Skip to content

gRPC

L'appel de procédure à distance RPC (Remote Procedure Call) est un élément incontournable dans les microservices. Au cours de l'apprentissage, vous rencontrerez divers frameworks RPC. Cependant, dans le domaine de Go, presque tous les frameworks RPC sont basés sur gRPC, qui est devenu un protocole fondamental dans le domaine du cloud native. Pourquoi le choisir ? Voici la réponse officielle :

gRPC est un framework d'appel de procédure à distance (Remote Process Call, RPC) open source hautes performances et moderne, qui peut fonctionner dans n'importe quel environnement. Il peut connecter efficacement les services au sein et entre les centres de données, avec un support enfichable pour l'équilibrage de charge, le traçage, les contrôles de santé et l'authentification. Il convient également à la connectivité de dernière génération entre les appareils, les applications mobiles, les navigateurs et les services backend.

Site officiel : gRPC

Documentation officielle : Documentation | gRPC

Tutoriel technique gRPC : Basics tutorial | Go | gRPC

Site officiel Protocol Buffers : Reference Guides | Protocol Buffers Documentation (protobuf.dev)

C'est également un projet open source sous la fondation CNCF (Cloud Native Computing Foundation).

Caractéristiques

Définition de service simple

Utilisez Protocol Buffers pour définir les services, un ensemble d'outils de sérialisation binaire puissant et indépendant du langage.

Démarrage et mise à l'échelle rapides

Installez et exécutez l'environnement d'exécution et de développement avec une seule ligne de code, et passez à des millions de RPC par seconde en quelques secondes.

Multi-langage, multi-plateforme

Génération automatique de stubs client et serveur pour différentes plateformes et langages.

Flux bidirectionnel et autorisation intégrée

Flux bidirectionnel basé sur HTTP/2 et authentification/autorisation enfichable.

Bien que gRPC soit indépendant du langage, la plupart du contenu de ce site concerne Go, donc cet article utilisera principalement Go comme langage principal. Les compilateurs pb et générateurs utilisés pour d'autres langages peuvent être trouvés sur le site officiel Protocol Buffers. Pour simplifier, le processus de création de projet sera omis.

Installation des dépendances

Téléchargez d'abord le compilateur Protocol Buffer : Releases · protocolbuffers/protobuf (github.com)

Choisissez le système et la version appropriés. Après le téléchargement, ajoutez le répertoire bin aux variables d'environnement.

Ensuite, téléchargez les générateurs de code. Le compilateur génère le code de sérialisation pour les langages correspondants à partir des fichiers proto, tandis que les générateurs produisent le code métier.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Créez un projet vide, nommé grpc_learn ici, puis ajoutez la dépendance suivante

sh
$ go get google.golang.org/grpc

Enfin, vérifiez la version pour confirmer l'installation

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Structure du projet

L'exemple suivant utilise un Hello World pour la démonstration. Créez la structure de projet suivante.

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Définition du fichier protobuf

Dans pb/hello.proto, écrivez le contenu suivant. C'est un exemple assez simple. Si vous ne connaissez pas la syntaxe protobuf, veuillez consulter la documentation correspondante.

protobuf
syntax = "proto3";

// . signifie générer directement dans le chemin de sortie, hello est le nom du package
option go_package = ".;hello";

// Requête
message HelloReq {
  string name = 1;
}

// Réponse
message HelloRep {
  string msg = 1;
}

// Définition du service
service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Génération du code

Après l'écriture, utilisez le compilateur protoc pour générer le code de sérialisation des données, et le générateur pour produire le code RPC.

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

Vous verrez alors que les fichiers hello.pb.go et hello_grpc.pb.go ont été générés dans le dossier hello. En parcourant hello.pb.go, vous trouverez les messages que nous avons définis.

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Champ défini
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Champ défini
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

Dans hello_grpc.pb.go, vous trouverez le service que nous avons défini.

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Par la suite, si nous implémentons nous-mêmes l'interface de service, nous devons intégrer cette structure,
// ce qui évite d'implémenter la méthode mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Retourne nil par défaut
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Contrainte d'interface
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Écriture du serveur

Dans server/main.go, écrivez le code suivant

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Écouter le port
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Créer le serveur gRPC
  server := grpc.NewServer()
  // Enregistrer le service
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Exécuter
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Écriture du client

Dans client/main.go, écrivez le code suivant

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Établir la connexion, sans chiffrement
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Créer le client
  client := pb.NewSayHelloClient(conn)
  // Appel distant
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Exécution

Exécutez d'abord le serveur, puis le client. La sortie du serveur est la suivante

2023/07/16 16:26:51 received grpc req: name:"client"

La sortie du client est la suivante

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

Dans cet exemple, une fois la connexion établie par le client, l'appel de la méthode distante se fait comme un appel de méthode locale. Il suffit d'accéder directement à la méthode Hello de client et d'obtenir le résultat. C'est un exemple gRPC très simple, et de nombreux frameworks open source encapsulent ce processus.

bufbuild

