Skip to content

gRPC

Gọi thủ tục từ xa rpc có lẽ là điểm bắt buộc phải học trong microservices, trong quá trình học sẽ gặp đủ loại khung rpc, nhưng trong lĩnh vực go, hầu hết các khung rpc đều dựa trên gRPC, và nó còn trở thành một giao thức cơ bản trong lĩnh vực cloud native, tại sao chọn nó, câu trả lời chính thức như sau:

gRPC là một khung gọi thủ tục từ xa (Remote Process Call, RPC) mã nguồn mở hiệu năng cao hiện đại, có thể chạy trong bất kỳ môi trường nào. Nó có thể kết nối hiệu quả các dịch vụ trong và giữa các trung tâm dữ liệu với sự hỗ trợ của cân bằng tải có thể cắm, theo dõi, kiểm tra sức khỏe và xác thực. Nó cũng phù hợp cho điện toán phân tán last-mile để kết nối các thiết bị, ứng dụng di động và trình duyệt với các dịch vụ backend.

Trang web chính thức: gRPC

Tài liệu chính thức: Documentation | gRPC

Hướng dẫn kỹ thuật gRPC: Basics tutorial | Go | gRPC

Trang web ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

Nó cũng là một dự án mã nguồn mở thuộc quỹ CNCF, CNCF viết tắt của CLOUD NATIVE COMPUTING FOUNDATION

Đặc điểm

Định nghĩa dịch vụ đơn giản

Sử dụng Protocol Buffers để định nghĩa dịch vụ, đây là một bộ công cụ và ngôn ngữ tuần tự hóa nhị phân mạnh mẽ.

Khởi động và mở rộng quy mô rất nhanh chóng

Chỉ cần một dòng code để cài đặt runtime và môi trường phát triển, chỉ cần vài giây để mở rộng đến hàng triệu RPC mỗi giây

Đa ngôn ngữ, đa nền tảng

Tự động tạo stub dịch vụ client và server cho các ngôn ngữ và nền tảng khác nhau

Luồng hai chiều và ủy quyền tích hợp

Luồng hai chiều dựa trên HTTP/2 và ủy quyền xác thực có thể cắm

Mặc dù GRPC không phụ thuộc vào ngôn ngữ, nhưng nội dung của trang web này phần lớn liên quan đến go, vì vậy bài viết này cũng sẽ sử dụng go làm ngôn ngữ chính để giải thích, các trình biên dịch pb và trình tạo được sử dụng sau này nếu là người dùng ngôn ngữ khác có thể tự tìm trên trang web Protobuf. Để thuận tiện, sẽ bỏ qua quá trình tạo dự án.

Cài đặt phụ thuộc

Trước tiên tải xuống trình biên dịch Protocol Buffer, địa chỉ tải xuống: Releases · protocolbuffers/protobuf (github.com)

Chọn hệ thống và phiên bản phù hợp với tình hình của bạn, sau khi tải xuống cần thêm thư mục bin vào biến môi trường.

Sau đó còn cần tải xuống trình tạo code, trình biên dịch dùng để tạo code tuần tự hóa ngôn ngữ tương ứng từ tệp proto, trình tạo dùng để tạo code business.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Tạo một dự án trống, tên ở đây lấy là grpc_learn, sau đó thêm các phụ thuộc sau

sh
$ go get google.golang.org/grpc

Cuối cùng xem phiên bản, xem có thực sự cài đặt thành công không

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Cấu trúc dự án

Dưới đây sẽ demo với một ví dụ Hello World, tạo cấu trúc dự án như sau.

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Định nghĩa tệp protobuf

Trong đó, trong pb/hello.proto, nhập nội dung sau, đây là một ví dụ khá đơn giản, nếu không biết cú pháp protoc, vui lòng xem tài liệu liên quan.

protobuf
syntax = "proto3";

// . biểu thị sinh ra trực tiếp trong đường dẫn đầu ra, hello là tên gói
option go_package = ".;hello";

// Yêu cầu
message HelloReq {
  string name = 1;
}

// Phản hồi
message HelloRep {
  string msg = 1;
}

// Định nghĩa dịch vụ
service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Tạo code

Sau khi viết xong, sử dụng trình biên dịch protoc để tạo code liên quan đến tuần tự hóa dữ liệu, sử dụng trình tạo để tạo code rpc

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

Lúc này có thể thấy thư mục hello đã sinh ra tệp hello.pb.gohello_grpc.pb.go, xem qua hello.pb.go có thể thấy message mà chúng ta đã định nghĩa

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Trường đã định nghĩa
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Trường đã định nghĩa
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

Trong hello_grpc.pb.go có thể thấy dịch vụ mà chúng ta đã định nghĩa

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Sau này nếu chúng ta tự triển khai giao diện dịch vụ, phải nhúng cấu trúc này, không cần triển khai phương thức mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Mặc định trả về nil
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Ràng buộc giao diện
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Viết server

Viết code sau trong server/main.go

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Lắng nghe cổng
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Tạo máy chủ gprc
  server := grpc.NewServer()
  // Đăng ký dịch vụ
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Chạy
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Viết client

Nhập code sau trong client/main.go

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Thiết lập kết nối, không có xác minh mã hóa
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Tạo client
  client := pb.NewSayHelloClient(conn)
  // Gọi từ xa
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Chạy

Trước tiên chạy server, sau đó chạy client, server xuất ra như sau

2023/07/16 16:26:51 received grpc req: name:"client"

Client xuất ra như sau

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

