Skip to content

gRPC

การเรียกใช้กระบวนการระยะไกล rpc น่าจะเป็นจุดที่ต้องเรียนรู้ในไมโครเซอร์วิส ในระหว่างการเรียนรู้จะพบเฟรมเวิร์ก rpc หลากหลายประเภท แต่ในโดเมนของ go เกือบทุกเฟรมเวิร์ก rpc ล้วนอยู่บนพื้นฐานของ gRPC และมันยังกลายเป็นโปรโตคอลพื้นฐานในโดเมนคลาวด์เนทีฟ ทำไมถึงเลือกมัน ทางการตอบดังนี้

gRPC เป็นเฟรมเวิร์กการเรียกใช้กระบวนการระยะไกล (Remote Process Call, RPC) แบบโอเพนซอร์สประสิทธิภาพสูงสมัยใหม่ สามารถรันได้ในทุกสภาพแวดล้อม มันสามารถเชื่อมต่อบริการภายในศูนย์ข้อมูลและระหว่างศูนย์ข้อมูลได้อย่างมีประสิทธิภาพผ่านการรองรับการโหลดบาลานซ์แบบปลั๊กอินได้ การติดตาม การตรวจสอบสุขภาพ และการตรวจสอบสิทธิ์ นอกจากนี้ยังเหมาะสำหรับการคำนวณแบบกระจายในระยะสุดท้ายสำหรับการเชื่อมต่ออุปกรณ์ แอปพลิเคชันมือถือ และเบราว์เซอร์กับบริการแบ็กเอนด์

เว็บไซต์ทางการ: gRPC

เอกสารทางการ: Documentation | gRPC

บทช่วยสอนทางเทคนิค gRPC: Basics tutorial | Go | gRPC

เว็บไซต์ ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

มันยังเป็นโครงการโอเพนซอร์สภายใต้มูลนิธิ CNCF CNCF ย่อมาจาก CLOUD NATIVE COMPUTING FOUNDATION

คุณสมบัติ

การกำหนดบริการอย่างง่าย

ใช้ Protocol Buffers กำหนดบริการ นี่คือชุดเครื่องมือไบนารีซีเรียลไลเซชันและภาษาที่ทรงพลัง

การเริ่มต้นและการขยายขนาดรวดเร็วมาก

เพียงบรรทัดเดียวก็สามารถติดตั้งรันไทม์และสภาพแวดล้อมการพัฒนา ใช้เวลาเพียงไม่กี่วินาทีก็สามารถขยายเป็นล้าน RPC ต่อวินาที

ข้ามภาษา ข้ามแพลตฟอร์ม

สร้างสตับบริการไคลเอนต์และเซิร์ฟเวอร์โดยอัตโนมัติตามแพลตฟอร์มและภาษาต่างกัน

การไหลสองทางและการอนุญาตแบบบูรณาการ

การไหลสองทางบนพื้นฐานของ HTTP/2 และการตรวจสอบสิทธิ์แบบปลั๊กอินได้

แม้ว่า GRPC จะไม่ขึ้นกับภาษา แต่เนื้อหาส่วนใหญ่ของเว็บไซต์นี้เกี่ยวข้องกับ go ดังนั้นบทความนี้จะใช้ go เป็นภาษาหลักในการอธิบาย ต่อไปนี้ตัวคอมไพเลอร์ pb และตัวสร้างที่ใช้หากเป็นผู้ใช้ภาษาอื่นสามารถค้นหาด้วยตนเองที่เว็บไซต์ Protobuf เพื่อความสะดวก ต่อไปนี้จะละเว้นกระบวนการสร้างโครงการโดยตรง

การติดตั้ง dependencies

ดาวน์โหลดตัวคอมไพเลอร์ Protocol Buffer ก่อน ที่อยู่ดาวน์โหลด: Releases · protocolbuffers/protobuf (github.com)

เลือกระบบและเวอร์ชันตามสถานการณ์ของคุณ หลังจากดาวน์โหลดเสร็จต้องเพิ่มไดเรกทอรี bin ลงในตัวแปรสภาพแวดล้อม

จากนั้นต้องดาวน์โหลดตัวสร้างโค้ด ตัวคอมไพเลอร์ใช้สร้างโค้ดซีเรียลไลเซชันของภาษาที่สอดคล้องจากไฟล์ proto ตัวสร้างใช้สร้างโค้ดธุรกิจ

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

สร้างโครงการเปล่า ชื่อที่นี่คือ grpc_learn แล้วนำเข้า dependencies ดังนี้

sh
$ go get google.golang.org/grpc

สุดท้ายดูเวอร์ชันว่าติดตั้งสำเร็จจริงหรือไม่

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

โครงสร้างโครงการ

ต่อไปนี้จะสาธิตด้วยตัวอย่าง Hello World สร้างโครงสร้างโครงการดังนี้

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

กำหนดไฟล์ protobuf

ใน pb/hello.proto ใส่เนื้อหาดังต่อไปนี้ นี่เป็นตัวอย่างที่ง่ายมาก หากไม่เข้าใจไวยากรณ์ protoc โปรดดูเอกสารที่เกี่ยวข้อง

protobuf
syntax = "proto3";

// . หมายถึงสร้างโดยตรงในพาธเอาต์พุต hello คือชื่อแพ็กเกจ
option go_package = ".;hello";

