Skip to content

gRPC

遠隔手順呼び出し rpc は、マイクロサービスにおいて学習すべき重要なポイントです。学習過程では様々な rpc フレームワークに出会いますが、Go という領域では、ほぼすべての rpc フレームワークが gRPC に基づいています。さらに、クラウドネイティブ領域における基本的なプロトコルにもなっています。なぜそれを選ぶのか、公式の回答は以下の通りです。

gRPC は、あらゆる環境で実行可能な現代的なオープンソースの高性能な遠隔手順呼び出し(Remote Process Call、RPC)フレームワークです。プラグイン可能なロードバランシング、トラッキング、ヘルスチェック、認証サポートにより、データセンター内およびデータセンター間のサービスを効果的に接続します。また、デバイス、モバイルアプリケーション、ブラウザをバックエンドサービスに接続するラストマイルの分散コンピューティングにも適しています。

公式ウェブサイト:gRPC

公式ドキュメント:Documentation | gRPC

gRPC 技術チュートリアル:Basics tutorial | Go | gRPC

ProtocBuf 公式サイト:Reference Guides | Protocol Buffers Documentation (protobuf.dev)

gRPC は CNCF 財団のオープンソースプロジェクトでもあります。CNCF は CLOUD NATIVE COMPUTING FOUNDATION の略で、日本語ではクラウドネイティブコンピューティング財団と訳されます。

特徴

シンプルなサービス定義

Protocol Buffers を使用してサービスを定義します。これは強力なバイナリシリアライゼーションツールセットと言語です。

起動とスケールアップが非常に迅速

1 行のコードだけでランタイムと開発環境をインストールでき、わずか数秒で毎秒数百万の RPC にスケールできます。

クロス言語、クロスプラットフォーム

異なるプラットフォームと言語に応じて、クライアントとサーバーのサービススタブを自動的に生成します。

双方向ストリーミングと統合認証

HTTP/2 に基づく双方向ストリーミングとプラグイン可能な認証サポート。

gRPC は言語に依存しませんが、当サイトのコンテンツのほとんどは Go に関連しているため、この記事でも Go を主な言語として使用して説明します。後続で使用する pb コンパイラーとジェネレーターが他の言語のユーザーである場合は、Protobuf 公式サイトで各自お調べください。便宜上、プロジェクトの作成プロセスは省略します。

依存関係のインストール

まず Protocol Buffer コンパイラーをダウンロードします。ダウンロードアドレス:Releases · protocolbuffers/protobuf (github.com)

状況に応じてシステムとバージョンを選択してください。ダウンロード完了後、bin ディレクトリを環境変数に追加する必要があります。

次にコードジェネレーターをダウンロードします。コンパイラーは proto ファイルから対応言語のシリアライゼーションコードを生成し、ジェネレーターはビジネスコードを生成するために使用します。

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

空のプロジェクトを作成します。名前は grpc_learn とします。次に以下の依存関係をインポートします。

sh
$ go get google.golang.org/grpc

最後にバージョンを確認して、本当にインストールが成功したか確認します。

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

プロジェクト構造

以下では Hello World サンプルを使用してデモンストレーションします。以下のようなプロジェクト構造を作成します。

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

protobuf ファイルの定義

pb/hello.proto に以下の内容を記述します。これは非常にシンプルなサンプルです。protoc 構文がわからない場合は、関連ドキュメントを参照してください。

protobuf
syntax = "proto3";

// .は出力パスに直接生成することを意味し、hello はパッケージ名
option go_package = ".;hello";

// リクエスト
message HelloReq {
  string name = 1;
}

// 応答
message HelloRep {
  string msg = 1;
}

// サービス定義
service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

コード生成

記述完了後、protoc コンパイラーを使用してデータシリアライゼーション関連のコードを生成し、ジェネレーターを使用して rpc 関連のコードを生成します。

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

この時、hello フォルダーに hello.pb.gohello_grpc.pb.go ファイルが生成されていることがわかります。hello.pb.go を閲覧すると、定義した message が確認できます。

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // 定義したフィールド
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // 定義したフィールド
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

hello_grpc.pb.go では、定義したサービスが確認できます。

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// 後続でサービスインターフェースを実装する場合、この構造体を埋め込む必要があります。mustEmbedUnimplementedSayHelloServer メソッドを実装しなくて済みます
type UnimplementedSayHelloServer struct {
}

// デフォルトは nil を返します
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// インターフェース制約
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

サーバー側の記述

server/main.go に以下のコードを記述します。

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // ポートをリッスン
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // gprc サーバーを作成
  server := grpc.NewServer()
  // サービス登録
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // 実行
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

クライアント側の記述

client/main.go に以下のコードを記述します。

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // 接続を確立。暗号化検証なし
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // クライアントを作成
  client := pb.NewSayHelloClient(conn)
  // 遠隔呼び出し
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

実行

まずサーバー側を実行し、次にクライアント側を実行します。サーバー側の出力は以下の通りです。

2023/07/16 16:26:51 received grpc req: name:"client"