Dans l'exemple ci-dessus, le code a été généré directement avec des commandes. Si de nombreux plugins sont ajoutés par la suite, les commandes deviendront fastidieuses. Un outil open source appelé bufbuild/buf permet de gérer les fichiers protobuf.

Dépôt open source : bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Documentation : Buf - Install the Buf CLI

Caractéristiques

  • Gestion BSR
  • Linter
  • Génération de code
  • Formatage
  • Gestion des dépendances

Avec cet outil, vous pouvez gérer très facilement les fichiers protobuf.

La documentation propose de nombreuses méthodes d'installation. Si vous avez un environnement Go local, installez directement avec go install

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Après l'installation, vérifiez la version

sh
$ buf --version
1.24.0

Allez dans le dossier helloworld/pb et exécutez la commande suivante pour créer un module pour gérer les fichiers pb.

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

Le contenu par défaut du fichier buf.yaml est

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Allez ensuite dans le répertoire helloworld/ et créez buf.gen.yaml avec le contenu suivant

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Exécutez ensuite la commande pour générer le code

sh
$ buf generate

Une fois terminé, vous verrez les fichiers générés. Bien sûr, buf a d'autres fonctionnalités que vous pouvez explorer dans la documentation.

RPC en flux (Streaming RPC)

Les appels gRPC se divisent en deux grandes catégories : RPC unaire (Unary RPC) et RPC en flux (Stream RPC). L'exemple Hello World est un RPC unaire typique.

Le RPC unaire (ou RPC simple, plus compréhensible) fonctionne comme une requête HTTP classique : le client fait une requête, le serveur renvoie les données, un échange question-réponse.

Le RPC en flux permet à la fois la requête et la réponse d'être sous forme de flux, comme illustré ci-dessous

Avec une requête en flux, une seule réponse est renvoyée. Le client peut envoyer plusieurs fois des paramètres au serveur via le flux. Le serveur n'a pas besoin d'attendre tous les paramètres comme dans un RPC unaire pour commencer le traitement. La logique de traitement spécifique peut être décidée par le serveur. Normalement, seul le client peut fermer activement une requête en flux. Une fois le flux fermé, la requête RPC en cours prend fin.

Avec une réponse en flux, un seul ensemble de paramètres est envoyé. Le serveur peut envoyer plusieurs fois des données au client via le flux. Le client n'a pas besoin d'attendre toutes les données comme dans un RPC unaire pour commencer le traitement. La logique de traitement spécifique peut être décidée par le client. Normalement, seul le serveur peut fermer activement une réponse en flux. Une fois le flux fermé, la requête RPC en cours prend fin.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Il peut aussi n'y avoir que la réponse en flux (Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Ou les deux, requête et réponse, sont en flux (Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Flux unidirectionnel

Voici un exemple d'opération de flux unidirectionnel. Créez d'abord la structure de projet suivante

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

Le contenu de message.proto est

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Générez le code avec buf

sh
$ buf generate

Il s'agit d'un service de messagerie. receiveMessage prend un nom d'utilisateur spécifique (type chaîne) et renvoie un flux de messages. sendMessage prend un flux de messages et renvoie le nombre de messages envoyés avec succès (type entier 64 bits). Créez ensuite server/message_service.go et implémentez vous-même le service généré par défaut.

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

On peut voir que les paramètres de réception et d'envoi de messages contiennent une interface de wrapper de flux

go
type MessageService_ReceiveMessageServer interface {
    // Envoyer un message
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Envoyer la valeur de retour et fermer la connexion
  SendAndClose(*wrapperspb.StringValue) error
    // Recevoir un message
  Recv() (*Message, error)
  grpc.ServerStream
}

Elles intègrent toutes deux l'interface gprc.ServerStream

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

On peut voir que le RPC en flux ne fonctionne pas comme le RPC unaire où les paramètres d'entrée et les valeurs de retour sont clairement visibles dans la signature de la fonction. Ces méthodes ne permettent pas de déterminer directement les types d'entrée et de retour. Il faut utiliser le type Stream passé en paramètre pour effectuer la transmission en flux. Commençons maintenant à écrire la logique spécifique du serveur. Lors de l'écriture de la logique du serveur, un sync.map est utilisé pour simuler une file de messages. Lorsqu'un client envoie une requête ReceiveMessage, le serveur renvoie continuellement les messages souhaités par le client via une réponse en flux jusqu'à ce qu'un timeout se produise et que la connexion soit fermée. Lorsqu'un client envoie une requête SendMessage, il envoie continuellement des messages via une requête en flux. Le serveur place continuellement les messages dans la file d'attente jusqu'à ce que le client ferme activement la connexion, puis renvoie au client le nombre de messages envoyés.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Une file de messages simulée
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Recevoir les messages d'un utilisateur spécifié
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("Pas de message reçu pour %s en 5 secondes, fermeture de la connexion", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Récupérer le message
      msg := queue[0]
      // Envoyer le message au client via le flux
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Envoyer un message à un utilisateur spécifié
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Recevoir le message du client
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Placer le message dans la file d'attente
    messageQueue.Store(msg.From, queue)
    count++
  }
}

