Skip to content

gRPC

遠程過程調用 rpc 應該是微服務當中必須要學習的一個點了,在學習的過程中會遇到各式各樣的 rpc 框架,不過在 go 這個領域,幾乎所有的 rpc 框架都是基於 gRPC 的,並且它還成為了雲原生領域的一個基礎協議,為什麼選擇它,官方如下回答:

gRPC 是一個現代化的開源高性能遠程過程調用(Remote Process Call,RPC) 框架,可以在任何環境中運行。它可以通過可插拔的負載平衡、跟蹤、健康檢查和身份驗證支持,有效地連接數據中心內和數據中心之間的服務。它還適用於連接設備、移動應用程序和瀏覽器到後端服務的最後一英裡分布式計算。

官方網址:gRPC

官方文檔:Documentation | gRPC

gRPC 技術教程:Basics tutorial | Go | gRPC

ProtocBuf 官網:Reference Guides | Protocol Buffers Documentation (protobuf.dev)

它也是 CNCF 基金會下一個的開源項目,CNCF 全名 CLOUD NATIVE COMPUTING FOUNDATION,譯名雲原生計算基金會

特點

簡單的服務定義

使用 Protocol Buffers 定義服務,這是一個強大的二進制序列化工具集和語言。

啟動和擴容都十分迅捷

只需一行代碼即可安裝運行時和開發環境,僅需幾秒鐘既可以擴張到每秒數百萬個 RPC

跨語言,跨平台

根據不同平台不同語言自動生成客戶端和服務端的服務存根

雙向流和集成授權

基於 HTTP/2 的雙向流和可插拔的認證授權

雖然 GRPC 是語言無關的,但是本站的內容大部分都是 go 相關的,所以本文也會使用 go 作為主要語言進行講解,後續用到的 pb 編譯器和生成器如果是其他語言的使用者可以自行到 Protobuf 官網查找。為了方便起見,接下來會直接省略項目的創建過程。

依賴安裝

先下載 Protocol Buffer 編譯器,下載地址:Releases · protocolbuffers/protobuf (github.com)

根據自己的情況選擇系統和版本即可,下載完成後需要將 bin 目錄添加到環境變量中。

然後還要下載代碼生成器,編譯器是將 proto 文件生成對應語言的序列化代碼,生成器是用於生成業務代碼。

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

創建一個空的項目,名字這裡取 grpc_learn,然後引入如下依賴

sh
$ go get google.golang.org/grpc

最後看一下版本,是不是真的安裝成功了

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

項目結構

下面將以一個 Hello World 示例來進行演示,創建如下的項目結構。

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

定義 protobuf 文件

其中,在pb/hello.proto中,寫入如下內容,這是一個相當簡單的示例,如果不會 protoc 語法,請移步相關文檔。

protobuf
syntax = "proto3";

// .表示就直接生成在輸出路徑下,hello是包名
option go_package = ".;hello";

