gRPC

การเรียกใช้กระบวนการระยะไกล rpc น่าจะเป็นจุดที่ต้องเรียนรู้ในไมโครเซอร์วิส ในระหว่างการเรียนรู้จะพบเฟรมเวิร์ก rpc หลากหลายประเภท แต่ในโดเมนของ go เกือบทุกเฟรมเวิร์ก rpc ล้วนอยู่บนพื้นฐานของ gRPC และมันยังกลายเป็นโปรโตคอลพื้นฐานในโดเมนคลาวด์เนทีฟ ทำไมถึงเลือกมัน ทางการตอบดังนี้
gRPC เป็นเฟรมเวิร์กการเรียกใช้กระบวนการระยะไกล (Remote Process Call, RPC) แบบโอเพนซอร์สประสิทธิภาพสูงสมัยใหม่ สามารถรันได้ในทุกสภาพแวดล้อม มันสามารถเชื่อมต่อบริการภายในศูนย์ข้อมูลและระหว่างศูนย์ข้อมูลได้อย่างมีประสิทธิภาพผ่านการรองรับการโหลดบาลานซ์แบบปลั๊กอินได้ การติดตาม การตรวจสอบสุขภาพ และการตรวจสอบสิทธิ์ นอกจากนี้ยังเหมาะสำหรับการคำนวณแบบกระจายในระยะสุดท้ายสำหรับการเชื่อมต่ออุปกรณ์ แอปพลิเคชันมือถือ และเบราว์เซอร์กับบริการแบ็กเอนด์
เว็บไซต์ทางการ: gRPC
เอกสารทางการ: Documentation | gRPC
บทช่วยสอนทางเทคนิค gRPC: Basics tutorial | Go | gRPC
เว็บไซต์ ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)
มันยังเป็นโครงการโอเพนซอร์สภายใต้มูลนิธิ CNCF CNCF ย่อมาจาก CLOUD NATIVE COMPUTING FOUNDATION

คุณสมบัติ
การกำหนดบริการอย่างง่าย
ใช้ Protocol Buffers กำหนดบริการ นี่คือชุดเครื่องมือไบนารีซีเรียลไลเซชันและภาษาที่ทรงพลัง
การเริ่มต้นและการขยายขนาดรวดเร็วมาก
เพียงบรรทัดเดียวก็สามารถติดตั้งรันไทม์และสภาพแวดล้อมการพัฒนา ใช้เวลาเพียงไม่กี่วินาทีก็สามารถขยายเป็นล้าน RPC ต่อวินาที
ข้ามภาษา ข้ามแพลตฟอร์ม
สร้างสตับบริการไคลเอนต์และเซิร์ฟเวอร์โดยอัตโนมัติตามแพลตฟอร์มและภาษาต่างกัน
การไหลสองทางและการอนุญาตแบบบูรณาการ
การไหลสองทางบนพื้นฐานของ HTTP/2 และการตรวจสอบสิทธิ์แบบปลั๊กอินได้
แม้ว่า GRPC จะไม่ขึ้นกับภาษา แต่เนื้อหาส่วนใหญ่ของเว็บไซต์นี้เกี่ยวข้องกับ go ดังนั้นบทความนี้จะใช้ go เป็นภาษาหลักในการอธิบาย ต่อไปนี้ตัวคอมไพเลอร์ pb และตัวสร้างที่ใช้หากเป็นผู้ใช้ภาษาอื่นสามารถค้นหาด้วยตนเองที่เว็บไซต์ Protobuf เพื่อความสะดวก ต่อไปนี้จะละเว้นกระบวนการสร้างโครงการโดยตรง
TIP
บทความนี้อ้างอิงเนื้อหาจากบทความต่อไปนี้
写给 go 开发者的 gRPC 教程-protobuf 基础 - 掘金 (juejin.cn)
gRPC 中的 Metadata - 熊喵君的博客 | PANDAYCHEN
การติดตั้ง dependencies
ดาวน์โหลดตัวคอมไพเลอร์ Protocol Buffer ก่อน ที่อยู่ดาวน์โหลด: Releases · protocolbuffers/protobuf (github.com)

