Skip to content

gRPC

La chiamata di procedura remota (RPC) è un punto fondamentale da apprendere nei microservizi. Durante il processo di apprendimento, si incontrano vari framework RPC. Tuttavia, nel mondo Go, quasi tutti i framework RPC sono basati su gRPC, ed è diventato un protocollo fondamentale nel campo del cloud native. Perché sceglierlo? Ecco la risposta ufficiale:

gRPC è un framework open source moderno, ad alte prestazioni e per chiamate di procedura remota (Remote Process Call, RPC) che può funzionare in qualsiasi ambiente. Può connettere efficacemente i servizi all'interno e tra i datacenter con supporto per bilanciamento del carico plug-in, tracciamento, controlli di integrità e autenticazione. È anche adatto per l'ultimo miglio del calcolo distribuito che connette dispositivi, applicazioni mobili e browser ai servizi backend.

Sito ufficiale: gRPC

Documentazione ufficiale: Documentation | gRPC

Tutorial tecnico gRPC: Basics tutorial | Go | gRPC

Sito ufficiale ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

È anche un progetto open source della fondazione CNCF. CNCF è l'acronimo di CLOUD NATIVE COMPUTING FOUNDATION.

Caratteristiche

Definizione semplice dei servizi

Utilizza Protocol Buffers per definire i servizi, un potente insieme di strumenti di serializzazione binaria e linguaggio.

Avvio e scalabilità molto rapidi

Basta una riga di codice per installare il runtime e l'ambiente di sviluppo. Può scalare a milioni di RPC al secondo in pochi secondi.

Multipiattaforma e multilingua

Genera automaticamente stub di servizi client e server per diverse piattaforme e linguaggi.

Flusso bidirezionale e autorizzazione integrata

Flusso bidirezionale basato su HTTP/2 e autorizzazione di autenticazione plug-in.

Sebbene gRPC sia indipendente dal linguaggio, la maggior parte del contenuto di questo sito è relativo a Go, quindi questo articolo utilizzerà Go come linguaggio principale per le spiegazioni. I compilatori pb e i generatori utilizzati successivamente possono essere consultati sul sito ufficiale Protobuf per utenti di altri linguaggi. Per comodità, il processo di creazione del progetto verrà omesso.

Installazione delle Dipendenze

Scaricare prima il compilatore Protocol Buffer. Indirizzo per il download: Releases · protocolbuffers/protobuf (github.com)

Selezionare il sistema e la versione in base alle proprie esigenze. Dopo il download, aggiungere la directory bin alle variabili d'ambiente.

Poi scaricare il generatore di codice. Il compilatore genera codice di serializzazione nel linguaggio corrispondente dai file proto, mentre il generatore è utilizzato per generare codice business.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Creare un progetto vuoto, chiamato grpc_learn, e introdurre le seguenti dipendenze:

sh
$ go get google.golang.org/grpc

Infine, verificare la versione per assicurarsi che l'installazione sia riuscita:

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Struttura del Progetto

Di seguito verrà dimostrato un esempio Hello World. Creare la seguente struttura del progetto:

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Definizione del File Protobuf

In pb/hello.proto, inserire il seguente contenuto. Questo è un esempio molto semplice. Se non si conosce la sintassi protoc, consultare la documentazione correlata.

protobuf
syntax = "proto3";

// . indica che viene generato direttamente nel percorso di output, hello è il nome del pacchetto
option go_package = ".;hello";

// Richiesta
message HelloReq {
  string name = 1;
}

// Risposta
message HelloRep {
  string msg = 1;
}

// Definizione del servizio
service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Generazione del Codice

Dopo aver scritto, utilizzare il compilatore protoc per generare il codice relativo alla serializzazione dei dati e il generatore per generare il codice RPC:

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

A questo punto, si può notare che nella cartella hello sono stati generati i file hello.pb.go e hello_grpc.pb.go. Sfogliando hello.pb.go si possono trovare i message definiti:

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Campi definiti
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Campi definiti
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

In hello_grpc.pb.go si può trovare il servizio definito:

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Successivamente, se implementiamo l'interfaccia del servizio, dobbiamo incorporare questa struttura
// senza bisogno di implementare il metodo mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Restituisce nil per impostazione predefinita
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Vincolo di interfaccia
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Scrittura del Server

In server/main.go, scrivere il seguente codice:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Ascolta la porta
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Crea il server gRPC
  server := grpc.NewServer()
  // Registra il servizio
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Esegui
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Scrittura del Client

In client/main.go, inserire il seguente codice:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Stabilisce la connessione, senza verifica crittografica
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Crea il client
  client := pb.NewSayHelloClient(conn)
  // Chiamata remota
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Esecuzione

Eseguire prima il server, poi il client. L'output del server è il seguente:

2023/07/16 16:26:51 received grpc req: name:"client"

L'output del client è il seguente:

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

In questo esempio, dopo che il client ha stabilito la connessione, quando chiama il metodo remoto, è come chiamare un metodo locale, accedendo direttamente al metodo Hello del client e ottenendo il risultato. Questo è l'esempio GRPC più semplice. Molti framework open source incapsulano anche questo processo.

bufbuild

Negli esempi precedenti, il codice è stato generato direttamente utilizzando i comandi. Se in seguito ci sono più plugin, i comandi possono diventare piuttosto noiosi. A questo punto, è possibile utilizzare uno strumento per gestire i file protobuf. Esiste proprio uno strumento di gestione open source: bufbuild/buf.

Repository open source: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Documentazione: Buf - Install the Buf CLI