Le client a ouvert deux goroutines, une pour envoyer des messages, l'autre pour recevoir des messages. Bien sûr, il est aussi possible d'envoyer et de recevoir simultanément. Voici le code.

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Requête de réception de messages
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("Pas de message pour le moment, fermeture de la connexion")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "Tu es là ?",
      "Tu as du temps cet après-midi pour jouer ensemble ?",
      "D'accord, on se capte plus tard alors",
      "Ce week-end devrait aller non ?",
      "C'est noté alors",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Messages envoyés, fermeture active de la requête et récupération de la valeur de retour
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("Envoi terminé, %d messages envoyés au total\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Après exécution, la sortie du serveur est la suivante

server  2023/07/18 16:28:24 send from:"jack" content:"Tu es là ?" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"Tu es là ?" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"Tu as du temps cet après-midi pour jouer ensemble ?" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"Tu as du temps cet après-midi pour jouer ensemble ?" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"D'accord, on se capte plus tard alors" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"D'accord, on se capte plus tard alors" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"Ce week-end devrait aller non ?" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"Ce week-end devrait aller non ?" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"C'est noté alors" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"C'est noté alors" to:"mike"
server  2023/07/18 16:28:33 Pas de message reçu pour jack en 5 secondes, fermeture de la connexion

La sortie du client est la suivante

client  2023/07/18 16:28:24 receive from:"jack" content:"Tu es là ?" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"Tu as du temps cet après-midi pour jouer ensemble ?" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"D'accord, on se capte plus tard alors" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"Ce week-end devrait aller non ?" to:"mike"
client  2023/07/18 16:28:28 Envoi terminé, 5 messages envoyés au total
client  2023/07/18 16:28:28 receive from:"jack" content:"C'est noté alors" to:"mike"
client  2023/07/18 16:28:33 Pas de message pour le moment, fermeture de la connexion

À travers cet exemple, on peut voir que le traitement des requêtes RPC en flux unidirectionnel est plus complexe que le RPC unaire, tant pour le client que pour le serveur. Cependant, le RPC en flux bidirectionnel est encore plus complexe.

Flux bidirectionnel

Le RPC en flux bidirectionnel signifie que la requête et la réponse sont toutes deux en flux. C'est comme combiner les deux services de l'exemple précédent en un seul. Pour un RPC en flux, la première requête est toujours initiée par le client. Ensuite, le client peut envoyer les paramètres de la requête à tout moment via le flux, et le serveur peut également renvoyer des données à tout moment via le flux. Quel que soit le côté qui ferme activement le flux, la requête en cours prend fin.

TIP

Dans la suite, sauf nécessité, les étapes de génération de code pb et de création des clients/serveurs RPC seront omises.

Créez d'abord la structure de projet suivante

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

Le contenu de message.proto est

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Dans la logique du serveur, une fois la connexion établie, deux goroutines sont lancées : une responsable de la réception des messages, l'autre de l'envoi. La logique de traitement est similaire à l'exemple précédent, mais la logique de timeout a été supprimée.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue file de messages simulée
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Service de chat, la logique du serveur est gérée avec plusieurs goroutines
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "fin du chat")

  var chatErr error
  chatCh := make(chan error)

  // Créer deux goroutines, une pour recevoir, une pour envoyer
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Goroutine de réception
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Goroutine d'envoi
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

