Skip to content

gRPC

A chamada de procedimento remoto rpc deve ser um ponto que deve ser aprendido em microsserviços, durante o processo de aprendizado você encontrará vários frameworks rpc, porém no domínio go, quase todos os frameworks rpc são baseados em gRPC, e também se tornou um protocolo básico no domínio de nuvem nativa, por que escolhê-lo, a resposta oficial é a seguinte:

gRPC é um framework de chamada de procedimento remoto (Remote Process Call, RPC) moderno, de código aberto e de alto desempenho que pode ser executado em qualquer ambiente. Ele pode conectar serviços dentro e entre datacenters de forma eficaz com suporte a balanceamento de carga plugável, rastreamento, verificação de saúde e autenticação. Também é adequado para computação distribuída de última milha conectando dispositivos, aplicativos móveis e navegadores a serviços de backend.

Site oficial: gRPC

Documentação oficial: Documentation | gRPC

Tutorial técnico gRPC: Basics tutorial | Go | gRPC

Site ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

Também é um projeto de código aberto da fundação CNCF, CNCF é o acrônimo de CLOUD NATIVE COMPUTING FOUNDATION

Características

Definição simples de serviço

Use Protocol Buffers para definir serviços, esta é uma poderosa ferramenta de serialização binária e linguagem.

Inicialização e escalonamento são muito rápidos

Basta uma linha de código para instalar o ambiente de execução e desenvolvimento, e apenas alguns segundos para escalar para milhões de RPCs por segundo

Multi-linguagem, multi-plataforma

Gera automaticamente stubs de serviço cliente e servidor para diferentes plataformas e linguagens

Fluxo bidirecional e autorização integrada

Fluxo bidirecional baseado em HTTP/2 e autenticação e autorização plugáveis

Embora o GRPC seja independente de linguagem, a maior parte do conteúdo deste site está relacionado a go, então este artigo também usará go como linguagem principal para explicação, os compiladores pb e geradores usados posteriormente, se os usuários de outras linguagens podem procurar por conta própria no site oficial do Protobuf. Para conveniência, o processo de criação do projeto será omitido diretamente a seguir.

Instalação de Dependências

Primeiro baixe o compilador Protocol Buffer, endereço de download: Releases · protocolbuffers/protobuf (github.com)

Selecione o sistema e versão de acordo com sua situação, após o download, adicione o diretório bin ao PATH.

Em seguida, baixe o gerador de código, o compilador gera código de serialização da linguagem correspondente a partir do arquivo proto, o gerador é usado para gerar código de negócio.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Crie um projeto vazio, o nome aqui é grpc_learn, e importe as seguintes dependências

sh
$ go get google.golang.org/grpc

Finalmente, verifique a versão para ver se a instalação foi bem-sucedida

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Estrutura do Projeto

Abaixo será demonstrado com um exemplo Hello World, crie a seguinte estrutura de projeto.

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Definir Arquivo Protobuf

Em pb/hello.proto, insira o seguinte conteúdo, este é um exemplo bastante simples, se não estiver familiarizado com a sintaxe protoc, consulte a documentação relacionada.

protobuf
syntax = "proto3";

// . significa gerar diretamente no caminho de saída, hello é o nome do pacote
option go_package = ".;hello";

