Skip to content

gRPC

Uzaktan prosedür çağrısı rpc, mikroservislerde öğrenilmesi gereken noktalardan biridir. Öğrenme sürecinde çeşitli rpc çerçeveleriyle karşılaşılabilir, ancak go alanında neredeyse tüm rpc çerçeveleri gRPC üzerine kuruludur ve ayrıca bulut bilişim alanında temel bir protokol haline gelmiştir. Neden bunu seçiyoruz, resmi cevap şu şekildedir:

gRPC, herhangi bir ortamda çalışabilen modern, açık kaynaklı, yüksek performanslı bir uzaktan prosedür çağrısı (Remote Process Call, RPC) çerçevesidir. Veri merkezi içinde ve veri merkezleri arasındaki servisleri etkili bir şekilde bağlar; takılabilir yük dengeleme, izleme, sağlık kontrolü ve kimlik doğrulama desteği sunar. Ayrıca cihazların, mobil uygulamaların ve tarayıcıların arka uç servislerine bağlanması için son mil dağıtık bilişim olarak da uygundur.

Resmi web sitesi: gRPC

Resmi dokümantasyon: Documentation | gRPC

gRPC teknik öğretici: Basics tutorial | Go | gRPC

ProtocBuf resmi web sitesi: Reference Guides | Protocol Buffers Documentation (protobuf.dev)

Ayrıca CNCF vakfının açık kaynak projesidir, CNCF tam adı CLOUD NATIVE COMPUTING FOUNDATION, çeviri adı Bulut Yerel Bilişim Vakfı

Özellikler

Basit servis tanımı

Protocol Buffers kullanarak servisleri tanımlayın, bu güçlü bir binary serileştirme araç seti ve dilidir.

Başlatma ve ölçeklendirme çok hızlıdır

Çalışma zamanı ve geliştirme ortamını yüklemek için sadece bir satır kod yeterlidir, saniyeler içinde milyonlarca RPC'ye ölçeklenebilir

Çapraz dil, çapraz platform

Farklı platformlar ve diller için otomatik olarak istemci ve sunucu servis taslakları oluşturur

Çift yönlü akış ve entegre yetkilendirme

HTTP/2 tabanlı çift yönlü akış ve takılabilir kimlik doğrulama yetkilendirmesi

GRPC dilden bağımsız olmasına rağmen, bu sitenin içeriğinin çoğu go ile ilgilidir, bu nedenle bu makale de go'yu ana dil olarak kullanacaktır,后续 kullanılan pb derleyicisi ve oluşturucusu diğer dillerin kullanıcıları için Protobuf resmi web sitesinde kendileri bulabilirler. Kolaylık olması için projenin oluşturma süreci doğrudan atlanacaktır.

Bağımlılıkların Kurulumu

Önce Protocol Buffer derleyicisini indirin, indirme adresi: Releases · protocolbuffers/protobuf (github.com)

Kendi durumunuza göre sistem ve versiyon seçin, indirdikten sonra bin dizinini ortam değişkenlerine eklemeniz gerekir.

Ardından kod oluşturucuyu indirmeniz gerekir, derleyici proto dosyalarını ilgili dilin serileştirme kodlarına dönüştürür, oluşturucu iş kodunu oluşturmak için kullanılır.

sh
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest

grpc_learn adında boş bir proje oluşturun, ardından aşağıdaki bağımlılığı ekleyin

sh
$ go get google.golang.org/grpc

Son olarak versiyonu kontrol edin, gerçekten kurulup kurulmadığını görün

sh
$ protoc --version
libprotoc 23.4

$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1

$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0

Hello World

Proje Yapısı

Aşağıda bir Hello World örneği ile gösterim yapılacaktır, aşağıdaki proje yapısını oluşturun.

grpc_learn\helloworld
|
+---client
|       main.go
|
+---hello
|
|
+---pb
|       hello.proto
|
\---server
        main.go

protobuf Dosyasını Tanımlama

pb/hello.proto içinde aşağıdaki içeriği yazın, bu oldukça basit bir örnektir, eğer protoc sözdizimini bilmiyorsanız ilgili dokümantasyona bakın.

protobuf
syntax = "proto3";

// . doğrudan çıktı yolunda oluşturulacağını gösterir, hello paket adıdır
option go_package = ".;hello";