Trong ví dụ này, sau khi client thiết lập kết nối, khi gọi phương thức từ xa cũng giống như gọi phương thức local, trực tiếp truy cập phương thức Hello của client và lấy kết quả, đây là một ví dụ GRPC đơn giản nhất, nhiều khung mã nguồn mở cũng đều đóng gói quy trình này.

bufbuild

Trong ví dụ trên, sử dụng lệnh để tạo code trực tiếp, nếu sau này có nhiều plugin thì lệnh sẽ rất phức tạp, lúc này có thể sử dụng công cụ để quản lý tệp protobuf, có một công cụ quản lý mã nguồn mở bufbuild/buf.

Địa chỉ mã nguồn mở: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Địa chỉ tài liệu: Buf - Install the Buf CLI

Đặc điểm

  • Quản lý BSR
  • Linter
  • Tạo code
  • Định dạng
  • Quản lý phụ thuộc

Với công cụ này có thể quản lý tệp protobuf rất thuận tiện.

Tài liệu cung cấp rất nhiều cách cài đặt, có thể tự chọn. Nếu local đã cài đặt môi trường go, có thể sử dụng go install để cài đặt

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Sau khi cài đặt xong xem phiên bản

sh
$ buf --version
1.24.0

Đến thư mục helloworld/pb, thực hiện lệnh sau để tạo một module quản lý tệp pb.

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

Nội dung tệp buf.yaml mặc định như sau

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Sau đó đến thư mục helloworld/, tạo buf.gen.yaml, nhập nội dung sau

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Sau đó thực hiện lệnh để tạo code

sh
$ buf generate

Sau khi hoàn thành có thể thấy các tệp đã tạo, tất nhiên buf không chỉ có chức năng này, các chức năng khác có thể tự học trong tài liệu.

RPC luồng

Cách gọi grpc có hai loại lớn, Unary RPC và Stream RPC. Ví dụ trong Hello World là một Unary RPC điển hình.

Unary rpc (hoặc gọi là ordinary rpc dễ hiểu hơn, thực sự không biết dịch unary này thế nào) sử dụng giống như http thông thường, client yêu cầu, server trả về dữ liệu, cách hỏi đáp. Còn yêu cầu và phản hồi của Stream RPC đều có thể là luồng, như hình dưới

Khi sử dụng yêu cầu luồng, chỉ trả về một lần phản hồi, client có thể gửi tham số nhiều lần cho server thông qua luồng, server không cần đợi tất cả tham số đều nhận xong mới xử lý như Unary RPC, logic xử lý cụ thể có thể do server quyết định. Bình thường, chỉ client có thể chủ động đóng yêu cầu luồng, một khi luồng bị đóng, yêu cầu RPC hiện tại cũng sẽ kết thúc.