// Requisição
message HelloReq {
  string name = 1;


  // Resposta
  message HelloRep {
    string msg = 1;
  }

  // Definir serviço
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Gerar Código

Após a escrita, use o compilador protoc para gerar código relacionado à serialização de dados, use o gerador para gerar código rpc

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

Neste ponto, pode-se descobrir que a pasta hello gerou os arquivos hello.pb.go e hello_grpc.pb.go, ao navegar por hello.pb.go pode-se encontrar nossa message definida

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Campo definido
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Campo definido
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

Em hello_grpc.pb.go pode-se encontrar nosso serviço definido

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Posteriormente, se implementarmos a interface de serviço, devemos incorporar esta estrutura, não precisando implementar o método mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Retorna nil por padrão
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Restrição de interface
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Escrever Servidor

Em server/main.go escreva o seguinte código

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Escutar porta
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Criar servidor grpc
  server := grpc.NewServer()
  // Registrar serviço
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Executar
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Escrever Cliente

Em client/main.go insira o seguinte código

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Estabelecer conexão, sem verificação de criptografia
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Criar cliente
  client := pb.NewSayHelloClient(conn)
  // Chamada remota
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Executar

Primeiro execute o servidor, depois execute o cliente, a saída do servidor é a seguinte

2023/07/16 16:26:51 received grpc req: name:"client"

A saída do cliente é a seguinte

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

Neste exemplo, após o cliente estabelecer a conexão, ao chamar o método remoto, é como chamar um método local, acessando diretamente o método Hello do client e obtendo o resultado, este é o exemplo GRPC mais simples, muitos frameworks de código aberto também encapsulam este processo.

bufbuild

No exemplo acima, o código foi gerado diretamente usando comandos, se posteriormente houver muitos plugins, o comando parecerá bastante tedioso, neste ponto, pode-se usar uma ferramenta para gerenciar arquivos protobuf, existe exatamente uma ferramenta de gerenciamento de código aberto bufbuild/buf.

Endereço de código aberto: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Endereço da documentação: Buf - Install the Buf CLI

Características

  • Gerenciamento BSR
  • Linter
  • Geração de código
  • Formatação
  • Gerenciamento de dependências

Com esta ferramenta, é muito conveniente gerenciar arquivos protobuf.

A documentação fornece muitas maneiras de instalar, você pode escolher por conta própria. Se o ambiente go estiver instalado localmente, use diretamente go install para instalar

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Após a instalação, verifique a versão

sh
$ buf --version
1.24.0

Vá para a pasta helloworld/pb e execute o seguinte comando para criar um módulo para gerenciar arquivos pb.

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

O conteúdo do arquivo buf.yaml é o seguinte por padrão

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Em seguida, vá para o diretório helloworld/ e crie buf.gen.yaml, insira o seguinte conteúdo

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Em seguida, execute o comando para gerar código

sh
$ buf generate

Após a conclusão, você pode ver os arquivos gerados, claro, buf não tem apenas esta funcionalidade, outras funcionalidades podem ser aprendidas por conta própria na documentação.

RPC de Fluxo

Os métodos de chamada grpc são divididos em duas categorias, Unary RPC e Stream RPC. O exemplo no Hello World é um típico Unary RPC.

Unary rpc (ou rpc comum seria mais compreensível, realmente não sei como traduzir este unary) é usado como http comum, o cliente solicita, o servidor retorna dados, uma pergunta e uma resposta. Enquanto o Stream RPC pode ter solicitações e respostas em fluxo, como na figura abaixo

Ao usar solicitação de fluxo, apenas uma resposta é retornada, o cliente pode enviar parâmetros várias vezes ao servidor através do fluxo, o servidor não precisa esperar até que todos os parâmetros sejam recebidos antes de processar como no Unary RPC, a lógica de processamento específica pode ser decidida pelo servidor. Normalmente, apenas o cliente pode fechar ativamente a solicitação de fluxo, uma vez que o fluxo é fechado, a solicitação RPC atual terminará.

Ao usar resposta de fluxo, apenas um parâmetro é enviado, o servidor pode enviar dados várias vezes ao cliente através do fluxo, o cliente não precisa esperar até receber todos os dados antes de processar como no Unary RPC, a lógica de processamento específica pode ser decidida pelo próprio cliente. Em uma solicitação normal, apenas o servidor pode fechar ativamente a resposta de fluxo, uma vez que o fluxo é fechado, a solicitação RPC atual terminará.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Também pode ser apenas a resposta em fluxo (Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Ou tanto a solicitação quanto a resposta em fluxo (Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Fluxo Unidirecional

Abaixo está um exemplo para demonstrar a operação de fluxo unidirecional, primeiro crie a seguinte estrutura de projeto

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

O conteúdo de message.proto é o seguinte

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Gere código através do buf

sh
$ buf generate

Aqui a demonstração é um serviço de mensagem, receiveMessage recebe um nome de usuário especificado, tipo string, retorna um fluxo de mensagens, sendMessage recebe um fluxo de mensagens, retorna o número de mensagens enviadas com sucesso, tipo inteiro de 64 bits. Em seguida, crie server/message_service.go, implemente por conta própria o serviço gerado pelo código padrão

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Pode-se ver que os parâmetros de recebimento e envio de mensagens têm uma interface de empacotamento de fluxo

go
type MessageService_ReceiveMessageServer interface {
    // Enviar mensagem
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Enviar valor de retorno e fechar conexão
  SendAndClose(*wrapperspb.StringValue) error
    // Receber mensagem
  Recv() (*Message, error)
  grpc.ServerStream
}

Ambos incorporam a interface gprc.ServerStream

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Pode-se ver que o RPC de fluxo não é como o Unary RPC onde os parâmetros de entrada e valores de retorno podem ser claramente refletidos na assinatura da função, estes métodos à primeira vista não mostram quais tipos são os parâmetros de entrada e valores de retorno, é necessário chamar o tipo Stream passado para completar a transmissão de fluxo, em seguida, comece a escrever a lógica específica do servidor. Ao escrever a lógica do servidor, use um sync.map para simular uma fila de mensagens, quando o cliente solicita ReceiveMessage, o servidor retorna continuamente as mensagens que o cliente deseja através da resposta de fluxo, até que o tempo limite expire e a conexão seja interrompida. Quando o cliente solicita SendMessage, envia continuamente mensagens através da solicitação de fluxo, o servidor coloca continuamente as mensagens na fila, até que o cliente desconecte ativamente a solicitação e retorne ao cliente o número de mensagens enviadas.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Uma fila de mensagens simulada
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Receber mensagens de usuário especificado
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("Não recebeu mensagem de %s em 5 segundos, fechando conexão", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Obter mensagem
      msg := queue[0]
      // Enviar mensagem ao cliente através de transmissão de fluxo
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Enviar mensagem para usuário especificado
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Receber mensagem do cliente
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Colocar mensagem na fila de mensagens
    messageQueue.Store(msg.From, queue)
    count++
  }
}

O cliente abre duas gorotinas, uma para enviar mensagens e outra para receber mensagens, claro, também pode enviar e receber ao mesmo tempo, o código é o seguinte.

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Solicitação de recebimento de mensagem
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("Nenhuma mensagem, fechando conexão")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "Olá",
      "Tem tempo para jogar jogos à tarde",
      "Tudo bem, vamos marcar um jogo quando tiver tempo",
      "Deve dar neste fim de semana",
      "Então está combinado",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Mensagens enviadas, fechar ativamente a solicitação e obter valor de retorno
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("Envio concluído, total de %d mensagens enviadas\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Após a execução, a saída do servidor é a seguinte

server  2023/07/18 16:28:24 send from:"jack" content:"Olá" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"Olá" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"Tem tempo para jogar jogos à tarde" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"Tem tempo para jogar jogos à tarde" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"Tudo bem, vamos marcar um jogo quando tiver tempo" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"Tudo bem, vamos marcar um jogo quando tiver tempo" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"Deve dar neste fim de semana" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"Deve dar neste fim de semana" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"Então está combinado" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"Então está combinado" to:"mike"
server  2023/07/18 16:28:33 Não recebeu mensagem de jack em 5 segundos, fechando conexão

A saída do cliente é a seguinte

client  2023/07/18 16:28:24 receive from:"jack" content:"Olá" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"Tem tempo para jogar jogos à tarde" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"Tudo bem, vamos marcar um jogo quando tiver tempo" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"Deve dar neste fim de semana" to:"mike"
client  2023/07/18 16:28:28 Envio concluído, total de 5 mensagens enviadas
client  2023/07/18 16:28:28 receive from:"jack" content:"Então está combinado" to:"mike"
client  2023/07/18 16:28:33 Nenhuma mensagem, fechando conexão

Através deste exemplo, pode-se descobrir que o processamento de solicitação RPC de fluxo unidirecional é mais complexo tanto para o cliente quanto para o servidor do que o rpc unário, porém o RPC de fluxo bidirecional é ainda mais complexo.

Fluxo Bidirecional

RPC de fluxo bidirecional, ou seja, tanto a solicitação quanto a resposta são em fluxo, é equivalente a combinar os dois serviços do exemplo anterior em um. Para RPC de fluxo, a primeira solicitação é sempre iniciada pelo cliente, em seguida, o cliente pode enviar parâmetros de solicitação a qualquer momento através do fluxo, o servidor também pode retornar dados a qualquer momento através do fluxo, não importa qual parte feche ativamente o fluxo, a solicitação atual terminará.

TIP

O conteúdo subsequente, a menos que necessário, omitirá diretamente a descrição do código de geração de código pb e a criação de etapas de código de cliente e servidor rpc

Primeiro crie a seguinte estrutura de projeto

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

O conteúdo de message.proto é o seguinte

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Na lógica do servidor, após estabelecer a conexão, abre duas gorotinas, uma responsável por receber mensagens e outra por enviar mensagens, a lógica de processamento específica é semelhante ao exemplo anterior, mas desta vez a lógica de determinação de tempo limite foi removida.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue Fila de mensagens simulada
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Serviço de chat, usaremos múltiplas gorotinas para processar a lógica do servidor
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "fim do chat")

  var chatErr error
  chatCh := make(chan error)

  // Criar duas gorotinas, uma para receber mensagens e outra para enviar mensagens
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Gorotina para receber mensagens
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Gorotina para enviar mensagens
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

Na lógica do cliente, abre duas sub-gorotinas para simular o processo de chat entre duas pessoas, cada sub-gorotina tem duas net-gorotinas responsáveis por enviar e receber mensagens (a lógica do cliente não garante que a ordem de envio e recebimento das mensagens entre as duas pessoas esteja correta, é apenas um exemplo simples de envio e recebimento de ambas as partes)

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "Olá", "Tem tempo para jogar jogos?", "Tudo bem")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "Olá", "Não", "Sem tempo, procure outra pessoa")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("fim do chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Mensagens enviadas, fechar conexão
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Gorotina para receber mensagens
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Normalmente, a saída do servidor é

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"Olá" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"Olá" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"Olá" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"Olá" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"Tem tempo para jogar jogos?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"Não" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"Não" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"Tem tempo para jogar jogos?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"Tudo bem" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"Sem tempo, procure outra pessoa" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"Tudo bem" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"Sem tempo, procure outra pessoa" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

Normalmente, a saída do cliente é (pode-se ver que a ordem lógica das mensagens está bagunçada)

client 2023/07/19 17:26:24 receive from:"jack"  content:"Olá"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"Olá"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"Não"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"Tem tempo para jogar jogos?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"Tudo bem"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"Sem tempo, procure outra pessoa"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

Através do exemplo, pode-se ver que a lógica de processamento de fluxo bidirecional, tanto para o cliente quanto para o servidor, é mais complexa do que o fluxo unidirecional, é necessário combinar múltiplas gorotinas para abrir tarefas assíncronas para melhor processar a lógica.

metadata

metadata é essencialmente um map, seu value é um slice de string, assim como o header do http1, e o papel que desempenha no gRPC também é similar ao http header, fornecendo algumas informações sobre esta chamada RPC, e o ciclo de vida do metadata segue todo o processo de uma chamada rpc, quando a chamada termina, seu ciclo de vida também termina.

No gRPC, é principalmente transmitido e armazenado através de context, porém o gRPC fornece o pacote metadata, que tem muitas funções convenientes para simplificar a operação, não precisamos operar manualmente o context. O tipo correspondente ao metadata no gRPC é metadata.MD, como mostrado abaixo.

go
// MD é um mapeamento de chaves de metadata para valores. Os usuários devem usar as seguintes
// duas funções de conveniência New e Pairs para gerar MD.
type MD map[string][]string

Podemos usar diretamente a função metadata.New para criar, porém antes de criar, há alguns pontos a serem observados

go
func New(m map[string]string) MD

O metadata tem restrições quanto ao nome da chave, apenas pode ser os caracteres restritos pelas seguintes regras:

  • Caracteres ASCII
  • Números: 0-9
  • Letras minúsculas: a-z
  • Letras maiúsculas: A-Z
  • Caracteres especiais: -_

TIP

No metadata, letras maiúsculas serão convertidas para minúsculas, ou seja, ocuparão a mesma chave, e o valor também será sobrescrito.

TIP

Chaves que começam com grpc- são chaves internas reservadas para uso do grpc, se usar este tipo de chave pode causar alguns erros.

Criação Manual

Há muitas maneiras de criar metadata, aqui são apresentados os dois métodos mais comuns de criação manual de metadata, o primeiro é usar a função metadata.New, passando diretamente um map.

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

O segundo é metadata.Pairs, passando um slice de string de comprimento par, será automaticamente analisado em pares chave-valor.

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Também pode usar metadata.join para mesclar múltiplos metadata

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

Uso no Servidor

Obter metadata

O servidor pode usar a função metadata.FromIncomingContext para obter metadata

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Para rpc unário, o parâmetro do service terá um parâmetro context, basta obter o metadata diretamente dele

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Para rpc de fluxo, o parâmetro do service terá um objeto de fluxo, através dele pode-se obter o context do fluxo

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Enviar metadata

Para enviar metadata, pode-se usar a função grpc.sendHeader

go
func SendHeader(ctx context.Context, md metadata.MD) error

Esta função pode ser chamada no máximo uma vez, não terá efeito em alguns casos após alguns eventos que causam o envio automático do header. Em alguns casos, se não quiser enviar o header diretamente, pode-se usar a função grpc.SetHeader.

go
func SetHeader(ctx context.Context, md metadata.MD) error

Se esta função for chamada várias vezes, o metadata transmitido em cada vez será mesclado e enviado ao cliente nas seguintes situações

  • Quando gprc.SendHeader e Servertream.SendHeader são chamados
  • Quando o handler do rpc unário retorna
  • Quando o objeto de fluxo no rpc de fluxo chama Stream.SendMsg
  • Quando o status da solicitação rpc se torna send out, esta situação significa que a solicitação rpc foi bem-sucedida ou ocorreu um erro.

Para rpc de fluxo, é recomendado usar o método SendHeader e o método SetHeader do objeto de fluxo.

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Durante o uso, descobrirá que as funcionalidades Header e Trailer são quase as mesmas, porém sua principal diferença está no momento do envio, pode não ser percebido no rpc unário, mas esta diferença é particularmente óbvia no RPC de fluxo, porque o Header no RPC de fluxo pode ser enviado sem esperar o término da solicitação. Como mencionado anteriormente, o Header será enviado em situações específicas, enquanto o Trailer só será enviado após o término de toda a solicitação RPC, antes disso, o trailer obtido está vazio.

Uso no Cliente

Obter metadata

O cliente que deseja obter o header da resposta pode usar grpc.Header e grpc.Trailer para implementar

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Porém, note que não pode obter diretamente, pode-se ver que os valores de retorno das duas funções acima são CallOption, ou seja, são passados como parâmetros option ao fazer a solicitação RPC.

go
// Declarar md para receber valores
var header, trailer metadata.MD

// Passar option ao fazer solicitação rpc
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

Após a conclusão da solicitação, o valor será escrito no md passado. Para rpc de fluxo, pode-se obter diretamente através do objeto de fluxo retornado ao fazer a solicitação

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

Enviar metadata

O cliente que deseja enviar metadata é muito simples, como mencionado anteriormente, a manifestação do metadata é valueContext, combine o metadata ao context e depois passe o context ao fazer a solicitação, o pacote metadata fornece duas funções para facilitar a construção do context.

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// Stream rpc
stream,err := client.StreamRPC(outgoingContext)

Se o ctx original já tiver metadata, usar NewOutgoingContext sobrescreverá os dados anteriores, para evitar esta situação, pode-se usar a função abaixo, que não sobrescreve, mas mescla os dados.

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// Stream rpc
stream,err := client.StreamRPC(appendContext)

Interceptadores

Os interceptadores do gRPC são similares ao Middleware no gin, ambos fazem algum trabalho especial antes ou depois da solicitação sem afetar a lógica de negócio em si. No gRPC, há duas categorias de interceptadores, interceptadores do servidor e interceptadores do cliente, de acordo com o tipo de solicitação, há interceptadores de RPC unário e interceptadores de RPC de fluxo, veja a figura abaixo

Para entender melhor os interceptadores, abaixo será descrito de acordo com um exemplo muito simples.

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

O conteúdo de person.proto é o seguinte

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

O código do servidor é o seguinte, a lógica é toda do conteúdo anterior, bastante simples, não será detalhado.

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Armazenar dados
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Interceptação do Servidor

Os interceptores de solicitação rpc do servidor são UnaryServerInterceptor e StreamServerInterceptor, os tipos específicos são mostrados abaixo

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

RPC Unário

Para criar um interceptador de RPC unário, basta implementar o tipo UnaryserverInterceptor, abaixo está um exemplo simples de interceptador de RPC unário, a funcionalidade é output de cada solicitação e resposta rpc.

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} dados da solicitação rpc
// param info *grpc.UnaryServerInfo algumas informações de solicitação deste RPC unário
// param unaryHandler grpc.UnaryHandler handler específico
// return resp interface{} dados da resposta rpc
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Para RPC unário, o interceptador intercepta cada solicitação e resposta RPC, ou seja, intercepta a fase de solicitação e a fase de resposta do RPC, se o interceptador retornar error, então esta solicitação terminará.