クライアント側の出力は以下の通りです。

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

この例では、クライアントが接続を確立した後、遠隔メソッドを呼び出す際にローカルメソッドを呼び出すのと同じように、直接 clientHello メソッドにアクセスして結果を取得できます。これが最もシンプルな GRPC の例です。多くのオープンソースフレームワークもこのプロセスをカプセル化しています。

bufbuild

上記の例では、直接使用コマンドでコードを生成しましたが、後からプラグインが増えるとコマンドが非常に煩雑になります。この場合、protobuf ファイルを管理するツールを使用できます。まさにそのようなオープンソース管理ツール bufbuild/buf があります。

オープンソースアドレス:bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

ドキュメントアドレス:Buf - Install the Buf CLI

特徴

  • BSR 管理
  • リンター
  • コード生成
  • フォーマット
  • 依存関係管理

このツールを使用すると、protobuf ファイルを非常に簡単に管理できます。

ドキュメントには非常に多くのインストール方法が提供されています。自分で選択できます。ローカルに Go 環境がインストールされている場合は、go install を使用してインストールできます。

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

インストール完了後、バージョンを確認します。

sh
$ buf --version
1.24.0

helloworld/pb フォルダーに移動し、以下のコマンドを実行して pb ファイルを管理するモジュールを作成します。

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

buf.yaml ファイルの内容はデフォルトで以下の通りです。

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

次に helloworld/ ディレクトリに移動し、buf.gen.yaml を作成して以下の内容を記述します。

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

次にコマンドを実行してコードを生成します。

sh
$ buf generate

完了後、生成されたファイルを確認できます。もちろん、buf にはこれだけの機能だけでなく、他の機能もドキュメントで学習できます。

ストリーミング RPC

grpc の呼び出し方法には 2 つのカテゴリがあります。一元 RPC(Unary RPC)とストリーミング RPC(Stream RPC)です。Hello World のサンプルは典型的な一元 RPC です。

一元 rpc(または普通 rpc の方が理解しやすいかもしれません。unary の翻訳方法がわかりません)は、通常の http と同じように使用できます。クライアントがリクエストし、サーバーがデータを返す、一问一答的方式です。一方、ストリーミング RPC のリクエストと応答はストリーミングになる可能性があります。下図の通りです。

ストリーミングリクエストを使用する場合、応答は 1 回のみ返されます。クライアントはストリームを通じてパラメータを複数回サーバーに送信できます。サーバーは一元 RPC のようにすべてのパラメータを受信するまで待ってから処理する必要はなく、具体的な処理ロジックはサーバーが決定できます。通常、クライアントのみがストリーミングリクエストを積極的に閉じることができます。ストリームが閉じられると、現在の RPC リクエストも終了します。

ストリーミング応答を使用する場合、パラメータは 1 回のみ送信されます。サーバーはストリームを通じてデータを複数回クライアントに送信できます。クライアントは一元 RPC のようにすべてのデータを受信するまで待ってから処理する必要はなく、具体的な処理ロジックはクライアント自身が決定できます。通常のリクエストでは、サーバーのみがストリーミング応答を積極的に閉じることができます。ストリームが閉じられると、現在の RPC リクエストも終了します。

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