Khi sử dụng phản hồi luồng, chỉ gửi một lần tham số, server có thể gửi dữ liệu nhiều lần cho client thông qua luồng, client không cần đợi nhận xong tất cả dữ liệu mới xử lý như Unary RPC, logic xử lý cụ thể có thể do client tự quyết định. Trong yêu cầu bình thường, chỉ server có thể chủ động đóng phản hồi luồng, một khi luồng bị đóng, yêu cầu RPC hiện tại cũng sẽ kết thúc.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Cũng có thể chỉ phản hồi là luồng (Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Hoặc yêu cầu và phản hồi đều là luồng (Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Luồng một chiều

Dưới đây qua một ví dụ để demo thao tác luồng một chiều, trước tiên tạo cấu trúc dự án như sau

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

Nội dung message.proto như sau

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Tạo code thông qua buf

sh
$ buf generate

Ở đây demo là dịch vụ tin nhắn, receiveMessage nhận một tên người dùng chỉ định, kiểu là chuỗi, trả về luồng tin nhắn, sendMessage nhận luồng tin nhắn, trả về số lượng tin nhắn đã gửi thành công, kiểu là số nguyên 64 bit. Tiếp theo tạo server/message_service.go, tự triển khai dịch vụ được tạo mặc định

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Có thể thấy trong tham số của nhận tin nhắn và gửi tin nhắn đều có một giao diện đóng gói luồng

go
type MessageService_ReceiveMessageServer interface {
    // Gửi tin nhắn
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Gửi giá trị trả về và đóng kết nối
  SendAndClose(*wrapperspb.StringValue) error
    // Nhận tin nhắn
  Recv() (*Message, error)
  grpc.ServerStream
}

Chúng đều nhúng giao diện gprc.ServerStream

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Có thể thấy, RPC luồng không giống như Unary RPC那样 tham số đầu vào và giá trị trả về đều có thể thể hiện rõ ràng trên chữ ký hàm, những phương thức này nhìn thoáng qua không thể看出 tham số đầu vào và giá trị trả về là kiểu gì, cần gọi Stream được truyền vào để hoàn thành truyền tải luồng, tiếp theo bắt đầu viết logic cụ thể của server. Khi viết logic server, sử dụng một sync.map để mô phỏng hàng đợi tin nhắn, khi client gửi yêu cầu ReceiveMessage, server liên tục trả về tin nhắn mà client muốn thông qua phản hồi luồng, cho đến khi hết thời gian chờ thì ngắt yêu cầu. Khi client yêu cầu SendMessage, liên tục gửi tin nhắn thông qua yêu cầu luồng, server liên tục đưa tin nhắn vào hàng đợi, cho đến khi client chủ động ngắt yêu cầu, và trả về cho client số lượng tin nhắn đã gửi.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Một hàng đợi tin nhắn mô phỏng
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Nhận tin nhắn của người dùng chỉ định
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("5 秒钟内没有收到%s的消息,关闭连接", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Lấy tin nhắn
      msg := queue[0]
      // Gửi tin nhắn cho client thông qua truyền tải luồng
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Gửi tin nhắn cho người dùng chỉ định
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Nhận tin nhắn từ client
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Đưa tin nhắn vào hàng đợi tin nhắn
    messageQueue.Store(msg.From, queue)
    count++
  }
}

Client mở hai goroutine, một goroutine dùng để gửi tin nhắn, một goroutine khác dùng để nhận tin nhắn, tất nhiên cũng có thể vừa gửi vừa nhận, code như sau.

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Yêu cầu nhận tin nhắn
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("暂无消息,关闭连接")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "在吗",
      "下午有没有时间一起打游戏",
      "那行吧,以后有时间一起约",
      "就这个周末应该可以吧",
      "那就这么定了",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Tin nhắn gửi xong, chủ động đóng yêu cầu và lấy giá trị trả về
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Sau khi thực hiện server xuất ra như sau

server  2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:33 5 秒钟内没有收到 jack 的消息,关闭连接

Client xuất ra như sau

client  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client  2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client  2023/07/18 16:28:33 暂无消息,关闭连接

Qua ví dụ này có thể thấy xử lý yêu cầu RPC luồng một chiều cho dù là client hay server đều phức tạp hơn unary rpc, tuy nhiên RPC luồng hai chiều còn phức tạp hơn chúng.

Luồng hai chiều

RPC luồng hai chiều, tức là yêu cầu và phản hồi đều là luồng, tương đương với việc kết hợp hai dịch vụ trong ví dụ trên thành một. Đối với RPC luồng而言, yêu cầu đầu tiên chắc chắn do client khởi xướng, sau đó client có thể gửi tham số yêu cầu bất cứ lúc nào thông qua luồng, server cũng có thể trả về dữ liệu bất cứ lúc nào thông qua luồng, bất kể bên nào chủ động đóng luồng, yêu cầu hiện tại đều sẽ kết thúc.

TIP

Nội dung sau này trừ khi cần thiết, sẽ trực tiếp bỏ qua mô tả code tạo code pb và tạo client server rpc

Trước tiên tạo cấu trúc dự án như sau

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

Nội dung message.proto như sau

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Trong logic server, sau khi thiết lập kết nối, mở hai goroutine, một goroutine chịu trách nhiệm nhận tin nhắn, một chịu trách nhiệm gửi tin nhắn, logic xử lý cụ thể tương tự ví dụ trước, nhưng lần này bỏ qua logic判定 thời gian chờ.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue Hàng đợi tin nhắn mô phỏng
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Dịch vụ chat, logic server chúng ta sử dụng đa goroutine để xử lý
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "end chat")

  var chatErr error
  chatCh := make(chan error)

  // Tạo hai goroutine, một nhận tin nhắn, một gửi tin nhắn
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Goroutine nhận tin nhắn
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Goroutine gửi tin nhắn
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

Trong logic client, mở hai goroutine con để mô phỏng quá trình chat của hai người, trong hai goroutine con lại分别 có hai goroutine cháu chịu trách nhiệm gửi và nhận tin nhắn (logic client không đảm bảo thứ tự gửi nhận tin nhắn chat của hai người là đúng, chỉ là một ví dụ đơn giản về gửi và nhận của hai bên)

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("end chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Gửi xong tin nhắn thì đóng kết nối
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Goroutine nhận tin nhắn
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Bình thường, server xuất ra

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

Bình thường, client xuất ra (có thể thấy thứ tự logic tin nhắn là lộn xộn)

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"没有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有没有时间一起打游戏?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"没时间,你找别人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

Qua ví dụ có thể thấy, logic xử lý luồng hai chiều cho dù là client hay server đều phức tạp hơn luồng một chiều, cần kết hợp đa goroutine mở tác vụ bất đồng bộ mới có thể xử lý logic tốt hơn.

metadata

Bản chất metadata là một map, value của nó là một slice chuỗi, giống như header của http1, và vai trò của nó trong gRPC cũng tương tự như http header, cung cấp một số thông tin về lần gọi RPC này, đồng thời vòng đời của metadata đi theo toàn bộ quá trình gọi rpc, gọi kết thúc thì vòng đời của nó cũng kết thúc.

Nó chủ yếu được truyền và lưu trữ thông qua context trong gRPC, nhưng gRPC cung cấp gói metadata, bên trong có khá nhiều hàm tiện lợi để đơn giản hóa thao tác, không cần chúng ta thao tác context thủ công. Kiểu tương ứng của metadata trong gRPC là metadata.MD, như dưới đây.

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

Chúng ta có thể sử dụng hàm metadata.New để tạo, nhưng trước khi tạo có vài điểm cần lưu ý

go
func New(m map[string]string) MD

metadata có hạn chế về tên key, chỉ có thể là các ký tự bị hạn chế bởi các quy tắc sau:

  • Ký tự ASCII
  • Số: 0-9
  • Chữ thường: a-z
  • Chữ hoa: A-Z
  • Ký tự đặc biệt: -_.

TIP

Trong metadata, chữ hoa sẽ được chuyển thành chữ thường, tức là sẽ chiếm cùng một key, value cũng sẽ bị ghi đè.

TIP

Key bắt đầu bằng grpc- là key nội bộ được grpc保留 sử dụng, nếu sử dụng key loại này có thể dẫn đến một số lỗi.

Tạo thủ công

Có rất nhiều cách để tạo metadata, ở đây giới thiệu hai cách phổ biến nhất để tạo metadata thủ công, cách thứ nhất là sử dụng hàm metadata.New, trực tiếp truyền vào một map.

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

Cách thứ hai là metadata.Pairs, truyền vào slice chuỗi có độ dài chẵn, sẽ tự động phân tích thành cặp key-value.

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Cũng có thể sử dụng metadata.join để hợp nhất nhiều metadata

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)