RPC de Fluxo

Para criar um interceptador de RPC de fluxo, basta implementar o tipo StreamServerInterceptor, abaixo está um exemplo simples de interceptador de RPC de fluxo.

go
// StreamPersonLogInterceptor
// param srv interface{} corresponde ao server implementado pelo servidor
// param stream grpc.ServerStream objeto de fluxo
// param info *grpc.StreamServerInfo informações de fluxo
// param streamHandler grpc.StreamHandler processador
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Para RPC de fluxo, o interceptador intercepta o momento em que os métodos Send e Recve de cada objeto de fluxo são chamados, se o interceptador retornar error, não causará o término desta solicitação RPC, apenas representa que este send ou recv ocorreu um erro.

Usar Interceptadores

Para fazer com que os interceptadores criados tenham efeito, é necessário passá-los como option ao criar o servidor gRPC, a função relacionada também é fornecida oficialmente para uso. Como mostrado abaixo, há funções para adicionar um único interceptador e também funções para adicionar interceptadores em cadeia.

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

O uso repetido de UnaryInterceptor causará o seguinte panic

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor é a mesma coisa, enquanto o uso repetido do interceptador em cadeia será anexado à mesma cadeia.

O exemplo de uso é o seguinte

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Adicionar interceptadores em cadeia
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Interceptação do Cliente