Dans la logique du client, deux sous-goroutines sont lancées pour simuler une conversation entre deux personnes. Chacune de ces sous-goroutines a elle-même deux goroutines "petits-enfants" responsables de l'envoi et de la réception (la logique du client ne garantit pas l'ordre correct des messages, c'est juste un exemple simple d'envoi et de réception entre deux parties).

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "Bonjour", "Tu as du temps pour jouer ?", "D'accord")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "Bonjour", "Non", "Pas le temps, cherche quelqu'un d'autre")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("fin du chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Messages envoyés, fermer la connexion
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Goroutine de réception
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Normalement, la sortie du serveur est

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"Bonjour" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"Bonjour" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"Bonjour" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"Bonjour" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"Tu as du temps pour jouer ?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"Non" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"Non" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"Tu as du temps pour jouer ?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"D'accord" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"Pas le temps, cherche quelqu'un d'autre" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"D'accord" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"Pas le temps, cherche quelqu'un d'autre" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack fin du chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike fin du chat

Normalement, la sortie du client est (on peut voir que la logique d'ordre des messages est mélangée)

client 2023/07/19 17:26:24 receive from:"jack"  content:"Bonjour"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"Bonjour"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"Non"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"Tu as du temps pour jouer ?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"D'accord"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"Pas le temps, cherche quelqu'un d'autre"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 fin du chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 fin du chat mike

À travers cet exemple, on peut voir que la logique de traitement du flux bidirectionnel est plus complexe que celle du flux unidirectionnel, tant pour le client que pour le serveur. Il faut combiner plusieurs goroutines avec des tâches asynchrones pour mieux gérer la logique.

Metadata

Les metadata sont essentiellement une map dont la valeur est une slice de chaînes, similaire aux headers HTTP/1. Elles jouent un rôle similaire dans gRPC, fournissant des informations sur l'appel RPC en cours. Le cycle de vie des metadata suit celui de l'appel RPC : une fois l'appel terminé, leur cycle de vie prend également fin.

Dans gRPC, elles sont transmises et stockées principalement via le context. Cependant, gRPC fournit un package metadata avec de nombreuses fonctions pratiques pour simplifier les opérations. Le type correspondant dans gRPC est metadata.MD, comme indiqué ci-dessous.

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

Nous pouvons utiliser directement la fonction metadata.New pour créer des metadata, mais il y a quelques points à noter.

go
func New(m map[string]string) MD

Les metadata ont des restrictions sur les noms de clés. Seuls les caractères suivants sont autorisés :

  • Caractères ASCII
  • Chiffres : 0-9
  • Minuscules : a-z
  • Majuscules : A-Z
  • Caractères spéciaux : -_

TIP

Dans les metadata, les lettres majuscules sont converties en minuscules, ce qui signifie qu'elles occuperont la même clé et que la valeur sera écrasée.

TIP

Les clés commençant par grpc- sont des clés internes réservées par grpc. Utiliser ce type de clé peut entraîner des erreurs.

Création manuelle

Il existe plusieurs façons de créer des metadata. Voici les deux méthodes les plus courantes pour la création manuelle. La première utilise la fonction metadata.New en lui passant directement une map.

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

La seconde est metadata.Pairs, qui prend une slice de chaînes de longueur paire et les analyse automatiquement en paires clé-valeur.

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Vous pouvez aussi utiliser metadata.join pour fusionner plusieurs metadata

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

Utilisation côté serveur

Récupérer les metadata

Le serveur peut récupérer les metadata avec la fonction metadata.FromIncomingContext

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Pour un RPC unaire, le paramètre du service contient un paramètre context, vous pouvez récupérer directement les metadata de celui-ci

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Pour un RPC en flux, le paramètre du service contient un objet flux, via lequel vous pouvez récupérer le context du flux

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Envoyer des metadata

L'envoi de metadata peut se faire avec la fonction grpc.sendHeader

go
func SendHeader(ctx context.Context, md metadata.MD) error

Cette fonction peut être appelée au maximum une fois. Elle ne prendra pas effet si elle est utilisée après certains événements qui provoquent l'envoi automatique des headers. Si vous ne souhaitez pas envoyer directement les headers, vous pouvez utiliser la fonction grpc.SetHeader.

go
func SetHeader(ctx context.Context, md metadata.MD) error

Si cette fonction est appelée plusieurs fois, les metadata de chaque appel seront fusionnées et envoyées au client dans les cas suivants :

  • Quand gprc.SendHeader et Servertream.SendHeader sont appelés
  • Quand le handler du RPC unaire retourne
  • Quand la méthode Stream.SendMsg de l'objet flux est appelée dans un RPC en flux
  • Quand l'état de la requête RPC devient send out, c'est-à-dire soit la requête RPC a réussi, soit une erreur s'est produite.

Pour un RPC en flux, il est recommandé d'utiliser les méthodes SendHeader et SetHeader de l'objet flux.

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

En cours d'utilisation, vous remarquerez que Header et Trailer ont des fonctionnalités similaires. Cependant, leur principale différence réside dans le moment de l'envoi. Dans un RPC unaire, cela peut ne pas être perceptible, mais cette différence est particulièrement marquée dans les RPC en flux, car les headers des RPC en flux peuvent être envoyés sans attendre la fin de la requête. Comme mentionné précédemment, les headers sont envoyés dans des situations spécifiques, tandis que les trailers ne sont envoyés qu'une fois la requête RPC entièrement terminée. Avant cela, les trailers récupérés sont vides.

Utilisation côté client

Récupérer les metadata

Le client peut récupérer les headers de réponse via grpc.Header et grpc.Trailer

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Notez cependant qu'ils ne peuvent pas être récupérés directement. Les deux fonctions ci-dessus retournent un CallOption, ce qui signifie qu'ils doivent être passés en tant qu'options lors de l'appel RPC.

go
// Déclarer un md pour recevoir les valeurs
var header, trailer metadata.MD

// Passer les options lors de l'appel RPC
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

Une fois la requête terminée, les valeurs seront écrites dans les md passés. Pour un RPC en flux, vous pouvez récupérer directement via l'objet flux retourné lors de la requête

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

Envoyer des metadata

Pour le client, envoyer des metadata est simple. Comme mentionné précédemment, les metadata se présentent sous forme de valueContext. Il suffit de combiner les metadata au context, puis de passer le context lors de la requête. Le package metadata fournit deux fonctions pratiques pour construire le context.

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// RPC unaire
res,err := client.SomeRPC(outgoingContext,data)
// RPC en flux
stream,err := client.StreamRPC(outgoingContext)

Si le ctx original contient déjà des metadata, utiliser NewOutgoingContext écrasera directement les données précédentes. Pour éviter cela, vous pouvez utiliser la fonction suivante, qui fusionne les données au lieu de les écraser.

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// RPC unaire
res,err := client.SomeRPC(appendContext,data)
// RPC en flux
stream,err := client.StreamRPC(appendContext)

Intercepteurs

Les intercepteurs gRPC sont similaires aux Middleware dans Gin. Ils permettent d'effectuer un travail spécial avant ou après la requête sans affecter la logique métier elle-même. Dans gRPC, il y a deux grandes catégories d'intercepteurs : les intercepteurs serveur et les intercepteurs client. Selon le type de requête, on distingue les intercepteurs de RPC unaire et les intercepteurs de RPC en flux.

Pour mieux comprendre les intercepteurs, nous utiliserons un exemple très simple.

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

Le contenu de person.proto est

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Le code du serveur est le suivant. La logique reprend le contenu précédent, assez simple, nous ne nous étendrons pas.

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Stockage des données
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Interception côté serveur

Les intercepteurs de requêtes RPC côté serveur sont UnaryServerInterceptor et StreamServerInterceptor. Leurs types spécifiques sont

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

RPC unaire

Pour créer un intercepteur de RPC unaire, il suffit d'implémenter le type UnaryserverInterceptor. Voici un exemple simple d'intercepteur de RPC unaire qui affiche chaque requête et réponse RPC.

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} données de la requête RPC
// param info *grpc.UnaryServerInfo informations sur cette requête RPC unaire
// param unaryHandler grpc.UnaryHandler le handler spécifique
// return resp interface{} données de réponse RPC
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Pour un RPC unaire, l'intercepteur intercepte chaque requête et réponse RPC, c'est-à-dire la phase de requête et la phase de réponse du RPC. Si l'intercepteur retourne une erreur, la requête en cours se termine.

RPC en flux

Pour créer un intercepteur de RPC en flux, il suffit d'implémenter le type StreamServerInterceptor. Voici un exemple simple d'intercepteur de RPC en flux.

go
// StreamPersonLogInterceptor
// param srv interface{} le serveur implémenté côté serveur
// param stream grpc.ServerStream l'objet flux
// param info *grpc.StreamServerInfo informations sur le flux
// param streamHandler grpc.StreamHandler le gestionnaire
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Pour un RPC en flux, l'intercepteur intercepte chaque appel aux méthodes Send et Recve de l'objet flux. Si l'intercepteur retourne une erreur, cela ne met pas fin à la requête RPC en cours, cela signifie simplement qu'une erreur s'est produite lors de ce send ou recv.

Utilisation des intercepteurs

Pour que les intercepteurs créés prennent effet, ils doivent être passés en option lors de la création du serveur gRPC. La bibliothèque officielle fournit également les fonctions correspondantes. Il existe des fonctions pour ajouter un seul intercepteur et des fonctions pour ajouter des intercepteurs en chaîne.

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

Utiliser UnaryInterceptor plusieurs fois provoquera la panic suivante

panic: The unary server interceptor was already set and may not be reset.

C'est pareil pour StreamInterceptor, alors que les intercepteurs en chaîne ajouteront à la même chaîne s'ils sont appelés plusieurs fois.

Exemple d'utilisation

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Ajouter des intercepteurs en chaîne
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Interception côté client

Les intercepteurs côté client sont similaires à ceux du serveur. Il y a un intercepteur unaire UnaryClientInterceptor et un intercepteur de flux StreamClientInterceptor. Leurs types spécifiques sont

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

RPC unaire

Pour créer un intercepteur de RPC unaire côté client, il suffit d'implémenter UnaryClientInterceptor. Voici un exemple simple.

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string nom de la méthode
// param req interface{} données de la requête
// param reply interface{} données de la réponse
// param cc *grpc.ClientConn objet de connexion client
// param invoker grpc.UnaryInvoker la méthode client spécifique interceptée
// param opts ...grpc.CallOption options de configuration de cette requête
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Via l'intercepteur de RPC unaire côté client, vous pouvez récupérer les données de requête et de réponse locales ainsi que d'autres informations sur la requête.

RPC en flux

Pour créer un intercepteur de RPC en flux côté client, il suffit d'implémenter StreamClientInterceptor. Voici un exemple.

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc informations de description de l'objet flux
// param cc *grpc.ClientConn objet de connexion
// param method string nom de la méthode
// param streamer grpc.Streamer objet utilisé pour créer l'objet flux
// param opts ...grpc.CallOption options de connexion
// return grpc.ClientStream l'objet flux client créé
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Via l'intercepteur de RPC en flux côté client, vous ne pouvez intercepter que le moment où le client établit la connexion avec le serveur, c'est-à-dire la création du flux. Vous ne pouvez pas intercepter chaque envoi et réception de message de l'objet flux client. Cependant, en enveloppant l'objet flux créé dans l'intercepteur, vous pouvez intercepter les envois et réceptions de messages, comme ceci

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc informations de description de l'objet flux
// param cc *grpc.ClientConn objet de connexion
// param method string nom de la méthode
// param streamer grpc.Streamer objet utilisé pour créer l'objet flux
// param opts ...grpc.CallOption options de connexion
// return grpc.ClientStream l'objet flux client créé
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Avant l'envoi du message
  err := c.ClientStream.SendMsg(m)
  // Après l'envoi du message
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Avant la réception du message
  err := c.ClientStream.RecvMsg(m)
  // Après la réception du message
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Utilisation des intercepteurs

Côté client, comme côté serveur, il y a quatre fonctions utilitaires pour ajouter des intercepteurs via des options. Il y a des intercepteurs simples et des intercepteurs en chaîne.

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

Utiliser WithUnaryInterceptor plusieurs fois côté client ne provoquera pas de panic, mais seul le dernier sera pris en compte.

Voici un exemple d'utilisation

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Jusqu'à présent, l'exemple complet a été écrit. Il est temps de l'exécuter et de voir les résultats. La sortie du serveur est

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

La sortie du client est

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

On peut voir que les sorties des deux côtés sont conformes aux attentes et jouent le rôle d'interception. Ce cas n'est qu'un exemple très simple. Les intercepteurs gRPC peuvent faire beaucoup de choses comme l'autorisation, la journalisation, la surveillance, etc. Vous pouvez choisir de créer vos propres solutions ou utiliser des solutions existantes de la communauté open source. gRPC Ecosystem collecte spécifiquement une série de middlewares intercepteurs gRPC open source à l'adresse : grpc-ecosystem/go-grpc-middleware.

Gestion des erreurs

Avant de commencer, regardons un exemple. Dans l'exemple précédent sur les intercepteurs, si un utilisateur n'est pas trouvé, une erreur person not found est renvoyée au client. La question est : le client peut-il effectuer un traitement spécial en fonction de l'erreur retournée ? Essayons. Dans le code client, utilisons errors.Is pour vérifier l'erreur.

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Le résultat est

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

On peut voir que l'erreur reçue par le client ressemble à ceci. L'erreur retournée par le serveur se trouve dans le champ desc

rpc error: code = Unknown desc = person not found

Naturellement, la logique errors.Is ne s'exécute pas. Même en remplaçant par errors.As, le résultat est le même.

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Pour résoudre ce type de problème, gRPC fournit le package status. C'est pourquoi l'erreur reçue par le client a des champs code et desc. En fait, gRPC renvoie un Status au client. Son type spécifique est le suivant. On peut voir que c'est aussi un message défini par protobuf.

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

Codes d'erreur

Le Code dans la structure Status est similaire au HTTP Status. Il est utilisé pour représenter l'état de la requête RPC en cours. gRPC définit 16 codes dans grpc/codes, couvrant la plupart des scénarios.

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // Appel réussi
  OK Code = 0

  // Requête annulée
  Canceled Code = 1

  // Erreur inconnue
  Unknown Code = 2

  // Paramètre invalide
  InvalidArgument Code = 3

    // Délai d'attente dépassé
  DeadlineExceeded Code = 4

  // Ressource non trouvée
  NotFound Code = 5

    // La ressource existe déjà
  AlreadyExists Code = 6

  // Permission refusée
  PermissionDenied Code = 7

  // Ressources épuisées
  ResourceExhausted Code = 8

  // Conditions préalables non remplies
  FailedPrecondition Code = 9

  // Opération interrompue
  Aborted Code = 10

  // Hors limites
  OutOfRange Code = 11

  // Service non implémenté
  Unimplemented Code = 12

  // Erreur interne
  Internal Code = 13

  // Service indisponible
  Unavailable Code = 14

  // Perte de données
  DataLoss Code = 15

  // Authentification échouée
  Unauthenticated Code = 16

  _maxCode = 17
)