// คำขอ
message HelloReq {
  string name = 1;


  // การตอบกลับ
  message HelloRep {
    string msg = 1;
  }

  // กำหนดบริการ
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

สร้างโค้ด

หลังจากเขียนเสร็จ ใช้ตัวคอมไพเลอร์ protoc สร้างโค้ดที่เกี่ยวข้องกับการซีเรียลไลเซชันข้อมูล ใช้ตัวสร้างสร้างโค้ด rpc ที่เกี่ยวข้อง

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

ตอนนี้จะพบว่าโฟลเดอร์ hello สร้างไฟล์ hello.pb.go และ hello_grpc.pb.go เมื่อดู hello.pb.go จะพบ message ที่เรากำหนด

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // ฟิลด์ที่กำหนด
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // ฟิลด์ที่กำหนด
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

ใน hello_grpc.pb.go จะพบบริการที่กำหนด

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// ต่อมาหากเราใช้งานอินเทอร์เฟซบริการเอง ต้องฝังโครงสร้างนี้ ไม่ต้องใช้งาน method mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// คืนค่า nil เป็นค่าเริ่มต้น
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// ข้อจำกัดอินเทอร์เฟซ
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

เขียน服务端

ใน server/main.go เขียนโค้ดดังนี้

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // ฟังพอร์ต
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // สร้างเซิร์ฟเวอร์ gprc
  server := grpc.NewServer()
  // ลงทะเบียนบริการ
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // รัน
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

เขียนไคลเอนต์

ใน client/main.go ใส่โค้ดดังนี้

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // สร้างไคลเอนต์
  client := pb.NewSayHelloClient(conn)
  // เรียกใช้ระยะไกล
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

รัน

รันเซิร์ฟเวอร์ก่อน แล้วรันไคลเอนต์ เซิร์ฟเวอร์เอาต์พุตดังนี้

2023/07/16 16:26:51 received grpc req: name:"client"

ไคลเอนต์เอาต์พุตดังนี้

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

ในตัวอย่างนี้ หลังจากไคลเอนต์สร้างการเชื่อมต่อแล้ว เมื่อเรียก method ระยะไกลก็เหมือนเรียก method ท้องถิ่น โดยตรงเข้าถึง method Hello ของ client และรับผลลัพธ์ นี่คือตัวอย่าง GRPC ที่ง่ายที่สุด เฟรมเวิร์กโอเพนซอร์สจำนวนมากก็ทำการ encapsulate กระบวนการนี้

bufbuild

ในตัวอย่างข้างต้น ใช้คำสั่งสร้างโค้ดโดยตรง หากภายหลังมีปลั๊กอินมาก คำสั่งจะดูยุ่งยากมาก这时สามารถใช้เครื่องมือจัดการไฟล์ protobuf ได้ พอดีมีเครื่องมือจัดการโอเพนซอร์ส bufbuild/buf

ที่อยู่โอเพนซอร์ส: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

ที่อยู่เอกสาร: Buf - Install the Buf CLI

คุณสมบัติ

  • การจัดการ BSR
  • Linter
  • การสร้างโค้ด
  • การจัดรูปแบบ
  • การจัดการ dependencies

มีเครื่องมือนี้สามารถจัดการไฟล์ protobuf ได้อย่างสะดวก

เอกสารมีวิธีการติดตั้งมากมายให้เลือก หากติดตั้งสภาพแวดล้อม go ในท้องถิ่นแล้ว ใช้ go install ติดตั้งได้เลย

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

หลังติดตั้งเสร็จดูเวอร์ชัน

sh
$ buf --version
1.24.0

ไปที่โฟลเดอร์ helloworld/pb รันคำสั่งด้านล่างเพื่อสร้าง module จัดการไฟล์ pb

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

เนื้อหาไฟล์ buf.yaml โดยค่าเริ่มต้นดังนี้

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

จากนั้นไปที่ไดเรกทอรี helloworld/ สร้าง buf.gen.yaml ใส่เนื้อหาดังนี้

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

แล้วรันคำสั่งสร้างโค้ด

sh
$ buf generate

หลังจากเสร็จก็จะเห็นไฟล์ที่สร้าง แน่นอน buf มีฟังก์ชันมากกว่านี้ ฟังก์ชันอื่น ๆ สามารถเรียนรู้ด้วยตนเองจากเอกสาร

Stream RPC

วิธีการเรียกใช้ grpc มีสองประเภทใหญ่ ๆ คือ Unary RPC และ Stream RPC ตัวอย่างใน Hello World เป็น Unary RPC แบบทั่วไป

Unary rpc (หรือเรียก普通 rpc ก็เข้าใจได้ง่ายกว่า จริงๆ ไม่รู้จะแปล unary อย่างไร) ใช้งานเหมือน http ทั่วไป ไคลเอนต์ขอ เซิร์ฟเวอร์ตอบกลับข้อมูล แบบถามตอบหนึ่งต่อหนึ่ง ส่วน Stream RPC คำขอและการตอบกลับสามารถเป็นแบบ stream ได้ ดังภาพ

เมื่อใช้ stream request จะตอบกลับเพียงครั้งเดียว ไคลเอนต์สามารถส่งพารามิเตอร์ให้เซิร์ฟเวอร์หลายครั้งผ่าน stream เซิร์ฟเวอร์ไม่จำเป็นต้องรอรับพารามิเตอร์ทั้งหมดก่อนแล้วค่อยประมวลผลเหมือน Unary RPC ตรรกะการประมวลผลเฉพาะสามารถตัดสินใจโดยเซิร์ฟเวอร์ โดยปกติแล้ว มีเพียงไคลเอนต์เท่านั้นที่สามารถปิด stream request ได้อย่างกระตือรือร้น เมื่อ stream ถูกปิด คำขอ RPC ปัจจุบันก็จะสิ้นสุด

เมื่อใช้ stream response จะส่งพารามิเตอร์เพียงครั้งเดียว เซิร์ฟเวอร์สามารถส่งข้อมูลให้ไคลเอนต์หลายครั้งผ่าน stream ไคลเอนต์ไม่จำเป็นต้องรอรับข้อมูลทั้งหมดก่อนแล้วค่อยประมวลผลเหมือน Unary RPC ตรรกะการประมวลผลเฉพาะสามารถตัดสินใจโดยไคลเอนต์เอง โดยปกติแล้ว มีเพียงเซิร์ฟเวอร์เท่านั้นที่สามารถปิด stream response ได้อย่างกระตือรือร้น เมื่อ stream ถูกปิด คำขอ RPC ปัจจุบันก็จะสิ้นสุด

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

หรือมีเพียง response ที่เป็นแบบ stream (Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

หรือทั้ง request และ response เป็นแบบ stream (Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

###单向 Stream

ต่อไปนี้จะใช้ตัวอย่างสาธิตการดำเนินการ单向 stream ก่อนอื่นสร้างโครงสร้างโครงการดังนี้

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

เนื้อหา message.proto ดังนี้

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

สร้างโค้ดผ่าน buf

sh
$ buf generate

ที่นี่สาธิตเป็นบริการข้อความ receiveMessage รับชื่อผู้ใช้ที่ระบุ ประเภทเป็นสตริง ตอบกลับ stream ข้อความ sendMessage รับ stream ข้อความ ตอบกลับจำนวนข้อความที่ส่งสำเร็จ ประเภทเป็นจำนวนเต็ม 64 บิต ต่อไปสร้าง server/message_service.go ใช้งานบริการที่สร้างโดยค่าเริ่มต้นเอง

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

จะเห็นว่าในพารามิเตอร์รับข้อความและส่งข้อความมีอินเทอร์เฟซ stream wrapper

go
type MessageService_ReceiveMessageServer interface {
    // ส่งข้อความ
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // ส่งค่าตอบกลับและปิดการเชื่อมต่อ
  SendAndClose(*wrapperspb.StringValue) error
    // รับข้อความ
  Recv() (*Message, error)
  grpc.ServerStream
}

ทั้งคู่ฝังอินเทอร์เฟซ gprc.ServerStream

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

จะเห็นว่า Stream RPC ไม่เหมือน Unary RPC ที่พารามิเตอร์เข้าและค่าตอบกลับสามารถสะท้อนในฟังก์ชัน签名ได้อย่างชัดเจน method เหล่านี้ดูเผินๆ ไม่สามารถบอกได้ว่าพารามิเตอร์เข้าและค่าตอบกลับเป็นประเภทอะไร ต้องเรียกใช้ Stream ที่ส่งเข้ามาเพื่อทำการส่งข้อมูลแบบ stream ต่อไปเริ่มเขียนตรรกะเซิร์ฟเวอร์เฉพาะ ในการเขียนตรรกะเซิร์ฟเวอร์ ใช้ sync.map จำลองคิวข้อความ เมื่อไคลเอนต์ส่งคำขอ ReceiveMessage เซิร์ฟเวอร์ตอบกลับข้อความที่ไคลเอนต์ต้องการผ่าน stream response อย่างต่อเนื่อง จนกว่าจะ超时แล้วตัดการเชื่อมต่อ เมื่อไคลเอนต์ขอ SendMessage ส่งข้อความผ่าน stream request อย่างต่อเนื่อง เซิร์ฟเวอร์ใส่ข้อความลงในคิวอย่างต่อเนื่อง จนกว่าไคลเอนต์จะตัดการขออย่างกระตือรือร้น และตอบกลับจำนวนข้อความที่ส่งให้ไคลเอนต์

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// คิวข้อความจำลอง
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// รับข้อความของผู้ใช้ที่ระบุ
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("5秒钟内没有收到%s的消息,关闭连接", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // รับข้อความ
      msg := queue[0]
      // ส่งข้อความให้ไคลเอนต์ผ่านการส่งข้อมูลแบบ stream
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// ส่งข้อความให้ผู้ใช้ที่ระบุ
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // รับข้อความจากไคลเอนต์
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // ใส่ข้อความลงในคิวข้อความ
    messageQueue.Store(msg.From, queue)
    count++
  }
}

ไคลเอนต์เปิดสอง goroutine goroutine หนึ่งใช้ส่งข้อความ อีก goroutine ใช้รับข้อความ แน่นอนสามารถส่งและรับพร้อมกันได้ โค้ดดังนี้

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // คำขอรับข้อความ
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("暂无消息,关闭连接")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "在吗",
      "下午有没有时间一起打游戏",
      "那行吧,以后有时间一起约",
      "就这个周末应该可以吧",
      "那就这么定了",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // ส่งข้อความเสร็จแล้ว ปิดคำขออย่างกระตือรือร้นและรับค่าตอบกลับ
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

หลังจากดำเนินการ เซิร์ฟเวอร์เอาต์พุตดังนี้

server  2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:33 5秒钟内没有收到 jack 的消息,关闭连接

ไคลเอนต์เอาต์พุตดังนี้

client  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client  2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client  2023/07/18 16:28:33 暂无消息,关闭连接

ผ่านตัวอย่างนี้จะพบว่า单向 Stream RPC request การประมวลผลไม่ว่าจะเป็นไคลเอนต์หรือเซิร์ฟเวอร์ล้วนซับซ้อนกว่า Unary rpc แต่双向 Stream RPC ซับซ้อนกว่าพวกมันอีก

###双向 Stream

双向 Stream PRC คือทั้ง request และ response เป็นแบบ stream ก็เหมือนรวมสองบริการในตัวอย่างข้างต้นเป็นหนึ่ง สำหรับ Stream RPC แล้ว คำขอแรกต้องเริ่มโดยไคลเอนต์ จากนั้นไคลเอนต์สามารถส่งพารามิเตอร์คำขอผ่าน stream ได้ตลอดเวลา เซิร์ฟเวอร์ก็สามารถตอบกลับข้อมูลผ่าน stream ได้ตลอดเวลา ไม่ว่าฝ่ายใดปิด stream อย่างกระตือรือร้น คำขอปัจจุบันจะสิ้นสุด

TIP

เนื้อหาต่อไปนี้จะละเว้นคำอธิบายโค้ดการสร้างโค้ด pb และการสร้าง rpc ไคลเอนต์เซิร์ฟเวอร์โดยตรง除非จำเป็น

ก่อนอื่นสร้างโครงสร้างโครงการดังนี้

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

เนื้อหา message.proto ดังนี้

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

ในตรรกะเซิร์ฟเวอร์ หลังจากสร้างการเชื่อมต่อแล้ว เปิดสอง goroutine goroutine หนึ่งรับผิดชอบรับข้อความ หนึ่งรับผิดชอบส่งข้อความ ตรรกะการประมวลผลเฉพาะคล้ายกับตัวอย่างก่อนหน้า แต่ครั้งนี้ลบตรรกะการตัดสิน超时ออก

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue จำลองคิวข้อความ
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// บริการแชท ตรรกะเซิร์ฟเวอร์เราใช้หลาย goroutine ในการประมวลผล
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "end chat")

  var chatErr error
  chatCh := make(chan error)

  // สร้างสอง goroutine หนึ่งรับข้อความ หนึ่งส่งข้อความ
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // goroutine รับข้อความ
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // goroutine ส่งข้อความ
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

ในตรรกะไคลเอนต์ เปิดสอง sub-goroutine จำลองกระบวนการแชทของสองคน ในสอง sub-goroutine分别又有สอง grand-goroutine รับผิดชอบส่งและรับข้อความ (ในตรรกะไคลเอนต์ไม่ได้รักษาลำดับการส่งและรับข้อความของการแชทสองคนให้ถูกต้อง เป็นเพียงตัวอย่างการส่งและรับของทั้งสองฝ่ายอย่างง่าย)

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("end chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // ส่งข้อความเสร็จแล้ว ปิดการเชื่อมต่อ
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // goroutine รับข้อความ
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

โดยปกติ เซิร์ฟเวอร์เอาต์พุต

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

โดยปกติ ไคลเอนต์เอาต์พุต (จะเห็นว่าลำดับตรรกะของข้อความสับสน)

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"没有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有没有时间一起打游戏?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"没时间,你找别人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

ผ่านตัวอย่างจะเห็นว่าตรรกะการประมวลผลของ双向 Stream ไม่ว่าไคลเอนต์หรือเซิร์ฟเวอร์ล้วนซับซ้อนกว่า单向 Stream ต้องรวมหลาย goroutine เปิด asynchronous task จึงจะประมวลผลตรรกะได้ดีกว่า

metadata

metadata โดยพื้นฐานแล้วเป็น map ค่าของมันคือ string slice คล้ายกับ header ของ http1 และมีบทบาทใน gRPC คล้ายกับ http header ให้ข้อมูลบางอย่างของการเรียก RPC ครั้งนี้ และ lifecycle ของ metadata ติดตาม整个过程ของการเรียก rpc ครั้งหนึ่ง เมื่อการเรียกสิ้นสุด lifecycle ของมันก็สิ้นสุด

ใน gRPC มัน主要通过 context ในการส่งและเก็บ แต่ gRPC ให้แพ็กเกจ metadata里面有ฟังก์ชันสะดวกมากมายเพื่อลดความซับซ้อน操作 ไม่ต้องให้เรา操作 context ด้วยตนเอง metadata ใน gRPC ตรงกับประเภท metadata.MD ดังนี้

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

เราสามารถ直接使用 metadata.New ฟังก์ชันในการสร้าง แต่ก่อนสร้าง มี几点需要注意

go
func New(m map[string]string) MD

metadata มีข้อจำกัดเกี่ยวกับชื่อคีย์ ได้เฉพาะตัวอักษรที่ถูกจำกัดโดยกฎต่อไปนี้:

  • ตัวอักษร ASCII
  • ตัวเลข: 0-9
  • ตัวพิมพ์เล็ก: a-z
  • ตัวพิมพ์ใหญ่: A-Z
  • ตัวอักษรพิเศษ: -_

TIP

ใน metadata ตัวพิมพ์ใหญ่จะถูกแปลงเป็นตัวพิมพ์เล็กทั้งหมด นั่นคือจะใช้ key เดียวกัน ค่าจะถูกทับ

TIP

key ที่ขึ้นต้นด้วย grpc- เป็น key ภายในที่ grpc สำรองไว้ หากใช้ key ประเภทนี้อาจทำให้เกิดข้อผิดพลาดบางอย่าง

สร้างด้วยตนเอง

มีวิธีสร้าง metadata มากมาย ที่นี่แนะนำสองวิธีที่ใช้บ่อยที่สุดในการสร้าง metadata ด้วยตนเอง วิธีแรกคือใช้ฟังก์ชัน metadata.New ส่ง map เข้าไปโดยตรง

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

วิธีที่สองคือ metadata.Pairs ส่ง string slice ความยาวคู่เข้าไป จะ解析เป็น key-value pair อัตโนมัติ

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

还可以ใช้ metadata.join เพื่อรวม metadata หลายตัว

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

การใช้งานเซิร์ฟเวอร์

รับ metadata

เซิร์ฟเวอร์รับ metadata สามารถใช้ฟังก์ชัน metadata.FromIncomingContext เพื่อรับ

go
func FromIncomingContext(ctx context.Context) (MD, bool)

สำหรับ unary rpc ในพารามิเตอร์ของ service จะมีพารามิเตอร์ context รับ metadata จาก dalamnya ได้เลย

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

สำหรับ stream rpc ในพารามิเตอร์ของ service จะมี stream object ผ่านมันสามารถรับ context ของ stream ได้

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

ส่ง metadata

ส่ง metadata สามารถใช้ฟังก์ชัน grpc.sendHeader

go
func SendHeader(ctx context.Context, md metadata.MD) error

ฟังก์ชันนี้เรียกใช้ได้มากที่สุดหนึ่งครั้ง ในบางเหตุการณ์ที่ทำให้ header ถูกส่งอัตโนมัติเกิดขึ้นแล้วใช้จะไม่生效 ในบางกรณี หากไม่ต้องการส่ง header โดยตรง这时สามารถใช้ฟังก์ชัน grpc.SetHeader

go
func SetHeader(ctx context.Context, md metadata.MD) error

ฟังก์ชันนี้หากเรียกหลายครั้ง จะรวม metadata ที่ส่งเข้าไปในแต่ละครั้ง และส่งให้ไคลเอนต์ในสถานการณ์ต่อไปนี้

  • เมื่อเรียก gprc.SendHeader และ Servertream.SendHeader
  • เมื่อ handler ของ unary rpc ตอบกลับ
  • เมื่อเรียก Stream.SendMsg ของ stream object ใน stream rpc
  • เมื่อสถานะคำขอ rpc เปลี่ยนเป็น send out สถานการณ์นี้要么คำขอ rpc สำเร็จ要么เกิดข้อผิดพลาด

สำหรับ stream rpc แนะนำให้ใช้ method SendHeader และ SetHeader ของ stream object

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

ในระหว่างการใช้งานจะพบว่าฟังก์ชัน Header และ Trailer คล้ายกันมาก แต่ความแตกต่างหลักอยู่ที่时机ในการส่ง ใน unary rpc อาจไม่รู้สึก แต่ความแตกต่างนี้ชัดเจนมากใน stream RPC เพราะ Header ใน stream RPC สามารถส่ง Header ได้โดยไม่ต้องรอให้การขอสิ้นสุด ด้านหน้าได้กล่าวไปแล้วว่า Header จะถูกส่งในสถานการณ์เฉพาะ ส่วน Trailer จะถูกส่งก็ต่อเมื่อคำขอ RPC ทั้งหมดสิ้นสุดเท่านั้น ก่อนหน้านี้ trailer ที่รับได้เป็นค่าว่าง

การใช้งานไคลเอนต์

รับ metadata

ไคลเอนต์ต้องการรับ response header สามารถใช้ grpc.Header และ grpc.Trailer เพื่อ實現

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

แต่需要注意的是 ไม่สามารถรับได้โดยตรง จะเห็นว่าฟังก์ชันข้างต้นคืนค่าเป็น CallOption นั่นคือส่งเป็นพารามิเตอร์ option เมื่อเริ่มคำขอ RPC

go
// ประกาศ md สำหรับรับค่า
var header, trailer metadata.MD

// ส่งคำขอ rpc โดยส่ง option เข้าไป
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

หลังจากคำขอเสร็จสิ้น จะเขียนค่าลงใน md ที่ส่งเข้าไป สำหรับ stream rpc สามารถรับได้โดยตรงผ่าน stream object ที่ตอบกลับเมื่อเริ่มคำขอ

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

ส่ง metadata

ไคลเอนต์ต้องการส่ง metadata ง่ายมาก ก่อนหน้านี้ได้กล่าวไปแล้วว่า表现形式ของ metadata คือ valueContext รวม metadata เข้ากับ context แล้วส่ง context เข้าไปเมื่อขอ metadataแพ็กเกจให้ฟังก์ชันสองฟังก์ชันเพื่อสร้าง context ได้สะดวก

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// stream rpc
stream,err := client.StreamRPC(outgoingContext)

หาก ctx เดิมมี metadata อยู่แล้ว再用 NewOutgoingContext จะทับข้อมูลก่อนหน้าโดยตรง เพื่อหลีกเลี่ยงสถานการณ์นี้ สามารถใช้ฟังก์ชันด้านล่าง มันไม่ทับ แต่จะรวมข้อมูล

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// unary rpc
res,err := client.SomeRPC(appendContext,data)
// stream rpc
stream,err := client.StreamRPC(appendContext)

Interceptor

Interceptor ของ gRPC คล้ายกับ Middleware ใน gin都是为了在请求前或者请求后做一些特殊的工作并且不影响到本身的业务逻辑 ใน gRPC มี Interceptor สองประเภทใหญ่ ๆ คือ Interceptor เซิร์ฟเวอร์และ Interceptor ไคลเอนต์ แบ่งตามประเภทคำขอ则有 Interceptor Unary RPC และ Interceptor Stream RPC ดังภาพ

为了能更好的理解拦截器,下面会根据一个非常简单的示例来进行描述。

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

person.proto内容如下

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

服务端代码如下,逻辑全是之前的内容,比较简单不再赘述。

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// 存放数据
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

เซิร์ฟเวอร์ Interceptor

拦截服务端 rpc 请求的有UnaryServerInterceptorStreamServerInterceptor,具体类型如下所示

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

Unary RPC

创建一元 RPC 拦截器,只需要实现UnaryserverInterceptor类型即可,下面是一个简单的一元 RPC 拦截器的例子,功能是输出每一次 rpc 的请求和响应。

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc的请求数据
// param info *grpc.UnaryServerInfo 本次一元RPC的一些请求信息
// param unaryHandler grpc.UnaryHandler 具体的handler
// return resp interface{} rpc的响应数据
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

对于一元 RPC 而言,拦截器拦截的是每一个 RPC 的请求和响应,即拦截的是 RPC 的请求阶段和响应阶段,如果拦截器返回 error,那么本次请求就会结束。

Stream RPC

创建流式 RPC 拦截器,只需要实现StreamServerInterceptor类型即可,下面是一个简单的流式 RPC 拦截器的例子。

go
// StreamPersonLogInterceptor
// param srv interface{} 对应服务端实现的 server
// param stream grpc.ServerStream 流对象
// param info *grpc.StreamServerInfo 流信息
// param streamHandler grpc.StreamHandler 处理器
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

对于流式 RPC 而言,拦截器拦截的是每一个流对象的SendRecve方法被调用的时机,如果拦截器返回 error,并不会导致本次 RPC 请求的结束,仅仅只是代表着本次send recv出现了错误。

使用拦截器

要想使创建的拦截器生效,需要在创建 gRPC 服务器的时候作为 option 传入,官方也提供了相关的函数以供使用。如下所示,有添加单个拦截器的函数,也有添加链式拦截器的函数。

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

重复使用UnaryInterceptor会抛出如下 panic

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor也是同理,而链式拦截器重复调用则会 append 到同一个链上。

使用示例如下

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // 添加链式拦截器
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

ไคลเอนต์ Interceptor

客户端拦截器跟服务端差不多,一个一元拦截器 UnaryClientInterceptor,一个流式拦截器StreamClientInterceptor,具体类型如下所示。

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC

创建一元 RPC 客户端拦截器,实现UnaryClientInterceptor即可,下面就是一个简单的例子。

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string 方法名
// param req interface{} 请求数据
// param reply interface{} 响应数据
// param cc *grpc.ClientConn 客户端连接对象
// param invoker grpc.UnaryInvoker 被拦截的具体客户端方法
// param opts ...grpc.CallOption 本次请求的配置项
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

通过客户端的一元 RPC 拦截器,可以获取到本地请求的请求数据和响应数据以及一些其他的请求信息。

Stream RPC

创建一个流式 RPC 客户端拦截器,实现StreamClientInterceptor即可,下面就是一个例子。

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

通过流式 RPC 客户端拦截器,只能拦截到客户端与服务端建立连接的时候也就是创建流的时机,并不能拦截到客户端流对象每一次收发消息的时候,不过我们把拦截器中创建好的流对象包装一下就可以实现拦截收发消息了,就像下面这样

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 流对象的描述信息
// param cc *grpc.ClientConn 连接对象
// param method string 方法名
// param streamer grpc.Streamer 用于创建流对象的对象
// param opts ...grpc.CallOption 连接配置项
// return grpc.ClientStream 创建好的客户端流对象
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // 消息发送前
  err := c.ClientStream.SendMsg(m)
  // 消息发送后
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // 消息接收前
  err := c.ClientStream.RecvMsg(m)
  // 消息接收后
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

使用拦截器

使用时,与服务端类似也是四个工具函数通过 option 来添加拦截器,分为单个拦截器和链式拦截器。

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

客户端重复使用 WithUnaryInterceptor 不会抛出 panic,但是仅最后一个会生效。

下面是一个使用案例

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

到目前为止,整个案例已经编写完毕,是时候来运行一下看看结果是什么样的。服务端输出如下

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

客户端输出如下

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

可以看到两边的输出都符合预期,起到了拦截的效果,这个案例只是一个很简单的示例,利用 gRPC 的拦截器可以做很多事情比如授权,日志,监控等等其他功能,可以选择自己造轮子,也可以选择使用开源社区现成的轮子,gRPC Ecosystem 专门收集了一系列开源的 gRPC 拦截器中间件,地址:grpc-ecosystem/go-grpc-middleware

การจัดการข้อผิดพลาด

ก่อนเริ่มต้นมาดูตัวอย่างกันก่อน ในกรณี interceptor ก่อนหน้า หากผู้ใช้ค้นหาไม่พบ จะส่งคืนข้อผิดพลาด person not found ให้ไคลเอนต์ แล้ว问题来了 ไคลเอนต์สามารถทำการประมวลผลพิเศษตามข้อผิดพลาดที่ส่งคืนได้หรือไม่ ต่อไปนี้ลองทดสอบดู ในโค้ดไคลเอนต์ พยายามใช้ errors.Is เพื่อตัดสินข้อผิดพลาด

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

ผลลัพธ์เอาต์พุตดังนี้

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

จะเห็นว่า error ที่ไคลเอนต์รับได้เป็นแบบนี้ จะพบว่า error ที่เซิร์ฟเวอร์ส่งคืนอยู่ในฟิลด์ desc

rpc error: code = Unknown desc = person not found

แน่นอนตรรกะ这段 errors.Is ก็ไม่ได้ดำเนินการ แม้จะ换成 errors.As ก็ได้ผลลัพธ์เดียวกัน

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

为此 gRPC ให้แพ็กเกจ status เพื่อแก้ปัญหาประเภทนี้ นี่คือเหตุผลว่าทำไม error ที่ไคลเอนต์รับได้มีฟิลด์ code และ desc เพราะที่จริงแล้ว gRPC ส่งคืน Status ให้ไคลเอนต์ ประเภทเฉพาะดังนี้ จะเห็นว่าเป็น message ที่กำหนดโดย protobuf เช่นกัน

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

รหัสข้อผิดพลาด

Code ในโครงสร้าง Status เป็นสิ่งที่คล้ายกับ Http Status ใช้สำหรับแสดงสถานะของคำขอ rpc ปัจจุบัน gRPC กำหนด code 16 ตัวอยู่ใน grpc/codesครอบคลุมสถานการณ์ส่วนใหญ่分别如下所示

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // เรียกสำเร็จ
  OK Code = 0

  // คำขอถูกยกเลิก
  Canceled Code = 1

  // ข้อผิดพลาดไม่ทราบสาเหตุ
  Unknown Code = 2

  // พารามิเตอร์ไม่ถูกต้อง
  InvalidArgument Code = 3

    // คำขอ超时
  DeadlineExceeded Code = 4

  // ทรัพยากรไม่มีอยู่
  NotFound Code = 5

    // มีทรัพยากรเดียวกันอยู่แล้ว (ไม่คิดว่าอันนี้จะปรากฏ)
  AlreadyExists Code = 6

  // สิทธิ์ไม่เพียงพอถูกปฏิเสธการเข้าถึง
  PermissionDenied Code = 7

  // ทรัพยากรหมด สภาพแวดล้อมที่เหลือไม่เพียงพอสำหรับใช้งาน เช่น พื้นที่ดิสก์ไม่พอเป็นต้น
  ResourceExhausted Code = 8

  // สภาพการดำเนินการไม่เพียงพอ เช่น ใช้ rm ลบไดเรกทอรีที่ไม่ได้ว่าง สภาพการลบคือไดเรกทอรีต้องว่าง แต่สภาพไม่เป็นไปตาม
  FailedPrecondition Code = 9

  // คำขอถูกขัดจังหวะ
  Aborted Code = 10

  // การดำเนินการเข้าถึงเกินขอบเขตที่จำกัด
  OutOfRange Code = 11

  // แสดงว่าบริการปัจจุบันไม่ได้ใช้งาน
  Unimplemented Code = 12

  // ข้อผิดพลาดภายในระบบ
  Internal Code = 13

  // บริการไม่สามารถใช้งานได้
  Unavailable Code = 14

  // ข้อมูลสูญหาย
  DataLoss Code = 15

  // ไม่ผ่านการตรวจสอบสิทธิ์
  Unauthenticated Code = 16

  _maxCode = 17
)

แพ็กเกจ grpc/status ให้ฟังก์ชันมากมายเพื่อแปลงระหว่าง status กับ error เราสามารถใช้ status.New โดยตรงเพื่อสร้าง Status หรือ Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

例如โค้ดด้านล่าง

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

ผ่าน method err ของ status สามารถรับ error ในนั้นได้ เมื่อสถานะเป็น ok error จะเป็น nil

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

还可以สร้าง error โดยตรง

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

ดังนั้นเราสามารถแก้ไขโค้ดเซิร์ฟเวอร์เป็นดังนี้

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

ก่อนนี้ code ทั้งหมดที่เซิร์ฟเวอร์ส่งคืนเป็น unknown ตอนนี้经过แก้ไขแล้วมีความหมายชัดเจนมากขึ้น ดังนั้นในไคลเอนต์就可以ผ่าน status.FromError หรือใช้ฟังก์ชันด้านล่างเพื่อรับ status จาก error เพื่อทำการประมวลผลที่สอดคล้องกันตาม code ที่ต่างกัน

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

ตัวอย่าง如下

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

แม้ว่า code ของ grpc จะครอบคลุมสถานการณ์通用บางอย่างแล้ว แต่บางครั้งยังไม่สามารถตอบสนองความต้องการของนักพัฒนาได้这时就可以ใช้ฟิลด์ Details ใน Status และมันเป็น slice สามารถ容纳ข้อมูลหลายรายการ ผ่าน Status.WithDetails เพื่อส่งข้อมูล自定义บางอย่าง

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

ผ่าน Status.Details เพื่อรับข้อมูล

go
func (s *Status) Details() []interface{}

需要注意的是 ข้อมูลที่ส่งเข้าไป最好是由 protobuf กำหนด เพื่อให้ทั้งสองฝั่งเซิร์ฟเวอร์และไคลเอนต์สามารถ解析ได้สะดวก ทางการให้ตัวอย่างหลายตัวอย่าง

protobuf
message ErrorInfo {
  // เหตุผลของข้อผิดพลาด
  string reason = 1;

  // 主体ที่กำหนดบริการ
  string domain = 2;

  // ข้อมูลอื่น ๆ
  map<string, string> metadata = 3;
}

// ข้อมูลการลองใหม่
message RetryInfo {
  // ช่วงเวลารอของคำขอเดียวกัน
  google.protobuf.Duration retry_delay = 1;
}

// ข้อมูลการดีบัก
message DebugInfo {
  // สแต็ก
  repeated string stack_entries = 1;

  // ข้อมูลรายละเอียดบางอย่าง
  string detail = 2;
}

    ...
    ...

ตัวอย่างเพิ่มเติมสามารถไปที่ googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) เพื่อดู หากต้องการสามารถนำเข้าผ่านโค้ดด้านล่าง

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