Os interceptadores do cliente são quase iguais aos do servidor, um interceptador unário UnaryClientInterceptor, um interceptador de fluxo StreamClientInterceptor, os tipos específicos são mostrados abaixo.

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

RPC Unário

Para criar um interceptador de cliente de RPC unário, basta implementar UnaryClientInterceptor, abaixo está um exemplo simples.

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string nome do método
// param req interface{} dados da solicitação
// param reply interface{} dados da resposta
// param cc *grpc.ClientConn objeto de conexão do cliente
// param invoker grpc.UnaryInvoker método específico do cliente interceptado
// param opts ...grpc.CallOption opções de configuração desta solicitação
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Através do interceptador de cliente de RPC unário, pode-se obter os dados de solicitação e resposta locais e algumas outras informações de solicitação.

RPC de Fluxo

Para criar um interceptador de cliente de RPC de fluxo, basta implementar StreamClientInterceptor, abaixo está um exemplo.

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc informações de descrição do objeto de fluxo
// param cc *grpc.ClientConn objeto de conexão
// param method string nome do método
// param streamer grpc.Streamer objeto usado para criar objeto de fluxo
// param opts ...grpc.CallOption opções de configuração de conexão
// return grpc.ClientStream objeto de fluxo do cliente criado
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Através do interceptador de cliente de RPC de fluxo, só é possível interceptar o momento em que o cliente estabelece conexão com o servidor, ou seja, o momento de criação do fluxo, não é possível interceptar cada vez que o objeto de fluxo do cliente envia ou recebe mensagens, porém podemos empacotar o objeto de fluxo criado no interceptador para implementar a interceptação de envio e recebimento de mensagens, como abaixo

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc informações de descrição do objeto de fluxo
// param cc *grpc.ClientConn objeto de conexão
// param method string nome do método
// param streamer grpc.Streamer objeto usado para criar objeto de fluxo
// param opts ...grpc.CallOption opções de configuração de conexão
// return grpc.ClientStream objeto de fluxo do cliente criado
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Antes do envio da mensagem
  err := c.ClientStream.SendMsg(m)
  // Após o envio da mensagem
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Antes do recebimento da mensagem
  err := c.ClientStream.RecvMsg(m)
  // Após o recebimento da mensagem
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Usar Interceptadores

