Skip to content

gRPC

Remote Procedure Call (RPC) adalah topik penting dalam microservices. Selama proses pembelajaran, Anda akan menjumpai berbagai kerangka kerja RPC. Namun, dalam ekosistem Go, hampir semua kerangka kerja RPC berbasis gRPC, dan ini telah menjadi protokol fundamental di bidang cloud-native. Mengapa memilihnya? Berikut jawaban resmi:

gRPC adalah kerangka kerja pemanggilan prosedur jarak jauh (RPC) modern, open-source, dan berkinerja tinggi yang dapat berjalan di mana saja. Ini dapat menghubungkan layanan secara efisien di dalam dan di seluruh pusat data dengan dukungan pluggable untuk load balancing, tracing, health checking, dan autentikasi. Ini juga berlaku dalam komputasi terdistribusi last-mile untuk menghubungkan perangkat, aplikasi mobile, dan browser ke layanan backend.

Situs Web Resmi: gRPC

Dokumentasi Resmi: Documentation | gRPC

Tutorial gRPC: Basics tutorial | Go | gRPC

Situs Web Resmi Protobuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

Ini juga merupakan proyek open-source di bawah Yayasan CNCF. CNCF adalah singkatan dari Cloud Native Computing Foundation.

Fitur

Definisi Layanan Sederhana

Definisikan layanan menggunakan Protocol Buffers, seperangkat alat dan bahasa serialisasi biner yang kuat.

Startup dan Scaling Cepat

Instal runtime dan lingkungan pengembangan hanya dengan satu baris kode, dan scale ke jutaan RPC per detik dalam hitungan detik.

Lintas Bahasa, Lintas Platform

Secara otomatis menghasilkan stub klien dan server untuk platform dan bahasa yang berbeda.

Streaming Dua Arah dan Otorisasi Terintegrasi

Streaming dua arah berbasis HTTP/2 dengan autentikasi dan otorisasi pluggable.

Meskipun gRPC tidak bergantung pada bahasa, sebagian besar konten di situs ini terkait Go, jadi artikel ini juga akan menggunakan Go sebagai bahasa utama untuk penjelasan. Untuk pengguna bahasa lain, compiler pb dan generator yang digunakan nanti dapat ditemukan di situs web resmi Protobuf. Untuk kenyamanan, proses pembuatan proyek akan dihilangkan di bawah ini.

Instalasi Dependensi

Pertama, unduh compiler Protocol Buffer dari: Releases · protocolbuffers/protobuf (github.com)

Pilih sistem dan versi sesuai situasi Anda. Setelah mengunduh, tambahkan direktori bin ke environment variables Anda.

Kemudian unduh generator kode. Compiler menghasilkan kode serialisasi untuk bahasa yang sesuai dari file proto, sedangkan generator digunakan untuk menghasilkan kode bisnis.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

Buat proyek kosong bernama grpc_learn, lalu impor dependensi berikut:

sh
$ go get google.golang.org/grpc

Terakhir, periksa versi untuk memastikan instalasi berhasil:

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Struktur Proyek

Berikut ini mendemonstrasikan contoh Hello World. Buat struktur proyek berikut:

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

Definisikan File Protobuf

Di pb/hello.proto, tulis konten berikut. Ini adalah contoh yang sangat sederhana. Jika Anda tidak familiar dengan sintaks protoc, silakan merujuk ke dokumentasi terkait.

protobuf
syntax = "proto3";

// . berarti generate langsung di path output, hello adalah nama package
option go_package = ".;hello";