Caratteristiche

  • Gestione BSR
  • Linter
  • Generazione di codice
  • Formattazione
  • Gestione delle dipendenze

Con questo strumento, è possibile gestire comodamente i file protobuf.

La documentazione offre molte modalità di installazione. Se è installato l'ambiente Go locale, è possibile utilizzare direttamente go install:

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Dopo l'installazione, verificare la versione:

sh
$ buf --version
1.24.0

Andare nella cartella helloworld/pb ed eseguire il seguente comando per creare un module per gestire i file pb:

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

Il contenuto predefinito del file buf.yaml è il seguente:

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Poi, nella directory helloworld/, creare buf.gen.yaml e inserire il seguente contenuto:

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Poi eseguire il comando per generare il codice:

sh
$ buf generate

Dopo il completamento, è possibile vedere i file generati. Naturalmente, buf ha più funzionalità di queste. È possibile apprendere le altre funzionalità dalla documentazione.

RPC Stream

I metodi di chiamata gRPC sono di due categorie: Unary RPC e Stream RPC. L'esempio Hello World è un tipico Unary RPC.

Unary RPC (o RPC normale) funziona come HTTP normale. Il client richiede, il server risponde, un modo domanda-risposta. Mentre le richieste e le risposte di Stream RPC possono essere entrambe stream, come mostrato di seguito:

Quando si utilizza una richiesta stream, viene restituita solo una risposta. Il client può inviare più volte i parametri al server tramite stream. Il server non deve attendere che tutti i parametri vengano ricevuti prima di elaborarli, come nell'RPC unario. La logica di elaborazione specifica può essere decisa dal server. Normalmente, solo il client può chiudere attivamente la richiesta stream. Una volta chiuso lo stream, la richiesta RPC corrente termina.

Quando si utilizza una risposta stream, viene inviato solo un parametro. Il server può inviare più volte i dati al client tramite stream. Il client non deve attendere di ricevere tutti i dati prima di elaborarli, come nell'RPC unario. La logica di elaborazione specifica può essere decisa dal client. Normalmente, solo il server può chiudere attivamente la risposta stream. Una volta chiuso lo stream, la richiesta RPC corrente termina.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Può anche essere solo la risposta come stream (Server-Streaming RPC):

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Oppure sia la richiesta che la risposta come stream (Bi-directional-Streaming RPC):

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Stream Unidirezionale

Di seguito viene dimostrato un esempio di operazione stream unidirezionale. Prima creare la seguente struttura del progetto:

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

Il contenuto di message.proto è il seguente:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Generare il codice tramite buf:

sh
$ buf generate

Qui viene dimostrato un servizio di messaggi. receiveMessage riceve un nome utente specificato, tipo stringa, e restituisce un flusso di messaggi. sendMessage riceve un flusso di messaggi e restituisce il numero di messaggi inviati con successo, tipo intero a 64 bit. Successivamente, creare server/message_service.go per implementare il servizio generato dal codice predefinito:

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Si può vedere che nei parametri di ricezione e invio messaggi c'è un'interfaccia di wrapping dello stream:

go
type MessageService_ReceiveMessageServer interface {
    // Invia messaggio
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Invia valore di ritorno e chiude la connessione
  SendAndClose(*wrapperspb.StringValue) error
    // Ricevi messaggio
  Recv() (*Message, error)
  grpc.ServerStream
}