// İstek
message HelloReq {
  string name = 1;


  // Yanıt
  message HelloRep {
    string msg = 1;
  }

  // Servis tanımla
  service SayHello {
  rpc Hello(HelloReq) returns (HelloRep) {}
}

Kod Oluşturma

Yazım tamamlandıktan sonra protoc derleyicisini kullanarak veri serileştirme ile ilgili kodları oluşturun, oluşturucuyu kullanarak rpc ile ilgili kodları oluşturun

sh
$ protoc -I ./pb \
    --go_out=./hello ./pb/*.proto\
    --go-grpc_out=./hello ./pb/*.proto

Bu noktada hello klasöründe hello.pb.go ve hello_grpc.pb.go dosyalarının oluşturulduğunu görebilirsiniz, hello.pb.go dosyasına göz atarak tanımladığımız message'ı görebilirsiniz

go
type HelloReq struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

    // Tanımlanan alan
  Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}

type HelloRep struct {
  state         protoimpl.MessageState
  sizeCache     protoimpl.SizeCache
  unknownFields protoimpl.UnknownFields

     // Tanımlanan alan
  Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}

hello_grpc.pb.go dosyasında tanımladığımız servisi bulabilirsiniz

go
type SayHelloServer interface {
  Hello(context.Context, *HelloReq) (*HelloRep, error)
  mustEmbedUnimplementedSayHelloServer()
}

// 后续如果我们自己实现服务接口,必须要嵌入该结构体,就不用实现 mustEmbedUnimplementedSayHelloServer 方法
type UnimplementedSayHelloServer struct {
}

// Varsayılan olarak nil döner
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
  return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}

// Arayüz kısıtlaması
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}

type UnsafeSayHelloServer interface {
  mustEmbedUnimplementedSayHelloServer()
}

Sunucu Tarafını Yazma

server/main.go içinde aşağıdaki kodu yazın

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  pb "grpc_learn/server/protoc"
  "log"
  "net"
)

type GrpcServer struct {
  pb.UnimplementedSayHelloServer
}

func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
  log.Printf("received grpc req: %+v", req.String())
  return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}

func main() {
  // Portu dinle
  listen, err := net.Listen("tcp", ":8080")
  if err != nil {
    panic(err)
  }
  // gprc sunucusu oluştur
  server := grpc.NewServer()
  // Servis kaydı
  pb.RegisterSayHelloServer(server, &GrpcServer{})
  // Çalıştır
  err = server.Serve(listen)
  if err != nil {
    panic(err)
  }
}

İstemci Tarafını Yazma

client/main.go içinde aşağıdaki kodu yazın

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  pb "grpc_learn/server/protoc"
  "log"
)

func main() {
    // Bağlantı kur, şifreleme doğrulaması yok
  conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // İstemci oluştur
  client := pb.NewSayHelloClient(conn)
  // Uzaktan çağrı
  helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Çalıştırma

Önce sunucu tarafını çalıştırın, ardından istemci tarafını çalıştırın, sunucu çıktısı şu şekildedir

2023/07/16 16:26:51 received grpc req: name:"client"

İstemci çıktısı şu şekildedir

2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"

Bu örnekte istemci bağlantıyı kurduktan sonra uzaktan metodu çağırırken yerel metodu çağırmakla aynıdır, doğrudan client'ın Hello metoduna erişip sonucu alır, bu en basit GRPC örneğidir, birçok açık kaynak çerçeve de bu süreci paketlemiştir.

bufbuild

Yukarıdaki örnekte kod doğrudan komutla oluşturuldu, eğer daha sonra plugin sayısı artarsa komut oldukça karmaşık görünebilir, bu durumda protobuf dosyalarını yönetmek için araç kullanılabilir, tam da böyle bir açık kaynak yönetim aracı var bufbuild/buf.

Açık kaynak adresi: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)

Dokümantasyon adresi: Buf - Install the Buf CLI

Özellikler

  • BSR yönetimi
  • Linter
  • Kod oluşturma
  • Biçimlendirme
  • Bağımlılık yönetimi

Bu araçla protobuf dosyalarını oldukça kolay bir şekilde yönetebilirsiniz.

Dokümantasyonda oldukça fazla kurulum yöntemi sunulmuştur, kendiniz seçebilirsiniz. Eğer yerel olarak go ortamı kuruluysa doğrudan go install ile kurulum yapabilirsiniz

sh
$ go install github.com/bufbuild/buf/cmd/buf@latest

Kurulum tamamlandıktan sonra versiyonu kontrol edin

sh
$ buf --version
1.24.0

helloworld/pb klasörüne gelin, pb dosyalarını yönetmek için bir module oluşturmak için aşağıdaki komutu çalıştırın.

sh
$ buf mod init
$ ls
buf.yaml  hello.proto

buf.yaml dosya içeriği varsayılan olarak şu şekildedir

yaml
version: v1
breaking:
  use:
    - FILE
lint:
  use:
    - DEFAULT

Ardından helloworld/ dizinine gelin, buf.gen.yaml oluşturun, aşağıdaki içeriği yazın

yaml
version: v1
plugins:
  - plugin: go
    out: hello
    opt:
  - plugin: go-grpc
    out: hello
    opt:

Ardından kodu oluşturmak için komutu çalıştırın

sh
$ buf generate

Tamamlandıktan sonra oluşturulan dosyaları görebilirsiniz, elbette buf'un sadece bu kadar işlevi yok, diğer işlevleri kendiniz dokümantasyondan öğrenebilirsiniz.

Akış RPC

grpc çağrı yöntemi iki kategoriye ayrılır, Unary RPC (Unary RPC) ve Stream RPC (Stream RPC). Hello World örneği tipik bir Unary RPC'dir.

Unary rpc (veya normal rpc daha anlaşılır olabilir, gerçekten unary kelimesini nasıl çevireceğimi bilmiyorum) kullanımı normal http gibidir, istemci istek gönderir, sunucu veri döner, soru-cevap şeklindedir. Stream RPC isteği ve yanıtı akış şeklinde olabilir, aşağıdaki şekil gibi

Stream isteği kullanıldığında sadece bir yanıt döner, istemci stream aracılığıyla sunucuya birden fazla kez parametre gönderebilir, sunucu Unary RPC gibi tüm parametreleri alana kadar beklemek zorunda değildir, belirli işleme mantığı sunucu tarafından belirlenebilir. Normal情况下 sadece istemci stream isteğini aktif olarak kapatabilir, stream kapatıldığında mevcut RPC isteği de sona erer.

Stream yanıtı kullanıldığında sadece bir parametre gönderilir, sunucu stream aracılığıyla istemciye birden fazla kez veri gönderebilir, istemci Unary RPC gibi tüm veriyi alana kadar beklemek zorunda değildir, belirli işleme mantığı istemci tarafından belirlenebilir. Normal istekte sadece sunucu stream yanıtını aktif olarak kapatabilir, stream kapatıldığında mevcut RPC isteği de sona erer.

protobuf
service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}

Veya sadece yanıtın stream şeklinde olması da olabilir (Server-Streaming RPC)

protobuf
service MessageService {
  rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}

Veya istek ve yanıtın her ikisinin de stream şeklinde olması (Bi-directional-Streaming RPC)

service MessageService {
  rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}

Tek Yönlü Stream

Aşağıda tek yönlü stream işlemini göstermek için bir örnek kullanılacaktır, önce aşağıdaki proje yapısını oluşturun

grpc_learn\server_client_stream
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go

message.proto içeriği şu şekildedir

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service MessageService {
  rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
  rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}

buf aracılığıyla kod oluşturun

sh
$ buf generate

Burada gösterim mesaj servisidir, receiveMessage belirtilen kullanıcı adını alır, türü string'dir, mesaj stream'i döner, sendMessage mesaj stream'i alır, başarılı gönderilen mesaj sayısını döner, türü 64 bit tamsayıdır. Ardından server/message_service.go oluşturun, varsayılan olarak oluşturulan servisi kendiniz implement edin

go
package main

import (
  "google.golang.org/grpc/codes"
  "google.golang.org/grpc/status"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
)

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}

Mesaj alma ve gönderme parametrelerinde bir stream wrapper arayüzü olduğunu görebilirsiniz

go
type MessageService_ReceiveMessageServer interface {
    // Mesaj gönder
  Send(*Message) error
  grpc.ServerStream
}

type MessageService_SendMessageServer interface {
    // Dönüş değerini gönder ve bağlantıyı kapat
  SendAndClose(*wrapperspb.StringValue) error
    // Mesaj al
  Recv() (*Message, error)
  grpc.ServerStream
}

Her ikisi de gprc.ServerStream arayüzünü embed eder

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  Context() context.Context
  SendMsg(m interface{}) error
  RecvMsg(m interface{}) error
}

Stream RPC'nin Unary RPC gibi parametre ve dönüş değerlerinin fonksiyon imzasında açıkça görülemediğini, bu yöntemlere ilk bakışta parametre ve dönüş değerlerinin ne tür olduğunu anlayamadığınızı,传入 edilen Stream türünü kullanarak stream iletişimini tamamlamanız gerektiğini görebilirsiniz, ardından sunucu tarafının belirli mantığını yazmaya başlayın. Sunucu tarafı mantığını yazarken bir sync.map kullanarak mesaj kuyruğunu simüle ettik, istemci ReceiveMessage isteği gönderdiğinde sunucu stream yanıtı aracılığıyla istemcinin istediği mesajları sürekli döner, zaman aşımı süresi geçtikten sonra isteği kesene kadar. İstemci SendMessage istediğinde stream isteği aracılığıyla sürekli mesaj gönderir, sunucu sürekli mesajları kuyruğa koyar, istemci aktif olarak isteği kesene kadar ve istemciye mesaj gönderme sayısını döner.

go
package main

import (
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "sync"
  "time"
)

// Simüle edilmiş bir mesaj kuyruğu
var messageQueue sync.Map

type MessageService struct {
  message.UnimplementedMessageServiceServer
}

// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Belirtilen kullanıcının mesajlarını al
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
  timer := time.NewTimer(time.Second * 5)
  for {
    time.Sleep(time.Millisecond * 100)
    select {
    case <-timer.C:
      log.Printf("5 秒钟内没有收到%s的消息,关闭连接", user.GetValue())
      return nil
    default:
      value, ok := messageQueue.Load(user.GetValue())
      if !ok {
        messageQueue.Store(user.GetValue(), []*message.Message{})
        continue
      }
      queue := value.([]*message.Message)
      if len(queue) < 1 {
        continue
      }

      // Mesajı al
      msg := queue[0]
      // Stream iletişimi kullanarak mesajı istemciye gönder
      err := recvServer.Send(msg)
      log.Printf("receive %+v\n", msg)
      if err != nil {
        return err
      }

      queue = queue[1:]
      messageQueue.Store(user.GetValue(), queue)
      timer.Reset(time.Second * 5)
    }
  }
}

// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Belirtilen kullanıcıya mesaj gönder
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
  count := 0
  for {
    // İstemciden mesaj al
    msg, err := sendServer.Recv()
    if errors.Is(err, io.EOF) {
      return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
    }
    if err != nil {
      return err
    }
    log.Printf("send %+v\n", msg)

    value, ok := messageQueue.Load(msg.From)
    if !ok {
      messageQueue.Store(msg.From, []*message.Message{msg})
      continue
    }
    queue := value.([]*message.Message)
    queue = append(queue, msg)
    // Mesajı mesaj kuyruğuna koy
    messageQueue.Store(msg.From, queue)
    count++
  }
}

İstemci iki goroutine açtı, bir goroutine mesaj göndermek için, diğer goroutine mesaj almak için kullanıldı, elbette aynı anda gönderip alabilirsiniz, kod şu şekildedir.

go
package main

import (
  "context"
  "errors"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/server_client_stream/message"
  "io"
  "log"
  "time"
)

var Client message.MessageServiceClient

func main() {
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  if err != nil {
    log.Panicln(err)
  }
  defer dial.Close()

  Client = message.NewMessageServiceClient(dial)

  log.SetPrefix("client\t")
  msgTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  ctx := context.Background()

  // Mesaj alma isteği
  msgTask.AddJobs(func() {
    receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
    if err != nil {
      log.Panicln(err)
    }
    for {
      recv, err := receiveMessageStream.Recv()
      if errors.Is(err, io.EOF) {
        log.Println("暂无消息,关闭连接")
        break
      } else if err != nil {
        break
      }
      log.Printf("receive %+v", recv)
    }
  })

  msgTask.AddJobs(func() {
    from := "jack"
    to := "mike"

    sendMessageStream, err := Client.SendMessage(ctx)
    if err != nil {
      log.Panicln(err)
    }
    msgs := []string{
      "在吗",
      "下午有没有时间一起打游戏",
      "那行吧,以后有时间一起约",
      "就这个周末应该可以吧",
      "那就这么定了",
    }
    for _, msg := range msgs {
      time.Sleep(time.Second)
      sendMessageStream.Send(&message.Message{
        From:    from,
        Content: msg,
        To:      to,
      })
    }
    // Mesajlar gönderildi, aktif olarak isteği kapat ve dönüş değerini al
    recv, err := sendMessageStream.CloseAndRecv()
    if err != nil {
      log.Println(err)
    } else {
      log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
    }
  })

  msgTask.Run()
}

Çalıştırdıktan sonra sunucu çıktısı şu şekildedir

server  2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server  2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server  2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server  2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server  2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server  2023/07/18 16:28:33 5 秒钟内没有收到 jack 的消息,关闭连接

İstemci çıktısı şu şekildedir

client  2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client  2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client  2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client  2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client  2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client  2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client  2023/07/18 16:28:33 暂无消息,关闭连接

Bu örnekle tek yönlü stream RPC isteği işlemenin hem istemci hem de sunucu açısından unary rpc'den daha karmaşık olduğunu görebilirsiniz, ancak çift yönlü stream RPC bunlardan daha da karmaşıktır.

Çift Yönlü Stream

Çift yönlü PRC, yani istek ve yanıtın her ikisi de stream şeklindedir, yukarıdaki örnekteki iki servisi birleştirmeye benzer. Stream RPC için ilk istek kesinlikle istemci tarafından başlatılır, ardından istemci istediği zaman stream aracılığıyla istek parametreleri gönderebilir, sunucu da istediği zaman stream aracılığıyla veri döndürebilir, her iki taraftan biri stream'i kapattığında mevcut istek sona erer.

TIP

Sonraki içerik除非 gerekli, pb kod oluşturma ve rpc istemci sunucu oluşturma adımlarının kod açıklamalarını doğrudan atlayacaktır

Önce aşağıdaki proje yapısını oluşturun

bi_stream\
|   buf.gen.yaml
|
+---client
|       main.go
|
+---message
|       message.pb.go
|       message_grpc.pb.go
|
+---pb
|       buf.yaml
|       message.proto
|
\---server
        main.go
        message_service.go

message.proto içeriği şu şekildedir

protobuf
syntax = "proto3";


option go_package = ".;message";

import "google/protobuf/wrappers.proto";

message Message {
  string from = 1;
  string content = 2;
  string to = 3;
}

service ChatService {
  rpc chat(stream Message) returns (stream Message);
}

Sunucu mantığında bağlantı kurulduktan sonra iki goroutine açılır, bir goroutine mesaj almaktan, diğeri mesaj göndermekten sorumludur, belirli işleme mantığı önceki örnekle benzerdir, ancak bu sefer zaman aşımı判定 mantığı kaldırılmıştır.

go
package main

import (
  "github.com/dstgo/task"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "sync"
  "time"
)

// MessageQueue Simüle edilmiş mesaj kuyruğu
var MessageQueue sync.Map

type ChatService struct {
  message.UnimplementedChatServiceServer
}

// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Sohbet servisi, sunucu mantığını çoklu goroutine ile işleyeceğiz
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, _ := metadata.FromIncomingContext(chatServer.Context())
  from := md.Get("from")[0]
  defer log.Println(from, "end chat")

  var chatErr error
  chatCh := make(chan error)

  // İki goroutine oluştur, biri mesaj alır, diğeri mesaj gönderir
  chatTask := task.NewTask(func(err error) {
    chatErr = err
  })

  // Mesaj alan goroutine
  chatTask.AddJobs(func() {
    for {
      msg, err := chatServer.Recv()
      log.Printf("receive %+v err %+v\n", msg, err)
      if err != nil {
        chatErr = err
        chatCh <- err
        break
      }

      value, ok := MessageQueue.Load(msg.To)
      if !ok {
        MessageQueue.Store(msg.To, []*message.Message{msg})
      } else {
        queue := value.([]*message.Message)
        queue = append(queue, msg)
        MessageQueue.Store(msg.To, queue)
      }
    }
  })

  // Mesaj gönderen goroutine
  chatTask.AddJobs(func() {
  Send:
    for {
      time.Sleep(time.Millisecond * 100)
      select {
      case <-chatCh:
        log.Println(from, "close send")
        break Send
      default:
        value, ok := MessageQueue.Load(from)
        if !ok {
          value = []*message.Message{}
          MessageQueue.Store(from, value)
        }

        queue := value.([]*message.Message)
        if len(queue) < 1 {
          continue Send
        }

        msg := queue[0]
        queue = queue[1:]
        MessageQueue.Store(from, queue)
        err := chatServer.Send(msg)
        log.Printf("send %+v\n", msg)
        if err != nil {
          chatErr = err
          break Send
        }
      }
    }
  })

  chatTask.Run()

  return chatErr
}

İstemci mantığında iki alt goroutine açılarak iki kişinin sohbet süreci simüle edilir, iki alt goroutine'de her biri iki torun goroutine açarak mesaj alıp göndermekten sorumludur (istemci mantığında iki kişinin sohbet mesajlarının alıp gönderme sırasının doğru olması garanti edilmemiştir, sadece basit bir çift taraf gönderme ve alma örneğidir)

go
package main

import (
  "context"
  "github.com/dstgo/task"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/metadata"
  "grpc_learn/bi_stream/message"
  "log"
  "time"
)

var Client message.ChatServiceClient

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
  defer dial.Close()

  if err != nil {
    log.Panicln(err)
  }
  Client = message.NewChatServiceClient(dial)

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
  })

  chatTask.AddJobs(func() {
    NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
  })

  chatTask.Run()
}

func NewChat(from string, to string, contents ...string) {
  ctx := context.Background()
  mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
  chat, err := Client.Chat(mdCtx)
  defer log.Println("end chat", from)

  if err != nil {
    log.Panicln(err)
  }

  chatTask := task.NewTask(func(err error) {
    log.Panicln(err)
  })

  chatTask.AddJobs(func() {
    for _, content := range contents {
      time.Sleep(time.Second)
      chat.Send(&message.Message{
        From:    from,
        Content: content,
        To:      to,
      })
    }
    // Mesajlar gönderildi, bağlantıyı kapat
    time.Sleep(time.Second * 5)
    chat.CloseSend()
  })

  // Mesaj alan goroutine
  chatTask.AddJobs(func() {
    for {
      msg, err := chat.Recv()
      log.Printf("receive %+v\n", msg)
      if err != nil {
        log.Println(err)
        break
      }
    }
  })

  chatTask.Run()
}

Normal durumda sunucu çıktısı

server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat

Normal durumda istemci çıktısı (mesajların sıra mantığının karışık olduğunu görebilirsiniz)

client 2023/07/19 17:26:24 receive from:"jack"  content:"你好"  to:"mike"
client 2023/07/19 17:26:24 receive from:"mike"  content:"你好"  to:"jack"
client 2023/07/19 17:26:25 receive from:"mike"  content:"没有"  to:"jack"
client 2023/07/19 17:26:25 receive from:"jack"  content:"有没有时间一起打游戏?"  to:"mike"
client 2023/07/19 17:26:26 receive from:"jack"  content:"好吧"  to:"mike"
client 2023/07/19 17:26:26 receive from:"mike"  content:"没时间,你找别人吧"  to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike

Örnekten çift yönlü stream işleme mantığının hem istemci hem de sunucu açısından tek yönlü stream'den daha karmaşık olduğunu, çoklu goroutine ile asenkron görevler açarak mantığı daha iyi işleyebileceğini görebilirsiniz.

metadata

metadata özünde bir map'tir, değeri bir string dilimidir, http1 header'ına benzer ve gRPC'de oynadığı rol de http header'a benzer, bu RPC çağrısı hakkında bazı bilgiler sağlar, aynı zamanda metadata'nın yaşam döngüsü bir rpc çağrısının tüm sürecini takip eder, çağrı bittiğinde yaşam döngüsü de sona erer.

gRPC'de esas olarak context aracılığıyla iletilir ve depolanır, ancak gRPC metadata paketini sağlar, içinde操作leri basitleştirmek için oldukça fazla kolaylaştırıcı fonksiyon vardır, context'i manuel olarak操作 etmemize gerek yoktur. metadata gRPC'de metadata.MD türüne karşılık gelir, aşağıdaki gibi gösterilir.

go
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]string

Doğrudan metadata.New fonksiyonunu kullanarak oluşturabiliriz, ancak oluşturmadan önce dikkat edilmesi gereken birkaç nokta var

go
func New(m map[string]string) MD

metadata anahtar adı için bazı kısıtlamalar vardır, sadece aşağıdaki kurallarla sınırlı karakterler olabilir:

  • ASCII karakterleri
  • Rakamlar: 0-9
  • Küçük harfler: a-z
  • Büyük harfler: A-Z
  • Özel karakterler: -_.

TIP

metadata'da büyük harfler küçük harflere dönüştürülür, yani aynı key'i işgal eder, değer de üzerine yazılır.

TIP

grpc- ile başlayan key'ler grpc tarafından ayrılmış dahili key'lerdir, bu tür key'leri kullanmak bazı hatalara neden olabilir.

Manuel Oluşturma

metadata oluşturmanın birçok yolu vardır, burada manuel olarak metadata oluşturmanın en yaygın iki yöntemini tanıtıyorum, birincisi metadata.New fonksiyonunu kullanmak, doğrudan bir map传入.

go
func New(m map[string]string) MD
go
md := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})

İkincisi metadata.Pairs, çift uzunlukta string dilimi传入, otomatik olarak anahtar-değer çiftlerine ayrıştırılır.

go
func Pairs(kv ...string) MD
go
md := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")

Birden fazla metadata'yı birleştirmek için metadata.join kullanılabilir

go
func Join(mds ...MD) MD
go
md1 := metadata.New(map[string]string{
    "key":  "value",
    "key1": "value1",
    "key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)

Sunucu Tarafı Kullanımı

metadata alma

Sunucu tarafı metadata almak için metadata.FromIncomingContext fonksiyonunu kullanabilir

go
func FromIncomingContext(ctx context.Context) (MD, bool)

Unary rpc için service parametresinde bir context parametresi vardır, doğrudan içinden metadata alın

go
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
  md, b := metadata.FromIncomingContext(ctx)
  ...
}

Stream rpc için service parametresinde bir stream nesnesi vardır, onun aracılığıyla stream'in context'ini alabilirsiniz

go
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
  md, b := metadata.FromIncomingContext(chatServer.Context())
    ...
}

metadata gönderme

metadata göndermek için grpc.sendHeader fonksiyonu kullanılabilir

go
func SendHeader(ctx context.Context, md metadata.MD) error

Bu fonksiyon en fazla bir kez çağrılabilir, header'ın otomatik olarak gönderilmesine neden olan bazı olaylar发生后 kullanıldığında etkili olmaz. Bazı durumlarda header'ı doğrudan göndermek istemiyorsanız grpc.SetHeader fonksiyonunu kullanabilirsiniz.

go
func SetHeader(ctx context.Context, md metadata.MD) error

Bu fonksiyon birden fazla kez çağrıldığında her seferinde传入 edilen metadata birleştirilir ve aşağıdaki durumlarda istemciye gönderilir

  • gprc.SendHeader ve Servertream.SendHeader çağrıldığında
  • Unary rpc handler döndüğünde
  • Stream rpc'de stream nesnesinin Stream.SendMsg çağrıldığında
  • rpc isteğinin durumu send out olduğunda, bu durum ya rpc isteğinin başarılı olduğu ya da hata oluştuğu anlamına gelir.

Stream rpc için stream nesnesinin SendHeader ve SetHeader yöntemlerini kullanmanız önerilir.

go
type ServerStream interface {
  SetHeader(metadata.MD) error
  SendHeader(metadata.MD) error
  SetTrailer(metadata.MD)
  ...
}

TIP

Kullanım sürecinde Header ve Trailer işlevlerinin neredeyse aynı olduğunu göreceksiniz, ancak ana farkları gönderme zamanıdır, unary rpc'de bunu hissedemeyebilirsiniz, ancak bu fark stream RPC'de özellikle belirgindir, çünkü stream RPC'de Header isteğin bitmesini beklemeden gönderilebilir. Header'ın belirli durumlarda gönderileceği daha önce belirtilmiştir, Trailer ise sadece tüm RPC isteği bittikten sonra gönderilir, bundan önce alınan trailer boştur.

İstemci Tarafı Kullanımı

metadata alma

İstemci yanıt header'ını almak istiyorsa grpc.Header ve grpc.Trailer aracılığıyla实现 edebilir

go
func Header(md *metadata.MD) CallOption
go
func Trailer(md *metadata.MD) CallOption

Ancak dikkat edilmesi gereken, doğrudan alınamaz, yukarıdaki iki fonksiyonun dönüş değeri CallOption'dır, yani RPC isteği başlatıldığında option parametresi olarak传入 edilir.

go
// Değer almak için md bildir
var header, trailer metadata.MD

// RPC isteği çağrıldığında option传入
res, err := client.SomeRPC(
    ctx,
    data,
    grpc.Header(&header),
    grpc.Trailer(&trailer)
)

İstek tamamlandıktan sonra değer传入 edilen md'ye yazılır. Stream rpc için istek başlatıldığında dönen stream nesnesi aracılığıyla doğrudan alınabilir

go
type ClientStream interface {
  Header() (metadata.MD, error)
  Trailer() metadata.MD
    ...
}
go
stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()

metadata gönderme

İstemci metadata göndermek istiyorsa çok basittir, daha önce metadata'nın表现形式 valueContext olduğunu belirtmiştik, metadata'yı context ile birleştirin, ardından istek yaparken context'i传入 edin, metadata paketi context oluşturmayı kolaylaştırmak için iki fonksiyon sağlar.

go
func NewOutgoingContext(ctx context.Context, md MD) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// Stream rpc
stream,err := client.StreamRPC(outgoingContext)

Eğer orijinal ctx'de zaten metadata varsa NewOutgoingContext kullanmak önceki verileri doğrudan üzerine yazar, bu durumu önlemek için aşağıdaki fonksiyonu kullanabilirsiniz, üzerine yazmaz, bunun yerine verileri birleştirir.

go
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context
go
md := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)

appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")

// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// Stream rpc
stream,err := client.StreamRPC(appendContext)

Interceptor

gRPC interceptor'ları gin'deki Middleware'e benzer, her ikisi de istek öncesi veya sonrası bazı özel işler yapmak ve iş mantığını etkilememek içindir. gRPC'de interceptor'lar iki kategoriye ayrılır, sunucu interceptor'ları ve istemci interceptor'ları, istek türüne göre Unary RPC interceptor'ları ve Stream RPC interceptor'ları olarak ayrılır, aşağıdaki şekil

Interceptor'ları daha iyi anlayabilmek için aşağıda çok basit bir örnek ile açıklama yapılacaktır.

go
grpc_learn\interceptor
|   buf.gen.yaml
|
+---client
|       main.go
|
+---pb
|       buf.yaml
|       person.proto
|
+---person
|       person.pb.go
|       person_grpc.pb.go
|
\---server
        main.go

person.proto içeriği şu şekildedir

protobuf
syntax = "proto3";

option go_package = ".;person";

import "google/protobuf/wrappers.proto";

message personInfo {
  string name = 1;
  int64  age = 2;
  string address = 3;
}

service person {
  rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
  rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}

Sunucu kodu şu şekildedir, mantık önceki içeriklerle aynı, oldukça basit, tekrar açıklanmayacaktır.

go
package main

import (
  "context"
  "errors"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "io"
  "sync"
)

// Veri depola
var personData sync.Map

type PersonService struct {
  person.UnimplementedPersonServer
}

func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, person.PersonNotFoundErr
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, nil
}

func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
  count := 0
  for {
    personInfo, err := personStream.Recv()
    if errors.Is(err, io.EOF) {
      return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
    } else if err != nil {
      return err
    }

    personData.Store(personInfo.Name, personInfo)
    count++
  }
}

Sunucu Interceptor'ı

Sunucu rpc isteğini intercept eden UnaryServerInterceptor ve StreamServerInterceptor vardır, belirli türler aşağıda gösterilmiştir

go
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) error

Unary RPC

Unary RPC interceptor'ı oluşturmak için sadece UnaryserverInterceptor türünü implement etmeniz gerekir, aşağıda basit bir Unary RPC interceptor örneği vardır, işlevi her rpc isteğini ve yanıtını çıktılamaktır.

go
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc istek verisi
// param info *grpc.UnaryServerInfo bu Unary RPC'nin bazı istek bilgileri
// param unaryHandler grpc.UnaryHandler belirli handler
// return resp interface{} rpc yanıt verisi
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
  log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
  resp, err = unaryHandler(ctx, req)
  log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
  return resp, err
}

Unary RPC için interceptor her RPC isteğini ve yanıtını intercept eder, yani RPC istek aşamasını ve yanıt aşamasını intercept eder, eğer interceptor error dönerse bu istek sona erer.

Stream rpc

Stream RPC interceptor'ı oluşturmak için sadece StreamServerInterceptor türünü implement etmeniz gerekir, aşağıda basit bir Stream RPC interceptor örneği vardır.

go
// StreamPersonLogInterceptor
// param srv interface{} sunucu implementasyonuna karşılık gelen server
// param stream grpc.ServerStream stream nesnesi
// param info *grpc.StreamServerInfo stream bilgisi
// param streamHandler grpc.StreamHandler işlemci
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
  log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
  err := streamHandler(srv, stream)
  log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
  return err
}

Stream RPC için interceptor her stream nesnesinin Send ve Recve yöntemi çağrıldığında intercept eder, eğer interceptor error dönerse bu RPC isteğinin sona ermesine neden olmaz, sadece bu send veya recv'de hata oluştuğu anlamına gelir.

Interceptor Kullanımı

Oluşturulan interceptor'ların etkili olması için gRPC sunucusu oluşturulurken option olarak传入 edilmelidir, resmi de kullanım için ilgili fonksiyonları sağlar. Aşağıda gösterildiği gibi tek interceptor ekleyen fonksiyonlar ve zincir interceptor ekleyen fonksiyonlar vardır.

go
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption

func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption

func StreamInterceptor(i StreamServerInterceptor) ServerOption

func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOption

TIP

UnaryInterceptor tekrar kullanıldığında aşağıdaki panic fırlatılır

panic: The unary server interceptor was already set and may not be reset.

StreamInterceptor da aynı şekilde, zincir interceptor tekrar çağrıldığında aynı zincire append edilir.

Kullanım örneği aşağıda gösterilmiştir

go
package main

import (
  "google.golang.org/grpc"
  "grpc_learn/interceptor/person"
  "log"
  "net"
)

func main() {
  log.SetPrefix("server ")
  listen, err := net.Listen("tcp", "9090")
  if err != nil {
    log.Panicln(err)
  }
  server := grpc.NewServer(
        // Zincir interceptor ekle
    grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
    grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
  )
  person.RegisterPersonServer(server, &PersonService{})
  server.Serve(listen)
}

İstemci Interceptor'ı

İstemci interceptor'ı sunucu ile neredeyse aynı, bir unary interceptor UnaryClientInterceptor, bir stream interceptor StreamClientInterceptor, belirli türler aşağıda gösterilmiştir.

go
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)

Unary RPC

Unary RPC istemci interceptor'ı oluşturmak için UnaryClientInterceptor implement edilmesi yeterlidir, aşağıda basit bir örnek vardır.

go
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string yöntem adı
// param req interface{} istek verisi
// param reply interface{} yanıt verisi
// param cc *grpc.ClientConn istemci bağlantı nesnesi
// param invoker grpc.UnaryInvoker intercept edilen belirli istemci yöntemi
// param opts ...grpc.CallOption bu isteğin yapılandırma öğeleri
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
  log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
  err := invoker(ctx, method, req, reply, cc, opts...)
  log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
  return err
}

İstemci unary RPC interceptor'ı aracılığıyla yerel isteğin istek verisi ve yanıt verisi ile bazı diğer istek bilgileri alınabilir.

Stream RPC

Stream RPC istemci interceptor'ı oluşturmak için StreamClientInterceptor implement edilmesi yeterlidir, aşağıda bir örnek vardır.

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc stream nesnesinin açıklama bilgisi
// param cc *grpc.ClientConn bağlantı nesnesi
// param method string yöntem adı
// param streamer grpc.Streamer stream nesnesi oluşturmak için kullanılan nesne
// param opts ...grpc.CallOption bağlantı yapılandırma öğeleri
// return grpc.ClientStream oluşturulan istemci stream nesnesi
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return stream, err
}

Stream RPC istemci interceptor'ı aracılığıyla sadece istemci ile sunucu bağlantı kurduğunda yani stream oluşturma zamanında intercept edilebilir, istemci stream nesnesinin her mesaj alıp gönderdiğinde intercept edilemez, ancak interceptor'da oluşturulan stream nesnesini paketleyerek mesaj alıp göndermeyi intercept实现 edebiliriz, aşağıdaki gibi

go
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc stream nesnesinin açıklama bilgisi
// param cc *grpc.ClientConn bağlantı nesnesi
// param method string yöntem adı
// param streamer grpc.Streamer stream nesnesi oluşturmak için kullanılan nesne
// param opts ...grpc.CallOption bağlantı yapılandırma öğeleri
// return grpc.ClientStream oluşturulan istemci stream nesnesi
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
  log.Println(fmt.Sprintf("before create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  stream, err := streamer(ctx, desc, cc, method, opts...)
  log.Println(fmt.Sprintf("after create stream  path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
  return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}

type ClientStreamInterceptorWrapper struct {
  method string
  desc   *grpc.StreamDesc
  grpc.ClientStream
}

func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
  // Mesaj göndermeden önce
  err := c.ClientStream.SendMsg(m)
  // Mesaj gönderdikten sonra
  log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
  return err
}

func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
  // Mesaj almadan önce
  err := c.ClientStream.RecvMsg(m)
  // Mesaj aldıktan sonra
  log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
  return err
}

Interceptor Kullanımı

Kullanırken sunucu ile benzer şekilde dört araç fonksiyonu option aracılığıyla interceptor eklemek için kullanılır, tek interceptor ve zincir interceptor olarak ayrılır.

go
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption

func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption

func WithStreamInterceptor(f StreamClientInterceptor) DialOption

func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOption

TIP

İstemci WithUnaryInterceptor tekrar kullandığında panic fırlatılmaz, ancak sadece sonuncusu etkili olur.

Aşağıda bir kullanım örneği vardır

go
package main

import (
  "context"
  "fmt"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/protobuf/types/known/wrapperspb"
  "grpc_learn/interceptor/person"
  "log"
)

func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
  log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}

Şu ana kadar tüm örnek kodu yazıldı, çalıştırıp sonucun nasıl olduğunu görmek zamanı geldi. Sunucu çıktısı şu şekildedir

server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found

İstemci çıktısı şu şekildedir

C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found

Her iki tarafın çıktısının beklentilere uygun olduğunu, intercept etkisini sağladığını görebilirsiniz, bu örnek çok basit bir örnektir, gRPC interceptor'larını kullanarak yetkilendirme, log, izleme ve diğer işlevler gibi birçok şey yapılabilir, kendi tekerleğinizi yapmayı seçebilir veya açık kaynak topluluğunun hazır tekerleklerini kullanmayı seçebilirsiniz, gRPC Ecosystem bir dizi açık kaynak gRPC interceptor middleware toplar, adres: grpc-ecosystem/go-grpc-middleware.

Hata İşleme

Başlamadan önce bir örneğe bakalım, önceki interceptor örneğinde kullanıcı sorgulama yapamadığında istemciye person not found hatası döndürülür, soru şu, istemci dönen hataya göre özel işlem yapabilir mi? Sonra deneyelim, istemci kodunda errors.Is kullanarak hatayı判断 etmeye çalışın.

go
func main() {
  log.SetPrefix("client ")
  dial, err := grpc.Dial("localhost:9090",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
  )
  if err != nil {
    log.Panicln(err)
  }

  ctx := context.Background()
  client := person.NewPersonClient(dial)

  personStream, err := client.CreatePersonInfo(ctx)
  personStream.Send(&person.PersonInfo{
    Name:    "jack",
    Age:     18,
    Address: "usa",
  })
  personStream.Send(&person.PersonInfo{
    Name:    "mike",
    Age:     20,
    Address: "cn",
  })
  recv, err := personStream.CloseAndRecv()
  log.Println(recv, err)

  info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
  log.Println(info, err)
  if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
  }
}

Sonuç çıktısı şu şekildedir

client 2023/07/21 16:46:10 before create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream  path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack"  age:18  address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike"  age:20  address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found

İstemcinin aldığı error'un şu şekilde olduğunu görebilirsiniz, sunucunun döndürdüğü error'un desc alanında olduğunu fark edersiniz

rpc error: code = Unknown desc = person not found

Doğal olarak errors.Is bu mantığı执行 etmez, errors.As ile değiştirseniz bile aynı sonuç olur.

go
if errors.Is(err, person.PersonNotFoundErr) {
    log.Println("person not found err")
}

Bu nedenle gRPC bu tür sorunları çözmek için status paketini sağlar, bu da istemcinin aldığı hatanın neden code ve desc alanları olduğunu gösterir, çünkü gRPC aslında istemciye bir Status döndürür, belirli türü aşağıda gösterilmiştir, protobuf tanımlı bir message olduğu görülebilir.

go
type Status struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
   Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
   Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}
protobuf
message Status {
  // The status code, which should be an enum value of
  // [google.rpc.Code][google.rpc.Code].
  int32 code = 1;

  // A developer-facing error message, which should be in English. Any
  // user-facing error message should be localized and sent in the
  // [google.rpc.Status.details][google.rpc.Status.details] field, or localized
  // by the client.
  string message = 2;

  // A list of messages that carry the error details.  There is a common set of
  // message types for APIs to use.
  repeated google.protobuf.Any details = 3;
}

Hata Kodları

Status yapısındaki Code, Http Status'a benzer bir varlıktır, mevcut rpc isteğinin durumunu göstermek için kullanılır, gRPC grpc/codes içinde 16 code tanımlar, çoğu senaryoyu kapsar, sırasıyla aşağıda gösterilmiştir

go
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32

const (
  // Çağrı başarılı
  OK Code = 0

  // İstek iptal edildi
  Canceled Code = 1

  // Bilinmeyen hata
  Unknown Code = 2

  // Parametre geçersiz
  InvalidArgument Code = 3

    // İstek zaman aşımı
  DeadlineExceeded Code = 4

  // Kaynak mevcut değil
  NotFound Code = 5

    // Aynı kaynak zaten mevcut (bunun出现masını beklemiyordum)
  AlreadyExists Code = 6

  // Yetki yetersiz, erişim reddedildi
  PermissionDenied Code = 7

  // Kaynak tükendi, kalan kapasite kullanılamıyor, örneğin disk kapasitesi yetersiz
  ResourceExhausted Code = 8

  // 执行 koşulları yetersiz, örneğin rm ile boş olmayan bir dizini silmek, silme koşulu dizinin boş olması ama koşul karşılanmıyor
  FailedPrecondition Code = 9

  // İstek kesintiye uğradı
  Aborted Code = 10

  // İşlem erişim sınırın ötesine geçti
  OutOfRange Code = 11

  // Mevcut servisin implement edilmediğini gösterir
  Unimplemented Code = 12

  // Sistem iç hatası
  Internal Code = 13

  // Servis kullanılamıyor
  Unavailable Code = 14

  // Veri kayboldu
  DataLoss Code = 15

  // Kimlik doğrulama başarısız
  Unauthenticated Code = 16

  _maxCode = 17
)

grpc/status paketi status ile error arasında dönüşüm için oldukça fazla fonksiyon sağlar. Doğrudan status.New kullanarak bir Status oluşturabiliriz veya Newf

go
func New(c codes.Code, msg string) *Status

func Newf(c codes.Code, format string, a ...interface{}) *Status

Örneğin aşağıdaki kod

go
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)

status'un err yöntemi aracılığıyla其中的 error alınabilir, durum ok olduğunda error nil'dir.

go
func (s *Status) Err() error {
  if s.Code() == codes.OK {
    return nil
  }
  return &Error{s: s}
}

Doğrudan error da oluşturabilirsiniz

go
func Err(c codes.Code, msg string) error

func Errorf(c codes.Code, format string, a ...interface{}) error
go
success := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)

Bu nedenle sunucu kodunu aşağıdaki gibi değiştirebiliriz

go
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
  value, ok := personData.Load(name.Value)
  if !ok {
    return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
  }
  personInfo := value.(*person.PersonInfo)
  return personInfo, status.Errorf(codes.OK, "request success")
}

Bundan önce sunucunun döndürdüğü tüm code'lar unknown'du, şimdi değişiklikten sonra daha açık bir anlam kazandı. Bu nedenle istemci status.FromError veya aşağıdaki fonksiyonu kullanarak error'dan status alabilir, böylece farklı code'lara göre ilgili işlemi yapabilir

go
func FromError(err error) (s *Status, ok bool)

func Convert(err error) *Status

func Code(err error) codes.Code

Örnek aşağıda gösterilmiştir

go
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
    ...
}

Ancak grpc code'u mümkün olduğunca bazı genel senaryoları kapsasa da bazen geliştiricilerin ihtiyaçlarını karşılayamaz, bu durumda Status'taki Details alanı kullanılabilir ve ayrıca bir dilimdir, birden fazla bilgi barındırabilir. Status.WithDetails aracılığıyla bazı özel bilgiler传入 edilebilir

go
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)

Status.Details aracılığıyla bilgi alın

go
func (s *Status) Details() []interface{}

Dikkat edilmesi gereken,传入 edilen bilginin protobuf tarafından tanımlanmış olması en iyisidir, böylece sunucu ve istemci iki ucunun da ayrıştırması kolaylaşır, resmi birkaç örnek verir

protobuf
message ErrorInfo {
  // Hatanın nedeni
  string reason = 1;

  // Servisi tanımlayan ana主体
  string domain = 2;

  // Diğer bilgiler
  map<string, string> metadata = 3;
}

// Yeniden deneme bilgisi
message RetryInfo {
  // Aynı isteğin bekleme aralığı
  google.protobuf.Duration retry_delay = 1;
}

// Hata ayıklama bilgisi
message DebugInfo {
  // Yığın
  repeated string stack_entries = 1;

  // Bazı detay bilgileri
  string detail = 2;
}

    ...
    ...

Daha fazla örnek için googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) adresini ziyaret edebilirsiniz. Gerekirse aşağıdaki kodu kullanarak引入 edebilirsiniz.

go
import "google.golang.org/genproto/googleapis/rpc/errdetails"

ErrorInfo'yu details olarak kullan

go
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
  notFound.WithDetails(&errdetails.ErrorInfo{
    Reason:   "person not found",
    Domain:   "xxx",
    Metadata: nil,
  })

İstemci tarafında veri alıp işlem yapılabilir, ancak yukarıdaki sadece gRPC'nin önerdiği bazı örneklerdir, bunun dışında业务 ihtiyaçlarını daha iyi karşılamak için kendi message'ınızı da tanımlayabilirsiniz, eğer bazı birleşik hata işlemleri yapmak istiyorsanız interceptor'a da koyabilirsiniz.

Zaman Aşımı Kontrolü

Çoğu durumda genellikle sadece bir servis olmaz ve upstream'de birçok servis olabilir, downstream'de de birçok servis olabilir. İstemci bir istek başlattığında en upstream servisten en downstream servise bir servis çağrı zinciri oluşur, şekildaki gibi, belki de şekildakinden daha uzun olabilir.

Bu kadar uzun bir çağrı zincirinde eğer其中一个 servisin mantık işleme uzun zaman alırsa upstream'in sürekli bekleme durumunda kalmasına neden olur. Gereksiz kaynak israfını azaltmak için bu nedenle zaman aşımı mekanizması引入 edilmelidir, böylece en upstream çağrıda传入 edilen zaman aşımı süresi tüm çağrı zincirinin izin verilen maksimum执行 süresi olur. gRPC prosesler arası ve diller arası zaman aşımı iletebilir, iletilmesi gereken bazı verileri HTTP2'nin HEADERS Frame帧ine koyar, aşağıdaki şekil gibi

gRPC isteğindeki zaman aşımı verisi HEADERS Frame'deki grpc-timeout alanına karşılık gelir. Dikkat edilmesi gereken, tüm gRPC kütüphanelerinin bu zaman aşımı iletim mekanizmasını implement etmediği, ancak gRPC-go kesinlikle destekler, eğer diğer dillerin kütüphanelerini kullanıyorsanız ve bu özelliği kullanıyorsanız bu noktaya ekstra dikkat etmeniz gerekir.

Bağlantı Zaman Aşımı

gRPC istemcisi sunucuya bağlantı kurarken varsayılan olarak asenkron kurulur, eğer bağlantı başarısız olursa boş bir Client döner. Bağlantının senkron olmasını istiyorsanız bağlantı başarılı olana kadar beklemek için grpc.WithBlock() kullanabilirsiniz.

go
dial, err := grpc.Dial("localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Bir zaman aşımı süresi kontrol etmek istiyorsanız sadece bir TimeoutContext传入 etmeniz yeterlidir, context传入 etmek için grpc.Dial yerine grpc.DialContext kullanın.

go
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
    grpc.WithBlock(),
    grpc.WithTransportCredentials(insecure.NewCredentials()),
    grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
    grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)

Bu şekilde eğer bağlantı zaman aşımına uğrarsa error döner

context deadline exceeded

Sunucu tarafında da bağlantı zaman aşımı ayarlanabilir, istemci ile yeni bağlantı kurulurken bir zaman aşımı süresi ayarlanır, varsayılan 120 saniyedir, eğer belirtilen sürede bağlantı başarılı bir şekilde kurulamazsa sunucu aktif olarak bağlantıyı keser.

go
server := grpc.NewServer(
    grpc.ConnectionTimeout(time.Second*3),
)

TIP

grpc.ConnectionTimeout hala deneysel aşamadadır, gelecekte API değiştirilebilir veya silinebilir.

İstek Zaman Aşımı

gRPC istemcisi istek başlattığında ilk parametre Context türüdür, aynı şekilde RPC isteğine bir zaman aşımı süresi eklemek istiyorsanız sadece bir TimeoutContext传入 etmeniz yeterlidir

go
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
    // Zaman aşımı mantık işleme
}

gRPC işlemden sonra zaman aşımı süresi sunucuya iletilir, iletim sürecinde帧 alanı şeklinde存在ir, go içinde context şeklinde存在ir, böylece tüm linkte iletilir. Link iletim sürecinde zaman aşımı süresini değiştirmek önerilmez, istek sırasında ne kadar zaman aşımı süresi ayarlanacağı en upstream'in düşünmesi gereken bir sorundur.

Kimlik Doğrulama Yetkilendirme

Mikroservis alanında her servis istek için kullanıcı kimliği ve yetkisini doğrulamalıdır, eğer monolitik uygulama gibi her servis kendi kimlik doğrulama mantığını implement ederse bu gerçekçi değildir. Bu nedenle birleşik bir kimlik doğrulama ve yetkilendirme servisine ihtiyaç vardır, yaygın çözüm OAuth2, dağıtık Session ve JWT kullanmaktır, bunlar arasında OAuth2 en yaygın kullanılır, bir dönem endüstri standardı haline gelmiştir, OAuth2'nin en yaygın token türü JWT'dir. Aşağıda OAuth2 yetkilendirme kod modunun akış şeması vardır, temel akış şekildaki gibidir.

Güvenli İletim

Servis Kaydı ve Keşfi

İstemci sunucunun belirli servisini çağırmadan önce sunucunun ip ve portunu bilmesi gerekir, önceki örneklerde sunucu adresi sabit yazılmıştı. Gerçek ağ ortamı her zaman o kadar stabil değildir, bazı servisler arıza nedeniyle çevrimdışı olabilir ve erişilemez, ayrıca iş gelişimi nedeniyle makine taşınması nedeniyle adres değişebilir, bu durumlarda statik adres kullanarak servise erişilemez, bu dinamik sorunlar servis keşfi ve kaydının çözmesi gereken sorunlardır, servis keşfi servis adresi değişikliklerini izler ve günceller, servis kaydı dış dünyaya kendi adresini bildirir. gRPC temel servis keşfi işlevi sağlar ve genişletme ve özelleştirmeyi destekler.

Statik adres kullanılamaz, bazı özel isimler kullanılabilir, örneğin tarayıcı DNS aracılığıyla alan adı çözümleyerek adres alır, aynı şekilde gRPC'nin varsayılan servis keşfi DNS aracılığıyla yapılır, yerel host dosyasını değiştirin, aşağıdaki eşlemeyi ekleyin

127.0.0.1 example.grpc.com

Ardından helloworld örneğinde istemci Dial adresini ilgili alan adına değiştirin

go
func main() {
  // Bağlantı kur, şifreleme doğrulaması yok
  conn, err := grpc.Dial("example.grpc.com:8080",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // İstemci oluştur
  client := hello2.NewSayHelloClient(conn)
  // Uzaktan çağrı
  helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
  if err != nil {
    panic(err)
  }
  log.Printf("received grpc resp: %+v", helloRep.String())
}

Aynı şekilde normal çıktıyı görebilirsiniz

2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"

gRPC'de bu tür isimler RFC 3986'da tanımlanan URI sözdizimine uymalıdır, format şu şekildedir

                   hierarchical part
        ┌───────────────────┴─────────────────────┐
                    authority               path
        ┌───────────────┴───────────────┐┌───┴────┐
  abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
  └┬┘   └───────┬───────┘ └────┬────┘ └┬┘           └─────────┬─────────┘ └──┬──┘
scheme  user information     host     port                  query         fragment

Yukarıdaki örnek URI aşağıdaki formdadır, dns varsayılan olarak desteklendiği için öndeki scheme atlanır.

dns:example.grpc.com:8080

Bunun dışında gRPC varsayılan olarak Unix domain sockets'ı destekler, diğer yöntemler için gRPC genişletmesine göre özelleştirilmiş implementasyon yapmamız gerekir, bu nedenle özel bir çözümleyici resolver.Resovler implement etmemiz gerekir, resolver hedef adres ve servis yapılandırması güncellemelerini izlemekten sorumludur.

go
type Resolver interface {
    // gRPC ResolveNow'u tekrar hedef adını çözümlemeyi denemek için çağırır. Bu sadece bir ipucudur, gerekirse çözümleyici bunu görmezden gelebilir, yöntem eşzamanlı olarak çağrılabilir
  ResolveNow(ResolveNowOptions)
  Close()
}

gRPC bir Resolver oluşturucu传入 etmemizi ister, yani resolver.Builder, Resolver örnekleri oluşturmaktan sorumludur.

go
type Builder interface {
  Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
  Scheme() string
}

Builder'ın Scheme yöntemi sorumlu olduğu Scheme türünü döner, örneğin varsayılan dnsBuilder dns döner, oluşturucu başlatıldığında resolver.Register kullanarak global Builder'a kaydedilmeli veya options olarak grpc.WithResolver kullanarak ClientConn içine传入 edilmelidir, ikincinin önceliği前者'den yüksektir.

Yukarıdaki şekil resolver'ın çalışma akışını basitçe açıklar, ardından nasıl özelleştirilmiş resolver oluşturulacağını göstereceğim

Özelleştirilmiş Servis Çözümleyici

Aşağıda özelleştirilmiş bir çözümleyici yazın, oluşturmak için özelleştirilmiş bir çözümleyici oluşturucuya ihtiyaç vardır.

go
package myresolver

import (
  "fmt"
  "google.golang.org/grpc/resolver"
)

func NewBuilder(ads map[string][]string) *MyBuilder {
  return &MyBuilder{ads: ads}
}

type MyBuilder struct {
  ads map[string][]string
}

func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
  if target.URL.Scheme != c.Scheme() {
    return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
  }
  m := &MyResolver{ads: c.ads, t: target, cc: cc}
    // Burada updatestate zorunludur, aksi takdirde deadlock oluşur
  m.start()
  return m, nil
}

func (c *MyBuilder) Scheme() string {
  return "hello"
}

type MyResolver struct {
  t   resolver.Target
  cc  resolver.ClientConn
  ads map[string][]string
}

func (m *MyResolver) start() {
  addres := make([]resolver.Address, 0)
  for _, ad := range m.ads[m.t.URL.Opaque] {
    addres = append(addres, resolver.Address{Addr: ad})
  }

  err := m.cc.UpdateState(resolver.State{
    Addresses: addres,
        // Yapılandırma, loadBalancingPolicy yük dengeleme stratejisidir
    ServiceConfig: m.cc.ParseServiceConfig(
      `{"loadBalancingPolicy":"round_robin"}`),
  })

  if err != nil {
    m.cc.ReportError(err)
  }
}

func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}

func (m *MyResolver) Close() {}

Özelleştirilmiş çözümleyici map içindeki eşleşen adresleri updatestate'e传入 eder, aynı zamanda yük dengeleme stratejisini belirtir, round_robin round-robin anlamına gelir.

go
// service config yapısı şu şekildedir
type jsonSC struct {
    LoadBalancingPolicy *string
    LoadBalancingConfig *internalserviceconfig.BalancerConfig
    MethodConfig        *[]jsonMC
    RetryThrottling     *retryThrottlingPolicy
    HealthCheckConfig   *healthCheckConfig
}

İstemci kodu şu şekildedir

go
package main

import (
  "context"
  "google.golang.org/grpc"
  "google.golang.org/grpc/credentials/insecure"
  "google.golang.org/grpc/resolver"
  "grpc_learn/helloworld/client/myresolver"
  hello2 "grpc_learn/helloworld/hello"
  "log"
  "time"
)

func init() {
  // builder kaydı
  resolver.Register(myresolver.NewBuilder(map[string][]string{
    "myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
  }))
}

func main() {

  // Bağlantı kur, şifreleme doğrulaması yok
  conn, err := grpc.Dial("hello:myworld",
    grpc.WithTransportCredentials(insecure.NewCredentials()),
  )
  if err != nil {
    panic(err)
  }
  defer conn.Close()
  // İstemci oluştur
  client := hello2.NewSayHelloClient(conn)
     // Her saniye bir çağrı
  for range time.Tick(time.Second) {
    // Uzaktan çağrı
    helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
    if err != nil {
      panic(err)
    }
    log.Printf("received grpc resp: %+v", helloRep.String())
  }

}

Normal olarak akış sunucunun kayıt merkezine kendi servisini kaydetmesi, ardından istemcinin kayıt merkezinden servis listesini alıp eşleştirmesi gerekir, burada传入 edilen map simüle edilmiş bir kayıt merkezidir, veri statik olduğu için servis kaydı aşaması atlanır, sadece servis keşfi kalır. İstemci tarafından kullanılan target hello:myworld'dür, hello özelleştirilmiş scheme'dir, myworld servis adıdır, özelleştirilmiş çözümleyici tarafından çözüldükten sonra 127.0.0.1:8080 gerçek adresi alınır, gerçek durumda yük dengeleme yapmak için bir servis adı birden fazla gerçek adresle eşleşebilir, bu nedenle servis adının bir dilime karşılık gelmesinin nedeni budur, burada iki sunucu açılır, farklı portları işgal eder, yük dengeleme stratejisi round-robin'dir, sunucu çıktıları sırasıyla aşağıdadır, istek zamanından yük dengeleme stratejisinin gerçekten etkili olduğu görülebilir, eğer strateji belirtilmezse varsayılan olarak sadece ilk servis seçilir.

// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"

// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"

Kayıt merkezi aslında servis kayıt adı ile gerçek servis adresi eşleme koleksiyonudur, veri depolayabilen herhangi bir middleware koşulu karşılayabilir, hatta mysql'i kayıt merkezi olarak kullanmak da imkansız değildir (bunu yapan biri olacağını sanmıyorum). Genel olarak kayıt merkezi KV depolamadır, redis oldukça iyi bir seçimdir, ancak redis'i kayıt merkezi olarak kullanırsak birçok mantığı kendimiz implement etmemiz gerekir, örneğin servisin kalp atışı kontrolü, servis çevrimdışı olması, servis zamanlaması vb., oldukça zahmetlidir, redis'in bu konuda belirli bir uygulaması vardır ancak azdır. Profesyonel işleri profesyonellere bırakmak gerekir, bu konuda oldukça ünlü birçok şey vardır: Zookeeper, Consul, Eureka, ETCD, Nacos vb.

Bu middleware'lerden bazı farkları öğrenmek için 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) adresini ziyaret edebilirsiniz.

Consul ile Birleştirme

consul ile birleştirilmiş kullanım örneği için consul adresini ziyaret edebilirsiniz

Yük Dengeleme

Golang by www.golangdev.cn edit