応答のみがストリーミングである場合もあります(Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

またはリクエストと応答の両方がストリーミングである場合もあります(Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

単方向ストリーミング

以下では、単方向ストリーミングの操作をデモンストレーションする例を示します。まず以下のようなプロジェクト構造を作成します。

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

message.proto の内容は以下の通りです。

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

buf を通じてコードを生成します。

sh
$ buf generate

ここではメッセージサービスをデモンストレーションします。receiveMessage は指定されたユーザー名(文字列タイプ)を受け取り、メッセージストリームを返します。sendMessage はメッセージストリームを受け取り、正常に送信されたメッセージ数を返します(64 ビット整数タイプ)。次に server/message_service.go を作成し、デフォルトで生成されたサービスインターフェースを自分で実装します。

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

メッセージ受信と送信のパラメータには、ストリームラッパーインターフェースが含まれていることがわかります。

go
type MessageService_ReceiveMessageServer interface {
    // メッセージ送信
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // 返送値を送信して接続を閉じる
  SendAndClose(*wrapperspb.StringValue) error
    // メッセージ受信
  Recv() (*Message, error)
  grpc.ServerStream
}

両方とも gprc.ServerStream インターフェースを埋め込んでいます。

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

ストリーミング RPC は一元 RPC のように、パラメータと戻り値のタイプを関数シグネチャに明確に反映させることはできません。これらのメソッドは一見するとパラメータと戻り値のタイプがわかりません。传入された Stream タイプを呼び出してストリーミング転送を完了する必要があります。次に、サーバー側の具体的なロジックを記述します。サーバー側のロジックを記述する際、sync.map を使用してメッセージキューをシミュレートします。クライアントが ReceiveMessage リクエストを送信すると、サーバーはストリーミング応答を通じてクライアントが望むメッセージを継続的に返します。タイムアウト後にリクエストが切断されるまで続きます。クライアントが SendMessage をリクエストすると、ストリーミングリクエストを通じてメッセージを継続的に送信し、サーバーはメッセージをキューに継続的に格納します。クライアントが積極的にリクエストを切断するまで続き、クライアントにメッセージ送信数を返します。

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// シミュレートされたメッセージキュー
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// 指定されたユーザーのメッセージを受信
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("5 秒間%sからのメッセージがありませんでした。接続を閉じます", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // メッセージを取得
      msg := queue[0]
      // ストリーミング転送を通じてメッセージをクライアントに送信
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// 指定されたユーザーにメッセージを送信
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // クライアントからメッセージを受信
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // メッセージをメッセージキューに格納
    messageQueue.Store(msg.From, queue)
    count++
  }
}

クライアントは 2 つのゴルーチンを開始します。1 つはメッセージ送信用、もう 1 つはメッセージ受信用です。もちろん、送信と受信を同時に行うこともできます。コードは以下の通りです。

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // メッセージ受信リクエスト
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("メッセージがありません。接続を閉じます")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "在吗",
      "下午有没有时间一起打游戏",
      "那行吧,以后有时间一起约",
      "就这个周末应该可以吧",
      "那就这么定了",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // メッセージ送信完了後、積極的にリクエストを閉じて戻り値を取得
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("送信完了。合計%d 件のメッセージを送信しました\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

実行後、サーバー側の出力は以下の通りです。

server  2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:33 5 秒間 jack からのメッセージがありませんでした。接続を閉じます

クライアント側の出力は以下の通りです。

client  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client  2023/07/18 16:28:28 送信完了。合計 5 件のメッセージを送信しました
client  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client  2023/07/18 16:28:33 メッセージがありません。接続を閉じます

この例を通じて、単方向ストリーミング RPC リクエストの処理は、クライアント側でもサーバー側でも一元 rpc よりも複雑であることがわかります。ただし、双方向ストリーミング RPC はそれらよりもさらに複雑です。

双方向ストリーミング

双方向ストリーミング PRC は、リクエストと応答の両方がストリーミングであることを意味し、上記の例の 2 つのサービスを 1 つに結合したようなものです。ストリーミング RPC 而言、最初のリクエストは必ずクライアントによって開始され、その後クライアントはいつでもストリームを通じてリクエストパラメータを送信できます。サーバーもいつでもストリームを通じてデータを返すことができます。どちら側がストリームを閉じても、現在のリクエストは終了します。

TIP

後続の内容は、必要でない限り、pb コード生成および rpc クライアントサーバー作成などのステップのコード記述を省略します。

まず以下のようなプロジェクト構造を作成します。

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

message.proto の内容は以下の通りです。

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

サーバー側のロジックでは、接続確立後に 2 つのゴルーチンを開始します。1 つはメッセージ受信用、もう 1 つはメッセージ送信用です。具体的な処理ロジックは前の例と似ていますが、今回はタイムアウト判定ロジックを削除しました。

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue シミュレートされたメッセージキュー
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// チャットサービス。サーバー側のロジックはマルチゴルーチンで処理します
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "end chat")

  var chatErr error
  chatCh := make(chan error)

  // 2 つのゴルーチンを作成。1 つはメッセージ受信、1 つはメッセージ送信
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // メッセージ受信ゴルーチン
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // メッセージ送信ゴルーチン
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

クライアント側のロジックでは、2 つのサブゴルーチンを開始して 2 人のチャットプロセスをシミュレートします。2 つのサブゴルーチンにはそれぞれ 2 つの孫ゴルーチンがあり、メッセージの送受信を担当します(クライアント側のロジックでは、2 人のチャットメッセージの送受信順序が正しいことは保証されていません。単なる双方の送受信の例です)。

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("end chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // メッセージ送信完了後、接続を閉じる
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // メッセージ受信ゴルーチン
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

通常、サーバー側の出力は以下の通りです。

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

通常、クライアント側の出力は以下の通りです(メッセージの順序ロジックが乱れていることがわかります)。

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"没有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有没有时间一起打游戏?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"没时间,你找别人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

この例を通じて、双方向ストリーミングの処理ロジックは、クライアント側でもサーバー側でも、単方向ストリーミングよりも複雑であり、マルチゴルーチンを組み合わせて非同期タスクを開始することで、より効果的にロジックを処理できることがわかります。

metadata

metadata は本質的に map で、その value は文字列スライスです。http1 の header と似ており、gRPC における役割も http header と似ています。今回の RPC 呼び出しに関する情報を提供します。同時に、metadata のライフサイクルは 1 回の rpc 呼び出しのプロセス全体に追随します。呼び出しが終了すると、そのライフサイクルも終了します。

gRPC では主に context を通じて転送および保存されます。ただし、gRPC は metadata パッケージを提供しており、操作を簡素化するための多くの便利な関数があり、手動で context を操作する必要はありません。gRPC における metadata の対応タイプは metadata.MD で、以下の通りです。

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

metadata.New 関数を使用して直接作成できますが、作成前にいくつか注意すべき点があります。

go
func New(m map[string]string) MD

metadata はキー名に制限があり、以下のルールで制限された文字のみ使用できます。

  • ASCII 文字
  • 数字:0-9
  • 小文字:a-z
  • 大文字:A-Z
  • 特殊文字:-_

TIP

metadata では、大文字のアルファベットはすべて小文字に変換されます。つまり、同じキーを占有し、値も上書きされます。

TIP

grpc- で始まるキーは grpc によって予約された内部キーです。このタイプのキーを使用すると、いくつかのエラーが発生する可能性があります。

手動作成

metadata の作成方法はいくつかありますが、ここでは手動で作成する最も一般的な 2 つの方法を紹介します。1 つ目は metadata.New 関数を使用する方法で、直接 map を传入します。

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

2 つ目は metadata.Pairs で、偶数長の文字列スライスを传入すると、自動的にキーバリューペアに解析されます。

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

metadata.join を使用して複数の metadata をマージすることもできます。

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)

サーバー側での使用

metadata の取得

サーバー側で metadata を取得するには、metadata.FromIncomingContext 関数を使用できます。

go
func FromIncomingContext(ctx context.Context) (MD, bool)

一元 rpc の場合、service のパラメータに context パラメータが含まれているため、そこから直接 metadata を取得できます。

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

ストリーミング rpc の場合、service のパラメータにストリームオブジェクトが含まれており、それを通じてストリームの context を取得できます。

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

metadata の送信

metadata の送信には grpc.SendHeader 関数を使用できます。

go
func SendHeader(ctx context.Context, md metadata.MD) error

この関数は最大 1 回のみ呼び出せます。ヘッダーが自動的に送信されるいくつかのイベントが発生した後に使用すると、効果はありません。一部の状況では、ヘッダーを直接送信したくない場合、grpc.SetHeader 関数を使用できます。

go
func SetHeader(ctx context.Context, md metadata.MD) error

この関数を複数回呼び出すと、毎回传入された metadata がマージされ、以下の状況でクライアントに送信されます。

  • gprc.SendHeaderServerStream.SendHeader が呼び出された時
  • 一元 rpc の handler が戻るとき
  • ストリーミング rpc 内のストリームオブジェクトの Stream.SendMsg が呼び出された時
  • rpc リクエストの状態が send out になった時。この状況は、rpc リクエストが成功したか、エラーが発生したかのどちらかです。

ストリーミング rpc の場合、ストリームオブジェクトの SendHeader メソッドと SetHeader メソッドを使用することをお勧めします。

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

使用中に Header と Trailer の 2 つの機能がほぼ同じであることがわかりますが、主な違いは送信のタイミングです。一元 rpc では実感できないかもしれませんが、この違いはストリーミング RPC で特に顕著です。ストリーミング RPC では、Header はリクエストが終了するのを待たずに送信できるためです。前述の通り、Header は特定の状況で送信されますが、Trailer は RPC リクエストが完全に終了した後にのみ送信されます。それ之前に取得される trailer は空です。

クライアント側での使用

metadata の取得

クライアント側で応答ヘッダーを取得するには、grpc.Headergrpc.Trailer を使用できます。

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

ただし、直接取得できないことに注意してください。上記の 2 つの関数の戻り値は CallOption であることがわかります。つまり、RPC リクエストを開始する際に option パラメータとして传入されます。

go
// 値を受信するための md を宣言
var header, trailer metadata.MD

// rpc リクエストを呼び出す際に option を传入
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

リクエスト完了後、値は传入された md に書き込まれます。ストリーミング rpc の場合、リクエスト開始時に返されたストリームオブジェクトを通じて直接取得できます。

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

metadata の送信

クライアント側で metadata を送信するのは簡単です。前述の通り、metadata の表现形式は valueContext です。metadata を context に結合し、リクエスト時に context を传入するだけです。metadata パッケージは 2 つの関数を提供して context の構築を容易にしています。

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// 一元 rpc
res,err := client.SomeRPC(outgoingContext,data)
// ストリーミング rpc
stream,err := client.StreamRPC(outgoingContext)

元の ctx にすでに metadata がある場合、NewOutgoingContext を使用すると以前のデータが直接上書きされます。この状況を避けるために、以下の関数を使用できます。これは上書きせず、データをマージします。

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// 一元 rpc
res,err := client.SomeRPC(appendContext,data)
// ストリーミング rpc
stream,err := client.StreamRPC(appendContext)

インターセプター

gRPC のインターセプターは、gin の Middleware と似ており、リクエスト前またはリクエスト後に特別な作業を行い、ビジネスロジック自体に影響を与えないようにするためのものです。gRPC では、インターセプターには 2 つのカテゴリがあります。サーバー側インターセプターとクライアント側インターセプターです。リクエストタイプに応じて、一元 RPC インターセプターとストリーミング RPC インターセプターに分かれます。下図の通りです。

インターセプターをよりよく理解するために、非常にシンプルなサンプルを使用して説明します。

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

person.proto の内容は以下の通りです。

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

サーバー側のコードは以下の通りです。ロジックはすべて以前の内容で、非常にシンプルなので詳しく説明しません。

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// データを格納
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

サーバー側インターセプター

サーバー側の rpc リクエストをインターセプトするには、UnaryServerInterceptorStreamServerInterceptor があります。具体的なタイプは以下の通りです。

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

一元 RPC

一元 RPC インターセプターを作成するには、UnaryServerInterceptor タイプを実装するだけです。以下はシンプルな一元 RPC インターセプターの例で、毎回 rpc のリクエストと応答を出力する機能です。

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc のリクエストデータ
// param info *grpc.UnaryServerInfo 今回の一元 RPC のリクエスト情報
// param unaryHandler grpc.UnaryHandler 具体的な handler
// return resp interface{} rpc の応答データ
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

一元 RPC の場合、インターセプターはすべての RPC リクエストと応答をインターセプトします。つまり、RPC のリクエスト段階と応答段階をインターセプトします。インターセプターが error を返すと、今回のリクエストは終了します。

ストリーミング rpc

ストリーミング RPC インターセプターを作成するには、StreamServerInterceptor タイプを実装するだけです。以下はシンプルなストリーミング RPC インターセプターの例です。

go
// StreamPersonLogInterceptor
// param srv interface{} サーバー側実装の server に対応
// param stream grpc.ServerStream ストリームオブジェクト
// param info *grpc.StreamServerInfo ストリーム情報
// param streamHandler grpc.StreamHandler プロセッサ
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

ストリーミング RPC の場合、インターセプターはすべてのストリームオブジェクトの SendRecv メソッドが呼び出されるタイミングをインターセプトします。インターセプターが error を返しても、今回の RPC リクエストが終了するわけではなく、今回の send または recv にエラーが発生したことを示すだけです。

インターセプターの使用

作成したインターセプターを有効にするには、gRPC サーバーを作成する際に option として传入する必要があります。公式も関連関数を提供しています。以下に示すように、単一のインターセプターを追加する関数と、チェーンインターセプターを追加する関数があります。

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

UnaryInterceptor を繰り返し使用すると、以下の panic が発生します。

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor も同様です。チェーンインターセプターを繰り返し呼び出すと、同じチェーンに追加されます。

使用例は以下の通りです。

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // チェーンインターセプターを追加
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

クライアント側インターセプター

クライアント側インターセプターはサーバー側とほぼ同じです。1 つの一元インターセプター UnaryClientInterceptor と 1 つのストリーミングインターセプター StreamClientInterceptor があります。具体的なタイプは以下の通りです。

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

一元 RPC

一元 RPC クライアントインターセプターを作成するには、UnaryClientInterceptor を実装するだけです。以下はシンプルな例です。

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string メソッド名
// param req interface{} リクエストデータ
// param reply interface{} 応答データ
// param cc *grpc.ClientConn クライアント接続オブジェクト
// param invoker grpc.UnaryInvoker インターセプトされる具体的なクライアントメソッド
// param opts ...grpc.CallOption 今回のリクエストの設定項目
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

クライアント側の一元 RPC インターセプターを通じて、ローカルリクエストのリクエストデータと応答データ、およびその他のリクエスト情報を取得できます。

ストリーミング RPC

ストリーミング RPC クライアントインターセプターを作成するには、StreamClientInterceptor を実装するだけです。以下は例です。

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc ストリームオブジェクトの説明情報
// param cc *grpc.ClientConn 接続オブジェクト
// param method string メソッド名
// param streamer grpc.Streamer ストリームオブジェクトを作成するオブジェクト
// param opts ...grpc.CallOption 接続設定項目
// return grpc.ClientStream 作成されたクライアントストリームオブジェクト
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

ストリーミング RPC クライアントインターセプターを通じて、クライアントとサーバーが接続を確立する時、つまりストリームを作成するタイミングのみをインターセプトでき、クライアントストリームオブジェクトがメッセージを送受信するたびにインターセプトすることはできません。ただし、インターセプターで作成されたストリームオブジェクトをラップすることで、メッセージの送受信をインターセプトできます。以下のようになります。

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc ストリームオブジェクトの説明情報
// param cc *grpc.ClientConn 接続オブジェクト
// param method string メソッド名
// param streamer grpc.Streamer ストリームオブジェクトを作成するオブジェクト
// param opts ...grpc.CallOption 接続設定項目
// return grpc.ClientStream 作成されたクライアントストリームオブジェクト
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // メッセージ送信前
  err := c.ClientStream.SendMsg(m)
  // メッセージ送信後
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // メッセージ受信前
  err := c.ClientStream.RecvMsg(m)
  // メッセージ受信後
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

インターセプターの使用

使用時は、サーバー側と同様に 4 つのツール関数を通じて option でインターセプターを追加します。単一インターセプターとチェーンインターセプターに分かれます。

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

クライアント側で WithUnaryInterceptor を繰り返し使用しても panic は発生しませんが、最後のもののみが有効になります。

以下は使用例です。

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

これで、ケース全体が完成しました。実行して結果を確認しましょう。サーバー側の出力は以下の通りです。

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

クライアント側の出力は以下の通りです。

client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

両側の出力が予想通りであり、インターセプト効果が得られていることがわかります。このケースは非常にシンプルなサンプルですが、gRPC のインターセプターを使用して、認証、ログ、監視など多くのことを行えます。自分で車輪を作ることも、オープンソースコミュニティの既存の車輪を使用することもできます。gRPC Ecosystem は一連のオープンソース gRPC インターセプターミドルウェアを収集しています。アドレス:grpc-ecosystem/go-grpc-middleware

エラー処理

開始前に、まず例を見てみましょう。前のインターセプターケースで、ユーザーが検索できない場合、クライアントにエラー person not found を返します。ここで問題です。クライアントは返されたエラーに基づいて特別な処理を行えるでしょうか。次に試してみましょう。クライアントコードで、errors.Is を使用してエラーを判断してみます。

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

結果の出力は以下の通りです。

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfo req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfo req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

クライアントが受信する error はこのようになっていることがわかります。サーバー側から返される error は desc フィールドにあることがわかります。

rpc error: code = Unknown desc = person not found

当然、errors.Is のロジックは実行されません。errors.As に変更しても同じ結果になります。

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

そのため、gRPC は status パッケージを提供してこのような問題を解決します。これが、クライアントが受信するエラーに code と desc フィールドがある理由です。gRPC が実際にクライアントに返すのは Status であり、具体的なタイプは以下の通りです。protobuf で定義された message であることもわかります。

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // ステータスコード。[google.rpc.Code][google.rpc.Code] の enum 値である必要があります。
  int32 code = 1;

  // 開発者向けのエラーメッセージ。英語である必要があります。ユーザー向けのエラーメッセージはローカライズされ、
  // [google.rpc.Status.details][google.rpc.Status.details] フィールドに送信されるか、クライアントによってローカライズされます。
  string message = 2;

  // エラーの詳細を伝えるメッセージのリスト。API が使用する一般的なメッセージタイプがあります。
  repeated google.protobuf.Any details = 3;
}