Ao usar, é similar ao servidor, também são quatro funções de ferramenta para adicionar interceptadores através de option, divididos em interceptador único e interceptador em cadeia.

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

O uso repetido de WithUnaryInterceptor no cliente não causará panic, mas apenas o último terá efeito.

Abaixo está um caso de uso

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Até agora, todo o caso foi escrito, é hora de executar para ver como é o resultado. A saída do servidor é a seguinte

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

A saída do cliente é a seguinte

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Pode-se ver que as saídas de ambos os lados estão conforme o esperado, desempenhando o efeito de interceptação, este caso é apenas um exemplo muito simples, usando os interceptadores do gRPC pode-se fazer muitas coisas como autorização, logs, monitoramento e outras funcionalidades, pode-se escolher fazer sua própria implementação ou escolher usar implementações prontas da comunidade de código aberto, gRPC Ecosystem coleta especialmente uma série de middlewares de interceptadores gRPC de código aberto, endereço: grpc-ecosystem/go-grpc-middleware.

Tratamento de Erros

Antes de começar, vamos ver um exemplo, no caso do interceptador anterior, se o usuário não for encontrado, será retornado ao cliente o erro person not found, então surge a questão, o cliente pode fazer um tratamento especial de acordo com o erro retornado? Em seguida, tente, no código do cliente, tente usar errors.Is para julgar o erro.

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