// 請求
message HelloReq {
  string name = 1;


  // 響應
  message HelloRep {
    string msg = 1;
  }

  // 定義服務
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

生成代碼

編寫完成後,使用 protoc 編譯器生成數據序列化相關的代碼,使用生成器生成 rpc 相關代碼

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

此時可以發現hello文件夾生成了hello.pb.gohello_grpc.pb.go文件,瀏覽hello.pb.go可以發現我們定義的 message

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // 定義的字段
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // 定義的字段
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

hello_grpc.pb.go中可以發現我們定義的服務

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// 後續如果我們自己實現服務接口,必須要嵌入該結構體,就不用實現mustEmbedUnimplementedSayHelloServer方法
type UnimplementedSayHelloServer struct {
}

// 默認返回nil
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// 接口約束
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

編寫服務端

server/main.go中編寫如下代碼

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // 監聽端口
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // 創建gprc服務器
  server := grpc.NewServer()
  // 注冊服務
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // 運行
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

編寫客戶端

client/main.go中寫入如下代碼

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // 建立連接,沒有加密驗證
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // 創建客戶端
  client := pb.NewSayHelloClient(conn)
  // 遠程調用
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

運行

先運行服務端,再運行客戶端,服務端輸出如下

2023/07/16 16:26:51 received grpc req: name:"client"

客戶端輸出如下

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

在本例中,客戶端建立好連接後,在調用遠程方法時就跟調用本地方法一樣,直接訪問clientHello 方法並獲取結果,這就是一個最簡單的 GRPC 例子,許多開源的框架也都是對這一個流程進行了封裝。

bufbuild

在上述例子中,是直接使用命令生成的代碼,如果後期插件多了命令會顯得相當繁瑣,這時可以通過工具來進行管理 protobuf 文件,正好就有這麼一個開源的管理工具bufbuild/buf

開源地址:bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

文檔地址:Buf - Install the Buf CLI

特點

  • BSR 管理
  • Linter
  • 代碼生成
  • 格式化
  • 依賴管理

有了這個工具可以相當方便的管理 protobuf 文件。

文檔中提供了相當多的安裝方式,可以自己選擇。如果本地安裝了 go 環境的話,直接使用go install安裝即可

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

安裝完畢後查看版本

sh
$ buf --version
1.24.0

來到helloworld/pb文件夾,執行如下命令創建一個 module 來管理 pb 文件。

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

buf.yaml文件內容默認如下

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

再來到helloworld/目錄下,創建buf.gen.yaml,寫入如下內容

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

再執行命令生成代碼

sh
$ buf generate

完成後就可以看到生成的文件了,當然 buf 不止這點功能,其他的功能可以自己去文檔學習。

流式 RPC

grpc 的調用方式有兩大類,一元 RPC(Unary RPC)和流式 RPC(Stream RPC)。Hello World 中的示例就是一個典型的一元 RPC。

一元 rpc(或者叫普通 rpc 更能理解些,實在不知道怎麼翻譯這個unary了)用起來就跟普通的 http 一樣,客戶端請求,服務端返回數據,一問一答的方式。而流式 RPC 的請求和響應都 可以是流式的,如下圖

使用流式請求時,只返回一次響應,客戶端可以通過流來多次發送參數給服務端,服務端可以不需要像一元 RPC 那樣等到所有參數都接收完畢再處理,具體處理邏輯可以由服務端決定。正常情況下,只有客戶端可以主動關閉流式請求,一旦流被關閉,當前 RPC 請求也就會結束。

使用流式響應時,只發送一次參數,服務端可以通過流多次發送數據給客戶端,客戶端不需要像一元 RPC 那樣接受完所有數據再處理,具體的處理邏輯可以由客戶端自己決定。正常請求下,只有服務端可以主動關閉流式響應,一旦流被關閉,當前 RPC 請求也就會結束。

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

也可以是只有響應是流式的(Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

或者請求和響應都是流式的(Bi-driectional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

單向流式

下面通過一個例子來演示單向流式的操作,首先創建如下的項目結構

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

message.proto內容如下

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

通過 buf 生成代碼

sh
$ buf generate

這裡演示是消息服務,receiveMessage接收一個指定的用戶名,類型為字符串,返回消息流,sendMessage 接收消息流,返回成功發送的消息數目,類型為 64 位整型。接下來創建server/message_service.go,自己實現默認的代碼生成的服務

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

可以看到接收消息和發送消息的參數裡面都有一個流包裝接口

go
type MessageService_ReceiveMessageServer interface {
    // 發送消息
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // 發送返回值並關閉連接
  SendAndClose(*wrapperspb.StringValue) error
    // 接收消息
  Recv() (*Message, error)
  grpc.ServerStream
}

它們都嵌入了gprc.ServerStream接口

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

可以看到,流式 RPC 並不像一元 RPC 那樣入參和返回值都可以很明確的體現在函數簽名上,這些方法乍一看是看不出來入參和返回值是什麼類型的,需要調用傳入的 Stream 類型完成流式傳輸,接下來開始編寫服務端的具體邏輯。在編寫服務端邏輯的時候,用了一個sync.map 來模擬消息隊列,當客戶端發送ReceiveMessage 請求時,服務端通過流式響應不斷返回客戶端想要的消息,直到超時過後斷開請求。當客戶端請求SendMessage 時,通過流式請求不斷發送消息過來,服務端不斷的將消息放入隊列中,直到客戶端主動斷開請求,並返回給客戶端消息發送條數。

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// 一個模擬的消息隊列
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// 接收指定用戶的消息
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("5秒鐘內沒有收到%s的消息,關閉連接", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // 拿到消息
      msg := queue[0]
      // 通過流式傳輸將消息發送給客戶端
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// 發送消息給指定用戶
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // 從客戶端接收消息
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // 將消息放入消息隊列中
    messageQueue.Store(msg.From, queue)
    count++
  }
}

客戶端開了兩個協程,一個協程用來發送消息,另一個協程用來接收消息,當然也可以一邊發送一邊接收,代碼如下。

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // 接收消息請求
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("暫無消息,關閉連接")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "在嗎",
      "下午有沒有時間一起打游戲",
      "那行吧,以後有時間一起約",
      "就這個周末應該可以吧",
      "那就這麼定了",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // 消息發送完了,主動關閉請求並獲取返回值
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("發送完畢,總共發送了%d條消息\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

執行過後服務端輸出如下

server  2023/07/18 16:28:24 send from:"jack" content:"在嗎" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在嗎" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有沒有時間一起打游戲" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有沒有時間一起打游戲" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以後有時間一起約" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以後有時間一起約" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就這個周末應該可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就這個周末應該可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就這麼定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就這麼定了" to:"mike"
server  2023/07/18 16:28:33 5秒鐘內沒有收到jack的消息,關閉連接

客戶端輸出如下

client  2023/07/18 16:28:24 receive from:"jack" content:"在嗎" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有沒有時間一起打游戲" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以後有時間一起約" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就這個周末應該可以吧" to:"mike"
client  2023/07/18 16:28:28 發送完畢,總共發送了5條消息
client  2023/07/18 16:28:28 receive from:"jack" content:"那就這麼定了" to:"mike"
client  2023/07/18 16:28:33 暫無消息,關閉連接

通過這個例子可以發現單向流式 RPC 請求處理起來的話不論是客戶端還是服務端都要比一元 rpc 復雜,不過雙向流式 RPC 比它們還要更復雜些。

雙向流式

雙向流式 PRC,即請求和響應都是流式的,就相當於把上例中的兩個服務結合成一個。對於流式 RPC 而言,第一個請求肯定是由客戶端發起的,隨後客戶端可以隨時通過流來發送請求參數,服務端也可以隨時通過流來返回數據,不管哪一方主動關閉流,當前請求都會結束。

TIP

後續的內容除非必要,都會直接省略掉 pb 代碼生成以及創建 rpc 客戶端服務端這些步驟的代碼描述

首先創建如下項目結構

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

message.proto內容如下

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

服務端邏輯中,建立連接後,開啟兩個協程,一個協程負責接收消息,一個負責發送消息,具體的處理邏輯與上個例子類似,不過這次去掉了超時的判定邏輯。

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue 模擬的消息隊列
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// 聊天服務,服務端邏輯我們用多協程來進行處理
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "end chat")

  var chatErr error
  chatCh := make(chan error)

  // 創建兩個協程,一個收消息,一個發消息
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // 接收消息的協程
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // 發送消息的協程
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

客戶端邏輯中,開啟了兩個子協程來模擬兩個人的聊天過程,兩個子協程中分別又各有兩個孫協程負責收發消息(客戶端邏輯中並沒有保證兩個人聊天的消息收發順序正確,只是一個簡單的雙方發送與接收的例子)

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "你好", "有沒有時間一起打游戲?", "好吧")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "你好", "沒有", "沒時間,你找別人吧")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("end chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // 消息發完了,就關閉連接
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // 接收消息的協程
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

正常情況下,服務端輸出

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有沒有時間一起打游戲?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"沒有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"沒有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有沒有時間一起打游戲?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"沒時間,你找別人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"沒時間,你找別人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

正常情況下,客戶端輸出(可以看到消息的順序邏輯是亂的)

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"沒有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有沒有時間一起打游戲?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"沒時間,你找別人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

通過示例可以看到的是,雙向流式的處理邏輯無論是客戶端還是服務端,都要比單向流式更復雜,需要結合多協程開啟異步任務才能更好的處理邏輯。

metadata

metadata 本質上是一個 map,它的 value 是一個字符串切片,就類似 http1 的 header 一樣,並且它在 gRPC 中扮演的角色也和 http header 類似,提供一些本次 RPC 調用的一些信息,同時 metadata 的生命周期跟隨著一次 rpc 調用的整個過程,調用結束,它的生命周期也就結束了。

它在 gRPC 中主要通過context來進行傳輸和存儲,不過 gRPC 提供了metadata 包,裡面有相當多的方便函數來簡化操作,不需要我們去手動操作context 。metadata 在 gRPC 中對應的類型為metadata.MD,如下所示。

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

我們可以直接使用metadata.New函數來創建,不過在創建之前,有幾個點需要注意

go
func New(m map[string]string) MD

metadata 對鍵名有所限制,僅能是以下規則限制的字符:

  • ASCII 字符
  • 數字:0-9
  • 小寫字母:a-z
  • 大寫字母:A-Z
  • 特殊字符:-_

TIP

在 metadata 中,大寫的字母都會被轉換為小寫,也就是說會佔用同一個 key,值也會被覆蓋。

TIP

grpc-開頭的 key 是 grpc 保留使用的內部 key,如果使用這類 key 的話可能會導致一些錯誤。

手動創建

創建 metadata 的方式有很多,這裡介紹手動創建 metadata 最常用的兩種方法,第一種就是使用metadata.New函數,直接傳入一個 map。

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

第二種是metadata.Pairs,傳入偶數長度的字符串切片,會自動的解析成鍵值對。

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

還可以使用metadata.join來合並多個 metadata

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

服務端使用

獲取 metadata

服務端獲取 metadata 可以使用metadata.FromIncomingContext函數來獲取

go
func FromIncomingContext(ctx context.Context) (MD, bool)

對於一元 rpc 而言,service 的參數裡面會帶一個context參數,直接從裡面獲取 metadata 即可

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

對於流式 rpc,service 的參數裡面會有一個流對象,通過它可以獲取流的context

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

發送 metadata

發送 metadata 可以使用grpc.sendHeader函數

go
func SendHeader(ctx context.Context, md metadata.MD) error

該函數最多調用一次,在一些導致 header 被自動發送的事件發生後使用則不會生效。在一些情況下,如果不想直接發送 header,這時可以使用grpc.SetHeader 函數。

go
func SetHeader(ctx context.Context, md metadata.MD) error

該函數多次調用的話,會將每次傳入的 metadata 合並,並在以下幾種情況發送給客戶端

  • gprc.SendHeaderServertream.SendHeader被調用時
  • 一元 rpc 的 handler 返回時
  • 調用流式 rpc 中流對象的Stream.SendMsg
  • rpc 請求的狀態變為send out,這種情況要麼是 rpc 請求成功了,要麼就是出錯了。

對於流式 rpc 而言,建議使用流對象的SendHeader方法和SetHeader方法。

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

在使用過程中會發現 Header 和 Trailer 兩個功能差不多,不過它們的主要區別在於發送的時機,一元 rpc 中可能體會不到,但是這一差別在流式 RPC 中尤為明顯,因為流式 RPC 中的 Header 可以不用等待請求結束就可以發送 Header。前面提到過了 Header 會在特定的情況下被發送,而 Trailer 僅僅只會在整個 RPC 請求結束後才會被發送,在此之前,獲取到的 trailer 都是空的。

客戶端使用

獲取 metadata

客戶端想要獲取響應的 header,可以通過grpc.Headergrpc.Trailer來實現

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

不過需要注意的是,並不能直接獲取,可以看到以上兩個函數返回值是CallOption,也就是說是在發起 RPC 請求時作為 option 參數傳入的。

go
// 聲明用於接收值的md
var header, trailer metadata.MD

// 調用rpc請求時傳入option
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

在請求完成後,會將值寫到傳入的 md 中。對於流式 rpc 而言,可以通過發起請求時返回的流對象直接獲取

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

發送 metadata

客戶端想要發送 metadata 很簡單,之前提到過 metadata 的表現形式就是 valueContext,將 metadata 結合到 context 中,然後在請求的時候把 context 傳入即可,metadata 包提供了兩個函數來方便構造 context。

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// 一元rpc
res,err := client.SomeRPC(outgoingContext,data)
// 流式rpc
stream,err := client.StreamRPC(outgoingContext)

如果原有的 ctx 已經有 metadata 了的話,再使用NewOutgoingContext會將先前的數據直接覆蓋掉,為了避免這種情況,可以使用下面這個函數,它不會覆蓋,而是會將數據合並。

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// 一元rpc
res,err := client.SomeRPC(appendContext,data)
// 流式rpc
stream,err := client.StreamRPC(appendContext)

攔截器

gRPC 的攔截器就類似於 gin 中的 Middleware 一樣,都是為了在請求前或者請求後做一些特殊的工作並且不影響到本身的業務邏輯。在 gRPC 中,攔截器有兩大類,服務端攔截器和客戶端攔截器,根據請求類型來分則有一元 RPC 攔截器,和流式 RPC 攔截器,下圖

為了能更好的理解攔截器,下面會根據一個非常簡單的示例來進行描述。

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

person.proto內容如下

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

服務端代碼如下,邏輯全是之前的內容,比較簡單不再贅述。

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// 存放數據
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

服務端攔截

攔截服務端 rpc 請求的有UnaryServerInterceptorStreamServerInterceptor,具體類型如下所示

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

一元 RPC

創建一元 RPC 攔截器,只需要實現UnaryserverInterceptor類型即可,下面是一個簡單的一元 RPC 攔截器的例子,功能是輸出每一次 rpc 的請求和響應。

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc的請求數據
// param info *grpc.UnaryServerInfo 本次一元RPC的一些請求信息
// param unaryHandler grpc.UnaryHandler 具體的handler
// return resp interface{} rpc的響應數據
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

對於一元 RPC 而言,攔截器攔截的是每一個 RPC 的請求和響應,即攔截的是 RPC 的請求階段和響應階段,如果攔截器返回 error,那麼本次請求就會結束。

流式 rpc

創建流式 RPC 攔截器,只需要實現StreamServerInterceptor類型即可,下面是一個簡單的流式 RPC 攔截器的例子。

go
// StreamPersonLogInterceptor
// param srv interface{} 對應服務端實現的server
// param stream grpc.ServerStream 流對象
// param info *grpc.StreamServerInfo 流信息
// param streamHandler grpc.StreamHandler 處理器
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

對於流式 RPC 而言,攔截器攔截的是每一個流對象的SendRecve 方法被調用的時機,如果攔截器返回 error,並不會導致本次 RPC 請求的結束,僅僅只是代表著本次send recv出現了錯誤。

使用攔截器

要想使創建的攔截器生效,需要在創建 gRPC 服務器的時候作為 option 傳入,官方也提供了相關的函數以供使用。如下所示,有添加單個攔截器的函數,也有添加鏈式攔截器的函數。

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

重復使用UnaryInterceptor會拋出如下 panic

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor也是同理,而鏈式攔截器重復調用則會 append 到同一個鏈上。

使用示例如下

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // 添加鏈式攔截器
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

客戶端攔截

客戶端攔截器跟服務端差不多,一個一元攔截器UnaryClientInterceptor,一個流式攔截器StreamClientInterceptor,具體類型如下所示。

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

一元 RPC

創建一元 RPC 客戶端攔截器,實現UnaryClientInterceptor即可,下面就是一個簡單的例子。

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string 方法名
// param req interface{} 請求數據
// param reply interface{} 響應數據
// param cc *grpc.ClientConn 客戶端連接對象
// param invoker grpc.UnaryInvoker 被攔截的具體客戶端方法
// param opts ...grpc.CallOption 本次請求的配置項
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

通過客戶端的一元 RPC 攔截器,可以獲取到本地請求的請求數據和響應數據以及一些其他的請求信息。

流式 RPC

創建一個流式 RPC 客戶端攔截器,實現StreamClientInterceptor即可,下面就是一個例子。

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流對象的描述信息
// param cc *grpc.ClientConn 連接對象
// param method string 方法名
// param streamer grpc.Streamer 用於創建流對象的對象
// param opts ...grpc.CallOption 連接配置項
// return grpc.ClientStream 創建好的客戶端流對象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

通過流式 RPC 客戶端攔截器,只能攔截到客戶端與服務端建立連接的時候也就是創建流的時機,並不能攔截到客戶端流對象每一次收發消息的時候,不過我們把攔截器中創建好的流對象包裝一下就可以實現攔截收發消息了,就像下面這樣

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流對象的描述信息
// param cc *grpc.ClientConn 連接對象
// param method string 方法名
// param streamer grpc.Streamer 用於創建流對象的對象
// param opts ...grpc.CallOption 連接配置項
// return grpc.ClientStream 創建好的客戶端流對象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // 消息發送前
  err := c.ClientStream.SendMsg(m)
  // 消息發送後
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // 消息接收前
  err := c.ClientStream.RecvMsg(m)
  // 消息接收後
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

使用攔截器

使用時,與服務端類似也是四個工具函數通過 option 來添加攔截器,分為單個攔截器和鏈式攔截器。

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

客戶端重復使用WithUnaryInterceptor不會拋出 panic,但是僅最後一個會生效。

下面是一個使用案例

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

到目前為止,整個案例已經編寫完畢,是時候來運行一下看看結果是什麼樣的。服務端輸出如下

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

客戶端輸出如下

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

可以看到兩邊的輸出都符合預期,起到了攔截的效果,這個案例只是一個很簡單的示例,利用 gRPC 的攔截器可以做很多事情比如授權,日志,監控等等其他功能,可以選擇自己造輪子,也可以選擇使用開源社區現成的輪子,gRPC Ecosystem 專門收集了一系列開源的 gRPC 攔截器中間件,地址:grpc-ecosystem/go-grpc-middleware

錯誤處理

在開始之前先來看一個例子,在上一個攔截器案例中,如果用戶查詢不到,會向客戶端返回錯誤person not found ,那麼問題來了,客戶端能不能根據返回的錯誤做特殊的處理呢?接下來試一試,在客戶端代碼中,嘗試使用errors.Is來判斷錯誤。

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

結果輸出如下

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

可以看到客戶端接收的 error 是這樣的,會發現服務端返回的 error 在 desc 這個字段裡面

rpc error: code = Unknown desc = person not found

自然errors.Is這段邏輯也就沒有執行,即便換成errors.As也是一樣的結果。

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

為此,gRPC 提供了status 包來解決這類問題,這也是為什麼客戶端接收到的錯誤會有 code 和 desc 字段的原因,因為 gRPC 實際上返回給客戶端的是一個Status ,其具體類型如下,可以看出也是一個 protobuf 定義的 message。

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

錯誤碼

Status 結構體中的 Code,是一種類似 Http Status 形式的存在,用於表示當前 rpc 請求的狀態,gRPC 定義了 16 個 code 位於grpc/codes ,涵蓋了大部分的場景,分別如下所示

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // 調用成功
  OK Code = 0

  // 請求被取消
  Canceled Code = 1

  // 未知錯誤
  Unknown Code = 2

  // 參數不合法
  InvalidArgument Code = 3

    // 請求超時
  DeadlineExceeded Code = 4

  // 資源不存在
  NotFound Code = 5

    // 已存在相同的資源(能出現這個我是沒想到的)
  AlreadyExists Code = 6

  // 權限不足被拒絕訪問
  PermissionDenied Code = 7

  // 資源枯竭,剩下的容量不足以使用,比如磁盤容量不夠了之類的情況
  ResourceExhausted Code = 8

  // 執行條件不足,比如使用rm刪除一個非空的目錄,刪除的條件是目錄是空的,但條件不滿足
  FailedPrecondition Code = 9

  // 請求被打斷
  Aborted Code = 10

  // 操作訪問超出限制范圍
  OutOfRange Code = 11

  // 表示當前服務沒有實現
  Unimplemented Code = 12

  // 系統內部錯誤
  Internal Code = 13

  // 服務不可用
  Unavailable Code = 14

  // 數據丟失
  DataLoss Code = 15

  // 沒有通過認證
  Unauthenticated Code = 16

  _maxCode = 17
)

