gRPC

Gọi thủ tục từ xa rpc có lẽ là điểm bắt buộc phải học trong microservices, trong quá trình học sẽ gặp đủ loại khung rpc, nhưng trong lĩnh vực go, hầu hết các khung rpc đều dựa trên gRPC, và nó còn trở thành một giao thức cơ bản trong lĩnh vực cloud native, tại sao chọn nó, câu trả lời chính thức như sau:
gRPC là một khung gọi thủ tục từ xa (Remote Process Call, RPC) mã nguồn mở hiệu năng cao hiện đại, có thể chạy trong bất kỳ môi trường nào. Nó có thể kết nối hiệu quả các dịch vụ trong và giữa các trung tâm dữ liệu với sự hỗ trợ của cân bằng tải có thể cắm, theo dõi, kiểm tra sức khỏe và xác thực. Nó cũng phù hợp cho điện toán phân tán last-mile để kết nối các thiết bị, ứng dụng di động và trình duyệt với các dịch vụ backend.
Trang web chính thức: gRPC
Tài liệu chính thức: Documentation | gRPC
Hướng dẫn kỹ thuật gRPC: Basics tutorial | Go | gRPC
Trang web ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)
Nó cũng là một dự án mã nguồn mở thuộc quỹ CNCF, CNCF viết tắt của CLOUD NATIVE COMPUTING FOUNDATION

Đặc điểm
Định nghĩa dịch vụ đơn giản
Sử dụng Protocol Buffers để định nghĩa dịch vụ, đây là một bộ công cụ và ngôn ngữ tuần tự hóa nhị phân mạnh mẽ.
Khởi động và mở rộng quy mô rất nhanh chóng
Chỉ cần một dòng code để cài đặt runtime và môi trường phát triển, chỉ cần vài giây để mở rộng đến hàng triệu RPC mỗi giây
Đa ngôn ngữ, đa nền tảng
Tự động tạo stub dịch vụ client và server cho các ngôn ngữ và nền tảng khác nhau
Luồng hai chiều và ủy quyền tích hợp
Luồng hai chiều dựa trên HTTP/2 và ủy quyền xác thực có thể cắm
Mặc dù GRPC không phụ thuộc vào ngôn ngữ, nhưng nội dung của trang web này phần lớn liên quan đến go, vì vậy bài viết này cũng sẽ sử dụng go làm ngôn ngữ chính để giải thích, các trình biên dịch pb và trình tạo được sử dụng sau này nếu là người dùng ngôn ngữ khác có thể tự tìm trên trang web Protobuf. Để thuận tiện, sẽ bỏ qua quá trình tạo dự án.
TIP
Bài viết này tham khảo nội dung từ các bài viết sau:
写给 go 开发者的 gRPC 教程-protobuf 基础 - 掘金 (juejin.cn)
gRPC 中的 Metadata - 熊喵君的博客 | PANDAYCHEN
Cài đặt phụ thuộc
Trước tiên tải xuống trình biên dịch Protocol Buffer, địa chỉ tải xuống: Releases · protocolbuffers/protobuf (github.com)