Sử dụng server

Lấy metadata

Server lấy metadata có thể sử dụng hàm metadata.FromIncomingContext để lấy

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Đối với unary rpc, trong tham số của service sẽ có một tham số context, trực tiếp lấy metadata từ đó

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Đối với rpc luồng, trong tham số của service sẽ có một đối tượng luồng, thông qua nó có thể lấy context của luồng

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Gửi metadata

Gửi metadata có thể sử dụng hàm grpc.SendHeader

go
func SendHeader(ctx context.Context, md metadata.MD) error

Hàm này nhiều nhất chỉ được gọi một lần, sẽ không có hiệu lực nếu sử dụng sau khi xảy ra một số sự kiện khiến header được gửi tự động. Trong một số trường hợp, nếu không muốn gửi header trực tiếp, có thể sử dụng hàm grpc.SetHeader.

go
func SetHeader(ctx context.Context, md metadata.MD) error

Nếu hàm này được gọi nhiều lần, sẽ hợp nhất metadata được truyền vào mỗi lần và gửi cho client trong các trường hợp sau

  • Khi gprc.SendHeaderServerStream.SendHeader được gọi
  • Khi handler của unary rpc trả về
  • Khi gọi Stream.SendMsg của đối tượng luồng trong rpc luồng
  • Khi trạng thái yêu cầu rpc变为 send out, trường hợp này có thể là yêu cầu rpc thành công, hoặc là xảy ra lỗi.

Đối với rpc luồng, khuyến nghị sử dụng phương thức SendHeaderSetHeader của đối tượng luồng.

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Trong quá trình sử dụng sẽ thấy hai chức năng Header và Trailer差不多, nhưng sự khác biệt chính của chúng là thời điểm gửi, có thể không cảm nhận được trong unary rpc, nhưng sự khác biệt này đặc biệt rõ ràng trong RPC luồng, vì Header trong RPC luồng có thể gửi Header mà không cần đợi yêu cầu kết thúc.前面 đã đề cập Header sẽ được gửi trong các trường hợp đặc biệt, còn Trailer chỉ được gửi sau khi toàn bộ yêu cầu RPC kết thúc, trước đó, trailer lấy được đều là rỗng.

Sử dụng client

Lấy metadata

Client muốn lấy header phản hồi có thể thực hiện thông qua grpc.Headergrpc.Trailer

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Nhưng cần lưu ý, không thể lấy trực tiếp, có thể thấy giá trị trả về của hai hàm trên là CallOption, tức là được truyền vào làm tham số option khi khởi tạo yêu cầu RPC.

go
// Khai báo md dùng để nhận giá trị
var header, trailer metadata.MD

// Truyền vào option khi gọi yêu cầu rpc
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

Sau khi yêu cầu hoàn tất, sẽ ghi giá trị vào md được truyền vào. Đối với rpc luồng, có thể lấy trực tiếp thông qua đối tượng luồng trả về khi khởi tạo yêu cầu

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

Gửi metadata

Client muốn gửi metadata rất đơn giản, đã đề cập trước đó biểu hiện của metadata là valueContext, kết hợp metadata vào context, sau đó truyền context khi yêu cầu là được, gói metadata cung cấp hai hàm để tiện tạo context.

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// Rpc luồng
stream,err := client.StreamRPC(outgoingContext)

Nếu ctx ban đầu đã có metadata rồi, lại sử dụng NewOutgoingContext sẽ ghi đè dữ liệu trước đó, để tránh trường hợp này, có thể sử dụng hàm dưới đây, nó không ghi đè mà là hợp nhất dữ liệu.

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// Rpc luồng
stream,err := client.StreamRPC(appendContext)

Interceptor

Interceptor của gRPC giống như Middleware trong gin, đều là để làm một số công việc đặc biệt trước hoặc sau yêu cầu mà không ảnh hưởng đến logic business của chính nó. Trong gRPC, interceptor có hai loại lớn, interceptor server và interceptor client, phân loại theo loại yêu cầu thì có interceptor RPC unary và interceptor RPC luồng, hình dưới

Để có thể hiểu rõ hơn về interceptor, dưới đây sẽ mô tả theo một ví dụ rất đơn giản.

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

Nội dung person.proto như sau

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Code server như sau, logic toàn là nội dung trước, khá đơn giản không nói lại.

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Lưu trữ dữ liệu
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Chặn server

Chặn yêu cầu rpc server có UnaryServerInterceptorStreamServerInterceptor, kiểu cụ thể như sau

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

Unary RPC

Tạo interceptor Unary RPC, chỉ cần triển khai kiểu UnaryServerInterceptor là được, dưới đây là một ví dụ interceptor Unary RPC đơn giản, chức năng là xuất ra mỗi yêu cầu và phản hồi rpc.

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} Dữ liệu yêu cầu rpc
// param info *grpc.UnaryServerInfo Một số thông tin yêu cầu của lần Unary RPC này
// param unaryHandler grpc.UnaryHandler Handler cụ thể
// return resp interface{} Dữ liệu phản hồi rpc
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Đối với Unary RPC而言, interceptor chặn mỗi yêu cầu và phản hồi của RPC, tức là chặn giai đoạn yêu cầu và giai đoạn phản hồi của RPC, nếu interceptor trả về error thì yêu cầu này sẽ kết thúc.