エラーコード

Status 構造体の Code は、Http Status に似た存在で、現在の rpc リクエストの状態を示すために使用されます。gRPC は 16 の code を grpc/codes で定義しており、ほとんどのシナリオをカバーしています。それぞれ以下の通りです。

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // 呼び出し成功
  OK Code = 0

  // リクエストがキャンセルされた
  Canceled Code = 1

  // 不明なエラー
  Unknown Code = 2

  // パラメータが不正
  InvalidArgument Code = 3

    // リクエストタイムアウト
  DeadlineExceeded Code = 4

  // リソースが存在しない
  NotFound Code = 5

    // 既に同じリソースが存在する(これが存在するとは思わなかった)
  AlreadyExists Code = 6

  // 権限不足でアクセス拒否
  PermissionDenied Code = 7

  // リソース枯渇。残りの容量が不足している。例えばディスク容量が不足しているなどの状況
  ResourceExhausted Code = 8

  // 実行条件不足。例えば rm で空でないディレクトリを削除しようとする。削除の条件はディレクトリが空であることだが、条件が満たされていない
  FailedPrecondition Code = 9

  // リクエストが中断された
  Aborted Code = 10

  // 操作アクセスが制限範囲を超えている
  OutOfRange Code = 11

  // 現在のサービスが実装されていないことを示す
  Unimplemented Code = 12

  // システム内部エラー
  Internal Code = 13

  // サービス利用不可
  Unavailable Code = 14

  // データ損失
  DataLoss Code = 15

  // 認証に失敗
  Unauthenticated Code = 16

  _maxCode = 17
)