O resultado da saída é o seguinte

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

Pode-se ver que o error recebido pelo cliente é assim, descobrirá que o error retornado pelo servidor está no campo desc

rpc error: code = Unknown desc = person not found

Naturalmente, a lógica do segmento errors.Is não foi executada, mesmo que troque por errors.As o resultado é o mesmo.

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Para isso, o gRPC fornece o pacote status para resolver este tipo de problema, esta é também a razão pela qual o error recebido pelo cliente tem os campos code e desc, porque o gRPC na verdade retorna um Status ao cliente, seu tipo específico é o seguinte, também é uma message definida por protobuf.

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // O código de status, que deve ser um valor enum de
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // Uma mensagem de erro voltada para o desenvolvedor, que deve estar em inglês. Qualquer
  // mensagem de erro voltada para o usuário deve ser localizada e enviada no
  // campo [google.rpc.Status.details][google.rpc.Status.details], ou localizada
  // pelo cliente.
  string message = 2;

  // Uma lista de mensagens que carregam os detalhes do erro. Existe um conjunto comum de
  // tipos de mensagem para APIs usarem.
  repeated google.protobuf.Any details = 3;
}

Códigos de Erro

O Code na estrutura Status é uma existência similar ao Http Status, usado para indicar o estado da solicitação rpc atual, o gRPC define 16 codes localizados em grpc/codes, cobrindo a maioria dos cenários, respectivamente mostrados abaixo

go
// A Code é um código de erro não assinado de 32 bits conforme definido na especificação gRPC.
type Code uint32

const (
  // Chamada bem-sucedida
  OK Code = 0

  // Solicitação cancelada
  Canceled Code = 1

  // Erro desconhecido
  Unknown Code = 2

  // Parâmetro inválido
  InvalidArgument Code = 3

    // Solicitação com tempo limite excedido
  DeadlineExceeded Code = 4

  // Recurso não existe
  NotFound Code = 5

    // Já existe o mesmo recurso (não esperava que este existisse)
  AlreadyExists Code = 6

  // Permissão insuficiente, acesso negado
  PermissionDenied Code = 7

  // Esgotamento de recursos, capacidade restante insuficiente para uso, como espaço em disco insuficiente
  ResourceExhausted Code = 8

  // Condições de execução insuficientes, como usar rm para excluir um diretório não vazio, a condição de exclusão é que o diretório esteja vazio, mas a condição não é atendida
  FailedPrecondition Code = 9

  // Solicitação interrompida
  Aborted Code = 10

  // Operação de acesso excede o limite de escopo
  OutOfRange Code = 11

  // Indica que o serviço atual não está implementado
  Unimplemented Code = 12

  // Erro interno do sistema
  Internal Code = 13

  // Serviço indisponível
  Unavailable Code = 14

  // Perda de dados
  DataLoss Code = 15

  // Não passou na autenticação
  Unauthenticated Code = 16

  _maxCode = 17
)

O pacote grpc/status fornece muitas funções para facilitar a conversão mútua entre status e error. Podemos usar diretamente status.New para criar um Status, ou Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Por exemplo, o código abaixo

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Através do método err do status pode-se obter o error nele, quando o estado é ok o error é nil.

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

Também pode criar error diretamente

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Então podemos modificar o código do servidor para o seguinte

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Antes disso, todos os codes retornados pelo servidor eram unknown, agora após a modificação têm uma semântica mais clara. Então no cliente pode-se usar status.FromError ou usar a função abaixo para obter status do error, para fazer o tratamento correspondente de acordo com diferentes codes

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Exemplo如下

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Porém, embora o code do grpc tenha coberto o máximo possível alguns cenários comuns, às vezes ainda não consegue atender às necessidades dos desenvolvedores, neste momento pode-se usar o campo Details no Status, e é um slice, pode acomodar múltiplas informações. Através de Status.WithDetails para passar algumas informações personalizadas

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Através de Status.Details para obter informações

go
func (s *Status) Details() []interface{}

Note que as informações passadas devem ser preferencialmente definidas por protobuf, para que seja conveniente para ambas as extremidades do servidor e cliente analisarem, a função oficial fornece alguns exemplos