ใช้ ErrorInfo เป็น details

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

ในไคลเอนต์就可以拿ข้อมูลมาทำการประมวลผล แต่นี่เป็นเพียงตัวอย่างที่ gRPC แนะนำใช้ นอกจากนั้น同样可以自己定义 message เพื่อตอบสนองความต้องการทางธุรกิจที่ดีกว่า หากต้องการทำการประมวลผลข้อผิดพลาดแบบรวม也可以ใส่ใน interceptor ดำเนินการ

การควบคุม超时

ในสถานการณ์ส่วนใหญ่通常不会只有一个บริการ并且可能上游有很多บริการ下游也有很多บริการ ไคลเอนต์发起一次请求从最上游的服务到最下游就形成了一个服务调用链就像图中那样或许可能比图中的还要长

如此长的一个调用链如果其中一个服务的逻辑处理需要花费很长时间就会导致上游一直处于等待状态为了减少不必要的资源浪费因此有必要引入超时这一机制这样一来最上游调用时传入的超时时间便是整个调用链所允许的执行花费最大时间而 gRPC 可以跨进程跨语言传递超时它把一些需要跨进程传递的数据放在了 HTTP2 的HEADERS Frame帧中如下图

ข้อมูล超时ในคำขอ gRPC ตรงกับฟิลด์ grpc-timeout ในHEADERS Frame需要注意的是并不是所有的 gRPC 库都实现了这一超时传递机制不过gRPC-go肯定是支持的如果使用其他语言的库并且使用了这一特性则需要额外留意这一点