// Request
message HelloReq {
  string name = 1;


  // Response
  message HelloRep {
    string msg = 1;
  }

  // Definisikan layanan
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Generate Kode

Setelah menulis, gunakan compiler protoc untuk menghasilkan kode terkait serialisasi data, dan gunakan generator untuk menghasilkan kode terkait RPC:

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

Pada titik ini, Anda dapat menemukan file hello.pb.go dan hello_grpc.pb.go yang dihasilkan di folder hello. Menjelajahi hello.pb.go, Anda dapat menemukan message yang kami definisikan:

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Field yang didefinisikan
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Field yang didefinisikan
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

Di hello_grpc.pb.go, Anda dapat menemukan layanan yang kami definisikan:

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// Nanti jika kita implementasikan interface layanan sendiri, kita harus embed struct ini, jadi kita tidak perlu implementasikan method mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}

// Mengembalikan nil secara default
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Constraint interface
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Tulis Server

Tulis kode berikut di server/main.go:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Listen pada port
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // Buat server gRPC
  server := grpc.NewServer()
  // Daftarkan layanan
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Jalankan
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

Tulis Klien

Tulis kode berikut di client/main.go:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Bangun koneksi, tanpa verifikasi enkripsi
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Buat klien
  client := pb.NewSayHelloClient(conn)
  // Panggilan jarak jauh
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Jalankan

Jalankan server terlebih dahulu, lalu jalankan klien. Output server:

2023/07/16 16:26:51 received grpc req: name:"client"

Output klien:

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

Dalam contoh ini, setelah klien membangun koneksi, memanggil method jarak jauh sama seperti memanggil method lokal - langsung akses method Hello dari client dan dapatkan hasilnya. Ini adalah contoh gRPC paling sederhana, dan banyak kerangka kerja open-source juga mengenkapsulasi proses ini.

bufbuild

Dalam contoh di atas, kode dihasilkan langsung menggunakan perintah. Jika ada banyak plugin nanti, perintah akan menjadi cukup merepotkan. Dalam hal ini, Anda dapat menggunakan alat untuk mengelola file protobuf. Kebetulan ada alat manajemen open-source seperti itu: bufbuild/buf.

Repositori Open Source: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Dokumentasi: Buf - Install the Buf CLI

Fitur

  • Manajemen BSR
  • Linter
  • Generate Kode
  • Formatting
  • Manajemen Dependensi

Dengan alat ini, Anda dapat dengan mudah mengelola file protobuf.

Dokumentasi menyediakan banyak metode instalasi; Anda dapat memilih sendiri. Jika Anda memiliki lingkungan Go yang terinstal secara lokal, Anda dapat menginstal langsung menggunakan go install:

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Setelah instalasi, periksa versi:

sh
$ buf --version
1.24.0

Pergi ke folder helloworld/pb dan jalankan perintah berikut untuk membuat modul untuk mengelola file pb:

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

Konten file buf.yaml default:

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Kemudian pergi ke direktori helloworld/ dan buat buf.gen.yaml dengan konten berikut:

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Kemudian jalankan perintah untuk menghasilkan kode:

sh
$ buf generate

Setelah selesai, Anda dapat melihat file yang dihasilkan. Tentu saja, buf memiliki lebih banyak fitur; Anda dapat mempelajarinya dari dokumentasi.

Streaming RPC

Ada dua jenis panggilan gRPC utama: Unary RPC dan Stream RPC. Contoh Hello World adalah Unary RPC tipikal.

Unary RPC (atau RPC biasa - tidak yakin bagaimana menerjemahkan unary dengan lebih baik) bekerja seperti HTTP biasa: klien meminta, server mengembalikan data, pola tanya-jawab. Dalam Stream RPC, baik permintaan maupun respons dapat berupa streaming, seperti yang ditunjukkan di bawah ini:

Saat menggunakan permintaan streaming, hanya satu respons yang dikembalikan. Klien dapat mengirim parameter ke server beberapa kali melalui stream. Server tidak perlu menunggu sampai semua parameter diterima sebelum memproses seperti di Unary RPC; logika pemrosesan spesifik dapat ditentukan oleh server. Biasanya, hanya klien yang dapat secara aktif menutup permintaan streaming. Setelah stream ditutup, permintaan RPC saat ini berakhir.

Saat menggunakan respons streaming, hanya satu parameter yang dikirim. Server dapat mengirim data ke klien beberapa kali melalui stream. Klien tidak perlu menunggu sampai semua data diterima sebelum memproses seperti di Unary RPC; logika pemrosesan spesifik dapat ditentukan oleh klien. Dalam permintaan normal, hanya server yang dapat secara aktif menutup respons streaming. Setelah stream ditutup, permintaan RPC saat ini berakhir.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Ini juga bisa hanya respons streaming (Server-Streaming RPC):

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Atau baik permintaan dan respons streaming (Bi-directional-Streaming RPC):

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Streaming Satu Arah

Berikut ini mendemonstrasikan operasi streaming satu arah. Pertama, buat struktur proyek berikut:

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

Konten message.proto:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

Generate kode menggunakan buf:

sh
$ buf generate

Ini mendemonstrasikan layanan pesan. receiveMessage menerima nama pengguna yang ditentukan (tipe string) dan mengembalikan stream pesan. sendMessage menerima stream pesan dan mengembalikan jumlah pesan yang berhasil dikirim (tipe integer 64-bit). Selanjutnya, buat server/message_service.go untuk mengimplementasikan layanan yang dihasilkan default:

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Anda dapat melihat bahwa parameter receive message dan send message memiliki interface wrapper stream:

go
type MessageService_ReceiveMessageServer interface {
    // Kirim pesan
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Kirim nilai kembali dan tutup koneksi
  SendAndClose(*wrapperspb.StringValue) error
    // Terima pesan
  Recv() (*Message, error)
  grpc.ServerStream
}

Keduanya embed interface grpc.ServerStream:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Seperti yang Anda lihat, streaming RPC tidak memiliki parameter input dan tipe return yang eksplisit dalam signature fungsi seperti Unary RPC. Method-method ini tidak langsung mengungkapkan apa tipe parameter input dan nilai return. Anda perlu menggunakan tipe Stream yang diteruskan untuk menyelesaikan transmisi streaming. Selanjutnya, mulai menulis logika spesifik server. Saat menulis logika server, sync.map digunakan untuk mensimulasikan antrian pesan. Ketika klien mengirim permintaan ReceiveMessage, server terus mengembalikan pesan yang diinginkan klien melalui respons streaming hingga timeout memutuskan koneksi. Ketika klien meminta SendMessage, itu terus mengirim pesan melalui permintaan streaming, dan server terus memasukkan pesan ke antrian hingga klien secara aktif memutuskan koneksi, mengembalikan jumlah pesan yang dikirim ke klien.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Antrian pesan yang disimulasikan
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Terima pesan untuk pengguna yang ditentukan
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("Tidak ada pesan dari %s selama 5 detik, menutup koneksi", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Dapatkan pesan
      msg := queue[0]
      // Kirim pesan ke klien melalui streaming
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Kirim pesan ke pengguna yang ditentukan
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // Terima pesan dari klien
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Masukkan pesan ke antrian pesan
    messageQueue.Store(msg.From, queue)
    count++
  }
}