Rpc luồng

Tạo interceptor RPC luồng, chỉ cần triển khai kiểu StreamServerInterceptor là được, dưới đây là một ví dụ interceptor RPC luồng đơn giản.

go
// StreamPersonLogInterceptor
// param srv interface{} Tương ứng với server do server triển khai
// param stream grpc.ServerStream Đối tượng luồng
// param info *grpc.StreamServerInfo Thông tin luồng
// param streamHandler grpc.StreamHandler Bộ xử lý
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Đối với RPC luồng而言, interceptor chặn thời điểm mỗi phương thức SendRecv của đối tượng luồng được gọi, nếu interceptor trả về error, sẽ không dẫn đến kết thúc yêu cầu RPC này, chỉ đại diện cho lần send hoặc recv này xảy ra lỗi.

Sử dụng interceptor

Để interceptor đã tạo có hiệu lực, cần truyền vào làm option khi tạo máy chủ gRPC, chính thức cũng cung cấp các hàm liên quan để sử dụng. Như dưới đây, có hàm thêm interceptor đơn, cũng có hàm thêm interceptor chuỗi.

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

Sử dụng lặp lại UnaryInterceptor sẽ ném ra panic như sau

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor cũng vậy, còn interceptor chuỗi gọi lặp lại sẽ append vào cùng một chuỗi.

Ví dụ sử dụng như sau

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Thêm interceptor chuỗi
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Chặn client

Interceptor client差不多 với server, một interceptor unary UnaryClientInterceptor, một interceptor luồng StreamClientInterceptor, kiểu cụ thể như sau.

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC

Tạo interceptor client Unary RPC, triển khai UnaryClientInterceptor là được, dưới đây là một ví dụ đơn giản.

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string Tên phương thức
// param req interface{} Dữ liệu yêu cầu
// param reply interface{} Dữ liệu phản hồi
// param cc *grpc.ClientConn Đối tượng kết nối client
// param invoker grpc.UnaryInvoker Phương thức client cụ thể bị chặn
// param opts ...grpc.CallOption Cấu hình của yêu cầu này
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Thông qua interceptor Unary RPC client, có thể lấy được dữ liệu yêu cầu và dữ liệu phản hồi của yêu cầu local cùng một số thông tin yêu cầu khác.

RPC luồng

Tạo một interceptor client RPC luồng, triển khai StreamClientInterceptor là được, dưới đây là một ví dụ.

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Thông tin mô tả đối tượng luồng
// param cc *grpc.ClientConn Đối tượng kết nối
// param method string Tên phương thức
// param streamer grpc.Streamer Đối tượng dùng để tạo đối tượng luồng
// param opts ...grpc.CallOption Cấu hình kết nối
// return grpc.ClientStream Đối tượng luồng client đã tạo
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Thông qua interceptor client RPC luồng, chỉ có thể chặn được thời điểm client thiết lập kết nối với server tức là thời điểm tạo luồng, không thể chặn được mỗi lần đối tượng luồng client gửi và nhận tin nhắn, nhưng chúng ta đóng gói đối tượng luồng đã tạo trong interceptor là có thể thực hiện chặn gửi và nhận tin nhắn, như dưới đây

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Thông tin mô tả đối tượng luồng
// param cc *grpc.ClientConn Đối tượng kết nối
// param method string Tên phương thức
// param streamer grpc.Streamer Đối tượng dùng để tạo đối tượng luồng
// param opts ...grpc.CallOption Cấu hình kết nối
// return grpc.ClientStream Đối tượng luồng client đã tạo
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Trước khi gửi tin nhắn
  err := c.ClientStream.SendMsg(m)
  // Sau khi gửi tin nhắn
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Trước khi nhận tin nhắn
  err := c.ClientStream.RecvMsg(m)
  // Sau khi nhận tin nhắn
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Sử dụng interceptor

Khi sử dụng, tương tự như server cũng là bốn hàm công cụ thông qua option để thêm interceptor, phân thành interceptor đơn và interceptor chuỗi.

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

Client sử dụng lặp lại WithUnaryInterceptor sẽ không ném ra panic, nhưng chỉ có cái cuối cùng có hiệu lực.

Dưới đây là một trường hợp sử dụng

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Đến đây, toàn bộ trường hợp đã được viết xong, đã đến lúc chạy thử xem kết quả như thế nào. Server xuất ra như sau

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

Client xuất ra như sau

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Có thể thấy đầu ra của hai bên đều phù hợp với mong đợi, đạt được hiệu quả chặn, trường hợp này chỉ là một ví dụ rất đơn giản, sử dụng interceptor của gRPC có thể làm rất nhiều việc như ủy quyền, nhật ký, giám sát và các chức năng khác, có thể tự tạo wheel, cũng có thể sử dụng wheel có sẵn của cộng đồng mã nguồn mở, gRPC Ecosystem chuyên thu thập một loạt interceptor middleware của gRPC mã nguồn mở, địa chỉ: grpc-ecosystem/go-grpc-middleware.

Xử lý lỗi