เลือกระบบและเวอร์ชันตามสถานการณ์ของคุณ หลังจากดาวน์โหลดเสร็จต้องเพิ่มไดเรกทอรี bin ลงในตัวแปรสภาพแวดล้อม
จากนั้นต้องดาวน์โหลดตัวสร้างโค้ด ตัวคอมไพเลอร์ใช้สร้างโค้ดซีเรียลไลเซชันของภาษาที่สอดคล้องจากไฟล์ proto ตัวสร้างใช้สร้างโค้ดธุรกิจ
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latestสร้างโครงการเปล่า ชื่อที่นี่คือ grpc_learn แล้วนำเข้า dependencies ดังนี้
$ go get google.golang.org/grpcสุดท้ายดูเวอร์ชันว่าติดตั้งสำเร็จจริงหรือไม่
$ protoc --version
libprotoc 23.4
$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1
$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0Hello World
โครงสร้างโครงการ
ต่อไปนี้จะสาธิตด้วยตัวอย่าง Hello World สร้างโครงสร้างโครงการดังนี้
grpc_learn\helloworld
|
+---client
| main.go
|
+---hello
|
|
+---pb
| hello.proto
|
\---server
main.goกำหนดไฟล์ protobuf
ใน pb/hello.proto ใส่เนื้อหาดังต่อไปนี้ นี่เป็นตัวอย่างที่ง่ายมาก หากไม่เข้าใจไวยากรณ์ protoc โปรดดูเอกสารที่เกี่ยวข้อง
syntax = "proto3";
// . หมายถึงสร้างโดยตรงในพาธเอาต์พุต hello คือชื่อแพ็กเกจ
option go_package = ".;hello";
// คำขอ
message HelloReq {
string name = 1;
// การตอบกลับ
message HelloRep {
string msg = 1;
}
// กำหนดบริการ
service SayHello {
rpc Hello(HelloReq) returns (HelloRep) {}
}สร้างโค้ด
หลังจากเขียนเสร็จ ใช้ตัวคอมไพเลอร์ protoc สร้างโค้ดที่เกี่ยวข้องกับการซีเรียลไลเซชันข้อมูล ใช้ตัวสร้างสร้างโค้ด rpc ที่เกี่ยวข้อง
$ protoc -I ./pb \
--go_out=./hello ./pb/*.proto\
--go-grpc_out=./hello ./pb/*.protoตอนนี้จะพบว่าโฟลเดอร์ hello สร้างไฟล์ hello.pb.go และ hello_grpc.pb.go เมื่อดู hello.pb.go จะพบ message ที่เรากำหนด
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// ฟิลด์ที่กำหนด
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
type HelloRep struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// ฟิลด์ที่กำหนด
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}ใน hello_grpc.pb.go จะพบบริการที่กำหนด
type SayHelloServer interface {
Hello(context.Context, *HelloReq) (*HelloRep, error)
mustEmbedUnimplementedSayHelloServer()
}
// ต่อมาหากเราใช้งานอินเทอร์เฟซบริการเอง ต้องฝังโครงสร้างนี้ ไม่ต้องใช้งาน method mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}
// คืนค่า nil เป็นค่าเริ่มต้น
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
// ข้อจำกัดอินเทอร์เฟซ
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}
type UnsafeSayHelloServer interface {
mustEmbedUnimplementedSayHelloServer()
}เขียน服务端
ใน server/main.go เขียนโค้ดดังนี้
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_learn/server/protoc"
"log"
"net"
)
type GrpcServer struct {
pb.UnimplementedSayHelloServer
}
func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
log.Printf("received grpc req: %+v", req.String())
return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}
func main() {
// ฟังพอร์ต
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
// สร้างเซิร์ฟเวอร์ gprc
server := grpc.NewServer()
// ลงทะเบียนบริการ
pb.RegisterSayHelloServer(server, &GrpcServer{})
// รัน
err = server.Serve(listen)
if err != nil {
panic(err)
}
}เขียนไคลเอนต์
ใน client/main.go ใส่โค้ดดังนี้
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc_learn/server/protoc"
"log"
)
func main() {
// สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
// สร้างไคลเอนต์
client := pb.NewSayHelloClient(conn)
// เรียกใช้ระยะไกล
helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}รัน
รันเซิร์ฟเวอร์ก่อน แล้วรันไคลเอนต์ เซิร์ฟเวอร์เอาต์พุตดังนี้
2023/07/16 16:26:51 received grpc req: name:"client"ไคลเอนต์เอาต์พุตดังนี้
2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"ในตัวอย่างนี้ หลังจากไคลเอนต์สร้างการเชื่อมต่อแล้ว เมื่อเรียก method ระยะไกลก็เหมือนเรียก method ท้องถิ่น โดยตรงเข้าถึง method Hello ของ client และรับผลลัพธ์ นี่คือตัวอย่าง GRPC ที่ง่ายที่สุด เฟรมเวิร์กโอเพนซอร์สจำนวนมากก็ทำการ encapsulate กระบวนการนี้
bufbuild
ในตัวอย่างข้างต้น ใช้คำสั่งสร้างโค้ดโดยตรง หากภายหลังมีปลั๊กอินมาก คำสั่งจะดูยุ่งยากมาก这时สามารถใช้เครื่องมือจัดการไฟล์ protobuf ได้ พอดีมีเครื่องมือจัดการโอเพนซอร์ส bufbuild/buf
ที่อยู่โอเพนซอร์ส: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)
ที่อยู่เอกสาร: Buf - Install the Buf CLI
คุณสมบัติ
- การจัดการ BSR
- Linter
- การสร้างโค้ด
- การจัดรูปแบบ
- การจัดการ dependencies
มีเครื่องมือนี้สามารถจัดการไฟล์ protobuf ได้อย่างสะดวก
เอกสารมีวิธีการติดตั้งมากมายให้เลือก หากติดตั้งสภาพแวดล้อม go ในท้องถิ่นแล้ว ใช้ go install ติดตั้งได้เลย
$ go install github.com/bufbuild/buf/cmd/buf@latestหลังติดตั้งเสร็จดูเวอร์ชัน
$ buf --version
1.24.0ไปที่โฟลเดอร์ helloworld/pb รันคำสั่งด้านล่างเพื่อสร้าง module จัดการไฟล์ pb
$ buf mod init
$ ls
buf.yaml hello.protoเนื้อหาไฟล์ buf.yaml โดยค่าเริ่มต้นดังนี้
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULTจากนั้นไปที่ไดเรกทอรี helloworld/ สร้าง buf.gen.yaml ใส่เนื้อหาดังนี้
version: v1
plugins:
- plugin: go
out: hello
opt:
- plugin: go-grpc
out: hello
opt:แล้วรันคำสั่งสร้างโค้ด
$ buf generateหลังจากเสร็จก็จะเห็นไฟล์ที่สร้าง แน่นอน buf มีฟังก์ชันมากกว่านี้ ฟังก์ชันอื่น ๆ สามารถเรียนรู้ด้วยตนเองจากเอกสาร
Stream RPC
วิธีการเรียกใช้ grpc มีสองประเภทใหญ่ ๆ คือ Unary RPC และ Stream RPC ตัวอย่างใน Hello World เป็น Unary RPC แบบทั่วไป

Unary rpc (หรือเรียก普通 rpc ก็เข้าใจได้ง่ายกว่า จริงๆ ไม่รู้จะแปล unary อย่างไร) ใช้งานเหมือน http ทั่วไป ไคลเอนต์ขอ เซิร์ฟเวอร์ตอบกลับข้อมูล แบบถามตอบหนึ่งต่อหนึ่ง ส่วน Stream RPC คำขอและการตอบกลับสามารถเป็นแบบ stream ได้ ดังภาพ

เมื่อใช้ stream request จะตอบกลับเพียงครั้งเดียว ไคลเอนต์สามารถส่งพารามิเตอร์ให้เซิร์ฟเวอร์หลายครั้งผ่าน stream เซิร์ฟเวอร์ไม่จำเป็นต้องรอรับพารามิเตอร์ทั้งหมดก่อนแล้วค่อยประมวลผลเหมือน Unary RPC ตรรกะการประมวลผลเฉพาะสามารถตัดสินใจโดยเซิร์ฟเวอร์ โดยปกติแล้ว มีเพียงไคลเอนต์เท่านั้นที่สามารถปิด stream request ได้อย่างกระตือรือร้น เมื่อ stream ถูกปิด คำขอ RPC ปัจจุบันก็จะสิ้นสุด
เมื่อใช้ stream response จะส่งพารามิเตอร์เพียงครั้งเดียว เซิร์ฟเวอร์สามารถส่งข้อมูลให้ไคลเอนต์หลายครั้งผ่าน stream ไคลเอนต์ไม่จำเป็นต้องรอรับข้อมูลทั้งหมดก่อนแล้วค่อยประมวลผลเหมือน Unary RPC ตรรกะการประมวลผลเฉพาะสามารถตัดสินใจโดยไคลเอนต์เอง โดยปกติแล้ว มีเพียงเซิร์ฟเวอร์เท่านั้นที่สามารถปิด stream response ได้อย่างกระตือรือร้น เมื่อ stream ถูกปิด คำขอ RPC ปัจจุบันก็จะสิ้นสุด
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}หรือมีเพียง response ที่เป็นแบบ stream (Server-Streaming RPC)
service MessageService {
rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}หรือทั้ง request และ response เป็นแบบ stream (Bi-directional-Streaming RPC)
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}###单向 Stream
ต่อไปนี้จะใช้ตัวอย่างสาธิตการดำเนินการ单向 stream ก่อนอื่นสร้างโครงสร้างโครงการดังนี้
grpc_learn\server_client_stream
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.goเนื้อหา message.proto ดังนี้
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service MessageService {
rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}สร้างโค้ดผ่าน buf
$ buf generateที่นี่สาธิตเป็นบริการข้อความ receiveMessage รับชื่อผู้ใช้ที่ระบุ ประเภทเป็นสตริง ตอบกลับ stream ข้อความ sendMessage รับ stream ข้อความ ตอบกลับจำนวนข้อความที่ส่งสำเร็จ ประเภทเป็นจำนวนเต็ม 64 บิต ต่อไปสร้าง server/message_service.go ใช้งานบริการที่สร้างโดยค่าเริ่มต้นเอง
package main
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
)
type MessageService struct {
message.UnimplementedMessageServiceServer
}
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}จะเห็นว่าในพารามิเตอร์รับข้อความและส่งข้อความมีอินเทอร์เฟซ stream wrapper
type MessageService_ReceiveMessageServer interface {
// ส่งข้อความ
Send(*Message) error
grpc.ServerStream
}
type MessageService_SendMessageServer interface {
// ส่งค่าตอบกลับและปิดการเชื่อมต่อ
SendAndClose(*wrapperspb.StringValue) error
// รับข้อความ
Recv() (*Message, error)
grpc.ServerStream
}ทั้งคู่ฝังอินเทอร์เฟซ gprc.ServerStream
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}จะเห็นว่า Stream RPC ไม่เหมือน Unary RPC ที่พารามิเตอร์เข้าและค่าตอบกลับสามารถสะท้อนในฟังก์ชัน签名ได้อย่างชัดเจน method เหล่านี้ดูเผินๆ ไม่สามารถบอกได้ว่าพารามิเตอร์เข้าและค่าตอบกลับเป็นประเภทอะไร ต้องเรียกใช้ Stream ที่ส่งเข้ามาเพื่อทำการส่งข้อมูลแบบ stream ต่อไปเริ่มเขียนตรรกะเซิร์ฟเวอร์เฉพาะ ในการเขียนตรรกะเซิร์ฟเวอร์ ใช้ sync.map จำลองคิวข้อความ เมื่อไคลเอนต์ส่งคำขอ ReceiveMessage เซิร์ฟเวอร์ตอบกลับข้อความที่ไคลเอนต์ต้องการผ่าน stream response อย่างต่อเนื่อง จนกว่าจะ超时แล้วตัดการเชื่อมต่อ เมื่อไคลเอนต์ขอ SendMessage ส่งข้อความผ่าน stream request อย่างต่อเนื่อง เซิร์ฟเวอร์ใส่ข้อความลงในคิวอย่างต่อเนื่อง จนกว่าไคลเอนต์จะตัดการขออย่างกระตือรือร้น และตอบกลับจำนวนข้อความที่ส่งให้ไคลเอนต์
package main
import (
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"sync"
"time"
)
// คิวข้อความจำลอง
var messageQueue sync.Map
type MessageService struct {
message.UnimplementedMessageServiceServer
}
// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// รับข้อความของผู้ใช้ที่ระบุ
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
timer := time.NewTimer(time.Second * 5)
for {
time.Sleep(time.Millisecond * 100)
select {
case <-timer.C:
log.Printf("5秒钟内没有收到%s的消息,关闭连接", user.GetValue())
return nil
default:
value, ok := messageQueue.Load(user.GetValue())
if !ok {
messageQueue.Store(user.GetValue(), []*message.Message{})
continue
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue
}
// รับข้อความ
msg := queue[0]
// ส่งข้อความให้ไคลเอนต์ผ่านการส่งข้อมูลแบบ stream
err := recvServer.Send(msg)
log.Printf("receive %+v\n", msg)
if err != nil {
return err
}
queue = queue[1:]
messageQueue.Store(user.GetValue(), queue)
timer.Reset(time.Second * 5)
}
}
}
// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// ส่งข้อความให้ผู้ใช้ที่ระบุ
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
count := 0
for {
// รับข้อความจากไคลเอนต์
msg, err := sendServer.Recv()
if errors.Is(err, io.EOF) {
return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
}
if err != nil {
return err
}
log.Printf("send %+v\n", msg)
value, ok := messageQueue.Load(msg.From)
if !ok {
messageQueue.Store(msg.From, []*message.Message{msg})
continue
}
queue := value.([]*message.Message)
queue = append(queue, msg)
// ใส่ข้อความลงในคิวข้อความ
messageQueue.Store(msg.From, queue)
count++
}
}ไคลเอนต์เปิดสอง goroutine goroutine หนึ่งใช้ส่งข้อความ อีก goroutine ใช้รับข้อความ แน่นอนสามารถส่งและรับพร้อมกันได้ โค้ดดังนี้
package main
import (
"context"
"errors"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"time"
)
var Client message.MessageServiceClient
func main() {
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicln(err)
}
defer dial.Close()
Client = message.NewMessageServiceClient(dial)
log.SetPrefix("client\t")
msgTask := task.NewTask(func(err error) {
log.Panicln(err)
})
ctx := context.Background()
// คำขอรับข้อความ
msgTask.AddJobs(func() {
receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
if err != nil {
log.Panicln(err)
}
for {
recv, err := receiveMessageStream.Recv()
if errors.Is(err, io.EOF) {
log.Println("暂无消息,关闭连接")
break
} else if err != nil {
break
}
log.Printf("receive %+v", recv)
}
})
msgTask.AddJobs(func() {
from := "jack"
to := "mike"
sendMessageStream, err := Client.SendMessage(ctx)
if err != nil {
log.Panicln(err)
}
msgs := []string{
"在吗",
"下午有没有时间一起打游戏",
"那行吧,以后有时间一起约",
"就这个周末应该可以吧",
"那就这么定了",
}
for _, msg := range msgs {
time.Sleep(time.Second)
sendMessageStream.Send(&message.Message{
From: from,
Content: msg,
To: to,
})
}
// ส่งข้อความเสร็จแล้ว ปิดคำขออย่างกระตือรือร้นและรับค่าตอบกลับ
recv, err := sendMessageStream.CloseAndRecv()
if err != nil {
log.Println(err)
} else {
log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
}
})
msgTask.Run()
}หลังจากดำเนินการ เซิร์ฟเวอร์เอาต์พุตดังนี้
server 2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:33 5秒钟内没有收到 jack 的消息,关闭连接ไคลเอนต์เอาต์พุตดังนี้
client 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client 2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client 2023/07/18 16:28:33 暂无消息,关闭连接ผ่านตัวอย่างนี้จะพบว่า单向 Stream RPC request การประมวลผลไม่ว่าจะเป็นไคลเอนต์หรือเซิร์ฟเวอร์ล้วนซับซ้อนกว่า Unary rpc แต่双向 Stream RPC ซับซ้อนกว่าพวกมันอีก
###双向 Stream
双向 Stream PRC คือทั้ง request และ response เป็นแบบ stream ก็เหมือนรวมสองบริการในตัวอย่างข้างต้นเป็นหนึ่ง สำหรับ Stream RPC แล้ว คำขอแรกต้องเริ่มโดยไคลเอนต์ จากนั้นไคลเอนต์สามารถส่งพารามิเตอร์คำขอผ่าน stream ได้ตลอดเวลา เซิร์ฟเวอร์ก็สามารถตอบกลับข้อมูลผ่าน stream ได้ตลอดเวลา ไม่ว่าฝ่ายใดปิด stream อย่างกระตือรือร้น คำขอปัจจุบันจะสิ้นสุด
TIP
เนื้อหาต่อไปนี้จะละเว้นคำอธิบายโค้ดการสร้างโค้ด pb และการสร้าง rpc ไคลเอนต์เซิร์ฟเวอร์โดยตรง除非จำเป็น
ก่อนอื่นสร้างโครงสร้างโครงการดังนี้
bi_stream\
| buf.gen.yaml
|
+---client
| main.go
|
+---message
| message.pb.go
| message_grpc.pb.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.go
message_service.goเนื้อหา message.proto ดังนี้
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service ChatService {
rpc chat(stream Message) returns (stream Message);
}ในตรรกะเซิร์ฟเวอร์ หลังจากสร้างการเชื่อมต่อแล้ว เปิดสอง goroutine goroutine หนึ่งรับผิดชอบรับข้อความ หนึ่งรับผิดชอบส่งข้อความ ตรรกะการประมวลผลเฉพาะคล้ายกับตัวอย่างก่อนหน้า แต่ครั้งนี้ลบตรรกะการตัดสิน超时ออก
package main
import (
"github.com/dstgo/task"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"sync"
"time"
)
// MessageQueue จำลองคิวข้อความ
var MessageQueue sync.Map
type ChatService struct {
message.UnimplementedChatServiceServer
}
// Chat
// param chatServer message.ChatService_ChatServer
// return error
// บริการแชท ตรรกะเซิร์ฟเวอร์เราใช้หลาย goroutine ในการประมวลผล
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, _ := metadata.FromIncomingContext(chatServer.Context())
from := md.Get("from")[0]
defer log.Println(from, "end chat")
var chatErr error
chatCh := make(chan error)
// สร้างสอง goroutine หนึ่งรับข้อความ หนึ่งส่งข้อความ
chatTask := task.NewTask(func(err error) {
chatErr = err
})
// goroutine รับข้อความ
chatTask.AddJobs(func() {
for {
msg, err := chatServer.Recv()
log.Printf("receive %+v err %+v\n", msg, err)
if err != nil {
chatErr = err
chatCh <- err
break
}
value, ok := MessageQueue.Load(msg.To)
if !ok {
MessageQueue.Store(msg.To, []*message.Message{msg})
} else {
queue := value.([]*message.Message)
queue = append(queue, msg)
MessageQueue.Store(msg.To, queue)
}
}
})
// goroutine ส่งข้อความ
chatTask.AddJobs(func() {
Send:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-chatCh:
log.Println(from, "close send")
break Send
default:
value, ok := MessageQueue.Load(from)
if !ok {
value = []*message.Message{}
MessageQueue.Store(from, value)
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue Send
}
msg := queue[0]
queue = queue[1:]
MessageQueue.Store(from, queue)
err := chatServer.Send(msg)
log.Printf("send %+v\n", msg)
if err != nil {
chatErr = err
break Send
}
}
}
})
chatTask.Run()
return chatErr
}ในตรรกะไคลเอนต์ เปิดสอง sub-goroutine จำลองกระบวนการแชทของสองคน ในสอง sub-goroutine分别又有สอง grand-goroutine รับผิดชอบส่งและรับข้อความ (ในตรรกะไคลเอนต์ไม่ได้รักษาลำดับการส่งและรับข้อความของการแชทสองคนให้ถูกต้อง เป็นเพียงตัวอย่างการส่งและรับของทั้งสองฝ่ายอย่างง่าย)
package main
import (
"context"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"time"
)
var Client message.ChatServiceClient
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer dial.Close()
if err != nil {
log.Panicln(err)
}
Client = message.NewChatServiceClient(dial)
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
})
chatTask.AddJobs(func() {
NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
})
chatTask.Run()
}
func NewChat(from string, to string, contents ...string) {
ctx := context.Background()
mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
chat, err := Client.Chat(mdCtx)
defer log.Println("end chat", from)
if err != nil {
log.Panicln(err)
}
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
for _, content := range contents {
time.Sleep(time.Second)
chat.Send(&message.Message{
From: from,
Content: content,
To: to,
})
}
// ส่งข้อความเสร็จแล้ว ปิดการเชื่อมต่อ
time.Sleep(time.Second * 5)
chat.CloseSend()
})
// goroutine รับข้อความ
chatTask.AddJobs(func() {
for {
msg, err := chat.Recv()
log.Printf("receive %+v\n", msg)
if err != nil {
log.Println(err)
break
}
}
})
chatTask.Run()
}โดยปกติ เซิร์ฟเวอร์เอาต์พุต
server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chatโดยปกติ ไคลเอนต์เอาต์พุต (จะเห็นว่าลำดับตรรกะของข้อความสับสน)
client 2023/07/19 17:26:24 receive from:"jack" content:"你好" to:"mike"
client 2023/07/19 17:26:24 receive from:"mike" content:"你好" to:"jack"
client 2023/07/19 17:26:25 receive from:"mike" content:"没有" to:"jack"
client 2023/07/19 17:26:25 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike"
client 2023/07/19 17:26:26 receive from:"jack" content:"好吧" to:"mike"
client 2023/07/19 17:26:26 receive from:"mike" content:"没时间,你找别人吧" to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mikeผ่านตัวอย่างจะเห็นว่าตรรกะการประมวลผลของ双向 Stream ไม่ว่าไคลเอนต์หรือเซิร์ฟเวอร์ล้วนซับซ้อนกว่า单向 Stream ต้องรวมหลาย goroutine เปิด asynchronous task จึงจะประมวลผลตรรกะได้ดีกว่า
metadata
metadata โดยพื้นฐานแล้วเป็น map ค่าของมันคือ string slice คล้ายกับ header ของ http1 และมีบทบาทใน gRPC คล้ายกับ http header ให้ข้อมูลบางอย่างของการเรียก RPC ครั้งนี้ และ lifecycle ของ metadata ติดตาม整个过程ของการเรียก rpc ครั้งหนึ่ง เมื่อการเรียกสิ้นสุด lifecycle ของมันก็สิ้นสุด
ใน gRPC มัน主要通过 context ในการส่งและเก็บ แต่ gRPC ให้แพ็กเกจ metadata里面有ฟังก์ชันสะดวกมากมายเพื่อลดความซับซ้อน操作 ไม่ต้องให้เรา操作 context ด้วยตนเอง metadata ใน gRPC ตรงกับประเภท metadata.MD ดังนี้
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]stringเราสามารถ直接使用 metadata.New ฟังก์ชันในการสร้าง แต่ก่อนสร้าง มี几点需要注意
func New(m map[string]string) MDmetadata มีข้อจำกัดเกี่ยวกับชื่อคีย์ ได้เฉพาะตัวอักษรที่ถูกจำกัดโดยกฎต่อไปนี้:
- ตัวอักษร ASCII
- ตัวเลข: 0-9
- ตัวพิมพ์เล็ก: a-z
- ตัวพิมพ์ใหญ่: A-Z
- ตัวอักษรพิเศษ: -_
TIP
ใน metadata ตัวพิมพ์ใหญ่จะถูกแปลงเป็นตัวพิมพ์เล็กทั้งหมด นั่นคือจะใช้ key เดียวกัน ค่าจะถูกทับ
TIP
key ที่ขึ้นต้นด้วย grpc- เป็น key ภายในที่ grpc สำรองไว้ หากใช้ key ประเภทนี้อาจทำให้เกิดข้อผิดพลาดบางอย่าง
สร้างด้วยตนเอง
มีวิธีสร้าง metadata มากมาย ที่นี่แนะนำสองวิธีที่ใช้บ่อยที่สุดในการสร้าง metadata ด้วยตนเอง วิธีแรกคือใช้ฟังก์ชัน metadata.New ส่ง map เข้าไปโดยตรง
func New(m map[string]string) MDmd := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})วิธีที่สองคือ metadata.Pairs ส่ง string slice ความยาวคู่เข้าไป จะ解析เป็น key-value pair อัตโนมัติ
func Pairs(kv ...string) MDmd := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")还可以ใช้ metadata.join เพื่อรวม metadata หลายตัว
func Join(mds ...MD) MDmd1 := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)การใช้งานเซิร์ฟเวอร์
รับ metadata
เซิร์ฟเวอร์รับ metadata สามารถใช้ฟังก์ชัน metadata.FromIncomingContext เพื่อรับ
func FromIncomingContext(ctx context.Context) (MD, bool)สำหรับ unary rpc ในพารามิเตอร์ของ service จะมีพารามิเตอร์ context รับ metadata จาก dalamnya ได้เลย
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
md, b := metadata.FromIncomingContext(ctx)
...
}สำหรับ stream rpc ในพารามิเตอร์ของ service จะมี stream object ผ่านมันสามารถรับ context ของ stream ได้
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, b := metadata.FromIncomingContext(chatServer.Context())
...
}ส่ง metadata
ส่ง metadata สามารถใช้ฟังก์ชัน grpc.sendHeader
func SendHeader(ctx context.Context, md metadata.MD) errorฟังก์ชันนี้เรียกใช้ได้มากที่สุดหนึ่งครั้ง ในบางเหตุการณ์ที่ทำให้ header ถูกส่งอัตโนมัติเกิดขึ้นแล้วใช้จะไม่生效 ในบางกรณี หากไม่ต้องการส่ง header โดยตรง这时สามารถใช้ฟังก์ชัน grpc.SetHeader
func SetHeader(ctx context.Context, md metadata.MD) errorฟังก์ชันนี้หากเรียกหลายครั้ง จะรวม metadata ที่ส่งเข้าไปในแต่ละครั้ง และส่งให้ไคลเอนต์ในสถานการณ์ต่อไปนี้
- เมื่อเรียก
gprc.SendHeaderและServertream.SendHeader - เมื่อ handler ของ unary rpc ตอบกลับ
- เมื่อเรียก
Stream.SendMsgของ stream object ใน stream rpc - เมื่อสถานะคำขอ rpc เปลี่ยนเป็น
send outสถานการณ์นี้要么คำขอ rpc สำเร็จ要么เกิดข้อผิดพลาด
สำหรับ stream rpc แนะนำให้ใช้ method SendHeader และ SetHeader ของ stream object
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
...
}TIP
ในระหว่างการใช้งานจะพบว่าฟังก์ชัน Header และ Trailer คล้ายกันมาก แต่ความแตกต่างหลักอยู่ที่时机ในการส่ง ใน unary rpc อาจไม่รู้สึก แต่ความแตกต่างนี้ชัดเจนมากใน stream RPC เพราะ Header ใน stream RPC สามารถส่ง Header ได้โดยไม่ต้องรอให้การขอสิ้นสุด ด้านหน้าได้กล่าวไปแล้วว่า Header จะถูกส่งในสถานการณ์เฉพาะ ส่วน Trailer จะถูกส่งก็ต่อเมื่อคำขอ RPC ทั้งหมดสิ้นสุดเท่านั้น ก่อนหน้านี้ trailer ที่รับได้เป็นค่าว่าง
การใช้งานไคลเอนต์
รับ metadata
ไคลเอนต์ต้องการรับ response header สามารถใช้ grpc.Header และ grpc.Trailer เพื่อ實現
func Header(md *metadata.MD) CallOptionfunc Trailer(md *metadata.MD) CallOptionแต่需要注意的是 ไม่สามารถรับได้โดยตรง จะเห็นว่าฟังก์ชันข้างต้นคืนค่าเป็น CallOption นั่นคือส่งเป็นพารามิเตอร์ option เมื่อเริ่มคำขอ RPC
// ประกาศ md สำหรับรับค่า
var header, trailer metadata.MD
// ส่งคำขอ rpc โดยส่ง option เข้าไป
res, err := client.SomeRPC(
ctx,
data,
grpc.Header(&header),
grpc.Trailer(&trailer)
)หลังจากคำขอเสร็จสิ้น จะเขียนค่าลงใน md ที่ส่งเข้าไป สำหรับ stream rpc สามารถรับได้โดยตรงผ่าน stream object ที่ตอบกลับเมื่อเริ่มคำขอ
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
...
}stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()ส่ง metadata
ไคลเอนต์ต้องการส่ง metadata ง่ายมาก ก่อนหน้านี้ได้กล่าวไปแล้วว่า表现形式ของ metadata คือ valueContext รวม metadata เข้ากับ context แล้วส่ง context เข้าไปเมื่อขอ metadataแพ็กเกจให้ฟังก์ชันสองฟังก์ชันเพื่อสร้าง context ได้สะดวก
func NewOutgoingContext(ctx context.Context, md MD) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
// unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// stream rpc
stream,err := client.StreamRPC(outgoingContext)หาก ctx เดิมมี metadata อยู่แล้ว再用 NewOutgoingContext จะทับข้อมูลก่อนหน้าโดยตรง เพื่อหลีกเลี่ยงสถานการณ์นี้ สามารถใช้ฟังก์ชันด้านล่าง มันไม่ทับ แต่จะรวมข้อมูล
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")
// unary rpc
res,err := client.SomeRPC(appendContext,data)
// stream rpc
stream,err := client.StreamRPC(appendContext)Interceptor
Interceptor ของ gRPC คล้ายกับ Middleware ใน gin都是为了在请求前或者请求后做一些特殊的工作并且不影响到本身的业务逻辑 ใน gRPC มี Interceptor สองประเภทใหญ่ ๆ คือ Interceptor เซิร์ฟเวอร์และ Interceptor ไคลเอนต์ แบ่งตามประเภทคำขอ则有 Interceptor Unary RPC และ Interceptor Stream RPC ดังภาพ

为了能更好的理解拦截器,下面会根据一个非常简单的示例来进行描述。
grpc_learn\interceptor
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| person.proto
|
+---person
| person.pb.go
| person_grpc.pb.go
|
\---server
main.goperson.proto内容如下
syntax = "proto3";
option go_package = ".;person";
import "google/protobuf/wrappers.proto";
message personInfo {
string name = 1;
int64 age = 2;
string address = 3;
}
service person {
rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}服务端代码如下,逻辑全是之前的内容,比较简单不再赘述。
package main
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"io"
"sync"
)
// 存放数据
var personData sync.Map
type PersonService struct {
person.UnimplementedPersonServer
}
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, person.PersonNotFoundErr
}
personInfo := value.(*person.PersonInfo)
return personInfo, nil
}
func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
count := 0
for {
personInfo, err := personStream.Recv()
if errors.Is(err, io.EOF) {
return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
} else if err != nil {
return err
}
personData.Store(personInfo.Name, personInfo)
count++
}
}เซิร์ฟเวอร์ Interceptor
拦截服务端 rpc 请求的有UnaryServerInterceptor和StreamServerInterceptor,具体类型如下所示
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorUnary RPC
创建一元 RPC 拦截器,只需要实现UnaryserverInterceptor类型即可,下面是一个简单的一元 RPC 拦截器的例子,功能是输出每一次 rpc 的请求和响应。
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc的请求数据
// param info *grpc.UnaryServerInfo 本次一元RPC的一些请求信息
// param unaryHandler grpc.UnaryHandler 具体的handler
// return resp interface{} rpc的响应数据
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
resp, err = unaryHandler(ctx, req)
log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
return resp, err
}对于一元 RPC 而言,拦截器拦截的是每一个 RPC 的请求和响应,即拦截的是 RPC 的请求阶段和响应阶段,如果拦截器返回 error,那么本次请求就会结束。
Stream RPC
创建流式 RPC 拦截器,只需要实现StreamServerInterceptor类型即可,下面是一个简单的流式 RPC 拦截器的例子。
// StreamPersonLogInterceptor
// param srv interface{} 对应服务端实现的 server
// param stream grpc.ServerStream 流对象
// param info *grpc.StreamServerInfo 流信息
// param streamHandler grpc.StreamHandler 处理器
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
err := streamHandler(srv, stream)
log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
return err
}对于流式 RPC 而言,拦截器拦截的是每一个流对象的Send和Recve方法被调用的时机,如果拦截器返回 error,并不会导致本次 RPC 请求的结束,仅仅只是代表着本次send 或recv出现了错误。
使用拦截器
要想使创建的拦截器生效,需要在创建 gRPC 服务器的时候作为 option 传入,官方也提供了相关的函数以供使用。如下所示,有添加单个拦截器的函数,也有添加链式拦截器的函数。
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
func StreamInterceptor(i StreamServerInterceptor) ServerOption
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptionTIP
重复使用UnaryInterceptor会抛出如下 panic
panic: The unary server interceptor was already set and may not be reset.StreamInterceptor也是同理,而链式拦截器重复调用则会 append 到同一个链上。
使用示例如下
package main
import (
"google.golang.org/grpc"
"grpc_learn/interceptor/person"
"log"
"net"
)
func main() {
log.SetPrefix("server ")
listen, err := net.Listen("tcp", "9090")
if err != nil {
log.Panicln(err)
}
server := grpc.NewServer(
// 添加链式拦截器
grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
)
person.RegisterPersonServer(server, &PersonService{})
server.Serve(listen)
}ไคลเอนต์ Interceptor
客户端拦截器跟服务端差不多,一个一元拦截器 UnaryClientInterceptor,一个流式拦截器StreamClientInterceptor,具体类型如下所示。
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)Unary RPC
创建一元 RPC 客户端拦截器,实现UnaryClientInterceptor即可,下面就是一个简单的例子。
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string 方法名
// param req interface{} 请求数据
// param reply interface{} 响应数据
// param cc *grpc.ClientConn 客户端连接对象
// param invoker grpc.UnaryInvoker 被拦截的具体客户端方法
// param opts ...grpc.CallOption 本次请求的配置项
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
err := invoker(ctx, method, req, reply, cc, opts...)
log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
return err
}通过客户端的一元 RPC 拦截器,可以获取到本地请求的请求数据和响应数据以及一些其他的请求信息。
Stream RPC
创建一个流式 RPC 客户端拦截器,实现StreamClientInterceptor即可,下面就是一个例子。
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return stream, err
}通过流式 RPC 客户端拦截器,只能拦截到客户端与服务端建立连接的时候也就是创建流的时机,并不能拦截到客户端流对象每一次收发消息的时候,不过我们把拦截器中创建好的流对象包装一下就可以实现拦截收发消息了,就像下面这样
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}
type ClientStreamInterceptorWrapper struct {
method string
desc *grpc.StreamDesc
grpc.ClientStream
}
func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
// 消息发送前
err := c.ClientStream.SendMsg(m)
// 消息发送后
log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
return err
}
func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
// 消息接收前
err := c.ClientStream.RecvMsg(m)
// 消息接收后
log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
return err
}使用拦截器
使用时,与服务端类似也是四个工具函数通过 option 来添加拦截器,分为单个拦截器和链式拦截器。
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOptionTIP
客户端重复使用 WithUnaryInterceptor 不会抛出 panic,但是仅最后一个会生效。
下面是一个使用案例
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"log"
)
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}到目前为止,整个案例已经编写完毕,是时候来运行一下看看结果是什么样的。服务端输出如下
server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found客户端输出如下
C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found可以看到两边的输出都符合预期,起到了拦截的效果,这个案例只是一个很简单的示例,利用 gRPC 的拦截器可以做很多事情比如授权,日志,监控等等其他功能,可以选择自己造轮子,也可以选择使用开源社区现成的轮子,gRPC Ecosystem 专门收集了一系列开源的 gRPC 拦截器中间件,地址:grpc-ecosystem/go-grpc-middleware。
การจัดการข้อผิดพลาด
ก่อนเริ่มต้นมาดูตัวอย่างกันก่อน ในกรณี interceptor ก่อนหน้า หากผู้ใช้ค้นหาไม่พบ จะส่งคืนข้อผิดพลาด person not found ให้ไคลเอนต์ แล้ว问题来了 ไคลเอนต์สามารถทำการประมวลผลพิเศษตามข้อผิดพลาดที่ส่งคืนได้หรือไม่ ต่อไปนี้ลองทดสอบดู ในโค้ดไคลเอนต์ พยายามใช้ errors.Is เพื่อตัดสินข้อผิดพลาด
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
log.Println(info, err)
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}
}ผลลัพธ์เอาต์พุตดังนี้
client 2023/07/21 16:46:10 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not foundจะเห็นว่า error ที่ไคลเอนต์รับได้เป็นแบบนี้ จะพบว่า error ที่เซิร์ฟเวอร์ส่งคืนอยู่ในฟิลด์ desc
rpc error: code = Unknown desc = person not foundแน่นอนตรรกะ这段 errors.Is ก็ไม่ได้ดำเนินการ แม้จะ换成 errors.As ก็ได้ผลลัพธ์เดียวกัน
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}为此 gRPC ให้แพ็กเกจ status เพื่อแก้ปัญหาประเภทนี้ นี่คือเหตุผลว่าทำไม error ที่ไคลเอนต์รับได้มีฟิลด์ code และ desc เพราะที่จริงแล้ว gRPC ส่งคืน Status ให้ไคลเอนต์ ประเภทเฉพาะดังนี้ จะเห็นว่าเป็น message ที่กำหนดโดย protobuf เช่นกัน
type Status struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}message Status {
// The status code, which should be an enum value of
// [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// A developer-facing error message, which should be in English. Any
// user-facing error message should be localized and sent in the
// [google.rpc.Status.details][google.rpc.Status.details] field, or localized
// by the client.
string message = 2;
// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}รหัสข้อผิดพลาด
Code ในโครงสร้าง Status เป็นสิ่งที่คล้ายกับ Http Status ใช้สำหรับแสดงสถานะของคำขอ rpc ปัจจุบัน gRPC กำหนด code 16 ตัวอยู่ใน grpc/codesครอบคลุมสถานการณ์ส่วนใหญ่分别如下所示
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32
const (
// เรียกสำเร็จ
OK Code = 0
// คำขอถูกยกเลิก
Canceled Code = 1
// ข้อผิดพลาดไม่ทราบสาเหตุ
Unknown Code = 2
// พารามิเตอร์ไม่ถูกต้อง
InvalidArgument Code = 3
// คำขอ超时
DeadlineExceeded Code = 4
// ทรัพยากรไม่มีอยู่
NotFound Code = 5
// มีทรัพยากรเดียวกันอยู่แล้ว (ไม่คิดว่าอันนี้จะปรากฏ)
AlreadyExists Code = 6
// สิทธิ์ไม่เพียงพอถูกปฏิเสธการเข้าถึง
PermissionDenied Code = 7
// ทรัพยากรหมด สภาพแวดล้อมที่เหลือไม่เพียงพอสำหรับใช้งาน เช่น พื้นที่ดิสก์ไม่พอเป็นต้น
ResourceExhausted Code = 8
// สภาพการดำเนินการไม่เพียงพอ เช่น ใช้ rm ลบไดเรกทอรีที่ไม่ได้ว่าง สภาพการลบคือไดเรกทอรีต้องว่าง แต่สภาพไม่เป็นไปตาม
FailedPrecondition Code = 9
// คำขอถูกขัดจังหวะ
Aborted Code = 10
// การดำเนินการเข้าถึงเกินขอบเขตที่จำกัด
OutOfRange Code = 11
// แสดงว่าบริการปัจจุบันไม่ได้ใช้งาน
Unimplemented Code = 12
// ข้อผิดพลาดภายในระบบ
Internal Code = 13
// บริการไม่สามารถใช้งานได้
Unavailable Code = 14
// ข้อมูลสูญหาย
DataLoss Code = 15
// ไม่ผ่านการตรวจสอบสิทธิ์
Unauthenticated Code = 16
_maxCode = 17
)แพ็กเกจ grpc/status ให้ฟังก์ชันมากมายเพื่อแปลงระหว่าง status กับ error เราสามารถใช้ status.New โดยตรงเพื่อสร้าง Status หรือ Newf
func New(c codes.Code, msg string) *Status
func Newf(c codes.Code, format string, a ...interface{}) *Status例如โค้ดด้านล่าง
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)ผ่าน method err ของ status สามารถรับ error ในนั้นได้ เมื่อสถานะเป็น ok error จะเป็น nil
func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{s: s}
}还可以สร้าง error โดยตรง
func Err(c codes.Code, msg string) error
func Errorf(c codes.Code, format string, a ...interface{}) errorsuccess := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)ดังนั้นเราสามารถแก้ไขโค้ดเซิร์ฟเวอร์เป็นดังนี้
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
}
personInfo := value.(*person.PersonInfo)
return personInfo, status.Errorf(codes.OK, "request success")
}ก่อนนี้ code ทั้งหมดที่เซิร์ฟเวอร์ส่งคืนเป็น unknown ตอนนี้经过แก้ไขแล้วมีความหมายชัดเจนมากขึ้น ดังนั้นในไคลเอนต์就可以ผ่าน status.FromError หรือใช้ฟังก์ชันด้านล่างเพื่อรับ status จาก error เพื่อทำการประมวลผลที่สอดคล้องกันตาม code ที่ต่างกัน
func FromError(err error) (s *Status, ok bool)
func Convert(err error) *Status
func Code(err error) codes.Codeตัวอย่าง如下
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
...
}แม้ว่า code ของ grpc จะครอบคลุมสถานการณ์通用บางอย่างแล้ว แต่บางครั้งยังไม่สามารถตอบสนองความต้องการของนักพัฒนาได้这时就可以ใช้ฟิลด์ Details ใน Status และมันเป็น slice สามารถ容纳ข้อมูลหลายรายการ ผ่าน Status.WithDetails เพื่อส่งข้อมูล自定义บางอย่าง
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)ผ่าน Status.Details เพื่อรับข้อมูล
func (s *Status) Details() []interface{}需要注意的是 ข้อมูลที่ส่งเข้าไป最好是由 protobuf กำหนด เพื่อให้ทั้งสองฝั่งเซิร์ฟเวอร์และไคลเอนต์สามารถ解析ได้สะดวก ทางการให้ตัวอย่างหลายตัวอย่าง
message ErrorInfo {
// เหตุผลของข้อผิดพลาด
string reason = 1;
// 主体ที่กำหนดบริการ
string domain = 2;
// ข้อมูลอื่น ๆ
map<string, string> metadata = 3;
}
// ข้อมูลการลองใหม่
message RetryInfo {
// ช่วงเวลารอของคำขอเดียวกัน
google.protobuf.Duration retry_delay = 1;
}
// ข้อมูลการดีบัก
message DebugInfo {
// สแต็ก
repeated string stack_entries = 1;
// ข้อมูลรายละเอียดบางอย่าง
string detail = 2;
}
...
...ตัวอย่างเพิ่มเติมสามารถไปที่ googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) เพื่อดู หากต้องการสามารถนำเข้าผ่านโค้ดด้านล่าง
import "google.golang.org/genproto/googleapis/rpc/errdetails"ใช้ ErrorInfo เป็น details
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
notFound.WithDetails(&errdetails.ErrorInfo{
Reason: "person not found",
Domain: "xxx",
Metadata: nil,
})ในไคลเอนต์就可以拿ข้อมูลมาทำการประมวลผล แต่นี่เป็นเพียงตัวอย่างที่ gRPC แนะนำใช้ นอกจากนั้น同样可以自己定义 message เพื่อตอบสนองความต้องการทางธุรกิจที่ดีกว่า หากต้องการทำการประมวลผลข้อผิดพลาดแบบรวม也可以ใส่ใน interceptor ดำเนินการ
การควบคุม超时

ในสถานการณ์ส่วนใหญ่通常不会只有一个บริการ并且可能上游有很多บริการ下游也有很多บริการ ไคลเอนต์发起一次请求从最上游的服务到最下游就形成了一个服务调用链就像图中那样或许可能比图中的还要长
如此长的一个调用链如果其中一个服务的逻辑处理需要花费很长时间就会导致上游一直处于等待状态为了减少不必要的资源浪费因此有必要引入超时这一机制这样一来最上游调用时传入的超时时间便是整个调用链所允许的执行花费最大时间而 gRPC 可以跨进程跨语言传递超时它把一些需要跨进程传递的数据放在了 HTTP2 的HEADERS Frame帧中如下图

ข้อมูล超时ในคำขอ gRPC ตรงกับฟิลด์ grpc-timeout ในHEADERS Frame需要注意的是并不是所有的 gRPC 库都实现了这一超时传递机制不过gRPC-go肯定是支持的如果使用其他语言的库并且使用了这一特性则需要额外留意这一点
การเชื่อมต่อ超时
gRPC ไคลเอนต์在向服务端建立连接时默认是异步建立的如果连接建立失败只会返回一个空的 Client หากต้องการ使连接同步进行则可以使用grpc.WithBlock()来使连接未建立成功时阻塞等待
dial, err := grpc.Dial("localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)若想要控制一个超时时间则只需要传入一个 TimeoutContext ใช้grpc.DialContext来替代gprc.Dial以传入 context
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)如此一来如果连接建立超时就会返回 error
context deadline exceeded在服务端同样也可以设置连接超时在与客户端建立新连接的时候设置一个超时时间默认是 120 秒如果在规定时间内没有成功建立连接服务端会主动断开连接
server := grpc.NewServer(
grpc.ConnectionTimeout(time.Second*3),
)TIP
grpc.ConnectionTimeout仍处于实验阶段未来的 API 可能会被修改或删除
คำขอ超时
gRPC ไคลเอนต์在发起请求的时候第一个参数就是Context类型同样的要想给 RPC 请求加上一个超时时间只需要传入一个 TimeoutContext 即可
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
// 超时逻辑处理
}经过 gRPC 的处理超时时间被传递到了服务端在传输过程中它以在帧字段的形式存在中在 go 里面它以 context 的形式存在就此在整个链路中进行传递在链路传递过程中不建议去修改超时时间具体在请求时设置多长的超时时间这应该是最上游应该考虑的问题
การตรวจสอบสิทธิ์
在微服务领域中每一个服务都需要对请求验证用户身份和权限如果和单体应用一样每个服务都要自己实现一套认证逻辑这显然是不太现实的所以需要一个统一的认证与授权服务而常见的解决方案是使用 OAuth2分布式 Session 和 JWT这其中 OAuth2 使用最为广泛一度已经成为了业界标准 OAuth2 最常用的令牌类型就是是 JWT下面是一张 OAuth2 授权码模式的流程图基本流程如图所示

การส่งผ่านที่ปลอดภัย
การลงทะเบียนและการค้นพบบริการ
ก่อนที่ไคลเอนต์จะเรียกบริการที่ระบุของเซิร์ฟเวอร์ต้องทราบ ip และ port ของเซิร์ฟเวอร์ในกรณีศึกษาก่อนหน้าที่อยู่เซิร์ฟเวอร์ล้วนเขียนตายในสภาพแวดล้อมเครือข่ายจริงไม่总是那么稳定一些服务可能会因故障下线而无法访问也有可能会因为业务发展进行机器迁移而导致地址变化在这些情况下就不能使用静态地址访问服务了而这些动态的问题就是服务发现与注册要解决的服务发现负责监视服务地址的变化并更新服务注册负责告诉外界自己的地址 gRPC 中提供了基础的服务发现功能并且支持拓展和自定义
不能用静态地址可以用一些特定的名称来进行代替比如浏览器通过 DNS 解析域名来获取地址同样的 gRPC 默认的服务发现就是通过 DNS 来进行的修改本地的 host 文件添加如下映射
127.0.0.1 example.grpc.com然后将 helloworld 示例中客户端 Dial 的地址改为对应的域名
func main() {
// สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
conn, err := grpc.Dial("example.grpc.com:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// สร้างไคลเอนต์
client := hello2.NewSayHelloClient(conn)
// เรียกจากระยะไกล
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}同样能看到正常的输出
2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"ใน gRPC ชื่อประเภทนี้ต้อง遵守 RFC 3986中定义的 URI 语法格式为
hierarchical part
┌───────────────────┴─────────────────────┐
authority path
┌───────────────┴───────────────┐┌───┴────┐
abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
└┬┘ └───────┬───────┘ └────┬────┘ └┬┘ └─────────┬─────────┘ └──┬──┘
scheme user information host port query fragment上例中的 URI 就是如下形式由于默认支持 dns 所以省略掉了前缀的 scheme
dns:example.grpc.com:8080除此之外 gRPC 还默认支持 Unix domain sockets而对于其他的方式需要我们根据 gRPC 的拓展来进行自定义实现为此需要实现一个自定义的解析器resolver.Resovlerresolver 负责监控目标地址和服务配置的更新
type Resolver interface {
// gRPC 将调用 ResolveNow 来尝试再次解析目标名称这只是一个提示如果不需要解析器可以忽略它该方法可能被并发的调用
ResolveNow(ResolveNowOptions)
Close()
}gRPC 要求我们传入一个 Resolver 构造器也就是resolver.Builder它负责构造 Resolver 实例
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}Scheme method ของ Builder คืนค่าประเภท Scheme ที่它负责解析例如默认的 dnsBuilder它คืนค่า的就是dns构造器在初始化时应该使用resolver.Register注册到全局 Builder 中又或者作为 optionsใช้grpc.WithResolver传入 ClientConn 内部后者的优先级高于前者

上图简单描述了一下 resolver 的工作流程接下来就演示如何自定义 resolver
###自定义服务解析
下面编写一个自定义解析器需要一个自定义的解析构造器来进行构造
package myresolver
import (
"fmt"
"google.golang.org/grpc/resolver"
)
func NewBuilder(ads map[string][]string) *MyBuilder {
return &MyBuilder{ads: ads}
}
type MyBuilder struct {
ads map[string][]string
}
func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Scheme != c.Scheme() {
return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
}
m := &MyResolver{ads: c.ads, t: target, cc: cc}
// 这里必须要 updatestate 否则会死锁
m.start()
return m, nil
}
func (c *MyBuilder) Scheme() string {
return "hello"
}
type MyResolver struct {
t resolver.Target
cc resolver.ClientConn
ads map[string][]string
}
func (m *MyResolver) start() {
addres := make([]resolver.Address, 0)
for _, ad := range m.ads[m.t.URL.Opaque] {
addres = append(addres, resolver.Address{Addr: ad})
}
err := m.cc.UpdateState(resolver.State{
Addresses: addres,
// 配置 loadBalancingPolicy指的是负载均衡策略
ServiceConfig: m.cc.ParseServiceConfig(
`{"loadBalancingPolicy":"round_robin"}`),
})
if err != nil {
m.cc.ReportError(err)
}
}
func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (m *MyResolver) Close() {}自定义解析器就是把 map 里面的匹配的地址传入到 updatestate同时还指定了负载均衡的策略round_robin指的是轮询的意思
// service config 结构如下
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *internalserviceconfig.BalancerConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}客户端代码如下
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"grpc_learn/helloworld/client/myresolver"
hello2 "grpc_learn/helloworld/hello"
"log"
"time"
)
func init() {
// ลงทะเบียน builder
resolver.Register(myresolver.NewBuilder(map[string][]string{
"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
}))
}
func main() {
// สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
conn, err := grpc.Dial("hello:myworld",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// สร้างไคลเอนต์
client := hello2.NewSayHelloClient(conn)
// เรียก每秒一次
for range time.Tick(time.Second) {
// เรียกจากระยะไกล
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}
}ปกติแล้ว流程应该是服务端向注册中心注册自身服务然后客户端从注册中心中获取服务列表然后进行匹配这里传入的 map 就是一个模拟的注册中心数据是静态的就省略掉了服务注册这一环节只剩下服务发现客户端使用的 target 为hello:myworldhello 是自定义的 scheme myworld 就是服务名经过自定义的解析器解析后就得到了 127.0.0.1:8080 的真实地址在实际情况中为了做负载均衡一个服务名可能会匹配多个真实地址所以这就是为什么服务名对应的是一个切片这里开两个服务端占用不同的端口负载均衡策略为轮询服务端输出分别如下通过请求时间可以看到负载均衡策略确实是在起作用的如果不指定策略的话默认只选取第一个服务
// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"
// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"注册中心其实就是存放着的就是服务注册名与真实服务地址的映射集合只要是能够进行数据存储的中间件都可以满足条件甚至拿 mysql 来做注册中心也不是不可以(应该没有人会这么做)一般来说注册中心都是 KV 存储的 redis 就是一个很不错的选择但如果使用 redis 来做注册中心的话我们就需要自行实现很多逻辑比如服务的心跳检查服务下线等服务调度等等还是相当麻烦的虽然 redis 在这方面有一定的应用但是较少正所谓专业的事情交给专业的人做这方面做的比较出名的有很多:ZookeeperConsulEurekaETCDNacos 等
可以前往 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) 来了解这几个中间件的一些区别
结合 consul
结合 consul 使用的案例可以前往 consul