protobuf
message ErrorInfo {
  // A causa do erro
  string reason = 1;

  // O sujeito que define o serviço
  string domain = 2;

  // Outras informações
  map<string, string> metadata = 3;
}

// Informações de retry
message RetryInfo {
  // Intervalo de espera para a mesma solicitação
  google.protobuf.Duration retry_delay = 1;
}

// Informações de depuração
message DebugInfo {
  // Stack
  repeated string stack_entries = 1;

  // Algumas informações detalhadas
  string detail = 2;
}

    ...
    ...

Mais exemplos podem ser consultados em googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Se necessário, pode ser importado através do código abaixo.

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Usar ErrorInfo como details

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

No cliente pode-se obter os dados para fazer o tratamento, porém os acima são apenas alguns exemplos recomendados pelo gRPC, além disso, também pode-se definir mensagens por conta própria para melhor atender às necessidades de negócio correspondentes, se quiser fazer algum tratamento de erro unificado, também pode ser operado dentro do interceptador.

Controle de Tempo Limite

Na maioria dos casos, normalmente não há apenas um serviço, e pode haver muitos serviços a montante e muitos serviços a jusante. O cliente inicia uma solicitação, do serviço mais a montante até o mais a jusante, forma uma cadeia de chamadas de serviço, como na figura, talvez até mais longa do que na figura.

Em uma cadeia de chamadas tão longa, se a lógica de processamento de um dos serviços levar muito tempo, causará que a montante fique sempre em estado de espera. Para reduzir o desperdício desnecessário de recursos, é necessário introduzir o mecanismo de tempo limite, desta forma o tempo limite passado na chamada mais a montante é o tempo máximo permitido para toda a cadeia de chamadas. E o gRPC pode transmitir tempo limite entre processos e linguagens, coloca alguns dados que precisam ser transmitidos entre processos no frame HEADERS Frame do HTTP2, como na figura abaixo

Os dados de tempo limite na solicitação gRPC correspondem ao campo grpc-timeout no HEADERS Frame. Note que nem todas as bibliotecas gRPC implementaram este mecanismo de transmissão de tempo limite, porém gRPC-go certamente suporta, se usar bibliotecas de outras linguagens e usar esta característica, é necessário prestar atenção adicional a este ponto.

Tempo Limite de Conexão

Quando o cliente gRPC estabelece conexão com o servidor, por padrão é estabelecida de forma assíncrona, se a conexão falhar em ser estabelecida, apenas retornará um Client vazio. Se quiser que a conexão seja síncrona, pode-se usar grpc.WithBlock() para bloquear a espera quando a conexão não for estabelecida com sucesso.

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Se quiser controlar um tempo limite, basta passar um TimeoutContext, usar grpc.DialContext para substituir gprc.Dial para passar o context.

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Desta forma, se o tempo limite de conexão for excedido, retornará error

context deadline exceeded

No servidor também pode-se definir tempo limite de conexão, ao estabelecer uma nova conexão com o cliente, defina um tempo limite, por padrão é 120 segundos, se não for estabelecida com sucesso dentro do tempo especificado, o servidor desconectará ativamente.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout ainda está em fase experimental, a API pode ser modificada ou removida no futuro.

Tempo Limite de Solicitação

Quando o cliente gRPC inicia uma solicitação, o primeiro parâmetro é do tipo Context, da mesma forma, se quiser adicionar um tempo limite à solicitação RPC, basta passar um TimeoutContext

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Tratamento de lógica de tempo limite
}

Através do processamento do gRPC, o tempo limite é transmitido ao servidor, durante o processo de transmissão existe na forma de campo de frame, em go existe na forma de context, assim transmitindo em toda a cadeia. Durante o processo de transmissão da cadeia, não é recomendado modificar o tempo limite, quanto tempo de tempo limite definir na solicitação específica, isso deve ser considerado pela parte mais a montante.

Autenticação e Autorização

No domínio de microsserviços, cada serviço precisa verificar a identidade e permissão do usuário para cada solicitação, se for igual à aplicação monolítica, cada serviço precisa implementar seu próprio conjunto de lógica de autenticação, isso obviamente não é muito realista. Portanto, é necessário um serviço unificado de autenticação e autorização, e as soluções comuns são usar OAuth2, Sessão Distribuída e JWT, entre estes, OAuth2 é o mais amplamente usado, uma vez se tornou o padrão da indústria, o tipo de token mais comumente usado no OAuth2 é JWT. Abaixo está um fluxograma do modo de código de autorização OAuth2, o processo básico é conforme mostrado.

Transmissão Segura

Registro e Descoberta de Serviços

Antes que o cliente chame um serviço específico do servidor, precisa conhecer o ip e port do servidor, nos casos anteriores, o endereço do servidor era fixo. No ambiente de rede real nem sempre é tão estável, alguns serviços podem ficar offline devido a falhas e se tornar inacessíveis, também podem ocorrer mudanças de endereço devido à migração de máquinas para desenvolvimento de negócios, nestas situações não se pode usar endereço estático para acessar serviços, e estes problemas dinâmicos são o que o registro e descoberta de serviços devem resolver, a descoberta de serviços é responsável por monitorar mudanças no endereço do serviço e atualizar, o registro de serviços é responsável por informar ao exterior seu endereço. No gRPC, é fornecida a funcionalidade básica de descoberta de serviços, e suporta extensão e personalização.