Le package grpc/status fournit de nombreuses fonctions pour la conversion entre status et error. Nous pouvons utiliser directement status.New pour créer un Status, ou Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Par exemple

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Via la méthode Err de status, on peut récupérer l'erreur qu'il contient. Quand le statut est OK, l'erreur est nil.

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

On peut aussi créer directement une erreur

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name.String())

Nous pouvons donc modifier le code du service comme suit

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Auparavant, tous les codes retournés par le serveur étaient Unknown. Après modification, ils ont une sémantique plus précise. Le client peut ensuite utiliser status.FromError ou la fonction suivante pour récupérer le status depuis l'erreur, et ainsi traiter différemment selon le code

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Exemple

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Cependant, bien que les codes gRPC couvrent autant que possible les scénarios courants, ils peuvent parfois ne pas satisfaire les besoins des développeurs. Dans ce cas, vous pouvez utiliser le champ Details du Status. C'est une slice qui peut contenir plusieurs informations. Utilisez Status.WithDetails pour passer des informations personnalisées

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Utilisez Status.Details pour récupérer les informations

go
func (s *Status) Details() []interface{}

Notez que les informations passées devraient idéalement être définies par protobuf, pour faciliter l'analyse des deux côtés, client et serveur. Voici quelques exemples officiels

protobuf
message ErrorInfo {
  // Raison de l'erreur
  string reason = 1;

  // Domaine du service
  string domain = 2;

  // Autres informations
  map<string, string> metadata = 3;
}