Klien membuka dua goroutine: satu untuk mengirim pesan dan satu lagi untuk menerima pesan. Tentu saja, Anda juga dapat mengirim dan menerima secara bersamaan. Kode sebagai berikut:

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Terima permintaan pesan
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("Tidak ada pesan, menutup koneksi")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "Apakah kamu di sana",
      "Apakah kamu punya waktu untuk bermain game sore ini",
      "Baiklah, mari bermain bersama kapan-kapan",
      "Akhir pekan ini seharusnya bisa",
      "Sudah diputuskan kalau begitu",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Pesan terkirim, aktif tutup permintaan dan dapatkan nilai kembali
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("Pengiriman selesai, total %d pesan terkirim\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Setelah eksekusi, output server:

server  2023/07/18 16:28:24 send from:"jack" content:"Apakah kamu di sana" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"Apakah kamu di sana" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"Apakah kamu punya waktu untuk bermain game sore ini" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"Apakah kamu punya waktu untuk bermain game sore ini" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"Baiklah, mari bermain bersama kapan-kapan" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"Baiklah, mari bermain bersama kapan-kapan" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"Akhir pekan ini seharusnya bisa" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"Akhir pekan ini seharusnya bisa" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"Sudah diputuskan kalau begitu" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"Sudah diputuskan kalau begitu" to:"mike"
server  2023/07/18 16:28:33 Tidak ada pesan dari jack selama 5 detik, menutup koneksi

Output klien:

client  2023/07/18 16:28:24 receive from:"jack" content:"Apakah kamu di sana" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"Apakah kamu punya waktu untuk bermain game sore ini" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"Baiklah, mari bermain bersama kapan-kapan" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"Akhir pekan ini seharusnya bisa" to:"mike"
client  2023/07/18 16:28:28 Pengiriman selesai, total 5 pesan terkirim
client  2023/07/18 16:28:28 receive from:"jack" content:"Sudah diputuskan kalau begitu" to:"mike"
client  2023/07/18 16:28:33 Tidak ada pesan, menutup koneksi

Melalui contoh ini, Anda dapat menemukan bahwa menangani permintaan Streaming RPC satu arah lebih kompleks untuk klien dan server dibandingkan dengan Unary RPC. Namun, Streaming RPC dua arah bahkan lebih kompleks.

Streaming Dua Arah

Streaming RPC dua arah berarti baik permintaan maupun respons adalah streaming, pada dasarnya menggabungkan dua layanan dari contoh sebelumnya menjadi satu. Untuk streaming RPC, permintaan pertama selalu diinisiasi oleh klien. Kemudian klien dapat mengirim parameter permintaan melalui stream kapan saja, dan server dapat mengembalikan data melalui stream kapan saja. Terlepas dari pihak mana yang secara aktif menutup stream, permintaan saat ini akan berakhir.

TIP

Konten selanjutnya akan langsung menghilangkan deskripsi kode untuk generate kode pb dan membuat langkah-langkah klien/server RPC kecuali jika diperlukan.

Pertama, buat struktur proyek berikut:

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

Konten message.proto:

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Dalam logika server, setelah membangun koneksi, buka dua goroutine: satu untuk menerima pesan dan satu untuk mengirim pesan. Logika pemrosesan spesifik mirip dengan contoh sebelumnya, tetapi kali ini logika penilaian timeout dihilangkan.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue antrian pesan yang disimulasikan
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Layanan obrolan, kami menggunakan multi-goroutine untuk logika server
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "akhir obrolan")

  var chatErr error
  chatCh := make(chan error)

  // Buat dua goroutine, satu untuk menerima pesan, satu untuk mengirim
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Goroutine untuk menerima pesan
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Goroutine untuk mengirim pesan
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "tutup kirim")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

Dalam logika klien, dua goroutine anak dibuka untuk mensimulasikan proses obrolan antara dua orang. Setiap goroutine anak memiliki dua goroutine cucu yang bertanggung jawab untuk mengirim dan menerima pesan (logika klien tidak menjamin urutan kirim/terima pesan yang benar antara dua orang; ini hanya contoh sederhana pengiriman dan penerimaan dua arah):

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "Halo", "Apakah kamu punya waktu untuk bermain game?", "Baiklah")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "Halo", "Tidak", "Tidak ada waktu, cari orang lain")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("akhir obrolan", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Pesan terkirim, tutup koneksi
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Goroutine untuk menerima pesan
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Dalam keadaan normal, output server:

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"Halo" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"Halo" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"Halo" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"Halo" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"Apakah kamu punya waktu untuk bermain game?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"Tidak" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"Tidak" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"Apakah kamu punya waktu untuk bermain game?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"Baiklah" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"Tidak ada waktu, cari orang lain" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"Baiklah" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"Tidak ada waktu, cari orang lain" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

Dalam keadaan normal, output klien (Anda dapat melihat urutan logika pesan kacau):

client 2023/07/19 17:26:24 receive from:"jack"  content:"Halo"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"Halo"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"Tidak"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"Apakah kamu punya waktu untuk bermain game?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"Baiklah"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"Tidak ada waktu, cari orang lain"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

Melalui contoh ini, Anda dapat melihat bahwa logika pemrosesan streaming dua arah lebih kompleks untuk klien dan server dibandingkan dengan streaming satu arah, memerlukan tugas asinkron multi-goroutine untuk penanganan yang lebih baik.

Metadata

Metadata pada dasarnya adalah map di mana nilainya adalah slice string, mirip dengan header HTTP/1, dan ini memainkan peran serupa di gRPC seperti header HTTP, menyediakan informasi tentang panggilan RPC. Siklus hidup metadata mengikuti seluruh proses panggilan RPC; ketika panggilan berakhir, siklus hidupnya juga berakhir.

Di gRPC, ini terutama ditransmisikan dan disimpan melalui context. Namun, gRPC menyediakan paket metadata dengan banyak fungsi kenyamanan untuk menyederhanakan operasi, jadi kami tidak perlu secara manual mengoperasikan context. Tipe yang sesuai dengan metadata di gRPC adalah metadata.MD, seperti yang ditunjukkan di bawah ini:

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

Kami dapat langsung menggunakan fungsi metadata.New untuk membuatnya, tetapi sebelum membuat, ada beberapa hal yang perlu diperhatikan:

go
func New(m map[string]string) MD

Metadata memiliki batasan pada nama key, hanya mengizinkan karakter yang dibatasi oleh aturan berikut:

  • Karakter ASCII
  • Angka: 0-9
  • Huruf kecil: a-z
  • Huruf besar: A-Z
  • Karakter khusus: - _

TIP

Dalam metadata, huruf besar akan dikonversi ke huruf kecil, artinya mereka akan menggunakan key yang sama, dan nilai akan ditimpa.

TIP

Key yang dimulai dengan grpc- adalah key internal yang dicadangkan oleh gRPC. Menggunakan key seperti itu dapat menyebabkan error.

Pembuatan Manual

Ada banyak cara untuk membuat metadata. Berikut adalah dua metode paling umum untuk membuat metadata secara manual. Yang pertama adalah menggunakan fungsi metadata.New, langsung memasukkan map:

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

Yang kedua adalah metadata.Pairs, memasukkan slice string dengan panjang genap, yang akan secara otomatis diparsing menjadi pasangan key-value:

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Anda juga dapat menggunakan metadata.Join untuk menggabungkan beberapa metadata:

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)