grpc/status パッケージは、status と error の相互変換を容易にする多くの関数を提供しています。status.New を使用して直接 Status を作成できます。または Newf も使用できます。

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

例えば以下のコードです。

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

status の err メソッドを通じて、その中の error を取得できます。状態が ok の場合、error は nil になります。

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

直接 error を作成することもできます。

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

したがって、サーバー側のコードを以下のように変更できます。

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

これ之前、サーバー側から返されるすべての code は unknown でしたが、変更後はより明確なセマンティクスを持つようになりました。したがって、クライアント側では status.FromError を使用するか、以下の関数を使用して error から status を取得し、異なる code に応じて対応する処理を行えます。

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

例は以下の通りです。

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

ただし、grpc の code はできるだけ一般的なシナリオをカバーしていますが、時には開発者のニーズを満たせないこともあります。この場合、Status の Details フィールドを使用できます。これはスライスであり、複数の情報を格納できます。Status.WithDetails を通じてカスタム情報を传入できます。

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Status.Details を通じて情報を取得できます。

go
func (s *Status) Details() []interface{}

注意すべき点は、传入する情報は protobuf で定義されていることが望ましいです。これにより、サーバー側とクライアント側の両方で解析しやすくなります。公式はいくつかの例を提供しています。

protobuf
message ErrorInfo {
  // エラーの原因
  string reason = 1;

  // サービスを定義する主体
  string domain = 2;

  // その他の情報
  map<string, string> metadata = 3;
}