// Informations de nouvelle tentative
message RetryInfo {
  // Intervalle d'attente pour la même requête
  google.protobuf.Duration retry_delay = 1;
}

// Informations de débogage
message DebugInfo {
  // Pile d'appels
  repeated string stack_entries = 1;

  // Détails
  string detail = 2;
}

    ...
    ...

Pour plus d'exemples, consultez googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Si nécessaire, importez avec le code suivant.

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Utiliser ErrorInfo comme details

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

Le client peut alors récupérer les données et les traiter. Les exemples ci-dessus sont ceux recommandés par gRPC. Vous pouvez aussi définir vos propres messages pour mieux répondre aux besoins métier. Si vous souhaitez faire une gestion d'erreurs unifiée, vous pouvez aussi l'intégrer dans un intercepteur.

Contrôle du timeout

Dans la plupart des cas, il n'y a pas qu'un seul service. Il peut y avoir de nombreux services en amont et en aval. Le client initie une requête, du service le plus en amont jusqu'au plus en aval, formant une chaîne d'appels de services, comme illustré. La chaîne pourrait être encore plus longue.

Avec une chaîne d'appels aussi longue, si le traitement logique d'un service prend beaucoup de temps, les services en amont resteront en attente. Pour réduire le gaspillage inutile de ressources, il est nécessaire d'introduire un mécanisme de timeout. Ainsi, le timeout passé par le client le plus en amont lors de l'appel est le temps maximum autorisé pour l'exécution de toute la chaîne. gRPC peut transmettre le timeout entre processus et langages. Il place certaines données à transmettre entre processus dans la trame HEADERS Frame de HTTP/2, comme illustré