Não se pode usar endereço estático, pode-se usar alguns nomes específicos para substituir, como o navegador obtém o endereço através da resolução DNS do nome de domínio, da mesma forma, a descoberta de serviços padrão no gRPC é feita através do DNS, modifique o arquivo host local, adicione o seguinte mapeamento

127.0.0.1 example.grpc.com

Em seguida, altere o endereço Dial no exemplo helloworld para o nome de domínio correspondente

go
func main() {
  // Estabelecer conexão, sem verificação de criptografia
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Criar cliente
  client := hello2.NewSayHelloClient(conn)
  // Chamada remota
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Também pode-se ver a saída normal

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

No gRPC, este tipo de nome deve seguir a sintaxe URI definida no RFC 3986, o formato é

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

O URI no exemplo acima é a seguinte forma, como o dns é suportado por padrão, o prefixo scheme é omitido.

dns:example.grpc.com:8080

Além disso, o gRPC também suporta por padrão Unix domain sockets, e para outras formas, precisamos implementar personalização de acordo com a extensão do gRPC, para isso é necessário implementar um resolvedor personalizado resolver.Resovler, o resolver é responsável por monitorar atualizações do endereço de destino e configuração de serviço.

go
type Resolver interface {
    // O gRPC chamará ResolveNow para tentar analisar novamente o nome de destino. Este é apenas um aviso, se não for necessário, o resolvedor pode ignorá-lo, o método pode ser chamado concorrentemente
  ResolveNow(ResolveNowOptions)
  Close()
}

O gRPC requer que passemos um construtor de Resolver, ou seja, resolver.Builder, que é responsável por construir instâncias de Resolver.

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

O método Scheme do Builder retorna o tipo de Scheme que é responsável por analisar, por exemplo, o dnsBuilder padrão retorna dns, o construtor deve ser registrado no Builder global usando resolver.Register durante a inicialização, ou como options, usar grpc.WithResolver passado internamente para ClientConn, a prioridade deste último é maior que a do primeiro.

A figura acima descreve simplesmente o fluxo de trabalho do resolver, em seguida, será demonstrado como personalizar o resolver

Análise de Serviço Personalizada

Abaixo está a escrita de um resolvedor personalizado, é necessário um construtor de resolvedor personalizado para construção.

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Aqui é necessário updatestate, caso contrário ocorrerá deadlock
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Configuração, loadBalancingPolicy refere-se à estratégia de balanceamento de carga
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

O resolvedor personalizado passa o endereço correspondente do map para o updatestate, e também especifica a estratégia de balanceamento de carga, round_robin refere-se a round-robin.

go
// A estrutura da config de serviço é a seguinte
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

O código do cliente é o seguinte

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Registrar builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Estabelecer conexão, sem verificação de criptografia
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Criar cliente
  client := hello2.NewSayHelloClient(conn)
     // Chamar uma vez por segundo
  for range time.Tick(time.Second) {
    // Chamada remota
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Normalmente, o fluxo deve ser o servidor registrando seu serviço no centro de registro, e então o cliente obtém a lista de serviços do centro de registro e faz a correspondência, aqui o map passado é um centro de registro simulado, os dados são estáticos, então a etapa de registro de serviço é omitida, restando apenas a descoberta de serviços. O target usado pelo cliente é hello:myworld, hello é o scheme personalizado, myworld é o nome do serviço, após a análise do resolvedor personalizado, obtém-se o endereço real 127.0.0.1:8080, na situação real, para fazer balanceamento de carga, um nome de serviço pode corresponder a múltiplos endereços reais, então é por isso que o nome do serviço corresponde a um slice, aqui são abertos dois servidores, ocupando portas diferentes, a estratégia de balanceamento de carga é round-robin, as saídas do servidor são respectivamente, pode-se ver a partir do tempo de solicitação que a estratégia de balanceamento de carga está realmente funcionando, se a estratégia não for especificada, por padrão apenas o primeiro serviço é selecionado.

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

O centro de registro na verdade armazena o conjunto de mapeamento entre nomes de registro de serviço e endereços de serviço reais, qualquer middleware de armazenamento de dados pode atender às condições, até mesmo usar mysql como centro de registro não é impossível (não deve haver alguém que faça isso). Geralmente o centro de registro é armazenamento KV, redis é uma escolha muito boa, mas se usar redis como centro de registro, precisamos implementar muitas lógicas por conta própria, como verificação de heartbeat do serviço, serviço offline, etc, agendamento de serviços e assim por diante, ainda é bastante problemático, embora redis tenha certa aplicação neste aspecto, mas é menos comum. Como diz o ditado, deixe as coisas profissionais para pessoas profissionais, há muitos que fazem bem neste aspecto: Zookeeper, Consul, Eureka, ETCD, Nacos, etc.

Pode-se consultar 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) para entender algumas diferenças entre estes middlewares.

Combinar com Consul

O caso de uso combinado com consul pode ser consultado em consul

Balanceamento de Carga

Golang por www.golangdev.cn edit