Penggunaan Server

Mendapatkan Metadata

Server dapat menggunakan fungsi metadata.FromIncomingContext untuk mendapatkan metadata:

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Untuk Unary RPC, parameter layanan akan memiliki parameter context, dari mana Anda dapat langsung mendapatkan metadata:

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Untuk streaming RPC, parameter layanan akan memiliki objek stream, dari mana Anda dapat mendapatkan context stream:

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

Mengirim Metadata

Anda dapat menggunakan fungsi grpc.SendHeader untuk mengirim metadata:

go
func SendHeader(ctx context.Context, md metadata.MD) error

Fungsi ini dapat dipanggil paling banyak sekali dan tidak akan berlaku setelah beberapa peristiwa yang menyebabkan header dikirim secara otomatis. Dalam beberapa kasus, jika Anda tidak ingin mengirim header secara langsung, Anda dapat menggunakan fungsi grpc.SetHeader:

go
func SetHeader(ctx context.Context, md metadata.MD) error

Jika fungsi ini dipanggil beberapa kali, metadata yang diteruskan setiap kali akan digabungkan dan dikirim ke klien dalam situasi berikut:

  • Ketika grpc.SendHeader dan ServerStream.SendHeader dipanggil
  • Ketika handler Unary RPC kembali
  • Ketika memanggil Stream.SendMsg objek stream dalam streaming RPC
  • Ketika status permintaan RPC menjadi send out - baik permintaan RPC berhasil atau terjadi error.

Untuk streaming RPC, disarankan untuk menggunakan method SendHeader dan SetHeader objek stream:

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Selama penggunaan, Anda akan menemukan fungsi Header dan Trailer hampir sama. Namun, perbedaan utama mereka terletak pada waktu pengiriman. Anda mungkin tidak merasakan ini di Unary RPC, tetapi perbedaan ini sangat jelas dalam streaming RPC karena Header dalam streaming RPC dapat dikirim sebelum permintaan berakhir. Seperti yang disebutkan sebelumnya, Header akan dikirim dalam situasi tertentu, sedangkan Trailer hanya akan dikirim setelah seluruh permintaan RPC berakhir. Sebelum itu, trailer yang diperoleh kosong.

Penggunaan Klien

Mendapatkan Metadata

Jika klien ingin mendapatkan header respons, ini dapat dicapai melalui grpc.Header dan grpc.Trailer:

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Namun, perhatikan bahwa Anda tidak bisa mendapatkannya secara langsung. Seperti yang Anda lihat, nilai return dari dua fungsi di atas adalah CallOption, artinya mereka diteruskan sebagai parameter opsi saat memulai permintaan RPC:

go
// Deklarasikan md untuk menerima nilai
var header, trailer metadata.MD

// Teruskan opsi saat memanggil permintaan RPC
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

Setelah permintaan selesai, nilai akan ditulis ke md yang diteruskan. Untuk streaming RPC, Anda dapat mendapatkannya langsung melalui objek stream yang dikembalikan saat memulai permintaan:

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

Mengirim Metadata

Sederhana bagi klien untuk mengirim metadata. Seperti yang disebutkan sebelumnya, metadata termanifestasi sebagai valueContext. Gabungkan metadata ke context, lalu teruskan context saat meminta. Paket metadata menyediakan dua fungsi untuk memudahkan konstruksi context:

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// Unary RPC
res,err := client.SomeRPC(outgoingContext,data)
// Streaming RPC
stream,err := client.StreamRPC(outgoingContext)

Jika ctx asli sudah memiliki metadata, menggunakan NewOutgoingContext akan langsung menimpa data sebelumnya. Untuk menghindari ini, Anda dapat menggunakan fungsi berikut, yang tidak akan menimpa tetapi menggabungkan data:

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// Unary RPC
res,err := client.SomeRPC(appendContext,data)
// Streaming RPC
stream,err := client.StreamRPC(appendContext)

Interceptor

Interceptor gRPC mirip dengan Middleware di gin, keduanya untuk melakukan pekerjaan khusus sebelum atau setelah permintaan tanpa mempengaruhi logika bisnis itu sendiri. Di gRPC, ada dua jenis interceptor utama: interceptor server dan interceptor klien. Menurut jenis permintaan, ada interceptor Unary RPC dan interceptor Streaming RPC, seperti yang ditunjukkan di bawah ini:

Untuk memahami interceptor dengan lebih baik, berikut ini akan mendeskripsikan berdasarkan contoh yang sangat sederhana:

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

Konten person.proto:

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Kode server, logika semuanya dari konten sebelumnya, relatif sederhana dan tidak akan diuraikan:

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Simpan data
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Interceptor Server

Interceptor untuk permintaan RPC server adalah UnaryServerInterceptor dan StreamServerInterceptor, dengan tipe spesifik seperti yang ditunjukkan di bawah ini:

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

Unary RPC

Untuk membuat interceptor Unary RPC, Anda hanya perlu mengimplementasikan tipe UnaryServerInterceptor. Berikut adalah contoh interceptor Unary RPC sederhana yang mengeluarkan setiap permintaan dan respons RPC:

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} Data permintaan RPC
// param info *grpc.UnaryServerInfo Informasi permintaan untuk Unary RPC ini
// param unaryHandler grpc.UnaryHandler Handler spesifik
// return resp interface{} Data respons RPC
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Untuk Unary RPC, interceptor mengintersepsi setiap permintaan dan respons RPC, yaitu mengintersepsi fase permintaan RPC dan fase respons. Jika interceptor mengembalikan error, permintaan akan berakhir.

Streaming RPC

Untuk membuat interceptor Streaming RPC, Anda hanya perlu mengimplementasikan tipe StreamServerInterceptor. Berikut adalah contoh interceptor Streaming RPC sederhana:

go
// StreamPersonLogInterceptor
// param srv interface{} Sesuai dengan server yang diimplementasikan oleh server
// param stream grpc.ServerStream Objek stream
// param info *grpc.StreamServerInfo Informasi stream
// param streamHandler grpc.StreamHandler Handler
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Untuk Streaming RPC, interceptor mengintersepsi ketika method Send dan Recv setiap objek stream dipanggil. Jika interceptor mengembalikan error, ini tidak akan menyebabkan permintaan RPC berakhir; ini hanya menunjukkan bahwa send atau recv ini mengalami error.

Menggunakan Interceptor

Untuk membuat interceptor yang dibuat berlaku, mereka perlu diteruskan sebagai opsi saat membuat server gRPC. Resmi juga menyediakan fungsi terkait untuk digunakan. Seperti yang ditunjukkan di bawah ini, ada fungsi untuk menambahkan interceptor tunggal dan fungsi untuk menambahkan interceptor berantai:

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

Penggunaan berulang UnaryInterceptor akan melempar panic berikut:

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor juga sama. Interceptor berantai, jika dipanggil berulang kali, akan menambahkan ke rantai yang sama.

Contoh penggunaan:

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Tambahkan interceptor berantai
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

Interceptor Klien

Interceptor klien mirip dengan interceptor server: satu interceptor Unary UnaryClientInterceptor dan satu interceptor Streaming StreamClientInterceptor, dengan tipe spesifik seperti yang ditunjukkan di bawah ini:

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC

Untuk membuat interceptor klien Unary RPC, implementasikan UnaryClientInterceptor. Berikut adalah contoh sederhana:

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string Nama method
// param req interface{} Data permintaan
// param reply interface{} Data respons
// param cc *grpc.ClientConn Objek koneksi klien
// param invoker grpc.UnaryInvoker Method klien spesifik yang diintersepsi
// param opts ...grpc.CallOption Item konfigurasi untuk permintaan ini
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

Melalui interceptor Unary RPC klien, Anda bisa mendapatkan data permintaan lokal, data respons, dan informasi permintaan lainnya.

Streaming RPC

Untuk membuat interceptor klien Streaming RPC, implementasikan StreamClientInterceptor. Berikut adalah contoh:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Deskripsi objek stream
// param cc *grpc.ClientConn Objek koneksi
// param method string Nama method
// param streamer grpc.Streamer Objek untuk membuat objek stream
// param opts ...grpc.CallOption Item konfigurasi koneksi
// return grpc.ClientStream Objek stream klien yang dibuat
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Melalui interceptor klien Streaming RPC, Anda hanya dapat mengintersepsi ketika klien membangun koneksi dengan server, yaitu saat membuat stream. Anda tidak dapat mengintersepsi setiap kali objek stream klien mengirim atau menerima pesan. Namun, jika kita membungkus objek stream yang dibuat di interceptor, kita dapat mencapai intersepsi pengiriman dan penerimaan pesan, seperti ini:

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc Deskripsi objek stream
// param cc *grpc.ClientConn Objek koneksi
// param method string Nama method
// param streamer grpc.Streamer Objek untuk membuat objek stream
// param opts ...grpc.CallOption Item konfigurasi koneksi
// return grpc.ClientStream Objek stream klien yang dibuat
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Sebelum pesan dikirim
  err := c.ClientStream.SendMsg(m)
  // Setelah pesan dikirim
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Sebelum pesan diterima
  err := c.ClientStream.RecvMsg(m)
  // Setelah pesan diterima
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Menggunakan Interceptor

Saat menggunakan, mirip dengan server, ada empat fungsi utilitas untuk menambahkan interceptor melalui opsi, dibagi menjadi interceptor tunggal dan interceptor berantai:

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

Penggunaan berulang WithUnaryInterceptor pada klien tidak akan melempar panic, tetapi hanya yang terakhir yang akan berlaku.

Berikut adalah kasus penggunaan:

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Sejauh ini, seluruh kasus telah ditulis. Saatnya menjalankannya dan melihat hasilnya. Output server:

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