grpc/status包提供了相當多的函數以方 status 與 error 之間的相互轉換。我們可以直接使用status.New來創建一個 Status,或者Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

例如下面的代碼

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

通過 status 的 err 方法可以獲取到其中的 error,當狀態為 ok 的時候 error 為 nil。

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

也可以直接創建 error

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

於是我們可以將服務代碼修改成如下

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

在此之前,服務端返回的所有的 code 都是 unknown,現在經過修改後有了更加明確的語義。於是在客戶端就可以通過status.FromError 或者使用下面的函數從 error 中獲取 status,從而根據不同的 code 來做出響應的處理

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

示例如下

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

不過盡管 grpc 的 code 已經盡可能的涵蓋了一些通用場景,不過有時候還是無法滿足開發人員的需求,這個時候就可以使用 Status 中的 Details 字段,並且它還是一個切片,可以容納多個信息。通過Status.WithDetails 來傳入一些自定義的信息

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

通過Status.Details來獲取信息

go
func (s *Status) Details() []interface{}

需要注意的是,傳入的信息最好是由 protobuf 定義的,這樣才能方便服務端客戶端兩端都能解析,官方給出了幾個示例

protobuf
message ErrorInfo {
  // 錯誤的原因
  string reason = 1;

  // 定義服務的主體
  string domain = 2;

  // 其他信息
  map<string, string> metadata = 3;
}