Trước khi bắt đầu hãy xem một ví dụ, trong trường hợp interceptor trước đó, nếu người dùng không truy vấn được, sẽ trả về lỗi person not found cho client, vậy vấn đề là client có thể xử lý đặc biệt dựa trên lỗi trả về không? Tiếp theo thử một chút, trong code client, thử sử dụng errors.Is để phán đoán lỗi.

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Kết quả xuất ra như sau

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

Có thể thấy error mà client nhận được là như thế này, sẽ thấy error mà server trả về nằm trong trường desc

rpc error: code = Unknown desc = person not found

Tất nhiên logic đoạn errors.Is này cũng không được thực thi, cho dù đổi thành errors.As cũng là kết quả như vậy.

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Vì vậy, gRPC cung cấp gói status để giải quyết loại vấn đề này, đây cũng là lý do tại sao error mà client nhận được có trường code và desc, bởi vì gRPC thực tế trả về cho client là một Status, kiểu cụ thể như sau, có thể thấy cũng là một message được định nghĩa bởi protobuf.

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

Mã lỗi

Code trong cấu trúc Status là một dạng tương tự Http Status, dùng để biểu thị trạng thái của yêu cầu rpc hiện tại, gRPC định nghĩa 16 code nằm trong grpc/codes, bao quát hầu hết các trường hợp, lần lượt như sau

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // Gọi thành công
  OK Code = 0

  // Yêu cầu bị hủy
  Canceled Code = 1

  // Lỗi không xác định
  Unknown Code = 2

  // Tham số không hợp lệ
  InvalidArgument Code = 3

    // Yêu cầu hết thời gian chờ
  DeadlineExceeded Code = 4

  // Tài nguyên không tồn tại
  NotFound Code = 5

    // Đã tồn tại tài nguyên giống nhau (không nghĩ ra tại sao có cái này)
  AlreadyExists Code = 6

  // Quyền không đủ bị từ chối truy cập
  PermissionDenied Code = 7

  // Tài nguyên cạn kiệt, dung lượng còn lại không đủ để sử dụng, ví dụ như dung lượng đĩa không đủ
  ResourceExhausted Code = 8

  // Điều kiện thực thi không đủ, ví dụ như sử dụng rm để xóa một thư mục không rỗng, điều kiện xóa là thư mục phải rỗng nhưng điều kiện không thỏa mãn
  FailedPrecondition Code = 9

  // Yêu cầu bị gián đoạn
  Aborted Code = 10

  // Hoạt động truy cập vượt quá phạm vi giới hạn
  OutOfRange Code = 11

  // Biểu thị dịch vụ hiện tại chưa được triển khai
  Unimplemented Code = 12

  // Lỗi nội bộ hệ thống
  Internal Code = 13

  // Dịch vụ không khả dụng
  Unavailable Code = 14

  // Mất dữ liệu
  DataLoss Code = 15

  // Không vượt qua xác thực
  Unauthenticated Code = 16

  _maxCode = 17
)

Gói grpc/status cung cấp khá nhiều hàm để chuyển đổi qua lại giữa status và error. Chúng ta có thể trực tiếp sử dụng status.New để tạo một Status, hoặc Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Ví dụ code dưới đây

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Thông qua phương thức err của status có thể lấy được error trong đó, khi trạng thái là ok thì error là nil.

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

Cũng có thể trực tiếp tạo error

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Vì vậy chúng ta có thể sửa code server như sau

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Trước đó, tất cả code mà server trả về đều là unknown, bây giờ sau khi sửa đổi có ngữ nghĩa rõ ràng hơn. Vì vậy trong client có thể thông qua status.FromError hoặc sử dụng các hàm dưới đây để lấy status từ error, từ đó đưa ra xử lý tương ứng dựa trên code khác nhau

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Ví dụ như sau

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Tuy nhiên mặc dù code của grpc đã cố gắng bao quát một số trường hợp chung, nhưng đôi khi vẫn không thể đáp ứng nhu cầu của nhà phát triển, lúc này có thể sử dụng trường Details trong Status, và nó còn là một slice, có thể chứa nhiều thông tin. Thông qua Status.WithDetails để truyền vào một số thông tin tùy chỉnh

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Thông qua Status.Details để lấy thông tin

go
func (s *Status) Details() []interface{}

Cần lưu ý, thông tin truyền vào tốt nhất là được định nghĩa bởi protobuf, như vậy mới tiện cho cả server và client đều có thể phân tích, chính thức đưa ra một vài ví dụ

protobuf
message ErrorInfo {
  // Nguyên nhân lỗi
  string reason = 1;

  // Chủ thể định nghĩa dịch vụ
  string domain = 2;

  // Thông tin khác
  map<string, string> metadata = 3;
}

// Thông tin thử lại
message RetryInfo {
  // Thời gian chờ giữa các lần của cùng một yêu cầu
  google.protobuf.Duration retry_delay = 1;
}

// Thông tin gỡ lỗi
message DebugInfo {
  // Stack
  repeated string stack_entries = 1;

  // Một số thông tin chi tiết
  string detail = 2;
}

    ...
    ...

Thêm ví dụ có thể đến googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) để xem. Nếu cần có thể引入 thông qua code dưới đây.

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Sử dụng ErrorInfo làm details

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

Trong client có thể lấy dữ liệu để xử lý, tuy nhiên trên đây chỉ là một vài ví dụ mà gRPC khuyến nghị sử dụng, ngoài ra cũng có thể tự định nghĩa message để đáp ứng tốt hơn nhu cầu business tương ứng, nếu muốn làm một số xử lý lỗi thống nhất, cũng có thể放到 trong interceptor để thao tác.

Kiểm soát thời gian chờ