Entrambi incorporano l'interfaccia gprc.ServerStream:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Si può vedere che l'RPC stream non è come l'RPC unario dove i parametri in ingresso e i valori di ritorno sono chiaramente riflessi nella firma della funzione. Questi metodi a prima vista non mostrano quali tipi sono i parametri in ingresso e i valori di ritorno. È necessario chiamare lo stream passato per completare la trasmissione stream. Successivamente, inizia a scrivere la logica specifica del server. Nella scrittura della logica del server, viene utilizzato un sync.map per simulare una coda di messaggi. Quando il client invia una richiesta ReceiveMessage, il server restituisce continuamente i messaggi desiderati dal client tramite risposta stream, fino alla scadenza del timeout e alla chiusura della richiesta. Quando il client richiede SendMessage, invia continuamente messaggi tramite richiesta stream e il server mette continuamente i messaggi nella coda, fino a quando il client chiude attivamente la richiesta e restituisce al client il numero di messaggi inviati.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Una coda di messaggi simulata
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Riceve messaggi per un utente specificato
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("Nessun messaggio ricevuto da %s per 5 secondi, chiusura connessione", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Ottieni il messaggio
      msg := queue[0]
      // Invia il messaggio al client tramite trasmissione stream
      err := recvServer.Send(msg)
      log.Printf("ricevuto %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Invia messaggi a un utente specificato
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Ricevi messaggi dal client
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("inviato %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Metti il messaggio nella coda dei messaggi
    messageQueue.Store(msg.From, queue)
    count++
  }
}

Il client apre due goroutine, una per inviare messaggi e l'altra per ricevere messaggi. Naturalmente, è anche possibile inviare e ricevere contemporaneamente. Il codice è il seguente:

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Richiesta di ricezione messaggi
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("Nessun messaggio, chiusura connessione")
        break
      } else if err != nil {
        break
      }
      log.Printf("ricevuto %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "Ci sei",
      "Hai tempo per giocare insieme nel pomeriggio",
      "Va bene, giocheremo insieme quando avrai tempo",
      "Dovrebbe essere possibile questo weekend",
      "Allora è deciso",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Messaggi inviati, chiudi attivamente la richiesta e ottieni il valore di ritorno
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("Invio completato, totale %d messaggi inviati\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Dopo l'esecuzione, l'output del server è il seguente:

server  2023/07/18 16:28:24 send from:"jack" content:"Ci sei" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"Ci sei" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"Hai tempo per giocare insieme nel pomeriggio" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"Hai tempo per giocare insieme nel pomeriggio" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"Va bene, giocheremo insieme quando avrai tempo" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"Va bene, giocheremo insieme quando avrai tempo" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"Dovrebbe essere possibile questo weekend" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"Dovrebbe essere possibile questo weekend" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"Allora è deciso" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"Allora è deciso" to:"mike"
server  2023/07/18 16:28:33 Nessun messaggio ricevuto da jack per 5 secondi, chiusura connessione

L'output del client è il seguente:

client  2023/07/18 16:28:24 receive from:"jack" content:"Ci sei" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"Hai tempo per giocare insieme nel pomeriggio" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"Va bene, giocheremo insieme quando avrai tempo" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"Dovrebbe essere possibile questo weekend" to:"mike"
client  2023/07/18 16:28:28 Invio completato, totale 5 messaggi inviati
client  2023/07/18 16:28:28 receive from:"jack" content:"Allora è deciso" to:"mike"
client  2023/07/18 16:28:33 Nessun messaggio, chiusura connessione

Da questo esempio si può scoprire che la gestione delle richieste RPC stream unidirezionali è più complessa sia per il client che per il server rispetto all'RPC unario. Tuttavia, l'RPC stream bidirezionale è ancora più complesso.

Stream Bidirezionale

RPC stream bidirezionale, ovvero sia la richiesta che la risposta sono stream. Equivale a combinare i due servizi dell'esempio precedente in uno. Per l'RPC stream, la prima richiesta è sempre initiata dal client. Successivamente, il client può inviare parametri di richiesta tramite stream in qualsiasi momento e il server può restituire dati tramite stream in qualsiasi momento. Indipendentemente da quale parte chiude attivamente lo stream, la richiesta corrente termina.

TIP

Il contenuto successivo, se non necessario, ometterà direttamente la descrizione del codice per la generazione del codice pb e la creazione di client e server RPC.

Prima creare la seguente struttura del progetto:

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

Il contenuto di message.proto è il seguente:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Nella logica del server, dopo aver stabilito la connessione, apre due goroutine: uno per ricevere messaggi e uno per inviare messaggi. La logica di elaborazione specifica è simile all'esempio precedente, ma questa volta viene rimossa la logica di determinazione del timeout.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue Coda di messaggi simulata
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Servizio di chat, la logica del server verrà gestita con più goroutine
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "fine chat")

  var chatErr error
  chatCh := make(chan error)

  // Crea due goroutine, uno per ricevere messaggi e uno per inviare messaggi
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Goroutine per ricevere messaggi
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("ricevuto %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Goroutine per inviare messaggi
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "chiudi invio")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("inviato %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

Nella logica del client, vengono aperti due sottogoroutine per simulare il processo di chat tra due persone. Nei due sottogoroutine, ci sono rispettivamente due sottogoroutine nipoti responsabili dell'invio e della ricezione di messaggi (la logica del client non garantisce che l'ordine di invio e ricezione dei messaggi tra le due persone sia corretto, è solo un semplice esempio di invio e ricezione tra le due parti):

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "Ciao", "Hai tempo per giocare insieme?", "Va bene")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "Ciao", "No", "Non ho tempo, chiedi a qualcun altro")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("fine chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Messaggi inviati, chiudi connessione
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Goroutine per ricevere messaggi
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("ricevuto %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Normalmente, l'output del server è:

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"Ciao" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"Ciao" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"Ciao" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"Ciao" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"Hai tempo per giocare insieme?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"No" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"No" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"Hai tempo per giocare insieme?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"Va bene" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"Non ho tempo, chiedi a qualcun altro" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"Va bene" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"Non ho tempo, chiedi a qualcun altro" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

Normalmente, l'output del client (si può vedere che l'ordine logico dei messaggi è disordinato):

client 2023/07/19 17:26:24 receive from:"jack"  content:"Ciao"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"Ciao"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"No"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"Hai tempo per giocare insieme?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"Va bene"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"Non ho tempo, chiedi a qualcun altro"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

Dall'esempio si può vedere che la logica di elaborazione dello stream bidirezionale, sia per il client che per il server, è più complessa dello stream unidirezionale. È necessario combinare più goroutine per avviare attività asincrone per gestire meglio la logica.

Metadata

I metadata sono essenzialmente una mappa, il cui valore è una slice di stringhe, simile all'header di HTTP/1, e il ruolo che svolgono in gRPC è simile all'header HTTP, fornendo informazioni sulla chiamata RPC corrente. Allo stesso tempo, il ciclo di vita dei metadata segue l'intero processo di una chiamata RPC. Quando la chiamata termina, anche il loro ciclo di vita termina.

In gRPC, vengono principalmente trasmessi e archiviati tramite context. Tuttavia, gRPC fornisce il pacchetto metadata con molte funzioni convenienti per semplificare le operazioni, senza dover manipolare manualmente context. Il tipo corrispondente ai metadata in gRPC è metadata.MD, come mostrato di seguito:

go
// MD è una mappatura da chiavi metadata a valori. Gli utenti dovrebbero utilizzare
// le seguenti due funzioni di convenienza New e Pairs per generare MD.
type MD map[string][]string

Possiamo creare direttamente utilizzando la funzione metadata.New, ma prima di creare, ci sono alcuni punti da notare:

go
func New(m map[string]string) MD

I metadata hanno restrizioni sui nomi delle chiavi, possono essere solo caratteri limitati dalle seguenti regole:

  • Caratteri ASCII
  • Numeri: 0-9
  • Lettere minuscole: a-z
  • Lettere maiuscole: A-Z
  • Caratteri speciali: - _

TIP

Nei metadata, le lettere maiuscole vengono convertite in minuscole, il che significa che occuperanno la stessa chiave e i valori verranno sovrascritti.

TIP

Le chiavi che iniziano con grpc- sono chiavi interne riservate per l'uso di gRPC. L'uso di tali chiavi potrebbe causare alcuni errori.

Creazione Manuale

Esistono molti modi per creare metadata. Qui vengono introdotti i due metodi più comuni per creare manualmente i metadata. Il primo è utilizzare la funzione metadata.New, passando direttamente una mappa:

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

Il secondo è metadata.Pairs, passando una slice di stringhe di lunghezza pari, che verrà automaticamente解析成键值对。

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

È anche possibile utilizzare metadata.Join per unire più metadata:

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1, md2)