Output klien:

client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Anda dapat melihat bahwa output kedua sisi memenuhi harapan, mencapai efek intersepsi. Kasus ini hanyalah contoh sederhana. Menggunakan interceptor gRPC, Anda dapat melakukan banyak hal seperti otorisasi, logging, monitoring, dan fungsi lainnya. Anda dapat memilih untuk membuat sendiri atau menggunakan roda yang ada dari komunitas open-source. gRPC Ecosystem mengumpulkan serangkaian middleware interceptor gRPC open-source. Alamat: grpc-ecosystem/go-grpc-middleware.

Penanganan Error

Sebelum memulai, mari kita lihat contoh. Dalam kasus interceptor sebelumnya, jika kueri pengguna gagal, error person not found dikembalikan ke klien. Pertanyaannya adalah: dapatkah klien membuat penanganan khusus berdasarkan error yang dikembalikan? Mari kita coba. Dalam kode klien, coba gunakan errors.Is untuk menilai error:

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Output hasil:

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfo req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfo req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

Anda dapat melihat error yang diterima klien seperti ini. Anda akan menemukan error yang dikembalikan server ada di field desc:

rpc error: code = Unknown desc = person not found

Tentu saja, logika errors.Is tidak dieksekusi. Bahkan menggunakan errors.As akan menghasilkan hasil yang sama:

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Untuk ini, gRPC menyediakan paket status untuk menyelesaikan masalah seperti ini. Ini juga mengapa error yang diterima klien memiliki field code dan desc - karena gRPC sebenarnya mengembalikan Status ke klien. Tipe spesifiknya adalah sebagai berikut, yang juga merupakan message yang didefinisikan oleh protobuf:

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // Kode status, yang seharusnya merupakan nilai enum dari
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // Pesan error yang面向开发者,应该是英语。任何面向用户的错误消息应该本地化并发送到
  // [google.rpc.Status.details][google.rpc.Status.details] field, 或由客户端本地化。
  string message = 2;

  // 携带错误详情的消息列表。API 有一套通用的消息类型。
  repeated google.protobuf.Any details = 3;
}

Kode Error

Code dalam struktur Status mirip dengan HTTP Status, digunakan untuk menunjukkan status permintaan RPC saat ini. gRPC mendefinisikan 16 code yang terletak di grpc/codes, mencakup sebagian besar skenario, seperti yang ditunjukkan di bawah ini:

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // Panggilan berhasil
  OK Code = 0

  // Permintaan dibatalkan
  Canceled Code = 1

  // Error tidak diketahui
  Unknown Code = 2

  // Argumen tidak valid
  InvalidArgument Code = 3

    // Permintaan timeout
  DeadlineExceeded Code = 4

  // Sumber daya tidak ditemukan
  NotFound Code = 5

    // Sumber daya sudah ada (tidak menyangka yang ini)
  AlreadyExists Code = 6

  // Izin ditolak karena izin tidak cukup
  PermissionDenied Code = 7

  // Sumber daya habis, kapasitas tersisa tidak cukup, seperti ruang disk habis
  ResourceExhausted Code = 8

  // Prasyarat tidak terpenuhi, seperti menggunakan rm untuk menghapus direktori non-kosong di mana kondisi penghapusan adalah direktori kosong
  FailedPrecondition Code = 9

  // Permintaan dibatalkan
  Aborted Code = 10

  // Operasi di luar jangkauan
  OutOfRange Code = 11

  // Layanan tidak diimplementasikan
  Unimplemented Code = 12

  // Error sistem internal
  Internal Code = 13

  // Layanan tidak tersedia
  Unavailable Code = 14

  // Kehilangan data
  DataLoss Code = 15

  // Autentikasi gagal
  Unauthenticated Code = 16

  _maxCode = 17
)

Paket grpc/status menyediakan banyak fungsi untuk mengonversi antara status dan error. Kami dapat langsung menggunakan status.New untuk membuat Status, atau Newf:

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Misalnya:

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

Melalui method err status, Anda bisa mendapatkan error. Ketika status OK, error adalah nil:

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

Anda juga dapat langsung membuat error:

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Jadi kami dapat memodifikasi kode server sebagai berikut:

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Sebelum ini, semua code yang dikembalikan server adalah unknown. Sekarang setelah dimodifikasi, mereka memiliki semantik yang lebih jelas. Jadi di klien, Anda dapat menggunakan status.FromError atau fungsi berikut untuk mendapatkan status dari error, sehingga membuat penanganan yang sesuai berdasarkan code yang berbeda:

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Contoh:

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Namun, meskipun code gRPC telah sebisa mungkin mencakup beberapa skenario umum, terkadang masih tidak dapat memenuhi kebutuhan developer. Pada saat ini, Anda dapat menggunakan field Details di Status, yang juga merupakan slice yang dapat menampung beberapa informasi. Teruskan beberapa informasi kustom melalui Status.WithDetails:

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Dapatkan informasi melalui Status.Details:

go
func (s *Status) Details() []interface{}

Perhatikan bahwa informasi yang diteruskan sebaiknya didefinisikan oleh protobuf sehingga server dan klien dapat menguraikannya dengan mudah. Resmi menyediakan beberapa contoh:

protobuf
message ErrorInfo {
  // Alasan error
  string reason = 1;

  // Definisi subjek layanan
  string domain = 2;

  // Informasi lainnya
  map<string, string> metadata = 3;
}

// Informasi retry
message RetryInfo {
  // Interval tunggu untuk permintaan yang sama
  google.protobuf.Duration retry_delay = 1;
}