Trong hầu hết trường hợp, thường sẽ không chỉ có một dịch vụ, và có thể có rất nhiều dịch vụ ở upstream và cũng có rất nhiều dịch vụ ở downstream. Client khởi tạo một yêu cầu, từ dịch vụ upstream nhất đến dịch vụ downstream nhất, tạo thành một chuỗi gọi dịch vụ, giống như trong hình, có thể còn dài hơn trong hình.

Một chuỗi gọi dài như vậy, nếu logic xử lý của một trong các dịch vụ cần花费 thời gian dài, sẽ dẫn đến upstream luôn ở trạng thái chờ đợi. Để giảm lãng phí tài nguyên không cần thiết, do đó cần thiết引入 cơ chế timeout, như vậy thời gian timeout được truyền vào khi gọi ở upstream nhất便是 thời gian thực thi tối đa cho phép của toàn bộ chuỗi gọi. Và gRPC có thể truyền timeout qua tiến trình và ngôn ngữ, nó đặt một số dữ liệu cần truyền qua tiến trình vào khung HEADERS Frame của HTTP2, như hình dưới

Dữ liệu timeout trong yêu cầu gRPC tương ứng với trường grpc-timeout trong HEADERS Frame. Cần lưu ý, không phải tất cả các thư viện gRPC đều triển khai cơ chế truyền timeout này, nhưng gRPC-go chắc chắn hỗ trợ, nếu sử dụng thư viện ngôn ngữ khác và sử dụng tính năng này, thì cần lưu ý điểm này.

Timeout kết nối

Khi client gRPC thiết lập kết nối với server, mặc định là bất đồng bộ, nếu thiết lập kết nối thất bại sẽ chỉ trả về một Client rỗng. Nếu muốn kết nối diễn ra đồng bộ, có thể sử dụng grpc.WithBlock() để chặn chờ đợi khi kết nối chưa thiết lập thành công.

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Nếu muốn kiểm soát một thời gian timeout, chỉ cần truyền vào một TimeoutContext, sử dụng grpc.DialContext để thay thế gprc.Dial để truyền context.

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Như vậy, nếu kết nối thiết lập timeout, sẽ trả về error

context deadline exceeded

Ở server cũng có thể thiết lập timeout kết nối, thiết lập một thời gian timeout khi thiết lập kết nối mới với client, mặc định là 120 giây, nếu không thiết lập kết nối thành công trong thời gian quy định, server sẽ chủ động ngắt kết nối.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout vẫn đang trong giai đoạn thử nghiệm, API trong tương lai có thể bị sửa đổi hoặc xóa.

Timeout yêu cầu

Khi client gRPC khởi tạo yêu cầu, tham số đầu tiên là kiểu Context, tương tự, nếu muốn thêm một thời gian timeout cho yêu cầu RPC, chỉ cần truyền vào một TimeoutContext là được

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Xử lý logic timeout
}

Thông qua xử lý của gRPC, thời gian timeout được truyền đến server, trong quá trình truyền nó tồn tại dưới dạng trường trong khung, trong go nó tồn tại dưới dạng context, cứ như vậy truyền đi trong toàn bộ liên kết. Trong quá trình truyền liên kết, không khuyến nghị sửa đổi thời gian timeout, cụ thể nên thiết lập thời gian timeout bao lâu khi yêu cầu, đây nên là vấn đề mà upstream nhất cần cân nhắc.

Xác thực ủy quyền

Trong lĩnh vực microservices, mỗi dịch vụ đều cần xác thực danh tính người dùng và quyền cho yêu cầu, nếu giống như ứng dụng monolithic, mỗi dịch vụ đều phải tự triển khai một bộ logic xác thực, điều này rõ ràng là không thực tế. Vì vậy cần một dịch vụ xác thực và ủy quyền thống nhất, và các giải pháp phổ biến là sử dụng OAuth2, Distributed Session, và JWT, trong đó OAuth2 được sử dụng rộng rãi nhất,一度 đã trở thành tiêu chuẩn ngành, loại token phổ biến nhất của OAuth2 là JWT. Dưới đây là sơ đồ luồng chế độ mã ủy quyền OAuth2, quy trình cơ bản như hình.

Truyền tải an toàn

Đăng ký và khám phá dịch vụ

Trước khi client gọi dịch vụ chỉ định của server, cần biết ip và port của server, trong các trường hợp trước đó địa chỉ server đều được viết cứng. Trong môi trường mạng thực tế không phải lúc nào cũng ổn định như vậy, một số dịch vụ có thể offline do lỗi và không thể truy cập, cũng có thể do phát triển business tiến hành di chuyển máy dẫn đến địa chỉ thay đổi, trong những trường hợp này không thể sử dụng địa chỉ tĩnh để truy cập dịch vụ, và những vấn đề động này là những gì đăng ký và khám phá dịch vụ cần giải quyết, khám phá dịch vụ chịu trách nhiệm giám sát thay đổi địa chỉ dịch vụ và cập nhật, đăng ký dịch vụ chịu trách nhiệm告诉外界 địa chỉ của mình. Trong gRPC, cung cấp chức năng khám phá dịch vụ cơ bản, và hỗ trợ mở rộng và tùy chỉnh.

Không thể sử dụng địa chỉ tĩnh, có thể sử dụng một số tên cụ thể để thay thế, ví dụ như trình duyệt thông qua phân giải DNS để lấy địa chỉ từ tên miền, tương tự, khám phá dịch vụ mặc định của gRPC là thông qua DNS để thực hiện, sửa tệp host local, thêm ánh xạ sau