// 重試信息
message RetryInfo {
  // 同一個請求的等待間隔時間
  google.protobuf.Duration retry_delay = 1;
}

// 調試信息
message DebugInfo {
  // 堆棧
  repeated string stack_entries = 1;

  // 一些細節信息
  string detail = 2;
}

    ...
    ...

更多的例子可以前往googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) 查看。如果需要可以通過下面的代碼來引入。

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

使用ErrorInfo作為 details

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

在客戶端就可以拿到數據做出處理,不過上述只是 gRPC 推薦使用的一些例子,除此之外,同樣也可以自己定義 message,來更好的滿足相應的業務需求,如果想做一些統一的錯誤處理,也可以放到攔截器裡面操作。

超時控制

在大多數情況下,通常不會只有一個服務,並且可能上游有很多服務,下游也有很多服務。客戶端發起一次請求,從最上游的服務到最下游,就形成了一個服務調用鏈,就像圖中那樣,或許可能比圖中的還要長。

如此長的一個調用鏈,如果其中一個服務的邏輯處理需要花費很長時間,就會導致上游一直處於等待狀態。為了減少不必要的資源浪費,因此有必要引入超時這一機制,這樣一來最上游調用時傳入的超時時間,便是整個調用鏈所允許的執行花費最大時間。而 gRPC 可以跨進程跨語言傳遞超時,它把一些需要跨進程傳遞的數據放在了 HTTP2 的HEADERS Frame 幀中,如下圖