// 再試行情報
message RetryInfo {
  // 同じリクエストの待機間隔
  google.protobuf.Duration retry_delay = 1;
}

// デバッグ情報
message DebugInfo {
  // スタック
  repeated string stack_entries = 1;

  // 詳細情報
  string detail = 2;
}

    ...
    ...

さらに多くの例は googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) で確認できます。必要に応じて、以下のコードを通じてインポートできます。

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

ErrorInfo を details として使用します。

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

クライアント側でデータを取得して処理できます。ただし、上記は gRPC が推奨する例ですが、それ以外にも message を自分で定義して、より効果的にビジネスニーズを満たすこともできます。統一されたエラー処理を行いたい場合は、インターセプター内で操作することもできます。

タイムアウト制御

ほとんどの場合、通常は 1 つのサービスだけでなく、上流に多くのサービスがあり、下流にも多くのサービスがあります。クライアントが 1 回リクエストを開始すると、最上流のサービスから最下流まで、サービス呼び出しチェーンが形成されます。図の通りです。場合によっては、図よりもさらに長くなる可能性があります。

これほど長い呼び出しチェーンで、1 つのサービスのロジック処理に很长时间がかかると、上流が常に待機状態になります。不要なリソースの浪費を減らすために、タイムアウトメカニズムを導入する必要があります。これにより、最上流が呼び出し時に传入するタイムアウト時間が、呼び出しチェーン全体の実行に許容される最大時間になります。gRPC はプロセス間と言語を越えてタイムアウトを伝達できます。伝達が必要なデータを HTTP2 の HEADERS Frame フレームに配置します。下図の通りです。

