Skip to content

gRPC

Удалённый вызов процедур (RPC) — это важная тема в микросервисах. В процессе обучения вы столкнётесь с различными фреймворками RPC. Однако в экосистеме Go почти все фреймворки RPC основаны на gRPC, и он стал фундаментальным протоколом в области облачных нативных технологий. Почему выбирают его? Вот официальный ответ:

gRPC — это современный фреймворк удалённого вызова процедур (RPC) с открытым исходным кодом и высокой производительностью, который может работать где угодно. Он может эффективно соединять службы внутри и между центрами данных с подключаемой поддержкой балансировки нагрузки, трассировки, проверки работоспособности и аутентификации. Он также применим в распределённых вычислениях последнего километра для соединения устройств, мобильных приложений и браузеров с серверными службами.

Официальный сайт: gRPC

Официальная документация: Documentation | gRPC

Учебник по gRPC: Basics tutorial | Go | gRPC

Официальный сайт Protobuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

Это также проект с открытым исходным кодом под эгидой CNCF Foundation. CNCF расшифровывается как Cloud Native Computing Foundation.

Возможности

Простое определение службы

Определяйте службы с помощью Protocol Buffers, мощного инструмента бинарной сериализации и языка.

Быстрый запуск и масштабирование

Установите среду выполнения и среду разработки всего одной строкой кода и масштабируйтесь до миллионов RPC в секунду за секунды.

Кросс-языковой, кросс-платформенный

Автоматически генерируйте заглушки клиента и сервера для разных платформ и языков.

Двунаправленная потоковая передача и встроенная авторизация

Потоковая передача на основе HTTP/2 с подключаемой аутентификацией и авторизацией.

Хотя gRPC не зависит от языка, большая часть контента на этом сайте связана с Go, поэтому в этой статье также будет использоваться Go в качестве основного языка для объяснения. Для пользователей других языков компилятор pb и генератор, используемые далее, можно найти на официальном сайте Protobuf. Для удобства процесс создания проекта будет опущен в дальнейшем.

Установка зависимостей

Сначала загрузите компилятор Protocol Buffer с: Releases · protocolbuffers/protobuf (github.com)

Выберите систему и версию в соответствии с вашей ситуацией. После загрузки добавьте каталог bin в переменные среды.

Затем загрузите генератор кода. Компилятор генерирует код сериализации для соответствующего языка из файлов proto, в то время как генератор используется для генерации бизнес-кода.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Создайте пустой проект с именем grpc_learn, затем импортируйте следующую зависимость:

sh
$ go get google.golang.org/grpc

Наконец, проверьте версии для подтверждения успешной установки:

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Структура проекта

Ниже демонстрируется пример Hello World. Создайте следующую структуру проекта:

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Определение файла Protobuf

В pb/hello.proto запишите следующее содержимое. Это очень простой пример. Если вы не знакомы с синтаксисом protoc, обратитесь к соответствующей документации.

protobuf
syntax = "proto3";

// . означает генерировать непосредственно в выходном пути, hello — имя пакета
option go_package = ".;hello";