การเชื่อมต่อ超时

gRPC ไคลเอนต์在向服务端建立连接时默认是异步建立的如果连接建立失败只会返回一个空的 Client หากต้องการ使连接同步进行则可以使用grpc.WithBlock()来使连接未建立成功时阻塞等待

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

若想要控制一个超时时间则只需要传入一个 TimeoutContext ใช้grpc.DialContext来替代gprc.Dial以传入 context

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

如此一来如果连接建立超时就会返回 error

context deadline exceeded

在服务端同样也可以设置连接超时在与客户端建立新连接的时候设置一个超时时间默认是 120 秒如果在规定时间内没有成功建立连接服务端会主动断开连接

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout仍处于实验阶段未来的 API 可能会被修改或删除

คำขอ超时

gRPC ไคลเอนต์在发起请求的时候第一个参数就是Context类型同样的要想给 RPC 请求加上一个超时时间只需要传入一个 TimeoutContext 即可

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // 超时逻辑处理
}

经过 gRPC 的处理超时时间被传递到了服务端在传输过程中它以在帧字段的形式存在中在 go 里面它以 context 的形式存在就此在整个链路中进行传递在链路传递过程中不建议去修改超时时间具体在请求时设置多长的超时时间这应该是最上游应该考虑的问题

การตรวจสอบสิทธิ์