gRPC 請求中的超時數據對應著HEADERS Frame中的grpc-timeout 字段。需要注意的是,並不是所有的 gRPC 庫都實現了這一超時傳遞機制,不過gRPC-go肯定是支持的,如果使用其他語言的庫,並且使用了這一特性,則需要額外留意這一點。

連接超時

gRPC 客戶端在向服務端建立連接時,默認是異步建立的,如果連接建立失敗只會返回一個空的 Client。如果想要使連接同步進行,則可以使用grpc.WithBlock() 來使連接未建立成功時阻塞等待。

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

如果想要控制一個超時時間,則只需要傳入一個 TimeoutContext,使用grpc.DialContext來替代gprc.Dial以傳入 context。

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

如此一來,如果連接建立超時,就會返回 error

context deadline exceeded

在服務端同樣也可以設置連接超時,在與客戶端建立新連接的時候設置一個超時時間,默認是 120 秒,如果在規定時間內沒有成功建立連接,服務端會主動斷開連接。

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout仍處於實驗階段,未來的 API 可能會被修改或刪除。

請求超時

gRPC 客戶端在發起請求的時候,第一個參數就是Context類型,同樣的,要想給 RPC 請求加上一個超時時間,只需要傳入一個 TimeoutContext 即可

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // 超時邏輯處理
}