// Informasi debug
message DebugInfo {
  // Stack trace
  repeated string stack_entries = 1;

  // Beberapa informasi detail
  string detail = 2;
}

    ...
    ...

Lebih banyak contoh dapat ditemukan di googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Jika diperlukan, Anda dapat mengimpor menggunakan kode berikut:

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

Gunakan ErrorInfo sebagai details:

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

Di klien, Anda bisa mendapatkan data dan membuat penanganan. Namun, di atas hanyalah beberapa contoh yang direkomendasikan oleh gRPC. Selain itu, Anda juga dapat mendefinisikan message Anda sendiri untuk lebih memenuhi kebutuhan bisnis yang sesuai. Jika Anda ingin melakukan beberapa penanganan error terpadu, Anda juga dapat menempatkannya di interceptor.

Kontrol Timeout

Dalam kebanyakan kasus, biasanya tidak hanya ada satu layanan, dan mungkin ada banyak layanan hulu dan banyak layanan hilir. Ketika klien memulai permintaan, dari layanan paling hulu ke layanan paling hilir, rantai panggilan layanan terbentuk, seperti dalam diagram, mungkin bahkan lebih panjang dari yang ditunjukkan.

Dengan rantai panggilan yang begitu panjang, jika logika pemrosesan satu layanan membutuhkan waktu lama, ini akan menyebabkan hulu一直处于等待状态。Untuk mengurangi pemborosan sumber daya yang tidak perlu, perlu memperkenalkan mekanisme timeout. Dengan cara ini, timeout yang diteruskan oleh panggilan paling hulu adalah waktu eksekusi maksimum yang diizinkan untuk seluruh rantai panggilan. gRPC dapat meneruskan timeout lintas proses dan bahasa. Ini menempatkan beberapa data yang perlu diteruskan lintas proses di frame HEADERS Frame HTTP/2, seperti yang ditunjukkan di bawah ini:

Data timeout dalam permintaan gRPC sesuai dengan field grpc-timeout di HEADERS Frame. Perhatikan bahwa tidak semua pustaka gRPC mengimplementasikan mekanisme penerusan timeout ini, tetapi gRPC-go pasti mendukungnya. Jika menggunakan pustaka dalam bahasa lain dan menggunakan fitur ini, Anda perlu memberi perhatian ekstra pada hal ini.

Timeout Koneksi

Ketika klien gRPC membangun koneksi ke server, ini default secara asinkron. Jika koneksi gagal, ini hanya mengembalikan Client kosong. Jika Anda ingin koneksi bersifat sinkron, Anda dapat menggunakan grpc.WithBlock() untuk memblokir sambil menunggu koneksi dibangun:

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Jika Anda ingin mengatur timeout, Anda hanya perlu memasukkan TimeoutContext, menggunakan grpc.DialContext alih-alih grpc.Dial untuk memasukkan context:

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Dengan cara ini, jika pembangunan koneksi timeout, error akan dikembalikan:

context deadline exceeded

Di sisi server, Anda juga dapat mengatur timeout koneksi, mengatur timeout saat membangun koneksi baru dengan klien. Default adalah 120 detik. Jika koneksi tidak berhasil dibangun dalam waktu yang ditentukan, server akan secara aktif memutuskan.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout masih dalam tahap eksperimental, dan API dapat dimodifikasi atau dihapus di masa depan.

Timeout Permintaan

Ketika klien gRPC memulai permintaan, parameter pertama adalah tipe Context. Demikian pula, jika Anda ingin menambahkan timeout ke permintaan RPC, Anda hanya perlu memasukkan TimeoutContext:

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Penanganan logika timeout
}

Melalui pemrosesan gRPC, timeout diteruskan ke server. Selama transmisi, ini ada di field frame. Di Go, ini ada dalam bentuk context, sehingga meneruskan seluruh链路。Selama transmisi链路,tidak disarankan untuk memodifikasi timeout. Berapa lama timeout diatur selama permintaan seharusnya menjadi pertimbangan untuk tingkat paling hulu.

Autentikasi dan Otorisasi

Di bidang microservices, setiap layanan perlu memverifikasi identitas dan izin pengguna untuk permintaan. Jika setiap layanan mengimplementasikan logika autentikasi sendiri seperti dalam aplikasi monolitik, ini jelas tidak realistis. Oleh karena itu, layanan autentikasi dan otorisasi terpadu diperlukan. Solusi umum meliputi OAuth2, sesi terdistribusi, dan JWT. Di antaranya, OAuth2 adalah yang paling banyak digunakan dan telah menjadi standar industri. Tipe token paling umum untuk OAuth2 adalah JWT. Berikut adalah diagram alur mode kode otorisasi OAuth2, dengan proses dasar seperti yang ditunjukkan.

Transmisi Aman

Pendaftaran dan Penemuan Layanan

Sebelum klien dapat memanggil layanan spesifik di server, ini perlu mengetahui IP dan port server. Dalam kasus sebelumnya, alamat server hardcoded. Dalam lingkungan jaringan aktual, tidak selalu stabil. Beberapa layanan mungkin offline karena kegagalan dan menjadi tidak dapat diakses, atau alamat dapat berubah karena pengembangan bisnis dan migrasi mesin. Dalam kasus ini, alamat statis tidak dapat digunakan untuk mengakses layanan. Masalah dinamis inilah yang diselesaikan oleh penemuan dan pendaftaran layanan. Penemuan layanan bertanggung jawab untuk memantau perubahan alamat layanan dan memperbarui, sedangkan pendaftaran layanan bertanggung jawab untuk memberi tahu dunia luar alamatnya. Di gRPC, fungsi penemuan layanan dasar disediakan, dan ini mendukung ekstensi dan kustomisasi.