Utilizzo Lato Server

Ottenere metadata

Il server può ottenere i metadata utilizzando la funzione metadata.FromIncomingContext:

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Per l'RPC unario, il parametro del servizio conterrà un parametro context da cui è possibile ottenere direttamente i metadata:

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Per l'RPC stream, il parametro del servizio conterrà un oggetto stream da cui è possibile ottenere il context dello stream:

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Inviare metadata

Per inviare metadata, è possibile utilizzare la funzione grpc.SendHeader:

go
func SendHeader(ctx context.Context, md metadata.MD) error

Questa funzione può essere chiamata al massimo una volta e non avrà effetto se chiamata dopo che alcuni eventi hanno causato l'invio automatico dell'header. In alcuni casi, se non si desidera inviare direttamente l'header, è possibile utilizzare la funzione grpc.SetHeader:

go
func SetHeader(ctx context.Context, md metadata.MD) error

Se questa funzione viene chiamata più volte, i metadata trasmessi ogni volta verranno uniti e inviati al client nelle seguenti situazioni:

  • Quando vengono chiamati gprc.SendHeader e ServerStream.SendHeader
  • Quando l'handler dell'RPC unario restituisce
  • Quando viene chiamato il metodo Stream.SendMsg dell'oggetto stream nell'RPC stream
  • Quando lo stato della richiesta RPC diventa send out, il che significa che la richiesta RPC ha avuto successo o si è verificato un errore.

Per l'RPC stream, si consiglia di utilizzare i metodi SendHeader e SetHeader dell'oggetto stream:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Durante l'utilizzo, si scopre che le funzionalità di Header e Trailer sono quasi le stesse, ma la loro principale differenza risiede nel momento dell'invio. Potrebbe non essere evidente nell'RPC unario, ma questa differenza è particolarmente evidente nell'RPC stream, poiché l'Header nell'RPC stream può essere inviato senza attendere la fine della richiesta. Come menzionato in precedenza, l'Header verrà inviato in determinate situazioni, mentre il Trailer verrà inviato solo al termine dell'intera richiesta RPC. Prima di allora, il trailer ottenuto sarà vuoto.

Utilizzo Lato Client

Ottenere metadata

Il client può ottenere l'header della risposta tramite grpc.Header e grpc.Trailer:

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Tuttavia, è importante notare che non è possibile ottenerli direttamente. Come si può vedere, i valori di ritorno delle due funzioni sopra sono CallOption, il che significa che vengono passati come parametri option quando si effettua una richiesta RPC:

go
// Dichiara md per ricevere i valori
var header, trailer metadata.MD

// Passa option quando chiami la richiesta RPC
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

Al termine della richiesta, i valori verranno scritti nel md passato. Per l'RPC stream, è possibile ottenerli direttamente tramite l'oggetto stream restituito quando si effettua la richiesta:

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := stream.Trailer()

Inviare metadata

È molto semplice per il client inviare metadata. Come menzionato in precedenza, i metadata si manifestano come valueContext. Combina i metadata nel context e poi passa il context quando effettui la richiesta. Il pacchetto metadata fornisce due funzioni per facilitare la costruzione del context:

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// RPC unario
res,err := client.SomeRPC(outgoingContext,data)
// RPC stream
stream,err := client.StreamRPC(outgoingContext)

Se il ctx originale ha già metadata, l'utilizzo di NewOutgoingContext sovrascriverà direttamente i dati precedenti. Per evitare questa situazione, è possibile utilizzare la funzione seguente, che non sovrascrive ma unisce i dati:

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// RPC unario
res,err := client.SomeRPC(appendContext,data)
// RPC stream
stream,err := client.StreamRPC(appendContext)

Intercettori

Gli intercettori di gRPC sono simili ai Middleware di gin, entrambi servono a fare un lavoro speciale prima o dopo la richiesta senza influenzare la logica business stessa. In gRPC, ci sono due categorie di intercettori: intercettori lato server e intercettori lato client. In base al tipo di richiesta, ci sono intercettori per RPC unario e intercettori per RPC stream.

Per comprendere meglio gli intercettori, di seguito verrà descritta una dimostrazione molto semplice.

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

Il contenuto di person.proto è il seguente:

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Il codice del server è il seguente. La logica è tutta basata sui contenuti precedenti, abbastanza semplice, non verrà ripetuta.

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Archivia i dati
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Intercettazione Lato Server

Gli intercettori per le richieste RPC lato server sono UnaryServerInterceptor e StreamServerInterceptor. I tipi specifici sono i seguenti:

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