經過 gRPC 的處理,超時時間被傳遞到了服務端,在傳輸過程中它以在幀字段的形式存在中,在 go 裡面它以 context 的形式存在,就此在整個鏈路中進行傳遞。在鏈路傳遞過程中,不建議去修改超時時間,具體在請求時設置多長的超時時間,這應該是最上游應該考慮的問題。

認證授權

在微服務領域中,每一個服務都需要對請求驗證用戶身份和權限,如果和單體應用一樣,每個服務都要自己實現一套認證邏輯,這顯然是不太現實的。所以需要一個統一的認證與授權服務,而常見的解決方案是使用 OAuth2,分布式 Session,和 JWT,這其中,OAuth2 使用最為廣泛,一度已經成為了業界標准,OAuth2 最常用的令牌類型就是是 JWT。下面是一張 OAuth2 授權碼模式的流程圖,基本流程如圖所示。

安全傳輸

服務注冊與發現

客戶端調用服務端的指定服務之前,需要知曉服務端的 ip 和 port,在先前的案例中,服務端地址都是寫死的。在實際的網絡環境中不總是那麼穩定,一些服務可能會因故障下線而無法訪問,也有可能會因為業務發展進行機器遷移而導致地址變化,在這些情況下就不能使用靜態地址訪問服務了,而這些動態的問題就是服務發現與注冊要解決的,服務發現負責監視服務地址的變化並更新,服務注冊負責告訴外界自己的地址。gRPC 中,提供了基礎的服務發現功能,並且支持拓展和自定義。