Alih-alih alamat statis, Anda dapat menggunakan beberapa nama spesifik untuk menggantinya. Misalnya, browser mendapatkan alamat melalui resolusi DNS nama domain. Demikian pula, penemuan layanan default gRPC adalah melalui DNS. Modifikasi file host lokal Anda dan tambahkan pemetaan berikut:

127.0.0.1 example.grpc.com

Kemudian ubah alamat Dial klien dalam contoh helloworld ke nama domain yang sesuai:

go
func main() {
  // Bangun koneksi, tanpa verifikasi enkripsi
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Buat klien
  client := hello2.NewSayHelloClient(conn)
  // Panggilan jarak jauh
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Anda masih dapat melihat output normal:

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

Di gRPC, nama-nama seperti itu harus mematuhi sintaks URI yang didefinisikan dalam RFC 3986, dengan format:

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

URI dalam contoh di atas adalah dalam bentuk berikut. Karena DNS didukung secara default, prefix scheme dihilangkan:

dns:example.grpc.com:8080

Selain itu, gRPC juga mendukung Unix domain sockets secara default. Untuk metode lain, kami perlu mengimplementasikan ekstensi kustom sesuai dengan gRPC. Untuk ini, kami perlu mengimplementasikan resolver kustom resolver.Resolver. Resolver bertanggung jawab untuk memantau pembaruan alamat target dan konfigurasi layanan:

go
type Resolver interface {
    // gRPC akan memanggil ResolveNow untuk mencoba menyelesaikan nama target lagi. Ini hanya petunjuk, dan resolver dapat mengabaikannya jika tidak diperlukan. Method dapat dipanggil secara konkuren.
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC mengharuskan kami memasukkan builder Resolver, yaitu resolver.Builder, bertanggung jawab untuk membuat instance Resolver:

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Method Scheme Builder mengembalikan tipe Scheme yang bertanggung jawab untuk diuraikan. Misalnya, dnsBuilder default mengembalikan dns. Builder harus didaftarkan ke Builder global menggunakan resolver.Register selama inisialisasi, atau diteruskan sebagai opsi menggunakan grpc.WithResolver secara internal ke ClientConn. Yang terakhir memiliki prioritas lebih tinggi daripada yang pertama.

Diagram di atas secara sederhana menggambarkan alur kerja resolver. Selanjutnya, mari kita demonstrasikan cara mengkustomisasi resolver.

Resolusi Layanan Kustom

Berikut menulis resolver kustom, yang memerlukan builder resolver kustom untuk konstruksi:

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Harus updateState di sini, jika tidak deadlock akan terjadi
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Konfigurasi, loadBalancingPolicy mengacu pada strategi load balancing
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Resolver kustom meneruskan alamat yang cocok dari map ke updateState dan juga menentukan strategi load balancing. round_robin mengacu pada round-robin.

go
// struktur service config adalah sebagai berikut
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

Kode klien adalah sebagai berikut:

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // Daftarkan builder
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Bangun koneksi, tanpa verifikasi enkripsi
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // Buat klien
  client := hello2.NewSayHelloClient(conn)
     // Panggil sekali per detik
  for range time.Tick(time.Second) {
    // Panggilan jarak jauh
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Biasanya, prosesnya adalah: server mendaftarkan layanannya ke registry, kemudian klien mendapatkan daftar layanan dari registry dan melakukan pencocokan. Map yang dimasukkan di sini adalah registry yang disimulasikan. Karena data statis, langkah pendaftaran layanan dihilangkan, hanya menyisakan penemuan layanan. Target yang digunakan oleh klien adalah hello:myworld, di mana hello adalah scheme kustom dan myworld adalah nama layanan. Setelah diuraikan oleh resolver kustom, alamat nyata 127.0.0.1:8080 diperoleh. Dalam situasi aktual, untuk mencapai load balancing, nama layanan dapat mencocokkan beberapa alamat nyata, itulah sebabnya nama layanan sesuai dengan slice. Di sini, dua server dimulai, menempati port yang berbeda. Strategi load balancing adalah round-robin. Output server adalah sebagai berikut. Dari waktu permintaan, Anda dapat melihat strategi load balancing memang berfungsi. Jika tidak ada strategi yang ditentukan, hanya layanan pertama yang dipilih secara default:

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

Registry pada dasarnya menyimpan koleksi pemetaan nama pendaftaran layanan dan alamat layanan nyata. Middleware apa pun yang mampu menyimpan data dapat memenuhi persyaratan. Bahkan menggunakan MySQL sebagai registry dimungkinkan (meskipun mungkin tidak ada yang akan melakukannya). Umumnya, registry menggunakan penyimpanan KV. Redis adalah pilihan yang baik, tetapi jika menggunakan Redis sebagai registry, kami perlu mengimplementasikan banyak logika sendiri, seperti pemeriksaan heartbeat layanan, penanganan offline layanan, penjadwalan layanan, dll., yang cukup merepotkan. Meskipun Redis memiliki aplikasi tertentu di bidang ini, mereka relatif sedikit. Seperti kata pepatah, biarkan profesional menangani tugas profesional. Ada banyak solusi terkenal di bidang ini: ZooKeeper, Consul, Eureka, ETCD, Nacos, dll.

Anda dapat mengunjungi 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) untuk mempelajari perbedaan antara middleware ini.

Bergabung dengan Consul

Untuk kasus yang bergabung dengan Consul, kunjungi consul

Load Balancing

Golang by www.golangdev.cn edit