// Запрос
message HelloReq {
  string name = 1;


  // Ответ
  message HelloRep {
    string msg = 1;
  }

  // Определение службы
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Генерация кода

После написания используйте компилятор protoc для генерации кода, связанного с сериализацией данных, и используйте генератор для генерации кода RPC:

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

На этом этапе вы можете найти сгенерированные файлы hello.pb.go и hello_grpc.pb.go в папке hello. Просматривая hello.pb.go, вы можете найти наше определённое сообщение:

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Определённые поля
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Определённые поля
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

В hello_grpc.pb.go вы можете найти нашу определённую службу:

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Позже, если мы реализуем интерфейс службы самостоятельно, мы должны встроить эту структуру, поэтому нам не нужно реализовывать метод mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Возвращает nil по умолчанию
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Ограничение интерфейса
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Написание сервера

Напишите следующий код в server/main.go:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("получен grpc запрос: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("привет мир! %s", req.Name)}, nil
}

func main() {
  // Прослушивание порта
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Создание gRPC сервера
  server := grpc.NewServer()
  // Регистрация службы
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Запуск
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Написание клиента

Напишите следующий код в client/main.go:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Установление соединения, без проверки шифрования
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Создание клиента
  client := pb.NewSayHelloClient(conn)
  // Удалённый вызов
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("получен grpc ответ: %+v", helloRep.String())
}

Запуск

Сначала запустите сервер, затем запустите клиент. Вывод сервера:

2023/07/16 16:26:51 получен grpc запрос: name:"client"

Вывод клиента:

2023/07/16 16:26:51 получен grpc ответ: msg:"привет мир! client"

В этом примере после установления соединения клиентом вызов удалённого метода так же прост, как вызов локального метода — напрямую обращайтесь к методу Hello client и получайте результат. Это простейший пример gRPC, и многие фреймворки с открытым исходным кодом также инкапсулируют этот процесс.

bufbuild

В приведённом выше примере код генерируется непосредственно с помощью команд. Если позже будет много плагинов, команды станут довольно громоздкими. В этом случае вы можете использовать инструмент для управления файлами protobuf. Существует такой инструмент управления с открытым исходным кодом: bufbuild/buf.

Репозиторий с открытым исходным кодом: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Документация: Buf - Install the Buf CLI

Возможности

  • Управление BSR
  • Линтер
  • Генерация кода
  • Форматирование
  • Управление зависимостями

С этим инструментом вы можете удобно управлять файлами protobuf.

Документация предоставляет множество способов установки; вы можете выбрать свой собственный. Если у вас локально установлена среда Go, вы можете установить напрямую с помощью go install:

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

После установки проверьте версию:

sh
$ buf --version
1.24.0

Перейдите в папку helloworld/pb и выполните следующую команду для создания модуля для управления файлами pb:

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

Содержимое файла buf.yaml по умолчанию:

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Затем перейдите в каталог helloworld/ и создайте buf.gen.yaml со следующим содержимым:

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Затем выполните команду для генерации кода:

sh
$ buf generate

После завершения вы можете увидеть сгенерированные файлы. Конечно, у buf есть больше функций; вы можете изучить их из документации.

Потоковый RPC

Существует два основных типа вызовов gRPC: Unary RPC и Stream RPC. Пример Hello World является типичным Unary RPC.

Unary RPC (или обычный RPC — не уверен, как лучше перевести unary) работает так же, как обычный HTTP: клиент запрашивает, сервер возвращает данные, шаблон вопрос-ответ. В Stream RPC как запросы, так и ответы могут быть потоковыми, как показано ниже:

При использовании потоковых запросов возвращается только один ответ. Клиент может отправлять параметры серверу несколько раз через поток. Серверу не нужно ждать, пока все параметры будут получены перед обработкой, как в Unary RPC; конкретная логика обработки может быть определена сервером. Обычно только клиент может активно закрывать потоковый запрос. Как только поток закрыт, текущий RPC запрос заканчивается.

При использовании потоковых ответов отправляется только один параметр. Сервер может отправлять данные клиенту несколько раз через поток. Клиенту не нужно ждать, пока все данные будут получены перед обработкой, как в Unary RPC; конкретная логика обработки может быть определена клиентом. В обычных запросах только сервер может активно закрывать потоковый ответ. Как только поток закрыт, текущий RPC запрос заканчивается.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Он также может быть только с потоковым ответом (Server-Streaming RPC):

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Или с потоковыми запросом и ответом (Bi-directional-Streaming RPC):

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Однонаправленная потоковая передача

Ниже демонстрируются операции однонаправленной потоковой передачи. Сначала создайте следующую структуру проекта:

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

Содержимое message.proto:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Генерируйте код с помощью buf:

sh
$ buf generate

Это демонстрирует службу сообщений. receiveMessage получает указанное имя пользователя (строковый тип) и возвращает поток сообщений. sendMessage получает поток сообщений и возвращает количество успешно отправленных сообщений (64-битный целый тип). Далее создайте server/message_service.go для реализации службы, сгенерированной по умолчанию:

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Вы можете видеть, что оба параметра получения сообщения и отправки сообщения имеют обёртку интерфейса потока:

go
type MessageService_ReceiveMessageServer interface {
    // Отправка сообщения
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Отправка возвращаемого значения и закрытие соединения
  SendAndClose(*wrapperspb.StringValue) error
    // Получение сообщения
  Recv() (*Message, error)
  grpc.ServerStream
}

Оба встраивают интерфейс grpc.ServerStream:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Как вы можете видеть, потоковый RPC не имеет явных входных параметров и типов возвращаемых значений в сигнатуре функции, как Unary RPC. Эти методы не сразу раскрывают, какие типы имеют входные параметры и возвращаемые значения. Вам нужно использовать переданный тип Stream для завершения потоковой передачи. Далее начните писать конкретную логику сервера. При написании логики сервера используется sync.map для имитации очереди сообщений. Когда клиент отправляет запрос ReceiveMessage, сервер непрерывно возвращает сообщения, которые хочет клиент, через потоковый ответ до тех пор, пока не произойдёт разрыв по таймауту. Когда клиент запрашивает SendMessage, он непрерывно отправляет сообщения через потоковый запрос, и сервер непрерывно помещает сообщения в очередь до тех пор, пока клиент активно не отключится, возвращая количество отправленных сообщений клиенту.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Имитированная очередь сообщений
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Получение сообщений для указанного пользователя
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("Нет сообщений от %s в течение 5 секунд, закрытие соединения", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Получение сообщения
      msg := queue[0]
      // Отправка сообщения клиенту через потоковую передачу
      err := recvServer.Send(msg)
      log.Printf("получено %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Отправка сообщения указанному пользователю
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Получение сообщения от клиента
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("отправлено %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Помещение сообщения в очередь сообщений
    messageQueue.Store(msg.From, queue)
    count++
  }
}

Клиент открывает две goroutine: одна для отправки сообщений, а другая для получения сообщений. Конечно, вы также можете отправлять и получать одновременно. Код следующий:

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Запрос на получение сообщения
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("Нет сообщений, закрытие соединения")
        break
      } else if err != nil {
        break
      }
      log.Printf("получено %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "Ты здесь",
      "Есть ли у тебя время поиграть в игры сегодня днём",
      "Хорошо, давай поиграем вместе когда-нибудь",
      "В эти выходные должно подойти",
      "Тогда решено",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Сообщения отправлены, активно закрываем запрос и получаем возвращаемое значение
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("Отправка завершена, всего отправлено %d сообщений\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

После выполнения вывод сервера:

server  2023/07/18 16:28:24 отправлено from:"jack" content:"Ты здесь" to:"mike"
server  2023/07/18 16:28:24 получено from:"jack" content:"Ты здесь" to:"mike"
server  2023/07/18 16:28:25 отправлено from:"jack" content:"Есть ли у тебя время поиграть в игры сегодня днём" to:"mike"
server  2023/07/18 16:28:25 получено from:"jack" content:"Есть ли у тебя время поиграть в игры сегодня днём" to:"mike"
server  2023/07/18 16:28:26 отправлено from:"jack" content:"Хорошо, давай поиграем вместе когда-нибудь" to:"mike"
server  2023/07/18 16:28:26 получено from:"jack" content:"Хорошо, давай поиграем вместе когда-нибудь" to:"mike"
server  2023/07/18 16:28:27 отправлено from:"jack" content:"В эти выходные должно подойти" to:"mike"
server  2023/07/18 16:28:27 получено from:"jack" content:"В эти выходные должно подойти" to:"mike"
server  2023/07/18 16:28:28 отправлено from:"jack" content:"Тогда решено" to:"mike"
server  2023/07/18 16:28:28 получено from:"jack" content:"Тогда решено" to:"mike"
server  2023/07/18 16:28:33 Нет сообщений от jack в течение 5 секунд, закрытие соединения

Вывод клиента:

client  2023/07/18 16:28:24 получено from:"jack" content:"Ты здесь" to:"mike"
client  2023/07/18 16:28:25 получено from:"jack" content:"Есть ли у тебя время поиграть в игры сегодня днём" to:"mike"
client  2023/07/18 16:28:26 получено from:"jack" content:"Хорошо, давай поиграем вместе когда-нибудь" to:"mike"
client  2023/07/18 16:28:27 получено from:"jack" content:"В эти выходные должно подойти" to:"mike"
client  2023/07/18 16:28:28 Отправка завершена, всего отправлено 5 сообщений
client  2023/07/18 16:28:28 получено from:"jack" content:"Тогда решено" to:"mike"
client  2023/07/18 16:28:33 Нет сообщений, закрытие соединения

Через этот пример вы можете обнаружить, что обработка однонаправленных потоковых RPC запросов более сложна как для клиента, так и для сервера по сравнению с Unary RPC. Однако двунаправленный потоковый RPC ещё более сложен.

Двунаправленная потоковая передача

Двунаправленный потоковый RPC означает, что и запрос, и ответ являются потоковыми, по существу объединяя две службы из предыдущего примера в одну. Для потокового RPC первый запрос всегда инициируется клиентом. Затем клиент может отправлять параметры запроса через поток в любое время, и сервер может возвращать данные через поток в любое время. Независимо от того, какая сторона активно закрывает поток, текущий запрос завершится.

TIP

Последующий контент будет напрямую опускать описания кода для генерации кода pb и создания шагов клиента/сервера RPC, если это не необходимо.

Сначала создайте следующую структуру проекта:

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

Содержимое message.proto:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

В логике сервера после установления соединения откройте две goroutine: одна для получения сообщений, а другая для отправки сообщений. Конкретная логика обработки аналогична предыдущему примеру, но на этот раз логика проверки таймаута удалена.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue имитированная очередь сообщений
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Служба чата, мы используем несколько goroutine для логики сервера
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "завершение чата")

  var chatErr error
  chatCh := make(chan error)

  // Создание двух goroutine, одна для получения сообщений, одна для отправки
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Goroutine для получения сообщений
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("получено %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Goroutine для отправки сообщений
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "закрытие отправки")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("отправлено %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

В логике клиента открываются две дочерние goroutine для имитации процесса чата между двумя людьми. Каждая дочерняя goroutine имеет две внучатые goroutine, ответственные за отправку и получение сообщений (логика клиента не гарантирует правильный порядок отправки/получения сообщений между двумя людьми; это просто простой пример двунаправленной отправки и получения):

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "Hello", "Есть ли у тебя время поиграть в игры?", "Хорошо")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "Hello", "Нет", "Нет времени, найди кого-нибудь другого")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("завершение чата", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Сообщения отправлены, закрытие соединения
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Goroutine для получения сообщений
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("получено %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

При нормальных обстоятельствах вывод сервера:

server 2023/07/19 17:18:44 сервер прослушивает [::]:9090
server 2023/07/19 17:18:49 получено from:"mike" content:"Hello" to:"jack" err <nil>
server 2023/07/19 17:18:49 получено from:"jack" content:"Hello" to:"mike" err <nil>
server 2023/07/19 17:18:49 отправлено from:"jack" content:"Hello" to:"mike"
server 2023/07/19 17:18:49 отправлено from:"mike" content:"Hello" to:"jack"
server 2023/07/19 17:18:50 получено from:"jack" content:"Есть ли у тебя время поиграть в игры?" to:"mike" err <nil>
server 2023/07/19 17:18:50 получено from:"mike" content:"Нет" to:"jack" err <nil>
server 2023/07/19 17:18:50 отправлено from:"mike" content:"Нет" to:"jack"
server 2023/07/19 17:18:50 отправлено from:"jack" content:"Есть ли у тебя время поиграть в игры?" to:"mike"
server 2023/07/19 17:18:51 получено from:"jack" content:"Хорошо" to:"mike" err <nil>
server 2023/07/19 17:18:51 получено from:"mike" content:"Нет времени, найди кого-нибудь другого" to:"jack" err <nil>
server 2023/07/19 17:18:51 отправлено from:"jack" content:"Хорошо" to:"mike"
server 2023/07/19 17:18:51 отправлено from:"mike" content:"Нет времени, найди кого-нибудь другого" to:"jack"
server 2023/07/19 17:18:56 получено <nil> err EOF
server 2023/07/19 17:18:56 получено <nil> err EOF
server 2023/07/19 17:18:56 jack закрывает отправку
server 2023/07/19 17:18:56 jack завершает чат
server 2023/07/19 17:18:56 mike закрывает отправку
server 2023/07/19 17:18:56 mike завершает чат

При нормальных обстоятельствах вывод клиента (вы можете видеть, что логика порядка сообщений хаотична):

client 2023/07/19 17:26:24 получено from:"jack"  content:"Hello"  to:"mike"
client 2023/07/19 17:26:24 получено from:"mike"  content:"Hello"  to:"jack"
client 2023/07/19 17:26:25 получено from:"mike"  content:"Нет"  to:"jack"
client 2023/07/19 17:26:25 получено from:"jack"  content:"Есть ли у тебя время поиграть в игры?"  to:"mike"
client 2023/07/19 17:26:26 получено from:"jack"  content:"Хорошо"  to:"mike"
client 2023/07/19 17:26:26 получено from:"mike"  content:"Нет времени, найди кого-нибудь другого"  to:"jack"
client 2023/07/19 17:26:32 получено <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 завершение чата jack
client 2023/07/19 17:26:32 получено <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 завершение чата mike

Через пример вы можете видеть, что логика обработки двунаправленной потоковой передачи более сложна как для клиента, так и для сервера по сравнению с однонаправленной потоковой передачей, требуя многозадачности асинхронных задач goroutine для лучшей обработки.

Метаданные

Метаданные по сути являются картой, где значение — это срез строк, подобный заголовкам HTTP/1, и он играет аналогичную роль в gRPC, как заголовки HTTP, предоставляя информацию о вызове RPC. Жизненный цикл метаданных следует за всем процессом вызова RPC; когда вызов заканчивается, его жизненный цикл также заканчивается.

В gRPC он передаётся и хранится через context. Однако gRPC предоставляет пакет metadata с множеством удобных функций для упрощения операций, поэтому нам не нужно вручную оперировать context. Тип, соответствующий метаданным в gRPC, — это metadata.MD, как показано ниже:

go
// MD — это отображение от ключей метаданных к значениям. Пользователи должны использовать следующие
// две удобные функции New и Pairs для генерации MD.
type MD map[string][]string

Мы можем напрямую использовать функцию metadata.New для создания, но перед созданием есть несколько моментов, которые следует отметить:

go
func New(m map[string]string) MD

Метаданные имеют ограничения на имена ключей, разрешая только символы, ограниченные следующими правилами:

  • Символы ASCII
  • Цифры: 0-9
  • Строчные буквы: a-z
  • Прописные буквы: A-Z
  • Специальные символы: - _

TIP

В метаданных прописные буквы будут преобразованы в строчные, что означает, что они будут занимать один и тот же ключ, и значения будут перезаписаны.

TIP

Ключи, начинающиеся с grpc-, являются внутренними ключами, зарезервированными gRPC. Использование таких ключей может вызвать ошибки.

Ручное создание

Существует множество способов создания метаданных. Вот два наиболее распространённых метода для ручного создания метаданных. Первый — использование функции metadata.New, напрямую передавая карту:

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

Второй — metadata.Pairs, передача строкового среза чётной длины, который автоматически разбирается в пары ключ-значение:

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Вы также можете использовать metadata.Join для объединения нескольких метаданных:

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)

Использование на сервере

Получение метаданных

Сервер может использовать функцию metadata.FromIncomingContext для получения метаданных:

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Для Unary RPC параметр службы будет иметь параметр context, из которого вы можете напрямую получить метаданные:

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Для потокового RPC параметр службы будет иметь объект потока, из которого вы можете получить context потока:

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Отправка метаданных

Вы можете использовать функцию grpc.SendHeader для отправки метаданных:

go
func SendHeader(ctx context.Context, md metadata.MD) error

Эту функцию можно вызвать не более одного раза, и она не вступит в силу после некоторых событий, которые вызывают автоматическую отправку заголовков. В некоторых случаях, если вы не хотите отправлять заголовки напрямую, вы можете использовать функцию grpc.SetHeader:

go
func SetHeader(ctx context.Context, md metadata.MD) error

Если эта функция вызывается несколько раз, метаданные, переданные каждый раз, будут объединены и отправлены клиенту в следующих ситуациях:

  • Когда вызваны grpc.SendHeader и ServerStream.SendHeader
  • Когда обработчик Unary RPC возвращает
  • При вызове метода Stream.SendMsg объекта потока в потоковом RPC
  • Когда статус RPC запроса становится send out — RPC запрос успешен или произошла ошибка.

Для потокового RPC рекомендуется использовать методы SendHeader и SetHeader объекта потока:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Во время использования вы обнаружите, что функции Header и Trailer похожи. Однако их основное различие заключается во времени отправки. Вы можете не чувствовать этого в Unary RPC, но это различие особенно очевидно в потоковом RPC, потому что Header в потоковом RPC может быть отправлен до завершения запроса. Как упоминалось ранее, Header будет отправлен в определённых ситуациях, в то время как Trailer будет отправлен только после завершения всего RPC запроса. До этого полученный trailer пуст.

Использование на клиенте

Получение метаданных

Если клиент хочет получить заголовок ответа, это можно сделать через grpc.Header и grpc.Trailer:

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Однако обратите внимание, что вы не можете получить их напрямую. Как вы можете видеть, возвращаемые значения двух вышеуказанных функций — CallOption, что означает, что они передаются как параметры опции при инициировании RPC запроса:

go
// Объявление md для получения значений
var header, trailer metadata.MD

// Передача опции при вызове RPC запроса
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

После завершения запроса значения будут записаны в переданный md. Для потокового RPC вы можете получить их напрямую через объект потока, возвращённый при инициировании запроса:

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

Отправка метаданных

Для клиента отправить метаданные просто. Как упоминалось ранее, метаданные проявляются как valueContext. Объедините метаданные в context, затем передайте context при запросе. Пакет metadata предоставляет две функции для удобного построения context:

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// Unary RPC
res,err := client.SomeRPC(outgoingContext,data)
// Потоковый RPC
stream,err := client.StreamRPC(outgoingContext)

Если в исходном ctx уже есть метаданные, использование NewOutgoingContext напрямую перезапишет предыдущие данные. Чтобы избежать этого, вы можете использовать следующую функцию, которая не перезапишет, а объединит данные:

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// Unary RPC
res,err := client.SomeRPC(appendContext,data)
// Потоковый RPC
stream,err := client.StreamRPC(appendContext)

Перехватчики

Перехватчики gRPC похожи на Middleware в gin, оба предназначены для выполнения специальной работы до или после запросов без влияния на саму бизнес-логику. В gRPC существует два основных типа перехватчиков: перехватчики сервера и перехватчики клиента. Согласно типу запроса, существуют перехватчики Unary RPC и перехватчики Streaming RPC, как показано ниже:

Чтобы лучше понять перехватчики, далее будет описано на очень простом примере:

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

Содержимое person.proto:

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Код сервера, логика всё из предыдущего контента, относительно проста и не будет подробно рассматриваться:

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Хранение данных
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Перехватчики сервера

Перехватчики для RPC запросов сервера — это UnaryServerInterceptor и StreamServerInterceptor, с конкретными типами, как показано ниже:

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

Unary RPC

Для создания перехватчика Unary RPC вам нужно только реализовать тип UnaryServerInterceptor. Ниже приведён простой пример перехватчика Unary RPC, который выводит каждый RPC запрос и ответ:

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} RPC данные запроса
// param info *grpc.UnaryServerInfo Информация запроса для этого Unary RPC
// param unaryHandler grpc.UnaryHandler Конкретный обработчик
// return resp interface{} RPC данные ответа
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("до unary rpc перехватчика путь: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("после unary rpc перехватчика путь: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Для Unary RPC перехватчики перехватывают каждый RPC запрос и ответ, т.е. перехватывают фазу RPC запроса и фазу ответа. Если перехватчик возвращает ошибку, запрос завершится.

Streaming RPC

Для создания перехватчика Streaming RPC вам нужно только реализовать тип StreamServerInterceptor. Ниже приведён простой пример перехватчика Streaming RPC:

go
// StreamPersonLogInterceptor
// param srv interface{} Соответствует серверу, реализованному сервером
// param stream grpc.ServerStream Объект потока
// param info *grpc.StreamServerInfo Информация потока
// param streamHandler grpc.StreamHandler Обработчик
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("до stream rpc перехватчика путь: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("после stream rpc перехватчика путь: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Для Streaming RPC перехватчики перехватывают, когда вызываются методы Send и Recv каждого объекта потока. Если перехватчик возвращает ошибку, это не приведёт к завершению RPC запроса; это только указывает на то, что этот send или recv столкнулся с ошибкой.

Использование перехватчиков

Чтобы созданные перехватчики вступили в силу, их нужно передать как опции при создании gRPC сервера. Официально также предоставлены соответствующие функции для использования. Как показано ниже, существуют функции для добавления одиночных перехватчиков и функции для добавления цепочечных перехватчиков:

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

Повторное использование UnaryInterceptor вызовет следующую панику:

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor аналогичен. Цепочечные перехватчики, если вызваны повторно, будут добавлены в ту же цепочку.

Пример использования:

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Добавление цепочечных перехватчиков
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Перехватчики клиента

Перехватчики клиента похожи на перехватчики сервера: один перехватчик Unary UnaryClientInterceptor и один перехватчик Streaming StreamClientInterceptor, с конкретными типами, как показано ниже:

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC

Для создания перехватчика клиента Unary RPC реализуйте UnaryClientInterceptor. Ниже приведён простой пример:

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string Имя метода
// param req interface{} Данные запроса
// param reply interface{} Данные ответа
// param cc *grpc.ClientConn Объект клиентского соединения
// param invoker grpc.UnaryInvoker Перехваченный конкретный метод клиента
// param opts ...grpc.CallOption Элементы конфигурации для этого запроса
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("до unary запроса путь: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("после unary запроса путь: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Через перехватчик Unary RPC клиента вы можете получить локальные данные запроса, данные ответа и другую информацию о запросе.

Streaming RPC

Для создания перехватчика клиента Streaming RPC реализуйте StreamClientInterceptor. Ниже приведён пример:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Описание объекта потока
// param cc *grpc.ClientConn Объект соединения
// param method string Имя метода
// param streamer grpc.Streamer Объект для создания объектов потока
// param opts ...grpc.CallOption Элементы конфигурации соединения
// return grpc.ClientStream Созданный объект клиентского потока
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("до создания потока  путь: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("после создания потока  путь: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Через перехватчик клиента Streaming RPC вы можете перехватывать только когда клиент устанавливает соединение с сервером, т.е. при создании потока. Вы не можете перехватывать каждый раз, когда объект клиентского потока отправляет или получает сообщения. Однако, если мы обернём объект потока, созданный в перехватчике, мы можем достичь перехвата отправки и получения сообщений, вот так:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Описание объекта потока
// param cc *grpc.ClientConn Объект соединения
// param method string Имя метода
// param streamer grpc.Streamer Объект для создания объектов потока
// param opts ...grpc.CallOption Элементы конфигурации соединения
// return grpc.ClientStream Созданный объект клиентского потока
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("до создания потока  путь: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("после создания потока  путь: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // До отправки сообщения
  err := c.ClientStream.SendMsg(m)
  // После отправки сообщения
  log.Println(fmt.Sprintf("%s отправлено %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // До получения сообщения
  err := c.ClientStream.RecvMsg(m)
  // После получения сообщения
  log.Println(fmt.Sprintf("%s получено %+v err: %+v", c.method, m, err))
  return err
}

Использование перехватчиков

При использовании, аналогично серверу, существуют четыре служебные функции для добавления перехватчиков через опции, разделённые на одиночные перехватчики и цепочечные перехватчики:

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

Повторное использование WithUnaryInterceptor на клиенте не вызовет паники, но вступит в силу только последний.

Ниже приведён случай использования:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

На этом весь случай написан. Пришло время запустить его и увидеть результаты. Вывод сервера:

server 2023/07/20 17:27:57 до stream rpc перехватчика путь: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 после stream rpc перехватчика путь: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 до unary rpc перехватчика путь: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 после unary rpc перехватчика путь: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 до unary rpc перехватчика путь: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 после unary rpc перехватчика путь: /person/getPersonInfo resp: <nil> err: person not found

Вывод клиента:

client 2023/07/20 17:27:57 до создания потока  путь: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 после создания потока  путь: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo отправлено name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo отправлено name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo получено value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 до unary запроса путь: /person/getPersonInfo req: value:"jack"
client 2023/07/20 17:27:57 после unary запроса путь: /person/getPersonInfo req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 до unary запроса путь: /person/getPersonInfo req: value:"jenny"
client 2023/07/20 17:27:57 после unary запроса путь: /person/getPersonInfo req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Вы можете видеть, что вывод с обеих сторон соответствует ожиданиям, достигая эффекта перехвата. Этот случай — просто простой пример. Используя перехватчики gRPC, вы можете делать много вещей, таких как авторизация, ведение журнала, мониторинг и другие функции. Вы можете выбрать создание собственных колёс или использование существующих колёс из сообщества с открытым исходным кодом. Экосистема gRPC собирает серию промежуточного ПО перехватчиков gRPC с открытым исходным кодом. Адрес: grpc-ecosystem/go-grpc-middleware.

Обработка ошибок

Прежде чем начать, давайте посмотрим на пример. В предыдущем случае перехватчика, если запрос пользователя не удался, клиенту возвращается ошибка person not found. Вопрос: может ли клиент сделать специальную обработку на основе возвращённой ошибки? Давайте попробуем. В коде клиента попробуйте использовать errors.Is для оценки ошибки:

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Вывод результата:

client 2023/07/21 16:46:10 до создания потока  путь: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 после создания потока  путь: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo отправлено name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo отправлено name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo получено value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 до unary запроса путь: /person/getPersonInfo req: value:"john"
client 2023/07/21 16:46:10 после unary запроса путь: /person/getPersonInfo req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

Вы можете видеть, что ошибка, полученная клиентом, выглядит так. Вы обнаружите, что ошибка, возвращённая сервером, находится в поле desc:

rpc error: code = Unknown desc = person not found

Естественно, логика errors.Is не была выполнена. Даже использование errors.As даст тот же результат:

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Для этого gRPC предоставляет пакет status для решения таких проблем. Это также причина, по которой ошибка, полученная клиентом, имеет поля code и desc — потому что gRPC фактически возвращает Status клиенту. Его конкретный тип следующий, что также является сообщением, определённым protobuf:

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // Код состояния, который должен быть значением enum
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // Сообщение об ошибке, обращённое к разработчику, которое должно быть на английском. Любое
  // сообщение об ошибке, обращённое к пользователю, должно быть локализовано и отправлено в
  // поле [google.rpc.Status.details][google.rpc.Status.details], или локализовано
  // клиентом.
  string message = 2;

  // Список сообщений, которые несут детали ошибки. Существует общий набор
  // типов сообщений для использования API.
  repeated google.protobuf.Any details = 3;
}

Коды ошибок

Code в структуре Status похож на HTTP Status, используется для указания состояния текущего RPC запроса. gRPC определяет 16 кодов, расположенных в grpc/codes, охватывающих большинство сценариев, как показано ниже:

go
// Code — это беззнаковый 32-битный код ошибки, как определено в спецификации gRPC.
type Code uint32

const (
  // Вызов успешен
  OK Code = 0

  // Запрос отменён
  Canceled Code = 1

  // Неизвестная ошибка
  Unknown Code = 2

  // Неверный аргумент
  InvalidArgument Code = 3

    // Таймаут запроса
  DeadlineExceeded Code = 4

  // Ресурс не найден
  NotFound Code = 5

    // Ресурс уже существует (не ожидал этого)
  AlreadyExists Code = 6

  // Доступ запрещён из-за недостаточных разрешений
  PermissionDenied Code = 7

  // Ресурс исчерпан, оставшейся ёмкости недостаточно, например, заканчивается место на диске
  ResourceExhausted Code = 8

  // Предусловие не выполнено, например, использование rm для удаления непустого каталога, где условие удаления — каталог пуст
  FailedPrecondition Code = 9

  // Запрос прерван
  Aborted Code = 10

  // Операция вне диапазона
  OutOfRange Code = 11

  // Служба не реализована
  Unimplemented Code = 12

  // Внутренняя системная ошибка
  Internal Code = 13

  // Служба недоступна
  Unavailable Code = 14

  // Потеря данных
  DataLoss Code = 15

  // Аутентификация не удалась
  Unauthenticated Code = 16

  _maxCode = 17
)

Пакет grpc/status предоставляет множество функций для преобразования между status и error. Мы можем напрямую использовать status.New для создания Status, или Newf:

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Например:

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Через метод err status вы можете получить ошибку. Когда status равен OK, ошибка равна nil:

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

Вы также можете напрямую создавать ошибки:

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Таким образом, мы можем изменить код сервера следующим образом:

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

До этого все коды, возвращённые сервером, были unknown. Теперь после модификации они имеют более ясную семантику. Таким образом, на клиенте вы можете использовать status.FromError или следующие функции для получения status из error, тем самым делая соответствующую обработку на основе разных кодов:

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Пример:

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Однако, хотя коды gRPC охватывают некоторые общие сценарии насколько это возможно, иногда они всё ещё не могут удовлетворить потребности разработчиков. В это время вы можете использовать поле Details в Status, которое также является срезом, который может содержать несколько частей информации. Передайте некоторую пользовательскую информацию через Status.WithDetails:

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Получите информацию через Status.Details:

go
func (s *Status) Details() []interface{}

Обратите внимание, что переданная информация предпочтительно должна быть определена protobuf, чтобы и сервер, и клиент могли удобно её разобрать. Официально предоставлено несколько примеров:

protobuf
message ErrorInfo {
  // Причина ошибки
  string reason = 1;

  // Определение субъекта службы
  string domain = 2;

  // Другая информация
  map<string, string> metadata = 3;
}

// Информация о повторной попытке
message RetryInfo {
  // Интервал ожидания для того же запроса
  google.protobuf.Duration retry_delay = 1;
}

// Информация об отладке
message DebugInfo {
  // Трассировка стека
  repeated string stack_entries = 1;

  // Некоторая детальная информация
  string detail = 2;
}

    ...
    ...

Больше примеров можно найти по адресу googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Если нужно, вы можете импортировать с помощью следующего кода:

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Используйте ErrorInfo как details:

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

На клиенте вы можете получить данные и сделать обработку. Однако вышеприведённое — это просто некоторые примеры, рекомендованные gRPC. Кроме того, вы также можете определить свои собственные сообщения, чтобы лучше соответствовать соответствующим бизнес-потребностям. Если вы хотите сделать некоторую унифицированную обработку ошибок, вы также можете поместить её в перехватчик.

Управление таймаутом

В большинстве случаев обычно существует не только одна служба, и может быть много вышестоящих служб и много нижестоящих служб. Когда клиент инициирует запрос, от самой верхней службы до самой нижней, образуется цепочка вызовов служб, как на диаграмме, возможно, даже длиннее, чем показано.

С такой длинной цепочкой вызовов, если логика обработки одной службы занимает много времени, это приведёт к тому, что вышестоящая будет в состоянии ожидания. Чтобы уменьшить ненужную потерю ресурсов, необходимо ввести механизм таймаута. Таким образом, таймаут, переданный самым верхним вызовом, — это максимальное допустимое время выполнения для всей цепочки вызовов. gRPC может передавать таймауты через процессы и языки. Он помещает некоторые данные, которые должны быть переданы через процессы, в HEADERS Frame HTTP/2, как показано ниже:

Данные таймаута в запросах gRPC соответствуют полю grpc-timeout в HEADERS Frame. Обратите внимание, что не все библиотеки gRPC реализуют этот механизм передачи таймаута, но gRPC-go определённо поддерживает его. Если используете библиотеки на других языках и используете эту функцию, вам нужно обратить особое внимание на этот момент.

Таймаут соединения

Когда gRPC клиент устанавливает соединение с сервером, по умолчанию используется асинхронное установление. Если соединение не удалось, оно только возвращает пустой Client. Если вы хотите, чтобы соединение было синхронным, вы можете использовать grpc.WithBlock() для блокировки во время ожидания установления соединения:

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Если вы хотите установить таймаут, вам нужно только передать TimeoutContext, используя grpc.DialContext вместо grpc.Dial для передачи context:

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Таким образом, если установление соединения превысит таймаут, будет возвращена ошибка:

context deadline exceeded

На стороне сервера вы также можете установить таймаут соединения, установив таймаут при установлении нового соединения с клиентом. По умолчанию 120 секунд. Если соединение не успешно установлено в течение указанного времени, сервер активно разорвёт соединение.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout всё ещё находится на экспериментальной стадии, и API может быть изменён или удалён в будущем.

Таймаут запроса

Когда gRPC клиент инициирует запрос, первый параметр имеет тип Context. Аналогично, если вы хотите добавить таймаут к RPC запросу, вам нужно только передать TimeoutContext:

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Обработка логики таймаута
}

Через обработку gRPC таймаут передаётся серверу. Во время передачи он существует в поле frame. В Go он существует в форме context, таким образом проходя через всю ссылку. Во время передачи ссылки не рекомендуется изменять таймаут. Как долго устанавливать таймаут во время запросов должно быть рассмотрением самого верхнего уровня.

Аутентификация и авторизация

В области микросервисов каждая служба должна проверять идентификатор пользователя и разрешения для запросов. Если каждая служба реализует свою собственную логику аутентификации, как в монолитных приложениях, это явно нереалистично. Поэтому необходима унифицированная служба аутентификации и авторизации. Распространённые решения включают OAuth2, распределённые сессии и JWT. Среди них OAuth2 является наиболее широко используемым и стал отраслевым стандартом. Наиболее распространённым типом токена для OAuth2 является JWT. Ниже приведена блок-схема режима кода авторизации OAuth2, с основным процессом, как показано.

Безопасная передача

Регистрация и обнаружение служб

Прежде чем клиент сможет вызвать конкретную службу на сервере, ему нужно знать IP и порт сервера. В предыдущих случаях адреса серверов были жёстко закодированы. В реальных сетевых средах это не всегда так стабильно. Некоторые службы могут уйти из-за сбоев и стать недоступными, или адреса могут измениться из-за развития бизнеса и миграции машин. В этих случаях статические адреса не могут использоваться для доступа к службам. Эти динамические вопросы — это то, что решают обнаружение и регистрация служб. Обнаружение служб отвечает за мониторинг изменений адреса службы и обновление, в то время как регистрация служб отвечает за сообщение внешнему миру своего адреса. В gRPC предоставлена базовая функциональность обнаружения служб, и она поддерживает расширение и настройку.

Вместо статических адресов вы можете использовать некоторые конкретные имена для их замены. Например, браузеры получают адреса через DNS разрешение доменных имён. Аналогично, обнаружение служб gRPC по умолчанию через DNS. Измените ваш локальный файл host и добавьте следующее отображение:

127.0.0.1 example.grpc.com

Затем измените адрес Dial клиента в примере helloworld на соответствующее доменное имя:

go
func main() {
  // Установление соединения, без проверки шифрования
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Создание клиента
  client := hello2.NewSayHelloClient(conn)
  // Удалённый вызов
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("получен grpc ответ: %+v", helloRep.String())
}

Вы всё ещё можете видеть нормальный вывод:

2023/08/26 15:52:52 получен grpc ответ: msg:"привет мир! client"

В gRPC такие имена должны соответствовать синтаксису URI, определённому в RFC 3986, с форматом:

                   иерархическая часть
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  информация о пользователе     host     port                  query         fragment

URI в приведённом выше примере имеет следующую форму. Поскольку DNS поддерживается по умолчанию, префикс scheme опущен:

dns:example.grpc.com:8080

Кроме того, gRPC также поддерживает Unix domain sockets по умолчанию. Для других методов нам нужно реализовать пользовательские расширения согласно gRPC. Для этого нам нужно реализовать пользовательский resolver resolver.Resolver. Resolver отвечает за мониторинг целевого адреса и обновлений конфигурации службы:

go
type Resolver interface {
    // gRPC вызовет ResolveNow для повторной попытки разрешения имени цели снова. Это просто подсказка, и resolver может игнорировать его, если не нужно. Метод может вызываться одновременно.
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC требует, чтобы мы передали построитель Resolver, который является resolver.Builder, отвечающий за создание экземпляров Resolver:

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Метод Scheme построителя возвращает тип Scheme, за который он отвечает за разбор. Например, dnsBuilder по умолчанию возвращает dns. Построитель должен быть зарегистрирован в глобальном Builder с помощью resolver.Register во время инициализации, или передан как опции с помощью grpc.WithResolver внутренне в ClientConn. Последний имеет более высокий приоритет, чем первый.

Приведённая выше диаграмма просто описывает рабочий процесс resolver. Далее давайте продемонстрируем, как настроить resolver.

Пользовательское разрешение служб

Далее пишется пользовательский resolver, который требует пользовательского построителя resolver для построения:

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("неподдерживаемая схема: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Должно обновить состояние здесь, иначе произойдёт взаимоблокировка
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Конфигурация, loadBalancingPolicy относится к стратегии балансировки нагрузки
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Пользовательский resolver передаёт сопоставленные адреса из карты в updateState и также указывает стратегию балансировки нагрузки. round_robin относится к циклическому перебору.

go
// структура конфигурации службы следующая
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

Код клиента следующий:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Регистрация построителя
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Установление соединения, без проверки шифрования
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Создание клиента
  client := hello2.NewSayHelloClient(conn)
     // Вызов один раз в секунду
  for range time.Tick(time.Second) {
    // Удалённый вызов
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("получен grpc ответ: %+v", helloRep.String())
  }

}

При нормальных обстоятельствах процесс должен быть: сервер регистрирует свою службу в реестре, затем клиент получает список служб из реестра и выполняет сопоставление. Карта, переданная здесь, — это имитированный реестр. Поскольку данные статичны, шаг регистрации службы опущен, остаётся только обнаружение служб. Цель, используемая клиентом, — hello:myworld, где hello — пользовательская схема, а myworld — имя службы. После разбора пользовательским resolver получается реальный адрес 127.0.0.1:8080. В реальных ситуациях, для достижения балансировки нагрузки, имя службы может сопоставляться с несколькими реальными адресами, поэтому имя службы соответствует срезу. Здесь запущены два сервера, занимающих разные порты. Стратегия балансировки нагрузки — циклический перебор. Выводы серверов следующие. Из времени запросов вы можете видеть, что стратегия балансировки нагрузки действительно работает. Если стратегия не указана, по умолчанию выбирается только первая служба:

// server01
2023/08/29 17:32:21 получен grpc запрос: name:"client"
2023/08/29 17:32:23 получен grpc запрос: name:"client"
2023/08/29 17:32:25 получен grpc запрос: name:"client"
2023/08/29 17:32:27 получен grpc запрос: name:"client"
2023/08/29 17:32:29 получен grpc запрос: name:"client"

// server02
2023/08/29 17:32:20 получен grpc запрос: name:"client"
2023/08/29 17:32:22 получен grpc запрос: name:"client"
2023/08/29 17:32:24 получен grpc запрос: name:"client"
2023/08/29 17:32:26 получен grpc запрос: name:"client"
2023/08/29 17:32:28 получен grpc запрос: name:"client"

Реестр по сути хранит коллекцию отображений имён регистрации служб и реальных адресов служб. Любое промежуточное ПО, способное к хранению данных, может удовлетворить требования. Даже использование MySQL в качестве реестра возможно (хотя, вероятно, никто не стал бы этого делать). Обычно реестры используют хранилище KV. Redis — хороший выбор, но если использовать Redis в качестве реестра, нам нужно реализовать много логики самостоятельно, таких как проверки работоспособности службы, обработка отключения службы, планирование службы и т.д., что довольно хлопотно. Хотя Redis имеет определённые приложения в этой области, они относительно немногочисленны. Как говорится, пусть профессионалы занимаются профессиональными задачами. В этой области есть много известных решений: ZooKeeper, Consul, Eureka, ETCD, Nacos и т.д.

Вы можете посетить 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn), чтобы узнать о различиях между этими промежуточными ПО.

Комбинирование с Consul

Для случаев комбинирования с Consul посетите consul

Балансировка нагрузки

Golang by www.golangdev.cn edit