gRPC リクエストのタイムアウトデータは、HEADERS Framegrpc-timeout フィールドに対応します。注意すべき点は、すべての gRPC ライブラリがこのタイムアウト伝達メカニズムを実装しているわけではないことです。ただし、gRPC-go は確かにサポートしています。他の言語のライブラリを使用し、この機能を使用する場合は、この点に特に注意する必要があります。

接続タイムアウト

gRPC クライアントがサーバーに接続を確立する際、デフォルトでは非同期で確立されます。接続の確立に失敗した場合、空の Client が返されます。接続を同期で行いたい場合は、grpc.WithBlock() を使用して、接続が正常に確立されるまでブロック待機できます。

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

タイムアウト時間を制御したい場合は、TimeoutContext を传入するだけです。grpc.Dial の代わりに grpc.DialContext を使用して context を传入します。

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

これにより、接続確立がタイムアウトした場合、error が返されます。

context deadline exceeded

サーバー側でも接続タイムアウトを設定できます。クライアントと新しい接続を確立する際にタイムアウト時間を設定します。デフォルトは 120 秒です。規定時間内に正常に接続が確立されない場合、サーバーは積極的に接続を切断します。

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout はまだ実験段階です。将来 API が変更または削除される可能性があります。

リクエストタイムアウト

gRPC クライアントがリクエストを開始する際、最初のパラメータは Context タイプです。同様に、RPC リクエストにタイムアウト時間を設定したい場合は、TimeoutContext を传入するだけです。

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // タイムアウトロジック処理
}

gRPC の処理を通じて、タイムアウト時間がサーバー側に伝達されます。伝送プロセス中、それはフレームフィールドの形式で存在し、Go 内では context の形式で存在し、リンク全体で伝達されます。リンク伝送プロセス中、タイムアウト時間を変更することはお勧めしません。リクエスト時にどのくらいのタイムアウト時間を設定するかは、最上流が考慮すべき問題です。

認証認可

マイクロサービス領域では、すべてのサービスがリクエストのユーザー身元と権限を検証する必要があります。モノリシックアプリケーションと同じように、各サービスが独自の認証ロジックを実装するのは現実的ではありません。そのため、統一された認証と認可サービスが必要です。一般的な解決策は OAuth2、分散 Session、および JWT です。この中で、OAuth2 が最も広く使用されており、業界標準にもなっています。OAuth2 で最も一般的に使用されるトークンタイプは JWT です。以下は OAuth2 認証コードモードのフロー図です。基本的なフローは図の通りです。

安全な転送

サービス登録と発見

クライアントがサーバーの指定されたサービスを呼び出す前に、サーバーの ip と port を知る必要があります。以前のケースでは、サーバーアドレスはすべて固定されていました。実際のネットワーク環境は常に安定しているわけではなく、一部のサービスは障害によりオフラインになりアクセスできなくなる可能性があります。また、ビジネスの発展に伴い、マシン移行によりアドレスが変更される可能性もあります。これらの状況では、静的アドレスを使用してサービスにアクセスできません。これらの動的な問題は、サービス発見と登録が解決する問題です。サービス発見はサービスアドレスの変更を監視して更新し、サービス登録は外部に自分のアドレスを通知します。gRPC では、基本的なサービス発見機能が提供されており、拡張とカスタマイズがサポートされています。