在微服务领域中每一个服务都需要对请求验证用户身份和权限如果和单体应用一样每个服务都要自己实现一套认证逻辑这显然是不太现实的所以需要一个统一的认证与授权服务而常见的解决方案是使用 OAuth2分布式 Session 和 JWT这其中 OAuth2 使用最为广泛一度已经成为了业界标准 OAuth2 最常用的令牌类型就是是 JWT下面是一张 OAuth2 授权码模式的流程图基本流程如图所示

การส่งผ่านที่ปลอดภัย

การลงทะเบียนและการค้นพบบริการ

ก่อนที่ไคลเอนต์จะเรียกบริการที่ระบุของเซิร์ฟเวอร์ต้องทราบ ip และ port ของเซิร์ฟเวอร์ในกรณีศึกษาก่อนหน้าที่อยู่เซิร์ฟเวอร์ล้วนเขียนตายในสภาพแวดล้อมเครือข่ายจริงไม่总是那么稳定一些服务可能会因故障下线而无法访问也有可能会因为业务发展进行机器迁移而导致地址变化在这些情况下就不能使用静态地址访问服务了而这些动态的问题就是服务发现与注册要解决的服务发现负责监视服务地址的变化并更新服务注册负责告诉外界自己的地址 gRPC 中提供了基础的服务发现功能并且支持拓展和自定义