Chọn hệ thống và phiên bản phù hợp với tình hình của bạn, sau khi tải xuống cần thêm thư mục bin vào biến môi trường.
Sau đó còn cần tải xuống trình tạo code, trình biên dịch dùng để tạo code tuần tự hóa ngôn ngữ tương ứng từ tệp proto, trình tạo dùng để tạo code business.
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latestTạo một dự án trống, tên ở đây lấy là grpc_learn, sau đó thêm các phụ thuộc sau
$ go get google.golang.org/grpcCuối cùng xem phiên bản, xem có thực sự cài đặt thành công không
$ protoc --version
libprotoc 23.4
$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1
$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0Hello World
Cấu trúc dự án
Dưới đây sẽ demo với một ví dụ Hello World, tạo cấu trúc dự án như sau.
grpc_learn\helloworld
|
+---client
| main.go
|
+---hello
|
|
+---pb
| hello.proto
|
\---server
main.goĐịnh nghĩa tệp protobuf
Trong đó, trong pb/hello.proto, nhập nội dung sau, đây là một ví dụ khá đơn giản, nếu không biết cú pháp protoc, vui lòng xem tài liệu liên quan.
syntax = "proto3";
// . biểu thị sinh ra trực tiếp trong đường dẫn đầu ra, hello là tên gói
option go_package = ".;hello";
// Yêu cầu
message HelloReq {
string name = 1;
}
// Phản hồi
message HelloRep {
string msg = 1;
}
// Định nghĩa dịch vụ
service SayHello {
rpc Hello(HelloReq) returns (HelloRep) {}
}Tạo code
Sau khi viết xong, sử dụng trình biên dịch protoc để tạo code liên quan đến tuần tự hóa dữ liệu, sử dụng trình tạo để tạo code rpc
$ protoc -I ./pb \
--go_out=./hello ./pb/*.proto\
--go-grpc_out=./hello ./pb/*.protoLúc này có thể thấy thư mục hello đã sinh ra tệp hello.pb.go và hello_grpc.pb.go, xem qua hello.pb.go có thể thấy message mà chúng ta đã định nghĩa
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Trường đã định nghĩa
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
type HelloRep struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Trường đã định nghĩa
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}Trong hello_grpc.pb.go có thể thấy dịch vụ mà chúng ta đã định nghĩa
type SayHelloServer interface {
Hello(context.Context, *HelloReq) (*HelloRep, error)
mustEmbedUnimplementedSayHelloServer()
}
// Sau này nếu chúng ta tự triển khai giao diện dịch vụ, phải nhúng cấu trúc này, không cần triển khai phương thức mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}
// Mặc định trả về nil
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
// Ràng buộc giao diện
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}
type UnsafeSayHelloServer interface {
mustEmbedUnimplementedSayHelloServer()
}Viết server
Viết code sau trong server/main.go
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_learn/server/protoc"
"log"
"net"
)
type GrpcServer struct {
pb.UnimplementedSayHelloServer
}
func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
log.Printf("received grpc req: %+v", req.String())
return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}
func main() {
// Lắng nghe cổng
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
// Tạo máy chủ gprc
server := grpc.NewServer()
// Đăng ký dịch vụ
pb.RegisterSayHelloServer(server, &GrpcServer{})
// Chạy
err = server.Serve(listen)
if err != nil {
panic(err)
}
}Viết client
Nhập code sau trong client/main.go
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc_learn/server/protoc"
"log"
)
func main() {
// Thiết lập kết nối, không có xác minh mã hóa
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
// Tạo client
client := pb.NewSayHelloClient(conn)
// Gọi từ xa
helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}Chạy
Trước tiên chạy server, sau đó chạy client, server xuất ra như sau
2023/07/16 16:26:51 received grpc req: name:"client"Client xuất ra như sau
2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"Trong ví dụ này, sau khi client thiết lập kết nối, khi gọi phương thức từ xa cũng giống như gọi phương thức local, trực tiếp truy cập phương thức Hello của client và lấy kết quả, đây là một ví dụ GRPC đơn giản nhất, nhiều khung mã nguồn mở cũng đều đóng gói quy trình này.
bufbuild
Trong ví dụ trên, sử dụng lệnh để tạo code trực tiếp, nếu sau này có nhiều plugin thì lệnh sẽ rất phức tạp, lúc này có thể sử dụng công cụ để quản lý tệp protobuf, có một công cụ quản lý mã nguồn mở bufbuild/buf.
Địa chỉ mã nguồn mở: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)
Địa chỉ tài liệu: Buf - Install the Buf CLI
Đặc điểm
- Quản lý BSR
- Linter
- Tạo code
- Định dạng
- Quản lý phụ thuộc
Với công cụ này có thể quản lý tệp protobuf rất thuận tiện.
Tài liệu cung cấp rất nhiều cách cài đặt, có thể tự chọn. Nếu local đã cài đặt môi trường go, có thể sử dụng go install để cài đặt
$ go install github.com/bufbuild/buf/cmd/buf@latestSau khi cài đặt xong xem phiên bản
$ buf --version
1.24.0Đến thư mục helloworld/pb, thực hiện lệnh sau để tạo một module quản lý tệp pb.
$ buf mod init
$ ls
buf.yaml hello.protoNội dung tệp buf.yaml mặc định như sau
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULTSau đó đến thư mục helloworld/, tạo buf.gen.yaml, nhập nội dung sau
version: v1
plugins:
- plugin: go
out: hello
opt:
- plugin: go-grpc
out: hello
opt:Sau đó thực hiện lệnh để tạo code
$ buf generateSau khi hoàn thành có thể thấy các tệp đã tạo, tất nhiên buf không chỉ có chức năng này, các chức năng khác có thể tự học trong tài liệu.
RPC luồng
Cách gọi grpc có hai loại lớn, Unary RPC và Stream RPC. Ví dụ trong Hello World là một Unary RPC điển hình.

Unary rpc (hoặc gọi là ordinary rpc dễ hiểu hơn, thực sự không biết dịch unary này thế nào) sử dụng giống như http thông thường, client yêu cầu, server trả về dữ liệu, cách hỏi đáp. Còn yêu cầu và phản hồi của Stream RPC đều có thể là luồng, như hình dưới

Khi sử dụng yêu cầu luồng, chỉ trả về một lần phản hồi, client có thể gửi tham số nhiều lần cho server thông qua luồng, server không cần đợi tất cả tham số đều nhận xong mới xử lý như Unary RPC, logic xử lý cụ thể có thể do server quyết định. Bình thường, chỉ client có thể chủ động đóng yêu cầu luồng, một khi luồng bị đóng, yêu cầu RPC hiện tại cũng sẽ kết thúc.
Khi sử dụng phản hồi luồng, chỉ gửi một lần tham số, server có thể gửi dữ liệu nhiều lần cho client thông qua luồng, client không cần đợi nhận xong tất cả dữ liệu mới xử lý như Unary RPC, logic xử lý cụ thể có thể do client tự quyết định. Trong yêu cầu bình thường, chỉ server có thể chủ động đóng phản hồi luồng, một khi luồng bị đóng, yêu cầu RPC hiện tại cũng sẽ kết thúc.
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}Cũng có thể chỉ phản hồi là luồng (Server-Streaming RPC)
service MessageService {
rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}Hoặc yêu cầu và phản hồi đều là luồng (Bi-directional-Streaming RPC)
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}Luồng một chiều
Dưới đây qua một ví dụ để demo thao tác luồng một chiều, trước tiên tạo cấu trúc dự án như sau
grpc_learn\server_client_stream
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.goNội dung message.proto như sau
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service MessageService {
rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}Tạo code thông qua buf
$ buf generateỞ đây demo là dịch vụ tin nhắn, receiveMessage nhận một tên người dùng chỉ định, kiểu là chuỗi, trả về luồng tin nhắn, sendMessage nhận luồng tin nhắn, trả về số lượng tin nhắn đã gửi thành công, kiểu là số nguyên 64 bit. Tiếp theo tạo server/message_service.go, tự triển khai dịch vụ được tạo mặc định
package main
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
)
type MessageService struct {
message.UnimplementedMessageServiceServer
}
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}Có thể thấy trong tham số của nhận tin nhắn và gửi tin nhắn đều có một giao diện đóng gói luồng
type MessageService_ReceiveMessageServer interface {
// Gửi tin nhắn
Send(*Message) error
grpc.ServerStream
}
type MessageService_SendMessageServer interface {
// Gửi giá trị trả về và đóng kết nối
SendAndClose(*wrapperspb.StringValue) error
// Nhận tin nhắn
Recv() (*Message, error)
grpc.ServerStream
}Chúng đều nhúng giao diện gprc.ServerStream
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}Có thể thấy, RPC luồng không giống như Unary RPC那样 tham số đầu vào và giá trị trả về đều có thể thể hiện rõ ràng trên chữ ký hàm, những phương thức này nhìn thoáng qua không thể看出 tham số đầu vào và giá trị trả về là kiểu gì, cần gọi Stream được truyền vào để hoàn thành truyền tải luồng, tiếp theo bắt đầu viết logic cụ thể của server. Khi viết logic server, sử dụng một sync.map để mô phỏng hàng đợi tin nhắn, khi client gửi yêu cầu ReceiveMessage, server liên tục trả về tin nhắn mà client muốn thông qua phản hồi luồng, cho đến khi hết thời gian chờ thì ngắt yêu cầu. Khi client yêu cầu SendMessage, liên tục gửi tin nhắn thông qua yêu cầu luồng, server liên tục đưa tin nhắn vào hàng đợi, cho đến khi client chủ động ngắt yêu cầu, và trả về cho client số lượng tin nhắn đã gửi.
package main
import (
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"sync"
"time"
)
// Một hàng đợi tin nhắn mô phỏng
var messageQueue sync.Map
type MessageService struct {
message.UnimplementedMessageServiceServer
}
// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Nhận tin nhắn của người dùng chỉ định
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
timer := time.NewTimer(time.Second * 5)
for {
time.Sleep(time.Millisecond * 100)
select {
case <-timer.C:
log.Printf("5 秒钟内没有收到%s的消息,关闭连接", user.GetValue())
return nil
default:
value, ok := messageQueue.Load(user.GetValue())
if !ok {
messageQueue.Store(user.GetValue(), []*message.Message{})
continue
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue
}
// Lấy tin nhắn
msg := queue[0]
// Gửi tin nhắn cho client thông qua truyền tải luồng
err := recvServer.Send(msg)
log.Printf("receive %+v\n", msg)
if err != nil {
return err
}
queue = queue[1:]
messageQueue.Store(user.GetValue(), queue)
timer.Reset(time.Second * 5)
}
}
}
// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Gửi tin nhắn cho người dùng chỉ định
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
count := 0
for {
// Nhận tin nhắn từ client
msg, err := sendServer.Recv()
if errors.Is(err, io.EOF) {
return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
}
if err != nil {
return err
}
log.Printf("send %+v\n", msg)
value, ok := messageQueue.Load(msg.From)
if !ok {
messageQueue.Store(msg.From, []*message.Message{msg})
continue
}
queue := value.([]*message.Message)
queue = append(queue, msg)
// Đưa tin nhắn vào hàng đợi tin nhắn
messageQueue.Store(msg.From, queue)
count++
}
}Client mở hai goroutine, một goroutine dùng để gửi tin nhắn, một goroutine khác dùng để nhận tin nhắn, tất nhiên cũng có thể vừa gửi vừa nhận, code như sau.
package main
import (
"context"
"errors"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"time"
)
var Client message.MessageServiceClient
func main() {
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicln(err)
}
defer dial.Close()
Client = message.NewMessageServiceClient(dial)
log.SetPrefix("client\t")
msgTask := task.NewTask(func(err error) {
log.Panicln(err)
})
ctx := context.Background()
// Yêu cầu nhận tin nhắn
msgTask.AddJobs(func() {
receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
if err != nil {
log.Panicln(err)
}
for {
recv, err := receiveMessageStream.Recv()
if errors.Is(err, io.EOF) {
log.Println("暂无消息,关闭连接")
break
} else if err != nil {
break
}
log.Printf("receive %+v", recv)
}
})
msgTask.AddJobs(func() {
from := "jack"
to := "mike"
sendMessageStream, err := Client.SendMessage(ctx)
if err != nil {
log.Panicln(err)
}
msgs := []string{
"在吗",
"下午有没有时间一起打游戏",
"那行吧,以后有时间一起约",
"就这个周末应该可以吧",
"那就这么定了",
}
for _, msg := range msgs {
time.Sleep(time.Second)
sendMessageStream.Send(&message.Message{
From: from,
Content: msg,
To: to,
})
}
// Tin nhắn gửi xong, chủ động đóng yêu cầu và lấy giá trị trả về
recv, err := sendMessageStream.CloseAndRecv()
if err != nil {
log.Println(err)
} else {
log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
}
})
msgTask.Run()
}Sau khi thực hiện server xuất ra như sau
server 2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:33 5 秒钟内没有收到 jack 的消息,关闭连接Client xuất ra như sau
client 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client 2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client 2023/07/18 16:28:33 暂无消息,关闭连接Qua ví dụ này có thể thấy xử lý yêu cầu RPC luồng một chiều cho dù là client hay server đều phức tạp hơn unary rpc, tuy nhiên RPC luồng hai chiều còn phức tạp hơn chúng.
Luồng hai chiều
RPC luồng hai chiều, tức là yêu cầu và phản hồi đều là luồng, tương đương với việc kết hợp hai dịch vụ trong ví dụ trên thành một. Đối với RPC luồng而言, yêu cầu đầu tiên chắc chắn do client khởi xướng, sau đó client có thể gửi tham số yêu cầu bất cứ lúc nào thông qua luồng, server cũng có thể trả về dữ liệu bất cứ lúc nào thông qua luồng, bất kể bên nào chủ động đóng luồng, yêu cầu hiện tại đều sẽ kết thúc.
TIP
Nội dung sau này trừ khi cần thiết, sẽ trực tiếp bỏ qua mô tả code tạo code pb và tạo client server rpc
Trước tiên tạo cấu trúc dự án như sau
bi_stream\
| buf.gen.yaml
|
+---client
| main.go
|
+---message
| message.pb.go
| message_grpc.pb.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.go
message_service.goNội dung message.proto như sau
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service ChatService {
rpc chat(stream Message) returns (stream Message);
}Trong logic server, sau khi thiết lập kết nối, mở hai goroutine, một goroutine chịu trách nhiệm nhận tin nhắn, một chịu trách nhiệm gửi tin nhắn, logic xử lý cụ thể tương tự ví dụ trước, nhưng lần này bỏ qua logic判定 thời gian chờ.
package main
import (
"github.com/dstgo/task"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"sync"
"time"
)
// MessageQueue Hàng đợi tin nhắn mô phỏng
var MessageQueue sync.Map
type ChatService struct {
message.UnimplementedChatServiceServer
}
// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Dịch vụ chat, logic server chúng ta sử dụng đa goroutine để xử lý
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, _ := metadata.FromIncomingContext(chatServer.Context())
from := md.Get("from")[0]
defer log.Println(from, "end chat")
var chatErr error
chatCh := make(chan error)
// Tạo hai goroutine, một nhận tin nhắn, một gửi tin nhắn
chatTask := task.NewTask(func(err error) {
chatErr = err
})
// Goroutine nhận tin nhắn
chatTask.AddJobs(func() {
for {
msg, err := chatServer.Recv()
log.Printf("receive %+v err %+v\n", msg, err)
if err != nil {
chatErr = err
chatCh <- err
break
}
value, ok := MessageQueue.Load(msg.To)
if !ok {
MessageQueue.Store(msg.To, []*message.Message{msg})
} else {
queue := value.([]*message.Message)
queue = append(queue, msg)
MessageQueue.Store(msg.To, queue)
}
}
})
// Goroutine gửi tin nhắn
chatTask.AddJobs(func() {
Send:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-chatCh:
log.Println(from, "close send")
break Send
default:
value, ok := MessageQueue.Load(from)
if !ok {
value = []*message.Message{}
MessageQueue.Store(from, value)
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue Send
}
msg := queue[0]
queue = queue[1:]
MessageQueue.Store(from, queue)
err := chatServer.Send(msg)
log.Printf("send %+v\n", msg)
if err != nil {
chatErr = err
break Send
}
}
}
})
chatTask.Run()
return chatErr
}Trong logic client, mở hai goroutine con để mô phỏng quá trình chat của hai người, trong hai goroutine con lại分别 có hai goroutine cháu chịu trách nhiệm gửi và nhận tin nhắn (logic client không đảm bảo thứ tự gửi nhận tin nhắn chat của hai người là đúng, chỉ là một ví dụ đơn giản về gửi và nhận của hai bên)
package main
import (
"context"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"time"
)
var Client message.ChatServiceClient
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer dial.Close()
if err != nil {
log.Panicln(err)
}
Client = message.NewChatServiceClient(dial)
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
})
chatTask.AddJobs(func() {
NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
})
chatTask.Run()
}
func NewChat(from string, to string, contents ...string) {
ctx := context.Background()
mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
chat, err := Client.Chat(mdCtx)
defer log.Println("end chat", from)
if err != nil {
log.Panicln(err)
}
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
for _, content := range contents {
time.Sleep(time.Second)
chat.Send(&message.Message{
From: from,
Content: content,
To: to,
})
}
// Gửi xong tin nhắn thì đóng kết nối
time.Sleep(time.Second * 5)
chat.CloseSend()
})
// Goroutine nhận tin nhắn
chatTask.AddJobs(func() {
for {
msg, err := chat.Recv()
log.Printf("receive %+v\n", msg)
if err != nil {
log.Println(err)
break
}
}
})
chatTask.Run()
}Bình thường, server xuất ra
server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chatBình thường, client xuất ra (có thể thấy thứ tự logic tin nhắn là lộn xộn)
client 2023/07/19 17:26:24 receive from:"jack" content:"你好" to:"mike"
client 2023/07/19 17:26:24 receive from:"mike" content:"你好" to:"jack"
client 2023/07/19 17:26:25 receive from:"mike" content:"没有" to:"jack"
client 2023/07/19 17:26:25 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike"
client 2023/07/19 17:26:26 receive from:"jack" content:"好吧" to:"mike"
client 2023/07/19 17:26:26 receive from:"mike" content:"没时间,你找别人吧" to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mikeQua ví dụ có thể thấy, logic xử lý luồng hai chiều cho dù là client hay server đều phức tạp hơn luồng một chiều, cần kết hợp đa goroutine mở tác vụ bất đồng bộ mới có thể xử lý logic tốt hơn.
metadata
Bản chất metadata là một map, value của nó là một slice chuỗi, giống như header của http1, và vai trò của nó trong gRPC cũng tương tự như http header, cung cấp một số thông tin về lần gọi RPC này, đồng thời vòng đời của metadata đi theo toàn bộ quá trình gọi rpc, gọi kết thúc thì vòng đời của nó cũng kết thúc.
Nó chủ yếu được truyền và lưu trữ thông qua context trong gRPC, nhưng gRPC cung cấp gói metadata, bên trong có khá nhiều hàm tiện lợi để đơn giản hóa thao tác, không cần chúng ta thao tác context thủ công. Kiểu tương ứng của metadata trong gRPC là metadata.MD, như dưới đây.
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]stringChúng ta có thể sử dụng hàm metadata.New để tạo, nhưng trước khi tạo có vài điểm cần lưu ý
func New(m map[string]string) MDmetadata có hạn chế về tên key, chỉ có thể là các ký tự bị hạn chế bởi các quy tắc sau:
- Ký tự ASCII
- Số: 0-9
- Chữ thường: a-z
- Chữ hoa: A-Z
- Ký tự đặc biệt: -_.
TIP
Trong metadata, chữ hoa sẽ được chuyển thành chữ thường, tức là sẽ chiếm cùng một key, value cũng sẽ bị ghi đè.
TIP
Key bắt đầu bằng grpc- là key nội bộ được grpc保留 sử dụng, nếu sử dụng key loại này có thể dẫn đến một số lỗi.
Tạo thủ công
Có rất nhiều cách để tạo metadata, ở đây giới thiệu hai cách phổ biến nhất để tạo metadata thủ công, cách thứ nhất là sử dụng hàm metadata.New, trực tiếp truyền vào một map.
func New(m map[string]string) MDmd := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})Cách thứ hai là metadata.Pairs, truyền vào slice chuỗi có độ dài chẵn, sẽ tự động phân tích thành cặp key-value.
func Pairs(kv ...string) MDmd := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")Cũng có thể sử dụng metadata.join để hợp nhất nhiều metadata
func Join(mds ...MD) MDmd1 := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)Sử dụng server
Lấy metadata
Server lấy metadata có thể sử dụng hàm metadata.FromIncomingContext để lấy
func FromIncomingContext(ctx context.Context) (MD, bool)Đối với unary rpc, trong tham số của service sẽ có một tham số context, trực tiếp lấy metadata từ đó
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
md, b := metadata.FromIncomingContext(ctx)
...
}Đối với rpc luồng, trong tham số của service sẽ có một đối tượng luồng, thông qua nó có thể lấy context của luồng
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, b := metadata.FromIncomingContext(chatServer.Context())
...
}Gửi metadata
Gửi metadata có thể sử dụng hàm grpc.SendHeader
func SendHeader(ctx context.Context, md metadata.MD) errorHàm này nhiều nhất chỉ được gọi một lần, sẽ không có hiệu lực nếu sử dụng sau khi xảy ra một số sự kiện khiến header được gửi tự động. Trong một số trường hợp, nếu không muốn gửi header trực tiếp, có thể sử dụng hàm grpc.SetHeader.
func SetHeader(ctx context.Context, md metadata.MD) errorNếu hàm này được gọi nhiều lần, sẽ hợp nhất metadata được truyền vào mỗi lần và gửi cho client trong các trường hợp sau
- Khi
gprc.SendHeadervàServerStream.SendHeaderđược gọi - Khi handler của unary rpc trả về
- Khi gọi
Stream.SendMsgcủa đối tượng luồng trong rpc luồng - Khi trạng thái yêu cầu rpc变为
send out, trường hợp này có thể là yêu cầu rpc thành công, hoặc là xảy ra lỗi.
Đối với rpc luồng, khuyến nghị sử dụng phương thức SendHeader và SetHeader của đối tượng luồng.
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
...
}TIP
Trong quá trình sử dụng sẽ thấy hai chức năng Header và Trailer差不多, nhưng sự khác biệt chính của chúng là thời điểm gửi, có thể không cảm nhận được trong unary rpc, nhưng sự khác biệt này đặc biệt rõ ràng trong RPC luồng, vì Header trong RPC luồng có thể gửi Header mà không cần đợi yêu cầu kết thúc.前面 đã đề cập Header sẽ được gửi trong các trường hợp đặc biệt, còn Trailer chỉ được gửi sau khi toàn bộ yêu cầu RPC kết thúc, trước đó, trailer lấy được đều là rỗng.
Sử dụng client
Lấy metadata
Client muốn lấy header phản hồi có thể thực hiện thông qua grpc.Header và grpc.Trailer
func Header(md *metadata.MD) CallOptionfunc Trailer(md *metadata.MD) CallOptionNhưng cần lưu ý, không thể lấy trực tiếp, có thể thấy giá trị trả về của hai hàm trên là CallOption, tức là được truyền vào làm tham số option khi khởi tạo yêu cầu RPC.
// Khai báo md dùng để nhận giá trị
var header, trailer metadata.MD
// Truyền vào option khi gọi yêu cầu rpc
res, err := client.SomeRPC(
ctx,
data,
grpc.Header(&header),
grpc.Trailer(&trailer)
)Sau khi yêu cầu hoàn tất, sẽ ghi giá trị vào md được truyền vào. Đối với rpc luồng, có thể lấy trực tiếp thông qua đối tượng luồng trả về khi khởi tạo yêu cầu
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
...
}stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()Gửi metadata
Client muốn gửi metadata rất đơn giản, đã đề cập trước đó biểu hiện của metadata là valueContext, kết hợp metadata vào context, sau đó truyền context khi yêu cầu là được, gói metadata cung cấp hai hàm để tiện tạo context.
func NewOutgoingContext(ctx context.Context, md MD) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// Rpc luồng
stream,err := client.StreamRPC(outgoingContext)Nếu ctx ban đầu đã có metadata rồi, lại sử dụng NewOutgoingContext sẽ ghi đè dữ liệu trước đó, để tránh trường hợp này, có thể sử dụng hàm dưới đây, nó không ghi đè mà là hợp nhất dữ liệu.
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")
// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// Rpc luồng
stream,err := client.StreamRPC(appendContext)Interceptor
Interceptor của gRPC giống như Middleware trong gin, đều là để làm một số công việc đặc biệt trước hoặc sau yêu cầu mà không ảnh hưởng đến logic business của chính nó. Trong gRPC, interceptor có hai loại lớn, interceptor server và interceptor client, phân loại theo loại yêu cầu thì có interceptor RPC unary và interceptor RPC luồng, hình dưới

Để có thể hiểu rõ hơn về interceptor, dưới đây sẽ mô tả theo một ví dụ rất đơn giản.
grpc_learn\interceptor
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| person.proto
|
+---person
| person.pb.go
| person_grpc.pb.go
|
\---server
main.goNội dung person.proto như sau
syntax = "proto3";
option go_package = ".;person";
import "google/protobuf/wrappers.proto";
message personInfo {
string name = 1;
int64 age = 2;
string address = 3;
}
service person {
rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}Code server như sau, logic toàn là nội dung trước, khá đơn giản không nói lại.
package main
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"io"
"sync"
)
// Lưu trữ dữ liệu
var personData sync.Map
type PersonService struct {
person.UnimplementedPersonServer
}
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, person.PersonNotFoundErr
}
personInfo := value.(*person.PersonInfo)
return personInfo, nil
}
func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
count := 0
for {
personInfo, err := personStream.Recv()
if errors.Is(err, io.EOF) {
return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
} else if err != nil {
return err
}
personData.Store(personInfo.Name, personInfo)
count++
}
}Chặn server
Chặn yêu cầu rpc server có UnaryServerInterceptor và StreamServerInterceptor, kiểu cụ thể như sau
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorUnary RPC
Tạo interceptor Unary RPC, chỉ cần triển khai kiểu UnaryServerInterceptor là được, dưới đây là một ví dụ interceptor Unary RPC đơn giản, chức năng là xuất ra mỗi yêu cầu và phản hồi rpc.
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} Dữ liệu yêu cầu rpc
// param info *grpc.UnaryServerInfo Một số thông tin yêu cầu của lần Unary RPC này
// param unaryHandler grpc.UnaryHandler Handler cụ thể
// return resp interface{} Dữ liệu phản hồi rpc
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
resp, err = unaryHandler(ctx, req)
log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
return resp, err
}Đối với Unary RPC而言, interceptor chặn mỗi yêu cầu và phản hồi của RPC, tức là chặn giai đoạn yêu cầu và giai đoạn phản hồi của RPC, nếu interceptor trả về error thì yêu cầu này sẽ kết thúc.
Rpc luồng
Tạo interceptor RPC luồng, chỉ cần triển khai kiểu StreamServerInterceptor là được, dưới đây là một ví dụ interceptor RPC luồng đơn giản.
// StreamPersonLogInterceptor
// param srv interface{} Tương ứng với server do server triển khai
// param stream grpc.ServerStream Đối tượng luồng
// param info *grpc.StreamServerInfo Thông tin luồng
// param streamHandler grpc.StreamHandler Bộ xử lý
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
err := streamHandler(srv, stream)
log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
return err
}Đối với RPC luồng而言, interceptor chặn thời điểm mỗi phương thức Send và Recv của đối tượng luồng được gọi, nếu interceptor trả về error, sẽ không dẫn đến kết thúc yêu cầu RPC này, chỉ đại diện cho lần send hoặc recv này xảy ra lỗi.
Sử dụng interceptor
Để interceptor đã tạo có hiệu lực, cần truyền vào làm option khi tạo máy chủ gRPC, chính thức cũng cung cấp các hàm liên quan để sử dụng. Như dưới đây, có hàm thêm interceptor đơn, cũng có hàm thêm interceptor chuỗi.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
func StreamInterceptor(i StreamServerInterceptor) ServerOption
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptionTIP
Sử dụng lặp lại UnaryInterceptor sẽ ném ra panic như sau
panic: The unary server interceptor was already set and may not be reset.StreamInterceptor cũng vậy, còn interceptor chuỗi gọi lặp lại sẽ append vào cùng một chuỗi.
Ví dụ sử dụng như sau
package main
import (
"google.golang.org/grpc"
"grpc_learn/interceptor/person"
"log"
"net"
)
func main() {
log.SetPrefix("server ")
listen, err := net.Listen("tcp", "9090")
if err != nil {
log.Panicln(err)
}
server := grpc.NewServer(
// Thêm interceptor chuỗi
grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
)
person.RegisterPersonServer(server, &PersonService{})
server.Serve(listen)
}Chặn client
Interceptor client差不多 với server, một interceptor unary UnaryClientInterceptor, một interceptor luồng StreamClientInterceptor, kiểu cụ thể như sau.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)Unary RPC
Tạo interceptor client Unary RPC, triển khai UnaryClientInterceptor là được, dưới đây là một ví dụ đơn giản.
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string Tên phương thức
// param req interface{} Dữ liệu yêu cầu
// param reply interface{} Dữ liệu phản hồi
// param cc *grpc.ClientConn Đối tượng kết nối client
// param invoker grpc.UnaryInvoker Phương thức client cụ thể bị chặn
// param opts ...grpc.CallOption Cấu hình của yêu cầu này
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
err := invoker(ctx, method, req, reply, cc, opts...)
log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
return err
}Thông qua interceptor Unary RPC client, có thể lấy được dữ liệu yêu cầu và dữ liệu phản hồi của yêu cầu local cùng một số thông tin yêu cầu khác.
RPC luồng
Tạo một interceptor client RPC luồng, triển khai StreamClientInterceptor là được, dưới đây là một ví dụ.
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Thông tin mô tả đối tượng luồng
// param cc *grpc.ClientConn Đối tượng kết nối
// param method string Tên phương thức
// param streamer grpc.Streamer Đối tượng dùng để tạo đối tượng luồng
// param opts ...grpc.CallOption Cấu hình kết nối
// return grpc.ClientStream Đối tượng luồng client đã tạo
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return stream, err
}Thông qua interceptor client RPC luồng, chỉ có thể chặn được thời điểm client thiết lập kết nối với server tức là thời điểm tạo luồng, không thể chặn được mỗi lần đối tượng luồng client gửi và nhận tin nhắn, nhưng chúng ta đóng gói đối tượng luồng đã tạo trong interceptor là có thể thực hiện chặn gửi và nhận tin nhắn, như dưới đây
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Thông tin mô tả đối tượng luồng
// param cc *grpc.ClientConn Đối tượng kết nối
// param method string Tên phương thức
// param streamer grpc.Streamer Đối tượng dùng để tạo đối tượng luồng
// param opts ...grpc.CallOption Cấu hình kết nối
// return grpc.ClientStream Đối tượng luồng client đã tạo
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}
type ClientStreamInterceptorWrapper struct {
method string
desc *grpc.StreamDesc
grpc.ClientStream
}
func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
// Trước khi gửi tin nhắn
err := c.ClientStream.SendMsg(m)
// Sau khi gửi tin nhắn
log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
return err
}
func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
// Trước khi nhận tin nhắn
err := c.ClientStream.RecvMsg(m)
// Sau khi nhận tin nhắn
log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
return err
}Sử dụng interceptor
Khi sử dụng, tương tự như server cũng là bốn hàm công cụ thông qua option để thêm interceptor, phân thành interceptor đơn và interceptor chuỗi.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOptionTIP
Client sử dụng lặp lại WithUnaryInterceptor sẽ không ném ra panic, nhưng chỉ có cái cuối cùng có hiệu lực.
Dưới đây là một trường hợp sử dụng
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"log"
)
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}Đến đây, toàn bộ trường hợp đã được viết xong, đã đến lúc chạy thử xem kết quả như thế nào. Server xuất ra như sau
server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not foundClient xuất ra như sau
C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not foundCó thể thấy đầu ra của hai bên đều phù hợp với mong đợi, đạt được hiệu quả chặn, trường hợp này chỉ là một ví dụ rất đơn giản, sử dụng interceptor của gRPC có thể làm rất nhiều việc như ủy quyền, nhật ký, giám sát và các chức năng khác, có thể tự tạo wheel, cũng có thể sử dụng wheel có sẵn của cộng đồng mã nguồn mở, gRPC Ecosystem chuyên thu thập một loạt interceptor middleware của gRPC mã nguồn mở, địa chỉ: grpc-ecosystem/go-grpc-middleware.
Xử lý lỗi
Trước khi bắt đầu hãy xem một ví dụ, trong trường hợp interceptor trước đó, nếu người dùng không truy vấn được, sẽ trả về lỗi person not found cho client, vậy vấn đề là client có thể xử lý đặc biệt dựa trên lỗi trả về không? Tiếp theo thử một chút, trong code client, thử sử dụng errors.Is để phán đoán lỗi.
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
log.Println(info, err)
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}
}Kết quả xuất ra như sau
client 2023/07/21 16:46:10 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not foundCó thể thấy error mà client nhận được là như thế này, sẽ thấy error mà server trả về nằm trong trường desc
rpc error: code = Unknown desc = person not foundTất nhiên logic đoạn errors.Is này cũng không được thực thi, cho dù đổi thành errors.As cũng là kết quả như vậy.
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}Vì vậy, gRPC cung cấp gói status để giải quyết loại vấn đề này, đây cũng là lý do tại sao error mà client nhận được có trường code và desc, bởi vì gRPC thực tế trả về cho client là một Status, kiểu cụ thể như sau, có thể thấy cũng là một message được định nghĩa bởi protobuf.
type Status struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}message Status {
// The status code, which should be an enum value of
// [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// A developer-facing error message, which should be in English. Any
// user-facing error message should be localized and sent in the
// [google.rpc.Status.details][google.rpc.Status.details] field, or localized
// by the client.
string message = 2;
// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}Mã lỗi
Code trong cấu trúc Status là một dạng tương tự Http Status, dùng để biểu thị trạng thái của yêu cầu rpc hiện tại, gRPC định nghĩa 16 code nằm trong grpc/codes, bao quát hầu hết các trường hợp, lần lượt như sau
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32
const (
// Gọi thành công
OK Code = 0
// Yêu cầu bị hủy
Canceled Code = 1
// Lỗi không xác định
Unknown Code = 2
// Tham số không hợp lệ
InvalidArgument Code = 3
// Yêu cầu hết thời gian chờ
DeadlineExceeded Code = 4
// Tài nguyên không tồn tại
NotFound Code = 5
// Đã tồn tại tài nguyên giống nhau (không nghĩ ra tại sao có cái này)
AlreadyExists Code = 6
// Quyền không đủ bị từ chối truy cập
PermissionDenied Code = 7
// Tài nguyên cạn kiệt, dung lượng còn lại không đủ để sử dụng, ví dụ như dung lượng đĩa không đủ
ResourceExhausted Code = 8
// Điều kiện thực thi không đủ, ví dụ như sử dụng rm để xóa một thư mục không rỗng, điều kiện xóa là thư mục phải rỗng nhưng điều kiện không thỏa mãn
FailedPrecondition Code = 9
// Yêu cầu bị gián đoạn
Aborted Code = 10
// Hoạt động truy cập vượt quá phạm vi giới hạn
OutOfRange Code = 11
// Biểu thị dịch vụ hiện tại chưa được triển khai
Unimplemented Code = 12
// Lỗi nội bộ hệ thống
Internal Code = 13
// Dịch vụ không khả dụng
Unavailable Code = 14
// Mất dữ liệu
DataLoss Code = 15
// Không vượt qua xác thực
Unauthenticated Code = 16
_maxCode = 17
)Gói grpc/status cung cấp khá nhiều hàm để chuyển đổi qua lại giữa status và error. Chúng ta có thể trực tiếp sử dụng status.New để tạo một Status, hoặc Newf
func New(c codes.Code, msg string) *Status
func Newf(c codes.Code, format string, a ...interface{}) *StatusVí dụ code dưới đây
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)Thông qua phương thức err của status có thể lấy được error trong đó, khi trạng thái là ok thì error là nil.
func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{s: s}
}Cũng có thể trực tiếp tạo error
func Err(c codes.Code, msg string) error
func Errorf(c codes.Code, format string, a ...interface{}) errorsuccess := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)Vì vậy chúng ta có thể sửa code server như sau
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
}
personInfo := value.(*person.PersonInfo)
return personInfo, status.Errorf(codes.OK, "request success")
}Trước đó, tất cả code mà server trả về đều là unknown, bây giờ sau khi sửa đổi có ngữ nghĩa rõ ràng hơn. Vì vậy trong client có thể thông qua status.FromError hoặc sử dụng các hàm dưới đây để lấy status từ error, từ đó đưa ra xử lý tương ứng dựa trên code khác nhau
func FromError(err error) (s *Status, ok bool)
func Convert(err error) *Status
func Code(err error) codes.CodeVí dụ như sau
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
...
}Tuy nhiên mặc dù code của grpc đã cố gắng bao quát một số trường hợp chung, nhưng đôi khi vẫn không thể đáp ứng nhu cầu của nhà phát triển, lúc này có thể sử dụng trường Details trong Status, và nó còn là một slice, có thể chứa nhiều thông tin. Thông qua Status.WithDetails để truyền vào một số thông tin tùy chỉnh
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)Thông qua Status.Details để lấy thông tin
func (s *Status) Details() []interface{}Cần lưu ý, thông tin truyền vào tốt nhất là được định nghĩa bởi protobuf, như vậy mới tiện cho cả server và client đều có thể phân tích, chính thức đưa ra một vài ví dụ
message ErrorInfo {
// Nguyên nhân lỗi
string reason = 1;
// Chủ thể định nghĩa dịch vụ
string domain = 2;
// Thông tin khác
map<string, string> metadata = 3;
}
// Thông tin thử lại
message RetryInfo {
// Thời gian chờ giữa các lần của cùng một yêu cầu
google.protobuf.Duration retry_delay = 1;
}
// Thông tin gỡ lỗi
message DebugInfo {
// Stack
repeated string stack_entries = 1;
// Một số thông tin chi tiết
string detail = 2;
}
...
...Thêm ví dụ có thể đến googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) để xem. Nếu cần có thể引入 thông qua code dưới đây.
import "google.golang.org/genproto/googleapis/rpc/errdetails"Sử dụng ErrorInfo làm details
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
notFound.WithDetails(&errdetails.ErrorInfo{
Reason: "person not found",
Domain: "xxx",
Metadata: nil,
})Trong client có thể lấy dữ liệu để xử lý, tuy nhiên trên đây chỉ là một vài ví dụ mà gRPC khuyến nghị sử dụng, ngoài ra cũng có thể tự định nghĩa message để đáp ứng tốt hơn nhu cầu business tương ứng, nếu muốn làm một số xử lý lỗi thống nhất, cũng có thể放到 trong interceptor để thao tác.
Kiểm soát thời gian chờ

Trong hầu hết trường hợp, thường sẽ không chỉ có một dịch vụ, và có thể có rất nhiều dịch vụ ở upstream và cũng có rất nhiều dịch vụ ở downstream. Client khởi tạo một yêu cầu, từ dịch vụ upstream nhất đến dịch vụ downstream nhất, tạo thành một chuỗi gọi dịch vụ, giống như trong hình, có thể còn dài hơn trong hình.
Một chuỗi gọi dài như vậy, nếu logic xử lý của một trong các dịch vụ cần花费 thời gian dài, sẽ dẫn đến upstream luôn ở trạng thái chờ đợi. Để giảm lãng phí tài nguyên không cần thiết, do đó cần thiết引入 cơ chế timeout, như vậy thời gian timeout được truyền vào khi gọi ở upstream nhất便是 thời gian thực thi tối đa cho phép của toàn bộ chuỗi gọi. Và gRPC có thể truyền timeout qua tiến trình và ngôn ngữ, nó đặt một số dữ liệu cần truyền qua tiến trình vào khung HEADERS Frame của HTTP2, như hình dưới

Dữ liệu timeout trong yêu cầu gRPC tương ứng với trường grpc-timeout trong HEADERS Frame. Cần lưu ý, không phải tất cả các thư viện gRPC đều triển khai cơ chế truyền timeout này, nhưng gRPC-go chắc chắn hỗ trợ, nếu sử dụng thư viện ngôn ngữ khác và sử dụng tính năng này, thì cần lưu ý điểm này.
Timeout kết nối
Khi client gRPC thiết lập kết nối với server, mặc định là bất đồng bộ, nếu thiết lập kết nối thất bại sẽ chỉ trả về một Client rỗng. Nếu muốn kết nối diễn ra đồng bộ, có thể sử dụng grpc.WithBlock() để chặn chờ đợi khi kết nối chưa thiết lập thành công.
dial, err := grpc.Dial("localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)Nếu muốn kiểm soát một thời gian timeout, chỉ cần truyền vào một TimeoutContext, sử dụng grpc.DialContext để thay thế gprc.Dial để truyền context.
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)Như vậy, nếu kết nối thiết lập timeout, sẽ trả về error
context deadline exceededỞ server cũng có thể thiết lập timeout kết nối, thiết lập một thời gian timeout khi thiết lập kết nối mới với client, mặc định là 120 giây, nếu không thiết lập kết nối thành công trong thời gian quy định, server sẽ chủ động ngắt kết nối.
server := grpc.NewServer(
grpc.ConnectionTimeout(time.Second*3),
)TIP
grpc.ConnectionTimeout vẫn đang trong giai đoạn thử nghiệm, API trong tương lai có thể bị sửa đổi hoặc xóa.
Timeout yêu cầu
Khi client gRPC khởi tạo yêu cầu, tham số đầu tiên là kiểu Context, tương tự, nếu muốn thêm một thời gian timeout cho yêu cầu RPC, chỉ cần truyền vào một TimeoutContext là được
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
// Xử lý logic timeout
}Thông qua xử lý của gRPC, thời gian timeout được truyền đến server, trong quá trình truyền nó tồn tại dưới dạng trường trong khung, trong go nó tồn tại dưới dạng context, cứ như vậy truyền đi trong toàn bộ liên kết. Trong quá trình truyền liên kết, không khuyến nghị sửa đổi thời gian timeout, cụ thể nên thiết lập thời gian timeout bao lâu khi yêu cầu, đây nên là vấn đề mà upstream nhất cần cân nhắc.
Xác thực ủy quyền
Trong lĩnh vực microservices, mỗi dịch vụ đều cần xác thực danh tính người dùng và quyền cho yêu cầu, nếu giống như ứng dụng monolithic, mỗi dịch vụ đều phải tự triển khai một bộ logic xác thực, điều này rõ ràng là không thực tế. Vì vậy cần một dịch vụ xác thực và ủy quyền thống nhất, và các giải pháp phổ biến là sử dụng OAuth2, Distributed Session, và JWT, trong đó OAuth2 được sử dụng rộng rãi nhất,一度 đã trở thành tiêu chuẩn ngành, loại token phổ biến nhất của OAuth2 là JWT. Dưới đây là sơ đồ luồng chế độ mã ủy quyền OAuth2, quy trình cơ bản như hình.

Truyền tải an toàn
Đăng ký và khám phá dịch vụ
Trước khi client gọi dịch vụ chỉ định của server, cần biết ip và port của server, trong các trường hợp trước đó địa chỉ server đều được viết cứng. Trong môi trường mạng thực tế không phải lúc nào cũng ổn định như vậy, một số dịch vụ có thể offline do lỗi và không thể truy cập, cũng có thể do phát triển business tiến hành di chuyển máy dẫn đến địa chỉ thay đổi, trong những trường hợp này không thể sử dụng địa chỉ tĩnh để truy cập dịch vụ, và những vấn đề động này là những gì đăng ký và khám phá dịch vụ cần giải quyết, khám phá dịch vụ chịu trách nhiệm giám sát thay đổi địa chỉ dịch vụ và cập nhật, đăng ký dịch vụ chịu trách nhiệm告诉外界 địa chỉ của mình. Trong gRPC, cung cấp chức năng khám phá dịch vụ cơ bản, và hỗ trợ mở rộng và tùy chỉnh.
Không thể sử dụng địa chỉ tĩnh, có thể sử dụng một số tên cụ thể để thay thế, ví dụ như trình duyệt thông qua phân giải DNS để lấy địa chỉ từ tên miền, tương tự, khám phá dịch vụ mặc định của gRPC là thông qua DNS để thực hiện, sửa tệp host local, thêm ánh xạ sau
127.0.0.1 example.grpc.comSau đó sửa địa chỉ Dial trong ví dụ helloworld thành tên miền tương ứng
func main() {
// Thiết lập kết nối, không có xác minh mã hóa
conn, err := grpc.Dial("example.grpc.com:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// Tạo client
client := hello2.NewSayHelloClient(conn)
// Gọi từ xa
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}Cũng có thể thấy đầu ra bình thường
2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"Trong gRPC, loại tên này phải tuân thủ cú pháp URI được định nghĩa trong RFC 3986, định dạng là
hierarchical part
┌───────────────────┴─────────────────────┐
authority path
┌───────────────┴───────────────┐┌───┴────┐
abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
└┬┘ └───────┬───────┘ └────┬────┘ └┬┘ └─────────┬─────────┘ └──┬──┘
scheme user information host port query fragmentURI trong ví dụ trên là dạng sau, vì mặc định hỗ trợ dns nên bỏ qua scheme ở prefix.
dns:example.grpc.com:8080Ngoài ra gRPC còn mặc định hỗ trợ Unix domain sockets, còn đối với các cách khác, cần chúng ta tùy chỉnh thực hiện theo mở rộng của gRPC, vì vậy cần triển khai một bộ phân tích cú pháp tùy chỉnh resolver.Resolver, resolver chịu trách nhiệm giám sát cập nhật địa chỉ đích và cấu hình dịch vụ.
type Resolver interface {
// gRPC sẽ gọi ResolveNow để thử phân giải lại tên đích. Đây chỉ là một gợi ý, nếu không cần resolver có thể bỏ qua nó, phương thức này có thể được gọi đồng thời
ResolveNow(ResolveNowOptions)
Close()
}gRPC yêu cầu chúng ta truyền vào một bộ tạo Resolver, tức là resolver.Builder, nó chịu trách nhiệm tạo phiên bản Resolver.
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}Phương thức Scheme của Builder trả về loại Scheme mà nó chịu trách nhiệm phân tích, ví dụ như dnsBuilder mặc định nó trả về là dns, bộ tạo khi khởi tạo nên sử dụng resolver.Register đăng ký vào Builder toàn cục, hoặc làm options, sử dụng grpc.WithResolver truyền vào ClientConn nội bộ, ưu tiên của cái sau cao hơn cái trước.

Hình trên mô tả đơn giản quy trình làm việc của resolver, tiếp theo sẽ demo cách tùy chỉnh resolver
Phân tích dịch vụ tùy chỉnh
Dưới đây viết một bộ phân tích cú pháp tùy chỉnh, cần một bộ tạo phân tích cú pháp tùy chỉnh để tạo.
package myresolver
import (
"fmt"
"google.golang.org/grpc/resolver"
)
func NewBuilder(ads map[string][]string) *MyBuilder {
return &MyBuilder{ads: ads}
}
type MyBuilder struct {
ads map[string][]string
}
func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Scheme != c.Scheme() {
return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
}
m := &MyResolver{ads: c.ads, t: target, cc: cc}
// Ở đây bắt buộc phải updatestate, nếu không sẽ deadlock
m.start()
return m, nil
}
func (c *MyBuilder) Scheme() string {
return "hello"
}
type MyResolver struct {
t resolver.Target
cc resolver.ClientConn
ads map[string][]string
}
func (m *MyResolver) start() {
addres := make([]resolver.Address, 0)
for _, ad := range m.ads[m.t.URL.Opaque] {
addres = append(addres, resolver.Address{Addr: ad})
}
err := m.cc.UpdateState(resolver.State{
Addresses: addres,
// Cấu hình, loadBalancingPolicy chỉ chiến lược cân bằng tải
ServiceConfig: m.cc.ParseServiceConfig(
`{"loadBalancingPolicy":"round_robin"}`),
})
if err != nil {
m.cc.ReportError(err)
}
}
func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (m *MyResolver) Close() {}Bộ phân tích cú pháp tùy chỉnh là truyền địa chỉ khớp trong map vào updatestate, đồng thời chỉ định chiến lược cân bằng tải, round_robin có nghĩa là luân phiên.
// Cấu trúc service config như sau
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *internalserviceconfig.BalancerConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}Code client như sau
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"grpc_learn/helloworld/client/myresolver"
hello2 "grpc_learn/helloworld/hello"
"log"
"time"
)
func init() {
// Đăng ký builder
resolver.Register(myresolver.NewBuilder(map[string][]string{
"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
}))
}
func main() {
// Thiết lập kết nối, không có xác minh mã hóa
conn, err := grpc.Dial("hello:myworld",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// Tạo client
client := hello2.NewSayHelloClient(conn)
// Gọi một lần mỗi giây
for range time.Tick(time.Second) {
// Gọi từ xa
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}
}Bình thường, quy trình nên là server đăng ký dịch vụ của chính mình với trung tâm đăng ký, sau đó client lấy danh sách dịch vụ từ trung tâm đăng ký rồi khớp, ở đây map truyền vào là một trung tâm đăng ký mô phỏng, dữ liệu là tĩnh nên bỏ qua phần đăng ký dịch vụ, chỉ còn lại khám phá dịch vụ. Target mà client sử dụng là hello:myworld, hello là scheme tùy chỉnh, myworld là tên dịch vụ, sau khi được bộ phân tích cú pháp tùy chỉnh phân tích, sẽ được địa chỉ thực tế 127.0.0.1:8080, trong trường hợp thực tế, để làm cân bằng tải, một tên dịch vụ có thể khớp nhiều địa chỉ thực tế, vì vậy đây là lý do tại sao tên dịch vụ tương ứng là một slice, ở đây mở hai server, chiếm các cổng khác nhau, chiến lược cân bằng tải là luân phiên, đầu ra server lần lượt như sau, thông qua thời gian yêu cầu có thể thấy chiến lược cân bằng tải thực sự đang hoạt động, nếu không chỉ định chiến lược thì mặc định chỉ chọn dịch vụ đầu tiên.
// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"
// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"Trung tâm đăng ký thực chất là lưu trữ tập hợp ánh xạ giữa tên dịch vụ đăng ký và địa chỉ dịch vụ thực tế, chỉ cần là middleware lưu trữ dữ liệu đều có thể đáp ứng điều kiện, thậm chí lấy mysql làm trung tâm đăng ký cũng không phải không được (chắc không ai làm vậy). Nói chung trung tâm đăng ký đều là lưu trữ KV, redis là một lựa chọn rất tốt, nhưng nếu sử dụng redis làm trung tâm đăng ký thì chúng ta cần tự triển khai nhiều logic, như kiểm tra nhịp tim dịch vụ, dịch vụ offline, điều phối dịch vụ等等, vẫn khá phức tạp, mặc dù redis có một số ứng dụng trong khía cạnh này nhưng ít. Đúng như câu nói việc chuyên môn giao cho người chuyên môn làm, có nhiều bên làm nổi tiếng trong khía cạnh này: Zookeeper, Consul, Eureka, ETCD, Nacos等等.
Có thể đến 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) để tìm hiểu một số khác biệt của các middleware này.
Kết hợp consul
Trường hợp kết hợp sử dụng consul có thể đến consul