Les données de timeout dans une requête gRPC correspondent au champ grpc-timeout dans la trame HEADERS Frame. Notez que toutes les bibliothèques gRPC n'implémentent pas ce mécanisme de transmission de timeout. Cependant, gRPC-go le supporte certainement. Si vous utilisez des bibliothèques dans d'autres langages et utilisez cette fonctionnalité, vous devez être attentif à ce point.

Timeout de connexion

Lorsqu'un client gRPC établit une connexion avec un serveur, par défaut, la connexion est établie de manière asynchrone. Si la connexion échoue, un Client vide est retourné. Si vous souhaitez que la connexion soit établie de manière synchrone, vous pouvez utiliser grpc.WithBlock() pour bloquer l'attente jusqu'à ce que la connexion soit établie.

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Si vous souhaitez contrôler un timeout, il suffit de passer un TimeoutContext et d'utiliser grpc.DialContext au lieu de gprc.Dial pour passer le context.

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Ainsi, si le délai de connexion est dépassé, une erreur sera retournée

context deadline exceeded

Côté serveur, vous pouvez aussi définir un timeout de connexion. Lors de l'établissement d'une nouvelle connexion avec un client, vous pouvez définir un délai. Par défaut, c'est 120 secondes. Si la connexion n'est pas établie avec succès dans le délai imparti, le serveur fermera activement la connexion.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout est encore en phase expérimentale, l'API pourrait être modifiée ou supprimée à l'avenir.

Timeout de requête

Lorsqu'un client gRPC envoie une requête, le premier paramètre est de type Context. De même, pour ajouter un timeout à la requête RPC, il suffit de passer un TimeoutContext

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Traitement du timeout
}

Après traitement par gRPC, le timeout est transmis au serveur. Pendant la transmission, il existe sous forme de champ dans la trame. En Go, il existe sous forme de context, et est ainsi transmis tout au long de la chaîne. Il n'est pas recommandé de modifier le timeout en cours de transmission dans la chaîne. La durée du timeout à définir lors de la requête devrait être décidée par le service le plus en amont.

Authentification et autorisation

Dans le domaine des microservices, chaque service doit vérifier l'identité et les permissions des utilisateurs pour chaque requête. Si chaque service implémente sa propre logique d'authentification comme dans une application monolithique, ce n'est clairement pas réaliste. Un service d'authentification et d'autorisation unifié est donc nécessaire. Les solutions courantes sont OAuth2, Session distribuée et JWT. Parmi celles-ci, OAuth2 est la plus utilisée et est devenue un standard de l'industrie. Le type de jeton le plus couramment utilisé dans OAuth2 est JWT. Voici un diagramme de flux du mode code d'autorisation OAuth2.

Transmission sécurisée

Enregistrement et découverte de services

Avant qu'un client n'appelle un service spécifique sur un serveur, il doit connaître l'IP et le port du serveur. Dans les exemples précédents, l'adresse du serveur était codée en dur. Dans un environnement réseau réel, la stabilité n'est pas toujours garantie. Certains services peuvent être mis hors ligne pour des raisons de panne et devenir inaccessibles, ou l'adresse peut changer en raison d'une migration de machines pour des raisons commerciales. Dans ces cas, on ne peut pas utiliser une adresse statique pour accéder aux services. Ces problèmes dynamiques sont ceux que l'enregistrement et la découverte de services doivent résoudre. La découverte de services est responsable de surveiller les changements d'adresse des services et de les mettre à jour. L'enregistrement de services est responsable d'informer le monde extérieur de sa propre adresse.

gRPC fournit des fonctionnalités de base de découverte de services, avec support pour l'extension et la personnalisation.