静的アドレスを使用できない場合は、特定の名称を使用して置き換えることができます。例えば、ブラウザは DNS 解析を通じてドメイン名からアドレスを取得します。同様に、gRPC のデフォルトのサービス発見は DNS を通じて行われます。ローカルの host ファイルを変更し、以下のマッピングを追加します。

127.0.0.1 example.grpc.com

次に、helloworld サンプルのクライアント Dial のアドレスを対応するドメイン名に変更します。

go
func main() {
  // 接続を確立。暗号化検証なし
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // クライアントを作成
  client := hello2.NewSayHelloClient(conn)
  // 遠隔呼び出し
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

同様に正常な出力を確認できます。

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

gRPC では、このタイプの名称は RFC 3986 で定義された URI 構文に従う必要があります。形式は以下の通りです。

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

上記の例の URI は以下の形式です。デフォルトで dns をサポートしているため、接頭辞の scheme は省略されています。

dns:example.grpc.com:8080

これに加えて、gRPC は Unix domain sockets もデフォルトでサポートしています。他の方式については、gRPC の拡張に応じてカスタム実装する必要があります。そのためには、カスタムリゾルバー resolver.Resolver を実装する必要があります。resolver はターゲットアドレスとサービス設定の更新を監視する責任があります。

go
type Resolver interface {
    // gRPC は ResolveNow を呼び出して、ターゲット名の再解析を試みます。これは単なるヒントであり、必要ない場合、リゾルバーは無視できます。このメソッドは並行して呼び出される可能性があります
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC は Resolver コンストラクター、つまり resolver.Builder を传入する必要があります。これは Resolver インスタンスを作成する責任があります。

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Builder の Scheme メソッドは、それが担当する Scheme タイプを返します。例えば、デフォルトの dnsBuilder は dns を返します。コンストラクターは初期化時に resolver.Register を使用してグローバル Builder に登録するか、options として grpc.WithResolver を使用して ClientConn 内部に传入する必要があります。後者の優先度は前者より高いです。

上記の図は resolver のワークフローを簡単に説明しています。次に、カスタム resolver をデモンストレーションします。

カスタムサービス解析

以下では、カスタムリゾルバーを記述します。カスタム解析コンストラクターが必要です。

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // ここで updatestate する必要があります。そうでないとデッドロックします
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // 設定。loadBalancingPolicy はロードバランシングポリシー
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

カスタムリゾルバーは、map 内の一致するアドレスを updatestate に传入し、同時にロードバランシングポリシーを指定します。round_robin はラウンドロビンを意味します。

go
// service config 構造は以下の通り
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

クライアント側のコードは以下の通りです。

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // builder を登録
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // 接続を確立。暗号化検証なし
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // クライアントを作成
  client := hello2.NewSayHelloClient(conn)
     // 毎秒 1 回呼び出し
  for range time.Tick(time.Second) {
    // 遠隔呼び出し
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

通常、フローはサーバー側がレジストリセンターに自身サービスを登録し、クライアントがレジストリセンターからサービスリストを取得してマッチングします。ここで传入される map はシミュレートされたレジストリセンターです。データは静的なため、サービス登録のステップは省略され、サービス発見のみが残ります。クライアントが使用する target は hello:myworld です。hello はカスタム scheme で、myworld はサービス名です。カスタムリゾルバーを通じて解析後、127.0.0.1:8080 の実際のアドレスが取得されます。実際の状況では、ロードバランシングを行うために、1 つのサービス名が複数の実際のアドレスにマッチする可能性があります。そのため、サービス名がスライスに対応している理由です。ここでは 2 つのサーバー側を起動し、異なるポートを占有します。ロードバランシングポリシーはラウンドロビンです。サーバー側の出力はそれぞれ以下の通りです。リクエスト時間を通じて、ロードバランシングポリシーが実際に機能していることがわかります。ポリシーを指定しない場合、デフォルトで最初のサービスのみが選択されます。

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

レジストリセンターは、サービス登録名と実際のサービスアドレスのマッピングコレクションを格納しています。データ格納が可能なミドルウェアであれば、条件を満たせます。mysql をレジストリセンターとして使用することも不可能ではありません(おそらく誰も这样做ないでしょう)。一般的に、レジストリセンターは KV 格納です。redis は非常に良い選択ですが、redis をレジストリセンターとして使用する場合、多くのロジックを自行で実装する必要があります。例えばサービスのハートビートチェック、サービスオフラインなど、サービススケジューリングなど、非常に面倒です。redis はこの方面で一定の応用がありますが、少ないです。専門的なことは専門的な人に任せるべきです。この方面で有名なものはたくさんあります:Zookeeper、Consul、Eureka、ETCD、Nacos など。

注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) を参照して、これらのミドルウェアの違いを理解できます。

consul との連携

consul を使用したケースについては、consul を参照してください。

ロードバランシング

Golang学习网由www.golangdev.cn整理维护