RPC Unario

Per creare un intercettore per RPC unario, è sufficiente implementare il tipo UnaryServerInterceptor. Di seguito è riportato un semplice esempio di intercettore per RPC unario, la cui funzione è di output di ogni richiesta e risposta RPC:

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} Dati della richiesta RPC
// param info *grpc.UnaryServerInfo Informazioni sulla richiesta RPC unaria corrente
// param unaryHandler grpc.UnaryHandler Handler specifico
// return resp interface{} Dati della risposta RPC
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("prima dell'intercettazione RPC unaria path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("dopo l'intercettazione RPC unaria path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Per l'RPC unario, l'intercettore intercetta ogni richiesta e risposta RPC, ovvero intercetta la fase di richiesta e la fase di risposta dell'RPC. Se l'intercettore restituisce un errore, la richiesta corrente terminerà.

RPC Stream

Per creare un intercettore per RPC stream, è sufficiente implementare il tipo StreamServerInterceptor. Di seguito è riportato un semplice esempio di intercettore per RPC stream:

go
// StreamPersonLogInterceptor
// param srv interface{} Corrisponde al server implementato lato server
// param stream grpc.ServerStream Oggetto stream
// param info *grpc.StreamServerInfo Informazioni sullo stream
// param streamHandler grpc.StreamHandler Handler
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("prima dell'intercettazione RPC stream path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("dopo l'intercettazione RPC stream path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Per l'RPC stream, l'intercettore intercetta il momento in cui i metodi Send e Recv di ogni oggetto stream vengono chiamati. Se l'intercettore restituisce un errore, ciò non causerà la fine della richiesta RPC corrente, ma indica solo che si è verificato un errore in questo send o recv.

Utilizzo degli Intercettori

Per rendere effettivi gli intercettori creati, è necessario passarli come option quando si crea il server gRPC. L'ufficio fornisce anche funzioni correlate per l'uso. Come mostrato di seguito, ci sono funzioni per aggiungere un singolo intercettore e funzioni per aggiungere intercettori a catena:

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

L'utilizzo ripetuto di UnaryInterceptor causerà il seguente panic:

panic: The unary server interceptor was already set and may not be reset.

Lo stesso vale per StreamInterceptor, mentre la chiamata ripetuta degli intercettori a catena li aggiungerà alla stessa catena.

Esempio di utilizzo:

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Aggiungi intercettori a catena
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Intercettazione Lato Client

Gli intercettori lato client sono simili a quelli lato server, un intercettore unario UnaryClientInterceptor e un intercettore stream StreamClientInterceptor. I tipi specifici sono i seguenti:

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

RPC Unario

Per creare un intercettore per RPC unario lato client, è sufficiente implementare UnaryClientInterceptor. Di seguito è riportato un semplice esempio:

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string Nome del metodo
// param req interface{} Dati della richiesta
// param reply interface{} Dati della risposta
// param cc *grpc.ClientConn Oggetto connessione client
// param invoker grpc.UnaryInvoker Metodo client specifico intercettato
// param opts ...grpc.CallOption Opzioni di configurazione per la richiesta corrente
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("prima della richiesta unaria path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("dopo la richiesta unaria path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Tramite l'intercettore per RPC unario lato client, è possibile ottenere i dati della richiesta e della risposta locali e alcune altre informazioni sulla richiesta.

RPC Stream

Per creare un intercettore per RPC stream lato client, è sufficiente implementare StreamClientInterceptor. Di seguito è riportato un esempio:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Informazioni di descrizione dell'oggetto stream
// param cc *grpc.ClientConn Oggetto connessione
// param method string Nome del metodo
// param streamer grpc.Streamer Oggetto utilizzato per creare l'oggetto stream
// param opts ...grpc.CallOption Opzioni di configurazione connessione
// return grpc.ClientStream Oggetto stream client creato
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("prima della creazione dello stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("dopo la creazione dello stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Tramite l'intercettore per RPC stream lato client, è possibile intercettare solo il momento in cui il client stabilisce una connessione con il server, ovvero il momento in cui viene creato lo stream. Non è possibile intercettare ogni volta che l'oggetto stream client invia o riceve messaggi. Tuttavia, possiamo incapsulare l'oggetto stream creato nell'intercettore per realizzare l'intercettazione dell'invio e della ricezione di messaggi, come mostrato di seguito:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Informazioni di descrizione dell'oggetto stream
// param cc *grpc.ClientConn Oggetto connessione
// param method string Nome del metodo
// param streamer grpc.Streamer Oggetto utilizzato per creare l'oggetto stream
// param opts ...grpc.CallOption Opzioni di configurazione connessione
// return grpc.ClientStream Oggetto stream client creato
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("prima della creazione dello stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("dopo la creazione dello stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Prima dell'invio del messaggio
  err := c.ClientStream.SendMsg(m)
  // Dopo l'invio del messaggio
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Prima della ricezione del messaggio
  err := c.ClientStream.RecvMsg(m)
  // Dopo la ricezione del messaggio
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Utilizzo degli Intercettori

Quando si utilizza, simile al server, ci sono anche quattro funzioni utility per aggiungere intercettori tramite option, divisi in intercettori singoli e intercettori a catena:

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

L'utilizzo ripetuto di WithUnaryInterceptor lato client non causerà panic, ma solo l'ultimo avrà effetto.

Di seguito è riportato un caso d'uso:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Finora, l'intero caso è stato completato. È il momento di eseguirlo per vedere il risultato. L'output del server è il seguente:

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

L'output del client è il seguente:

client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Si può vedere che l'output di entrambe le parti è conforme alle aspettative, ottenendo l'effetto di intercettazione. Questo caso è solo un semplice esempio. Utilizzando gli intercettori di gRPC, si possono fare molte cose come autorizzazione, registrazione, monitoraggio e altre funzioni. È possibile scegliere di creare i propri strumenti o utilizzare strumenti già pronti della comunità open source. gRPC Ecosystem raccoglie una serie di middleware intercettori gRPC open source. Indirizzo: grpc-ecosystem/go-grpc-middleware.

Gestione degli Errori

Prima di iniziare, diamo un'occhiata a un esempio. Nel caso dell'intercettore precedente, se l'utente non viene trovato, verrà restituito un errore al client person not found. La domanda è: il client può gestire in modo speciale in base all'errore restituito? Successivamente, proviamo a utilizzare errors.Is nel codice del client per giudicare l'errore:

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Il risultato dell'output è il seguente:

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

Si può vedere che l'errore ricevuto dal client è il seguente. Si scopre che l'errore restituito dal server è nel campo desc:

rpc error: code = Unknown desc = person not found

Naturalmente, la logica di errors.Is non viene eseguita. Anche se si sostituisce con errors.As, il risultato è lo stesso:

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Per questo motivo, gRPC fornisce il pacchetto status per risolvere questo tipo di problema. Questo è anche il motivo per cui l'errore ricevuto dal client ha i campi code e desc. Perché in realtà gRPC restituisce al client un Status, il cui tipo specifico è il seguente. Si può vedere che è anche un message definito da protobuf:

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // Il codice di stato, che dovrebbe essere un valore enum di
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // Un messaggio di errore rivolto allo sviluppatore, che dovrebbe essere in inglese. Qualsiasi
  // messaggio di errore rivolto all'utente dovrebbe essere localizzato e inviato nel
  // campo [google.rpc.Status.details][google.rpc.Status.details], o localizzato
  // dal client.
  string message = 2;

  // Un elenco di messaggi che trasportano i dettagli dell'errore. Esiste un insieme comune di
  // tipi di messaggi da utilizzare per le API.
  repeated google.protobuf.Any details = 3;
}

Codici di Errore

Il Code nella struttura Status è simile a Http Status, utilizzato per indicare lo stato della richiesta RPC corrente. gRPC definisce 16 code in grpc/codes, che coprono la maggior parte degli scenari, come mostrato di seguito:

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // Chiamata riuscita
  OK Code = 0

  // Richiesta annullata
  Canceled Code = 1

  // Errore sconosciuto
  Unknown Code = 2

  // Parametro non valido
  InvalidArgument Code = 3

    // Richiesta scaduta
  DeadlineExceeded Code = 4

  // Risorsa non esistente
  NotFound Code = 5

    // Esiste già la stessa risorsa
  AlreadyExists Code = 6

  // Permesso insufficiente, accesso negato
  PermissionDenied Code = 7

  // Risorsa esaurita, la capacità rimanente non è sufficiente per l'uso, ad esempio spazio su disco insufficiente
  ResourceExhausted Code = 8

  // Condizione di esecuzione insufficiente, ad esempio utilizzo di rm per eliminare una directory non vuota, la condizione di eliminazione è che la directory sia vuota, ma la condizione non è soddisfatta
  FailedPrecondition Code = 9

  // Richiesta interrotta
  Aborted Code = 10

  // L'operazione di accesso supera l'intervallo limitato
  OutOfRange Code = 11

  // Indica che il servizio corrente non è implementato
  Unimplemented Code = 12

  // Errore interno del sistema
  Internal Code = 13

  // Servizio non disponibile
  Unavailable Code = 14

  // Perdita di dati
  DataLoss Code = 15

  // Autenticazione non riuscita
  Unauthenticated Code = 16

  _maxCode = 17
)

Il pacchetto grpc/status fornisce molte funzioni per facilitare la conversione reciproca tra status ed error. Possiamo utilizzare direttamente status.New per creare uno Status, oppure Newf:

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Ad esempio, il seguente codice:

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Tramite il metodo err di status, è possibile ottenere l'error在其中. Quando lo stato è ok, l'error è nil:

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

È anche possibile creare direttamente un error:

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Quindi possiamo modificare il codice del server come segue:

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Prima di questa modifica, tutti i code restituiti dal server erano unknown. Ora, dopo la modifica, hanno una semantica più chiara. Quindi, lato client, è possibile ottenere lo status dall'error tramite status.FromError o utilizzare le funzioni seguenti per ottenere lo status dall'error, in modo da poter gestire di conseguenza in base a code diversi:

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Esempio:

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Tuttavia, sebbene i code di grpc abbiano coperto il più possibile alcuni scenari comuni, a volte non riescono ancora a soddisfare le esigenze degli sviluppatori. A questo punto, è possibile utilizzare il campo Details di Status, che è anche una slice e può contenere più informazioni. Tramite Status.WithDetails è possibile passare alcune informazioni personalizzate:

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Tramite Status.Details è possibile ottenere le informazioni:

go
func (s *Status) Details() []interface{}

È importante notare che le informazioni passate dovrebbero essere preferibilmente definite da protobuf, in modo che sia il server che il client possano analizzarle facilmente. L'ufficio fornisce alcuni esempi:

protobuf
message ErrorInfo {
  // La ragione dell'errore
  string reason = 1;

  // Il soggetto che definisce il servizio
  string domain = 2;

  // Altre informazioni
  map<string, string> metadata = 3;
}

// Informazioni di retry
message RetryInfo {
  // L'intervallo di attesa per la stessa richiesta
  google.protobuf.Duration retry_delay = 1;
}

// Informazioni di debug
message DebugInfo {
  // Stack
  repeated string stack_entries = 1;

  // Alcuni dettagli
  string detail = 2;
}

    ...
    ...

Più esempi possono essere consultati su googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Se necessario, è possibile introdurre tramite il seguente codice:

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Utilizzo di ErrorInfo come details:

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

Lato client è possibile ottenere i dati e gestirli. Tuttavia, questi sono solo alcuni esempi consigliati da gRPC. Oltre a questi, è anche possibile definire i propri message per soddisfare meglio le esigenze business corrispondenti. Se si desidera eseguire una gestione uniforme degli errori, è anche possibile operare negli intercettori.

Controllo del Timeout

Nella maggior parte dei casi, di solito non c'è solo un servizio e potrebbe esserci un gran numero di servizi a monte e anche molti servizi a valle. Quando il client invia una richiesta, dal servizio più a monte a quello più a valle, si forma una catena di chiamate ai servizi, come mostrato nella figura, forse anche più lunga di quella nella figura.

In una catena di chiamate così lunga, se la logica di elaborazione di uno dei servizi richiede molto tempo, causerà uno stato di attesa a monte. Per ridurre gli sprechi di risorse non necessari, è necessario introdurre il meccanismo di timeout. In questo modo, il tempo di timeout passato dalla chiamata più a monte è il tempo massimo di esecuzione consentito per l'intera catena di chiamate. gRPC può trasmettere il timeout tra processi e linguaggi. Mette alcuni dati che devono essere trasmessi tra processi nel frame HEADERS Frame di HTTP/2, come mostrato di seguito:

I dati di timeout nella richiesta gRPC corrispondono al campo grpc-timeout nel HEADERS Frame. È importante notare che non tutte le librerie gRPC hanno implementato questo meccanismo di trasmissione del timeout. Tuttavia, gRPC-go è sicuramente supportato. Se si utilizzano librerie di altri linguaggi e si utilizza questa caratteristica, è necessario prestare particolare attenzione a questo punto.

Timeout di Connessione

Quando il client gRPC stabilisce una connessione con il server, per impostazione predefinita è asincrona. Se la connessione non riesce a stabilirsi, restituirà solo un Client vuoto. Se si desidera che la connessione sia sincrona, è possibile utilizzare grpc.WithBlock() per bloccare l'attesa quando la connessione non è stata stabilita con successo:

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Se si desidera controllare un tempo di timeout, è sufficiente passare un TimeoutContext, utilizzando grpc.DialContext al posto di gprc.Dial per passare il context:

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

In questo modo, se la connessione scade, verrà restituito un error:

context deadline exceeded

Anche lato server è possibile impostare il timeout di connessione. Quando si stabilisce una nuova connessione con il client, è possibile impostare un tempo di timeout. Il valore predefinito è 120 secondi. Se la connessione non viene stabilita con successo entro il tempo specificato, il server disconnetterà attivamente la connessione:

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout è ancora in fase sperimentale e l'API potrebbe essere modificata o rimossa in futuro.

Timeout di Richiesta

Quando il client gRPC invia una richiesta, il primo parametro è di tipo Context. Allo stesso modo, se si desidera aggiungere un tempo di timeout alla richiesta RPC, è sufficiente passare un TimeoutContext:

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Gestione della logica di timeout
}

Attraverso l'elaborazione di gRPC, il tempo di timeout viene trasmesso al server. Durante la trasmissione, esiste come campo nel frame. In Go, esiste come context, trasmesso in tutto il链路. Durante la trasmissione nel链路, non è consigliabile modificare il tempo di timeout. Quanto tempo di timeout impostare durante la richiesta dovrebbe essere una considerazione del servizio più a monte.

Autenticazione e Autorizzazione

Nel campo dei microservizi, ogni servizio deve verificare l'identità e i permessi dell'utente per ogni richiesta. Se, come nelle applicazioni monolitiche, ogni servizio deve implementare la propria logica di autenticazione, questo non è realistico. Pertanto, è necessario un servizio di autenticazione e autorizzazione unificato. Le soluzioni comuni includono OAuth2, Sessioni distribuite e JWT. Tra questi, OAuth2 è il più utilizzato e una volta diventato lo standard del settore. Il tipo di token più comunemente utilizzato in OAuth2 è JWT. Di seguito è riportato un diagramma del flusso della modalità codice di autorizzazione OAuth2, con il processo di base come mostrato.

Trasmissione Sicura

Registrazione e Scoperta dei Servizi

Prima che il client chiami un servizio specifico del server, deve conoscere l'ip e la porta del server. Nei casi precedenti, gli indirizzi del server erano scritti in modo fisso. Nell'ambiente di rete reale, non è sempre così stabile. Alcuni servizi potrebbero andare offline a causa di guasti e diventare inaccessibili, oppure potrebbero subire migrazioni di macchine a causa dello sviluppo del business, causando cambiamenti negli indirizzi. In questi casi, non è possibile utilizzare indirizzi statici per accedere ai servizi. Questi problemi dinamici sono ciò che la scoperta e la registrazione dei servizi devono risolvere. La scoperta dei servizi è responsabile del monitoraggio dei cambiamenti negli indirizzi dei servizi e dell'aggiornamento, mentre la registrazione dei servizi è responsabile di comunicare il proprio indirizzo al mondo esterno. gRPC fornisce funzionalità di base per la scoperta dei servizi e supporta l'estensione e la personalizzazione.

Non potendo utilizzare indirizzi statici, è possibile utilizzare alcuni nomi specifici per sostituirli. Ad esempio, il browser risolve i nomi di dominio tramite DNS per ottenere gli indirizzi. Allo stesso modo, la scoperta dei servizi predefinita in gRPC viene effettuata tramite DNS. Modificare il file host locale e aggiungere la seguente mappatura:

127.0.0.1 example.grpc.com

Poi modificare l'indirizzo Dial nel client dell'esempio helloworld con il nome di dominio corrispondente:

go
func main() {
  // Stabilisce la connessione, senza verifica crittografica
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Crea il client
  client := hello2.NewSayHelloClient(conn)
  // Chiamata remota
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Si può anche vedere l'output normale:

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

In gRPC, questi nomi devono rispettare la sintassi URI definita in RFC 3986, il formato è:

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

L'URI nell'esempio precedente è nella seguente forma. Poiché il dns è supportato per impostazione predefinita, il prefisso scheme viene omesso:

dns:example.grpc.com:8080

Oltre a questo, gRPC supporta anche per impostazione predefinita Unix domain sockets. Per altri modi, è necessario implementare personalizzazioni in base alle estensioni di gRPC. Per questo, è necessario implementare un risolutore personalizzato resolver.Resolver. Il resolver è responsabile del monitoraggio degli aggiornamenti degli indirizzi di destinazione e della configurazione dei servizi:

go
type Resolver interface {
    // gRPC chiamerà ResolveNow per tentare di risolvere nuovamente il nome di destinazione. Questo è solo un suggerimento. Se non necessario, il resolver può ignorarlo. Il metodo può essere chiamato concorrentemente
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC richiede di passare un costruttore di Resolver, ovvero resolver.Builder, che è responsabile della creazione di istanze di Resolver:

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Il metodo Scheme del Builder restituisce il tipo di Scheme di cui è responsabile l'analisi. Ad esempio, il dnsBuilder predefinito restituisce dns. Il costruttore dovrebbe essere registrato nel Builder globale utilizzando resolver.Register durante l'inizializzazione, oppure passato come options all'interno di ClientConn utilizzando grpc.WithResolver. Quest'ultimo ha priorità sul primo.

La figura sopra descrive brevemente il flusso di lavoro del resolver. Successivamente, verrà dimostrato come personalizzare un resolver.

Risolutore di Servizi Personalizzato

Di seguito viene scritto un risolutore personalizzato. È necessario un costruttore di risoluzione personalizzato per la costruzione.

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Qui è necessario updatestate, altrimenti si verificherà un deadlock
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Configurazione, loadBalancingPolicy si riferisce alla strategia di bilanciamento del carico
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Il risolutore personalizzato passa gli indirizzi corrispondenti nella mappa a updatestate e specifica anche la strategia di bilanciamento del carico. round_robin si riferisce al round-robin.

go
// La struttura della service config è la seguente
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

Il codice del client è il seguente:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Registra il builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Stabilisce la connessione, senza verifica crittografica
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Crea il client
  client := hello2.NewSayHelloClient(conn)
     // Chiama una volta al secondo
  for range time.Tick(time.Second) {
    // Chiamata remota
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Normalmente, il processo dovrebbe essere che il server si registra al registro dei servizi, quindi il client ottiene l'elenco dei servizi dal registro e li abbina. Qui la mappa passata è un registro dei servizi simulato. I dati sono statici, quindi viene omessa la fase di registrazione del servizio, lasciando solo la scoperta dei servizi. Il target utilizzato dal client è hello:myworld, dove hello è lo scheme personalizzato e myworld è il nome del servizio. Dopo l'analisi del risolutore personalizzato, si ottiene l'indirizzo reale 127.0.0.1:8080. Nella situazione reale, per fare il bilanciamento del carico, un nome di servizio potrebbe corrispondere a più indirizzi reali, motivo per cui il nome del servizio corrisponde a una slice. Qui vengono avviati due server, occupando porte diverse. La strategia di bilanciamento del carico è round-robin. Gli output del server sono rispettivamente i seguenti. Dal tempo della richiesta, si può vedere che la strategia di bilanciamento del carico sta effettivamente funzionando. Se non si specifica la strategia, per impostazione predefinita viene selezionato solo il primo servizio:

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

Il registro dei servizi contiene essenzialmente una raccolta di mappature tra nomi di servizi registrati e indirizzi di servizi reali. Qualsiasi middleware di archiviazione dati può soddisfare le condizioni. Anche l'utilizzo di mysql come registro dei servizi non è impossibile (anche se probabilmente nessuno lo farebbe). In generale, i registri dei servizi sono archiviati come chiave-valore. Redis è una scelta molto buona. Tuttavia, se si utilizza Redis come registro dei servizi, è necessario implementare autonomamente molte logiche, come il controllo dell'heartbeat del servizio, la disattivazione del servizio, la pianificazione dei servizi, ecc., il che è piuttosto complicato. Sebbene Redis abbia una certa applicazione in questo ambito, è meno comune. Come si suol dire, lascia fare ai professionisti ciò che sanno fare meglio. Ci sono molti nomi famosi in questo campo: Zookeeper, Consul, Eureka, ETCD, Nacos, ecc.

È possibile consultare 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) per comprendere alcune differenze tra questi middleware.

Integrazione con Consul

Per il caso d'uso dell'integrazione con Consul, consultare consul

Bilanciamento del Carico

Golang by www.golangdev.cn edit