不能用静态地址可以用一些特定的名称来进行代替比如浏览器通过 DNS 解析域名来获取地址同样的 gRPC 默认的服务发现就是通过 DNS 来进行的修改本地的 host 文件添加如下映射

127.0.0.1 example.grpc.com

然后将 helloworld 示例中客户端 Dial 的地址改为对应的域名

go
func main() {
  // สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // สร้างไคลเอนต์
  client := hello2.NewSayHelloClient(conn)
  // เรียกจากระยะไกล
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

同样能看到正常的输出

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

ใน gRPC ชื่อประเภทนี้ต้อง遵守 RFC 3986中定义的 URI 语法格式为

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

上例中的 URI 就是如下形式由于默认支持 dns 所以省略掉了前缀的 scheme

dns:example.grpc.com:8080

除此之外 gRPC 还默认支持 Unix domain sockets而对于其他的方式需要我们根据 gRPC 的拓展来进行自定义实现为此需要实现一个自定义的解析器resolver.Resovlerresolver 负责监控目标地址和服务配置的更新

go
type Resolver interface {
    // gRPC 将调用 ResolveNow 来尝试再次解析目标名称这只是一个提示如果不需要解析器可以忽略它该方法可能被并发的调用
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC 要求我们传入一个 Resolver 构造器也就是resolver.Builder它负责构造 Resolver 实例

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Scheme method ของ Builder คืนค่าประเภท Scheme ที่它负责解析例如默认的 dnsBuilder它คืนค่า的就是dns构造器在初始化时应该使用resolver.Register注册到全局 Builder 中又或者作为 optionsใช้grpc.WithResolver传入 ClientConn 内部后者的优先级高于前者

上图简单描述了一下 resolver 的工作流程接下来就演示如何自定义 resolver

###自定义服务解析

下面编写一个自定义解析器需要一个自定义的解析构造器来进行构造

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // 这里必须要 updatestate 否则会死锁
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // 配置 loadBalancingPolicy指的是负载均衡策略
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

自定义解析器就是把 map 里面的匹配的地址传入到 updatestate同时还指定了负载均衡的策略round_robin指的是轮询的意思

go
// service config 结构如下
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

客户端代码如下

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // ลงทะเบียน builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // สร้างการเชื่อมต่อ ไม่มีการตรวจสอบการเข้ารหัส
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // สร้างไคลเอนต์
  client := hello2.NewSayHelloClient(conn)
     // เรียก每秒一次
  for range time.Tick(time.Second) {
    // เรียกจากระยะไกล
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

ปกติแล้ว流程应该是服务端向注册中心注册自身服务然后客户端从注册中心中获取服务列表然后进行匹配这里传入的 map 就是一个模拟的注册中心数据是静态的就省略掉了服务注册这一环节只剩下服务发现客户端使用的 target 为hello:myworldhello 是自定义的 scheme myworld 就是服务名经过自定义的解析器解析后就得到了 127.0.0.1:8080 的真实地址在实际情况中为了做负载均衡一个服务名可能会匹配多个真实地址所以这就是为什么服务名对应的是一个切片这里开两个服务端占用不同的端口负载均衡策略为轮询服务端输出分别如下通过请求时间可以看到负载均衡策略确实是在起作用的如果不指定策略的话默认只选取第一个服务

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

注册中心其实就是存放着的就是服务注册名与真实服务地址的映射集合只要是能够进行数据存储的中间件都可以满足条件甚至拿 mysql 来做注册中心也不是不可以(应该没有人会这么做)一般来说注册中心都是 KV 存储的 redis 就是一个很不错的选择但如果使用 redis 来做注册中心的话我们就需要自行实现很多逻辑比如服务的心跳检查服务下线等服务调度等等还是相当麻烦的虽然 redis 在这方面有一定的应用但是较少正所谓专业的事情交给专业的人做这方面做的比较出名的有很多:ZookeeperConsulEurekaETCDNacos 等

可以前往 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) 来了解这几个中间件的一些区别

结合 consul

结合 consul 使用的案例可以前往 consul

การโหลดบาลานซ์

Golang by www.golangdev.cn edit