不能用靜態地址,可以用一些特定的名稱來進行代替,比如瀏覽器通過 DNS 解析域名來獲取地址,同樣的,gRPC 默認的服務發現就是通過 DNS 來進行的,修改本地的 host 文件,添加如下映射

127.0.0.1 example.grpc.com

然後將 helloworld 示例中客戶端 Dial 的地址改為對應的域名

go
func main() {
  // 建立連接,沒有加密驗證
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // 創建客戶端
  client := hello2.NewSayHelloClient(conn)
  // 遠程調用
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

同樣能看到正常的輸出

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

在 gRPC 中,這類名稱必須要遵守 RFC 3986 中定義的 URI 語法,格式為

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

上例中的 URI 就是如下形式,由於默認支持 dns 所以省略掉了前綴的 scheme。

dns:example.grpc.com:8080

除此之外 gRPC 還默認支持 Unix domain sockets,而對於其他的方式,需要我們根據 gRPC 的拓展來進行自定義實現,為此需要實現一個自定義的解析器resolver.Resovler ,resolver 負責監控目標地址和服務配置的更新。

go
type Resolver interface {
    // gRPC將調用ResolveNow來嘗試再次解析目標名稱。這只是一個提示,如果不需要,解析器可以忽略它,該方法可能被並發的調用
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC 要求我們傳入一個 Resolver 構造器,也就是resolver.Builder,它負責構造 Resolver 實例。

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Builder 的 Scheme 方法返回它負責解析的 Scheme 類型,例如默認的 dnsBuilder 它返回的就是dns ,構造器在初始化時應該使用resolver.Register注冊到全局 Builder 中,又或者作為 options,使用grpc.WithResolver 傳入 ClientConn 內部,後者的優先級高於前者。

上圖簡單描述了一下 resolver 的工作流程,接下來就演示如何自定義 resolver

自定義服務解析

下面編寫一個自定義解析器,需要一個自定義的解析構造器來進行構造。

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // 這裡必須要updatestate,否則會死鎖
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // 配置,loadBalancingPolicy指的是負載均衡策略
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

自定義解析器就是把 map 裡面的匹配的地址傳入到 updatestate,同時還指定了負載均衡的策略,round_robin指的是輪詢的意思。

go
// service config 結構如下
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

客戶端代碼如下

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // 注冊builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // 建立連接,沒有加密驗證
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // 創建客戶端
  client := hello2.NewSayHelloClient(conn)
     // 每秒調用一次
  for range time.Tick(time.Second) {
    // 遠程調用
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

正常來說,流程應該是服務端向注冊中心注冊自身服務,然後客戶端從注冊中心中獲取服務列表然後進行匹配,這裡傳入的 map 就是一個模擬的注冊中心,數據是靜態的就省略掉了服務注冊這一環節,只剩下服務發現。客戶端使用的 target 為hello:myworld ,hello 是自定義的 scheme,myworld 就是服務名,經過自定義的解析器解析後,就得到了 127.0.0.1: 8080 的真實地址,在實際情況中,為了做負載均衡,一個服務名可能會匹配多個真實地址,所以這就是為什麼服務名對應的是一個切片,這裡開兩個服務端,佔用不同的端口,負載均衡策略為輪詢,服務端輸出分別如下,通過請求時間可以看到負載均衡策略確實是在起作用的,如果不指定策略的話默認只選取第一個服務。

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

注冊中心其實就是存放著的就是服務注冊名與真實服務地址的映射集合,只要是能夠進行數據存儲的中間件都可以滿足條件,甚至拿 mysql 來做注冊中心也不是不可以(應該沒有人會這麼做)。一般來說注冊中心都是 KV 存儲的,redis 就是一個很不錯的選擇,但如果使用 redis 來做注冊中心的話,我們就需要自行實現很多邏輯,比如服務的心跳檢查,服務下線等,服務調度等等,還是相當麻煩的,雖然 redis 在這方面有一定的應用但是較少。正所謂專業的事情交給專業的人做,這方面做的比較出名的有很多:Zookeeper,Consul,Eureka,ETCD,Nacos 等。

可以前往注冊中心對比和選型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) 來了解這幾個中間件的一些區別。

結合 consul

結合 consul 使用的案例可以前往consul

負載均衡

Golang學習網由www.golangdev.cn整理維護