127.0.0.1 example.grpc.com

Sau đó sửa địa chỉ Dial trong ví dụ helloworld thành tên miền tương ứng

go
func main() {
  // Thiết lập kết nối, không có xác minh mã hóa
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Tạo client
  client := hello2.NewSayHelloClient(conn)
  // Gọi từ xa
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Cũng có thể thấy đầu ra bình thường

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

Trong gRPC, loại tên này phải tuân thủ cú pháp URI được định nghĩa trong RFC 3986, định dạng là

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

URI trong ví dụ trên là dạng sau, vì mặc định hỗ trợ dns nên bỏ qua scheme ở prefix.

dns:example.grpc.com:8080

Ngoài ra gRPC còn mặc định hỗ trợ Unix domain sockets, còn đối với các cách khác, cần chúng ta tùy chỉnh thực hiện theo mở rộng của gRPC, vì vậy cần triển khai một bộ phân tích cú pháp tùy chỉnh resolver.Resolver, resolver chịu trách nhiệm giám sát cập nhật địa chỉ đích và cấu hình dịch vụ.

go
type Resolver interface {
    // gRPC sẽ gọi ResolveNow để thử phân giải lại tên đích. Đây chỉ là một gợi ý, nếu không cần resolver có thể bỏ qua nó, phương thức này có thể được gọi đồng thời
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC yêu cầu chúng ta truyền vào một bộ tạo Resolver, tức là resolver.Builder, nó chịu trách nhiệm tạo phiên bản Resolver.

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Phương thức Scheme của Builder trả về loại Scheme mà nó chịu trách nhiệm phân tích, ví dụ như dnsBuilder mặc định nó trả về là dns, bộ tạo khi khởi tạo nên sử dụng resolver.Register đăng ký vào Builder toàn cục, hoặc làm options, sử dụng grpc.WithResolver truyền vào ClientConn nội bộ, ưu tiên của cái sau cao hơn cái trước.

Hình trên mô tả đơn giản quy trình làm việc của resolver, tiếp theo sẽ demo cách tùy chỉnh resolver

Phân tích dịch vụ tùy chỉnh

Dưới đây viết một bộ phân tích cú pháp tùy chỉnh, cần một bộ tạo phân tích cú pháp tùy chỉnh để tạo.

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Ở đây bắt buộc phải updatestate, nếu không sẽ deadlock
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Cấu hình, loadBalancingPolicy chỉ chiến lược cân bằng tải
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Bộ phân tích cú pháp tùy chỉnh là truyền địa chỉ khớp trong map vào updatestate, đồng thời chỉ định chiến lược cân bằng tải, round_robin có nghĩa là luân phiên.

go
// Cấu trúc service config như sau
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

Code client như sau

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Đăng ký builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Thiết lập kết nối, không có xác minh mã hóa
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Tạo client
  client := hello2.NewSayHelloClient(conn)
     // Gọi một lần mỗi giây
  for range time.Tick(time.Second) {
    // Gọi từ xa
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Bình thường, quy trình nên là server đăng ký dịch vụ của chính mình với trung tâm đăng ký, sau đó client lấy danh sách dịch vụ từ trung tâm đăng ký rồi khớp, ở đây map truyền vào là một trung tâm đăng ký mô phỏng, dữ liệu là tĩnh nên bỏ qua phần đăng ký dịch vụ, chỉ còn lại khám phá dịch vụ. Target mà client sử dụng là hello:myworld, hello là scheme tùy chỉnh, myworld là tên dịch vụ, sau khi được bộ phân tích cú pháp tùy chỉnh phân tích, sẽ được địa chỉ thực tế 127.0.0.1:8080, trong trường hợp thực tế, để làm cân bằng tải, một tên dịch vụ có thể khớp nhiều địa chỉ thực tế, vì vậy đây là lý do tại sao tên dịch vụ tương ứng là một slice, ở đây mở hai server, chiếm các cổng khác nhau, chiến lược cân bằng tải là luân phiên, đầu ra server lần lượt như sau, thông qua thời gian yêu cầu có thể thấy chiến lược cân bằng tải thực sự đang hoạt động, nếu không chỉ định chiến lược thì mặc định chỉ chọn dịch vụ đầu tiên.

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

Trung tâm đăng ký thực chất là lưu trữ tập hợp ánh xạ giữa tên dịch vụ đăng ký và địa chỉ dịch vụ thực tế, chỉ cần là middleware lưu trữ dữ liệu đều có thể đáp ứng điều kiện, thậm chí lấy mysql làm trung tâm đăng ký cũng không phải không được (chắc không ai làm vậy). Nói chung trung tâm đăng ký đều là lưu trữ KV, redis là một lựa chọn rất tốt, nhưng nếu sử dụng redis làm trung tâm đăng ký thì chúng ta cần tự triển khai nhiều logic, như kiểm tra nhịp tim dịch vụ, dịch vụ offline, điều phối dịch vụ等等, vẫn khá phức tạp, mặc dù redis có một số ứng dụng trong khía cạnh này nhưng ít. Đúng như câu nói việc chuyên môn giao cho người chuyên môn làm, có nhiều bên làm nổi tiếng trong khía cạnh này: Zookeeper, Consul, Eureka, ETCD, Nacos等等.

Có thể đến 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) để tìm hiểu một số khác biệt của các middleware này.

Kết hợp consul

Trường hợp kết hợp sử dụng consul có thể đến consul

Cân bằng tải

Golang by www.golangdev.cn edit