On ne peut pas utiliser d'adresses statiques, mais on peut utiliser des noms spécifiques à la place. Par exemple, les navigateurs utilisent la résolution DNS pour obtenir l'adresse à partir d'un nom de domaine. De même, la découverte de services par défaut dans gRPC se fait via DNS. Modifiez le fichier hosts local et ajoutez la correspondance suivante

127.0.0.1 example.grpc.com

Modifiez ensuite l'adresse Dial dans l'exemple helloworld pour utiliser le nom de domaine correspondant

go
func main() {
  // Établir la connexion, sans chiffrement
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Créer le client
  client := hello2.NewSayHelloClient(conn)
  // Appel distant
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

On peut voir une sortie normale

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

Dans gRPC, ces noms doivent suivre la syntaxe URI définie dans la RFC 3986. Le format est

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

L'URI dans l'exemple ci-dessus a la forme suivante. Comme DNS est supporté par défaut, le préfixe scheme est omis.

dns:example.grpc.com:8080

En plus de cela, gRPC supporte par défaut les sockets de domaine Unix. Pour les autres méthodes, nous devons implémenter un résolveur personnalisé selon les extensions gRPC, resolver.Resovler. Le resolver est responsable de surveiller les mises à jour des adresses cibles et de la configuration des services.

go
type Resolver interface {
    // gRPC appellera ResolveNow pour tenter de résoudre à nouveau le nom cible.
    // Ce n'est qu'un indice, le resolver peut l'ignorer s'il n'est pas nécessaire.
    // Cette méthode peut être appelée de manière concurrente
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC demande de passer un constructeur de Resolver, c'est-à-dire resolver.Builder, qui est responsable de construire les instances de Resolver.

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

La méthode Scheme du Builder retourne le type de Scheme qu'il est chargé de résoudre. Par exemple, le dnsBuilder par défaut retourne dns. Le constructeur devrait être enregistré dans le Builder global avec resolver.Register lors de l'initialisation, ou passé en option avec grpc.WithResolver dans le ClientConn interne. Ce dernier a une priorité plus élevée que le premier.

L'image ci-dessus décrit simplement le flux de travail du resolver. Voyons maintenant comment personnaliser un resolver.

Résolution de service personnalisée

Voici un résolveur personnalisé. Il faut un constructeur de résolveur personnalisé pour le construire.

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Il faut absolument appeler updatestate, sinon il y aura un deadlock
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Configuration, loadBalancingPolicy est la stratégie d'équilibrage de charge
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Le résolveur personnalisé passe simplement les adresses correspondantes dans la map à updatestate. Il spécifie également la stratégie d'équilibrage de charge. round_robin signifie une stratégie de tourniquet (round-robin).

go
// La structure service config est la suivante
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

Le code client est le suivant

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Enregistrer le builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Établir la connexion, sans chiffrement
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Créer le client
  client := hello2.NewSayHelloClient(conn)
     // Appeler une fois par seconde
  for range time.Tick(time.Second) {
    // Appel distant
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Normalement, le flux devrait être : le serveur s'enregistre auprès du registre, puis le client récupère la liste des services depuis le registre et effectue la correspondance. Ici, la map passée est un registre simulé. Les données sont statiques, donc l'étape d'enregistrement du service est omise, ne laissant que la découverte de services. Le target utilisé par le client est hello:myworld. hello est le scheme personnalisé, myworld est le nom du service. Après résolution par le résolveur personnalisé, l'adresse réelle 127.0.0.1:8080 est obtenue. En réalité, pour l'équilibrage de charge, un nom de service peut correspondre à plusieurs adresses réelles. C'est pourquoi le nom de service correspond à une slice. Ici, deux serveurs sont lancés, écoutant sur des ports différents. La stratégie d'équilibrage de charge est le round-robin. Les sorties des serveurs sont respectivement les suivantes. En regardant les horodatages, on peut voir que la stratégie d'équilibrage de charge fonctionne effectivement. Si aucune stratégie n'est spécifiée, seul le premier service est sélectionné par défaut.

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

Le registre stocke en fait la correspondance entre les noms d'enregistrement de service et les adresses de service réelles. Tout middleware capable de stocker des données peut faire l'affaire. On pourrait même utiliser MySQL comme registre (bien que personne ne le fasse probablement). Généralement, les registres sont des stockages clé-valeur. Redis est un très bon choix. Mais si on utilise Redis comme registre, il faut implémenter beaucoup de logique soi-même, comme les health checks, la mise hors ligne des services, l'ordonnancement des services, etc. C'est assez fastidieux. Bien que Redis ait certaines applications dans ce domaine, elles sont rares. Chaque métier a ses experts. Les solutions les plus connues dans ce domaine sont Zookeeper, Consul, Eureka, ETCD, Nacos, etc.

Vous pouvez consulter Comparaison et sélection de registres : Zookeeper, Eureka, Nacos, Consul et ETCD - 掘金 (juejin.cn) pour comprendre les différences entre ces middlewares.

Intégration avec Consul

Pour un exemple d'intégration avec Consul, consultez consul

Équilibrage de charge

Golang by www.golangdev.cn edit