gRPC

Uzaktan prosedür çağrısı rpc, mikroservislerde öğrenilmesi gereken noktalardan biridir. Öğrenme sürecinde çeşitli rpc çerçeveleriyle karşılaşılabilir, ancak go alanında neredeyse tüm rpc çerçeveleri gRPC üzerine kuruludur ve ayrıca bulut bilişim alanında temel bir protokol haline gelmiştir. Neden bunu seçiyoruz, resmi cevap şu şekildedir:
gRPC, herhangi bir ortamda çalışabilen modern, açık kaynaklı, yüksek performanslı bir uzaktan prosedür çağrısı (Remote Process Call, RPC) çerçevesidir. Veri merkezi içinde ve veri merkezleri arasındaki servisleri etkili bir şekilde bağlar; takılabilir yük dengeleme, izleme, sağlık kontrolü ve kimlik doğrulama desteği sunar. Ayrıca cihazların, mobil uygulamaların ve tarayıcıların arka uç servislerine bağlanması için son mil dağıtık bilişim olarak da uygundur.
Resmi web sitesi: gRPC
Resmi dokümantasyon: Documentation | gRPC
gRPC teknik öğretici: Basics tutorial | Go | gRPC
ProtocBuf resmi web sitesi: Reference Guides | Protocol Buffers Documentation (protobuf.dev)
Ayrıca CNCF vakfının açık kaynak projesidir, CNCF tam adı CLOUD NATIVE COMPUTING FOUNDATION, çeviri adı Bulut Yerel Bilişim Vakfı

Özellikler
Basit servis tanımı
Protocol Buffers kullanarak servisleri tanımlayın, bu güçlü bir binary serileştirme araç seti ve dilidir.
Başlatma ve ölçeklendirme çok hızlıdır
Çalışma zamanı ve geliştirme ortamını yüklemek için sadece bir satır kod yeterlidir, saniyeler içinde milyonlarca RPC'ye ölçeklenebilir
Çapraz dil, çapraz platform
Farklı platformlar ve diller için otomatik olarak istemci ve sunucu servis taslakları oluşturur
Çift yönlü akış ve entegre yetkilendirme
HTTP/2 tabanlı çift yönlü akış ve takılabilir kimlik doğrulama yetkilendirmesi
GRPC dilden bağımsız olmasına rağmen, bu sitenin içeriğinin çoğu go ile ilgilidir, bu nedenle bu makale de go'yu ana dil olarak kullanacaktır,后续 kullanılan pb derleyicisi ve oluşturucusu diğer dillerin kullanıcıları için Protobuf resmi web sitesinde kendileri bulabilirler. Kolaylık olması için projenin oluşturma süreci doğrudan atlanacaktır.
TIP
Bu makale aşağıdaki makalelerin içeriğine referans vermiştir:
写给 go 开发者的 gRPC 教程-protobuf 基础 - 掘金 (juejin.cn)
gRPC 中的 Metadata - 熊喵君的博客 | PANDAYCHEN
Bağımlılıkların Kurulumu
Önce Protocol Buffer derleyicisini indirin, indirme adresi: Releases · protocolbuffers/protobuf (github.com)

Kendi durumunuza göre sistem ve versiyon seçin, indirdikten sonra bin dizinini ortam değişkenlerine eklemeniz gerekir.
Ardından kod oluşturucuyu indirmeniz gerekir, derleyici proto dosyalarını ilgili dilin serileştirme kodlarına dönüştürür, oluşturucu iş kodunu oluşturmak için kullanılır.
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latestgrpc_learn adında boş bir proje oluşturun, ardından aşağıdaki bağımlılığı ekleyin
$ go get google.golang.org/grpcSon olarak versiyonu kontrol edin, gerçekten kurulup kurulmadığını görün
$ protoc --version
libprotoc 23.4
$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1
$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0Hello World
Proje Yapısı
Aşağıda bir Hello World örneği ile gösterim yapılacaktır, aşağıdaki proje yapısını oluşturun.
grpc_learn\helloworld
|
+---client
| main.go
|
+---hello
|
|
+---pb
| hello.proto
|
\---server
main.goprotobuf Dosyasını Tanımlama
pb/hello.proto içinde aşağıdaki içeriği yazın, bu oldukça basit bir örnektir, eğer protoc sözdizimini bilmiyorsanız ilgili dokümantasyona bakın.
syntax = "proto3";
// . doğrudan çıktı yolunda oluşturulacağını gösterir, hello paket adıdır
option go_package = ".;hello";
// İstek
message HelloReq {
string name = 1;
// Yanıt
message HelloRep {
string msg = 1;
}
// Servis tanımla
service SayHello {
rpc Hello(HelloReq) returns (HelloRep) {}
}Kod Oluşturma
Yazım tamamlandıktan sonra protoc derleyicisini kullanarak veri serileştirme ile ilgili kodları oluşturun, oluşturucuyu kullanarak rpc ile ilgili kodları oluşturun
$ protoc -I ./pb \
--go_out=./hello ./pb/*.proto\
--go-grpc_out=./hello ./pb/*.protoBu noktada hello klasöründe hello.pb.go ve hello_grpc.pb.go dosyalarının oluşturulduğunu görebilirsiniz, hello.pb.go dosyasına göz atarak tanımladığımız message'ı görebilirsiniz
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Tanımlanan alan
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
type HelloRep struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Tanımlanan alan
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}hello_grpc.pb.go dosyasında tanımladığımız servisi bulabilirsiniz
type SayHelloServer interface {
Hello(context.Context, *HelloReq) (*HelloRep, error)
mustEmbedUnimplementedSayHelloServer()
}
// 后续如果我们自己实现服务接口,必须要嵌入该结构体,就不用实现 mustEmbedUnimplementedSayHelloServer 方法
type UnimplementedSayHelloServer struct {
}
// Varsayılan olarak nil döner
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
// Arayüz kısıtlaması
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}
type UnsafeSayHelloServer interface {
mustEmbedUnimplementedSayHelloServer()
}Sunucu Tarafını Yazma
server/main.go içinde aşağıdaki kodu yazın
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_learn/server/protoc"
"log"
"net"
)
type GrpcServer struct {
pb.UnimplementedSayHelloServer
}
func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
log.Printf("received grpc req: %+v", req.String())
return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}
func main() {
// Portu dinle
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
// gprc sunucusu oluştur
server := grpc.NewServer()
// Servis kaydı
pb.RegisterSayHelloServer(server, &GrpcServer{})
// Çalıştır
err = server.Serve(listen)
if err != nil {
panic(err)
}
}İstemci Tarafını Yazma
client/main.go içinde aşağıdaki kodu yazın
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc_learn/server/protoc"
"log"
)
func main() {
// Bağlantı kur, şifreleme doğrulaması yok
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
// İstemci oluştur
client := pb.NewSayHelloClient(conn)
// Uzaktan çağrı
helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}Çalıştırma
Önce sunucu tarafını çalıştırın, ardından istemci tarafını çalıştırın, sunucu çıktısı şu şekildedir
2023/07/16 16:26:51 received grpc req: name:"client"İstemci çıktısı şu şekildedir
2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"Bu örnekte istemci bağlantıyı kurduktan sonra uzaktan metodu çağırırken yerel metodu çağırmakla aynıdır, doğrudan client'ın Hello metoduna erişip sonucu alır, bu en basit GRPC örneğidir, birçok açık kaynak çerçeve de bu süreci paketlemiştir.
bufbuild
Yukarıdaki örnekte kod doğrudan komutla oluşturuldu, eğer daha sonra plugin sayısı artarsa komut oldukça karmaşık görünebilir, bu durumda protobuf dosyalarını yönetmek için araç kullanılabilir, tam da böyle bir açık kaynak yönetim aracı var bufbuild/buf.
Açık kaynak adresi: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)
Dokümantasyon adresi: Buf - Install the Buf CLI
Özellikler
- BSR yönetimi
- Linter
- Kod oluşturma
- Biçimlendirme
- Bağımlılık yönetimi
Bu araçla protobuf dosyalarını oldukça kolay bir şekilde yönetebilirsiniz.
Dokümantasyonda oldukça fazla kurulum yöntemi sunulmuştur, kendiniz seçebilirsiniz. Eğer yerel olarak go ortamı kuruluysa doğrudan go install ile kurulum yapabilirsiniz
$ go install github.com/bufbuild/buf/cmd/buf@latestKurulum tamamlandıktan sonra versiyonu kontrol edin
$ buf --version
1.24.0helloworld/pb klasörüne gelin, pb dosyalarını yönetmek için bir module oluşturmak için aşağıdaki komutu çalıştırın.
$ buf mod init
$ ls
buf.yaml hello.protobuf.yaml dosya içeriği varsayılan olarak şu şekildedir
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULTArdından helloworld/ dizinine gelin, buf.gen.yaml oluşturun, aşağıdaki içeriği yazın
version: v1
plugins:
- plugin: go
out: hello
opt:
- plugin: go-grpc
out: hello
opt:Ardından kodu oluşturmak için komutu çalıştırın
$ buf generateTamamlandıktan sonra oluşturulan dosyaları görebilirsiniz, elbette buf'un sadece bu kadar işlevi yok, diğer işlevleri kendiniz dokümantasyondan öğrenebilirsiniz.
Akış RPC
grpc çağrı yöntemi iki kategoriye ayrılır, Unary RPC (Unary RPC) ve Stream RPC (Stream RPC). Hello World örneği tipik bir Unary RPC'dir.

Unary rpc (veya normal rpc daha anlaşılır olabilir, gerçekten unary kelimesini nasıl çevireceğimi bilmiyorum) kullanımı normal http gibidir, istemci istek gönderir, sunucu veri döner, soru-cevap şeklindedir. Stream RPC isteği ve yanıtı akış şeklinde olabilir, aşağıdaki şekil gibi

Stream isteği kullanıldığında sadece bir yanıt döner, istemci stream aracılığıyla sunucuya birden fazla kez parametre gönderebilir, sunucu Unary RPC gibi tüm parametreleri alana kadar beklemek zorunda değildir, belirli işleme mantığı sunucu tarafından belirlenebilir. Normal情况下 sadece istemci stream isteğini aktif olarak kapatabilir, stream kapatıldığında mevcut RPC isteği de sona erer.
Stream yanıtı kullanıldığında sadece bir parametre gönderilir, sunucu stream aracılığıyla istemciye birden fazla kez veri gönderebilir, istemci Unary RPC gibi tüm veriyi alana kadar beklemek zorunda değildir, belirli işleme mantığı istemci tarafından belirlenebilir. Normal istekte sadece sunucu stream yanıtını aktif olarak kapatabilir, stream kapatıldığında mevcut RPC isteği de sona erer.
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}Veya sadece yanıtın stream şeklinde olması da olabilir (Server-Streaming RPC)
service MessageService {
rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}Veya istek ve yanıtın her ikisinin de stream şeklinde olması (Bi-directional-Streaming RPC)
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}Tek Yönlü Stream
Aşağıda tek yönlü stream işlemini göstermek için bir örnek kullanılacaktır, önce aşağıdaki proje yapısını oluşturun
grpc_learn\server_client_stream
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.gomessage.proto içeriği şu şekildedir
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service MessageService {
rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}buf aracılığıyla kod oluşturun
$ buf generateBurada gösterim mesaj servisidir, receiveMessage belirtilen kullanıcı adını alır, türü string'dir, mesaj stream'i döner, sendMessage mesaj stream'i alır, başarılı gönderilen mesaj sayısını döner, türü 64 bit tamsayıdır. Ardından server/message_service.go oluşturun, varsayılan olarak oluşturulan servisi kendiniz implement edin
package main
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
)
type MessageService struct {
message.UnimplementedMessageServiceServer
}
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}Mesaj alma ve gönderme parametrelerinde bir stream wrapper arayüzü olduğunu görebilirsiniz
type MessageService_ReceiveMessageServer interface {
// Mesaj gönder
Send(*Message) error
grpc.ServerStream
}
type MessageService_SendMessageServer interface {
// Dönüş değerini gönder ve bağlantıyı kapat
SendAndClose(*wrapperspb.StringValue) error
// Mesaj al
Recv() (*Message, error)
grpc.ServerStream
}Her ikisi de gprc.ServerStream arayüzünü embed eder
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}Stream RPC'nin Unary RPC gibi parametre ve dönüş değerlerinin fonksiyon imzasında açıkça görülemediğini, bu yöntemlere ilk bakışta parametre ve dönüş değerlerinin ne tür olduğunu anlayamadığınızı,传入 edilen Stream türünü kullanarak stream iletişimini tamamlamanız gerektiğini görebilirsiniz, ardından sunucu tarafının belirli mantığını yazmaya başlayın. Sunucu tarafı mantığını yazarken bir sync.map kullanarak mesaj kuyruğunu simüle ettik, istemci ReceiveMessage isteği gönderdiğinde sunucu stream yanıtı aracılığıyla istemcinin istediği mesajları sürekli döner, zaman aşımı süresi geçtikten sonra isteği kesene kadar. İstemci SendMessage istediğinde stream isteği aracılığıyla sürekli mesaj gönderir, sunucu sürekli mesajları kuyruğa koyar, istemci aktif olarak isteği kesene kadar ve istemciye mesaj gönderme sayısını döner.
package main
import (
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"sync"
"time"
)
// Simüle edilmiş bir mesaj kuyruğu
var messageQueue sync.Map
type MessageService struct {
message.UnimplementedMessageServiceServer
}
// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Belirtilen kullanıcının mesajlarını al
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
timer := time.NewTimer(time.Second * 5)
for {
time.Sleep(time.Millisecond * 100)
select {
case <-timer.C:
log.Printf("5 秒钟内没有收到%s的消息,关闭连接", user.GetValue())
return nil
default:
value, ok := messageQueue.Load(user.GetValue())
if !ok {
messageQueue.Store(user.GetValue(), []*message.Message{})
continue
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue
}
// Mesajı al
msg := queue[0]
// Stream iletişimi kullanarak mesajı istemciye gönder
err := recvServer.Send(msg)
log.Printf("receive %+v\n", msg)
if err != nil {
return err
}
queue = queue[1:]
messageQueue.Store(user.GetValue(), queue)
timer.Reset(time.Second * 5)
}
}
}
// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Belirtilen kullanıcıya mesaj gönder
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
count := 0
for {
// İstemciden mesaj al
msg, err := sendServer.Recv()
if errors.Is(err, io.EOF) {
return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
}
if err != nil {
return err
}
log.Printf("send %+v\n", msg)
value, ok := messageQueue.Load(msg.From)
if !ok {
messageQueue.Store(msg.From, []*message.Message{msg})
continue
}
queue := value.([]*message.Message)
queue = append(queue, msg)
// Mesajı mesaj kuyruğuna koy
messageQueue.Store(msg.From, queue)
count++
}
}İstemci iki goroutine açtı, bir goroutine mesaj göndermek için, diğer goroutine mesaj almak için kullanıldı, elbette aynı anda gönderip alabilirsiniz, kod şu şekildedir.
package main
import (
"context"
"errors"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"time"
)
var Client message.MessageServiceClient
func main() {
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicln(err)
}
defer dial.Close()
Client = message.NewMessageServiceClient(dial)
log.SetPrefix("client\t")
msgTask := task.NewTask(func(err error) {
log.Panicln(err)
})
ctx := context.Background()
// Mesaj alma isteği
msgTask.AddJobs(func() {
receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
if err != nil {
log.Panicln(err)
}
for {
recv, err := receiveMessageStream.Recv()
if errors.Is(err, io.EOF) {
log.Println("暂无消息,关闭连接")
break
} else if err != nil {
break
}
log.Printf("receive %+v", recv)
}
})
msgTask.AddJobs(func() {
from := "jack"
to := "mike"
sendMessageStream, err := Client.SendMessage(ctx)
if err != nil {
log.Panicln(err)
}
msgs := []string{
"在吗",
"下午有没有时间一起打游戏",
"那行吧,以后有时间一起约",
"就这个周末应该可以吧",
"那就这么定了",
}
for _, msg := range msgs {
time.Sleep(time.Second)
sendMessageStream.Send(&message.Message{
From: from,
Content: msg,
To: to,
})
}
// Mesajlar gönderildi, aktif olarak isteği kapat ve dönüş değerini al
recv, err := sendMessageStream.CloseAndRecv()
if err != nil {
log.Println(err)
} else {
log.Printf("发送完毕,总共发送了%d条消息\n", recv.GetValue())
}
})
msgTask.Run()
}Çalıştırdıktan sonra sunucu çıktısı şu şekildedir
server 2023/07/18 16:28:24 send from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
server 2023/07/18 16:28:25 send from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
server 2023/07/18 16:28:26 send from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
server 2023/07/18 16:28:27 send from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
server 2023/07/18 16:28:28 send from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
server 2023/07/18 16:28:33 5 秒钟内没有收到 jack 的消息,关闭连接İstemci çıktısı şu şekildedir
client 2023/07/18 16:28:24 receive from:"jack" content:"在吗" to:"mike"
client 2023/07/18 16:28:25 receive from:"jack" content:"下午有没有时间一起打游戏" to:"mike"
client 2023/07/18 16:28:26 receive from:"jack" content:"那行吧,以后有时间一起约" to:"mike"
client 2023/07/18 16:28:27 receive from:"jack" content:"就这个周末应该可以吧" to:"mike"
client 2023/07/18 16:28:28 发送完毕,总共发送了 5 条消息
client 2023/07/18 16:28:28 receive from:"jack" content:"那就这么定了" to:"mike"
client 2023/07/18 16:28:33 暂无消息,关闭连接Bu örnekle tek yönlü stream RPC isteği işlemenin hem istemci hem de sunucu açısından unary rpc'den daha karmaşık olduğunu görebilirsiniz, ancak çift yönlü stream RPC bunlardan daha da karmaşıktır.
Çift Yönlü Stream
Çift yönlü PRC, yani istek ve yanıtın her ikisi de stream şeklindedir, yukarıdaki örnekteki iki servisi birleştirmeye benzer. Stream RPC için ilk istek kesinlikle istemci tarafından başlatılır, ardından istemci istediği zaman stream aracılığıyla istek parametreleri gönderebilir, sunucu da istediği zaman stream aracılığıyla veri döndürebilir, her iki taraftan biri stream'i kapattığında mevcut istek sona erer.
TIP
Sonraki içerik除非 gerekli, pb kod oluşturma ve rpc istemci sunucu oluşturma adımlarının kod açıklamalarını doğrudan atlayacaktır
Önce aşağıdaki proje yapısını oluşturun
bi_stream\
| buf.gen.yaml
|
+---client
| main.go
|
+---message
| message.pb.go
| message_grpc.pb.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.go
message_service.gomessage.proto içeriği şu şekildedir
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service ChatService {
rpc chat(stream Message) returns (stream Message);
}Sunucu mantığında bağlantı kurulduktan sonra iki goroutine açılır, bir goroutine mesaj almaktan, diğeri mesaj göndermekten sorumludur, belirli işleme mantığı önceki örnekle benzerdir, ancak bu sefer zaman aşımı判定 mantığı kaldırılmıştır.
package main
import (
"github.com/dstgo/task"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"sync"
"time"
)
// MessageQueue Simüle edilmiş mesaj kuyruğu
var MessageQueue sync.Map
type ChatService struct {
message.UnimplementedChatServiceServer
}
// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Sohbet servisi, sunucu mantığını çoklu goroutine ile işleyeceğiz
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, _ := metadata.FromIncomingContext(chatServer.Context())
from := md.Get("from")[0]
defer log.Println(from, "end chat")
var chatErr error
chatCh := make(chan error)
// İki goroutine oluştur, biri mesaj alır, diğeri mesaj gönderir
chatTask := task.NewTask(func(err error) {
chatErr = err
})
// Mesaj alan goroutine
chatTask.AddJobs(func() {
for {
msg, err := chatServer.Recv()
log.Printf("receive %+v err %+v\n", msg, err)
if err != nil {
chatErr = err
chatCh <- err
break
}
value, ok := MessageQueue.Load(msg.To)
if !ok {
MessageQueue.Store(msg.To, []*message.Message{msg})
} else {
queue := value.([]*message.Message)
queue = append(queue, msg)
MessageQueue.Store(msg.To, queue)
}
}
})
// Mesaj gönderen goroutine
chatTask.AddJobs(func() {
Send:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-chatCh:
log.Println(from, "close send")
break Send
default:
value, ok := MessageQueue.Load(from)
if !ok {
value = []*message.Message{}
MessageQueue.Store(from, value)
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue Send
}
msg := queue[0]
queue = queue[1:]
MessageQueue.Store(from, queue)
err := chatServer.Send(msg)
log.Printf("send %+v\n", msg)
if err != nil {
chatErr = err
break Send
}
}
}
})
chatTask.Run()
return chatErr
}İstemci mantığında iki alt goroutine açılarak iki kişinin sohbet süreci simüle edilir, iki alt goroutine'de her biri iki torun goroutine açarak mesaj alıp göndermekten sorumludur (istemci mantığında iki kişinin sohbet mesajlarının alıp gönderme sırasının doğru olması garanti edilmemiştir, sadece basit bir çift taraf gönderme ve alma örneğidir)
package main
import (
"context"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"time"
)
var Client message.ChatServiceClient
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer dial.Close()
if err != nil {
log.Panicln(err)
}
Client = message.NewChatServiceClient(dial)
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
NewChat("jack", "mike", "你好", "有没有时间一起打游戏?", "好吧")
})
chatTask.AddJobs(func() {
NewChat("mike", "jack", "你好", "没有", "没时间,你找别人吧")
})
chatTask.Run()
}
func NewChat(from string, to string, contents ...string) {
ctx := context.Background()
mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
chat, err := Client.Chat(mdCtx)
defer log.Println("end chat", from)
if err != nil {
log.Panicln(err)
}
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
for _, content := range contents {
time.Sleep(time.Second)
chat.Send(&message.Message{
From: from,
Content: content,
To: to,
})
}
// Mesajlar gönderildi, bağlantıyı kapat
time.Sleep(time.Second * 5)
chat.CloseSend()
})
// Mesaj alan goroutine
chatTask.AddJobs(func() {
for {
msg, err := chat.Recv()
log.Printf("receive %+v\n", msg)
if err != nil {
log.Println(err)
break
}
}
})
chatTask.Run()
}Normal durumda sunucu çıktısı
server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"你好" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"你好" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"你好" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"你好" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"没有" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"没有" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"有没有时间一起打游戏?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"好吧" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"没时间,你找别人吧" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"好吧" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"没时间,你找别人吧" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chatNormal durumda istemci çıktısı (mesajların sıra mantığının karışık olduğunu görebilirsiniz)
client 2023/07/19 17:26:24 receive from:"jack" content:"你好" to:"mike"
client 2023/07/19 17:26:24 receive from:"mike" content:"你好" to:"jack"
client 2023/07/19 17:26:25 receive from:"mike" content:"没有" to:"jack"
client 2023/07/19 17:26:25 receive from:"jack" content:"有没有时间一起打游戏?" to:"mike"
client 2023/07/19 17:26:26 receive from:"jack" content:"好吧" to:"mike"
client 2023/07/19 17:26:26 receive from:"mike" content:"没时间,你找别人吧" to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mikeÖrnekten çift yönlü stream işleme mantığının hem istemci hem de sunucu açısından tek yönlü stream'den daha karmaşık olduğunu, çoklu goroutine ile asenkron görevler açarak mantığı daha iyi işleyebileceğini görebilirsiniz.
metadata
metadata özünde bir map'tir, değeri bir string dilimidir, http1 header'ına benzer ve gRPC'de oynadığı rol de http header'a benzer, bu RPC çağrısı hakkında bazı bilgiler sağlar, aynı zamanda metadata'nın yaşam döngüsü bir rpc çağrısının tüm sürecini takip eder, çağrı bittiğinde yaşam döngüsü de sona erer.
gRPC'de esas olarak context aracılığıyla iletilir ve depolanır, ancak gRPC metadata paketini sağlar, içinde操作leri basitleştirmek için oldukça fazla kolaylaştırıcı fonksiyon vardır, context'i manuel olarak操作 etmemize gerek yoktur. metadata gRPC'de metadata.MD türüne karşılık gelir, aşağıdaki gibi gösterilir.
// MD is a mapping from metadata keys to values. Users should use the following
// two convenience functions New and Pairs to generate MD.
type MD map[string][]stringDoğrudan metadata.New fonksiyonunu kullanarak oluşturabiliriz, ancak oluşturmadan önce dikkat edilmesi gereken birkaç nokta var
func New(m map[string]string) MDmetadata anahtar adı için bazı kısıtlamalar vardır, sadece aşağıdaki kurallarla sınırlı karakterler olabilir:
- ASCII karakterleri
- Rakamlar: 0-9
- Küçük harfler: a-z
- Büyük harfler: A-Z
- Özel karakterler: -_.
TIP
metadata'da büyük harfler küçük harflere dönüştürülür, yani aynı key'i işgal eder, değer de üzerine yazılır.
TIP
grpc- ile başlayan key'ler grpc tarafından ayrılmış dahili key'lerdir, bu tür key'leri kullanmak bazı hatalara neden olabilir.
Manuel Oluşturma
metadata oluşturmanın birçok yolu vardır, burada manuel olarak metadata oluşturmanın en yaygın iki yöntemini tanıtıyorum, birincisi metadata.New fonksiyonunu kullanmak, doğrudan bir map传入.
func New(m map[string]string) MDmd := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})İkincisi metadata.Pairs, çift uzunlukta string dilimi传入, otomatik olarak anahtar-değer çiftlerine ayrıştırılır.
func Pairs(kv ...string) MDmd := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")Birden fazla metadata'yı birleştirmek için metadata.join kullanılabilir
func Join(mds ...MD) MDmd1 := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.join(md1,md2)Sunucu Tarafı Kullanımı
metadata alma
Sunucu tarafı metadata almak için metadata.FromIncomingContext fonksiyonunu kullanabilir
func FromIncomingContext(ctx context.Context) (MD, bool)Unary rpc için service parametresinde bir context parametresi vardır, doğrudan içinden metadata alın
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
md, b := metadata.FromIncomingContext(ctx)
...
}Stream rpc için service parametresinde bir stream nesnesi vardır, onun aracılığıyla stream'in context'ini alabilirsiniz
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, b := metadata.FromIncomingContext(chatServer.Context())
...
}metadata gönderme
metadata göndermek için grpc.sendHeader fonksiyonu kullanılabilir
func SendHeader(ctx context.Context, md metadata.MD) errorBu fonksiyon en fazla bir kez çağrılabilir, header'ın otomatik olarak gönderilmesine neden olan bazı olaylar发生后 kullanıldığında etkili olmaz. Bazı durumlarda header'ı doğrudan göndermek istemiyorsanız grpc.SetHeader fonksiyonunu kullanabilirsiniz.
func SetHeader(ctx context.Context, md metadata.MD) errorBu fonksiyon birden fazla kez çağrıldığında her seferinde传入 edilen metadata birleştirilir ve aşağıdaki durumlarda istemciye gönderilir
gprc.SendHeaderveServertream.SendHeaderçağrıldığında- Unary rpc handler döndüğünde
- Stream rpc'de stream nesnesinin
Stream.SendMsgçağrıldığında - rpc isteğinin durumu
send outolduğunda, bu durum ya rpc isteğinin başarılı olduğu ya da hata oluştuğu anlamına gelir.
Stream rpc için stream nesnesinin SendHeader ve SetHeader yöntemlerini kullanmanız önerilir.
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
...
}TIP
Kullanım sürecinde Header ve Trailer işlevlerinin neredeyse aynı olduğunu göreceksiniz, ancak ana farkları gönderme zamanıdır, unary rpc'de bunu hissedemeyebilirsiniz, ancak bu fark stream RPC'de özellikle belirgindir, çünkü stream RPC'de Header isteğin bitmesini beklemeden gönderilebilir. Header'ın belirli durumlarda gönderileceği daha önce belirtilmiştir, Trailer ise sadece tüm RPC isteği bittikten sonra gönderilir, bundan önce alınan trailer boştur.
İstemci Tarafı Kullanımı
metadata alma
İstemci yanıt header'ını almak istiyorsa grpc.Header ve grpc.Trailer aracılığıyla实现 edebilir
func Header(md *metadata.MD) CallOptionfunc Trailer(md *metadata.MD) CallOptionAncak dikkat edilmesi gereken, doğrudan alınamaz, yukarıdaki iki fonksiyonun dönüş değeri CallOption'dır, yani RPC isteği başlatıldığında option parametresi olarak传入 edilir.
// Değer almak için md bildir
var header, trailer metadata.MD
// RPC isteği çağrıldığında option传入
res, err := client.SomeRPC(
ctx,
data,
grpc.Header(&header),
grpc.Trailer(&trailer)
)İstek tamamlandıktan sonra değer传入 edilen md'ye yazılır. Stream rpc için istek başlatıldığında dönen stream nesnesi aracılığıyla doğrudan alınabilir
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
...
}stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()metadata gönderme
İstemci metadata göndermek istiyorsa çok basittir, daha önce metadata'nın表现形式 valueContext olduğunu belirtmiştik, metadata'yı context ile birleştirin, ardından istek yaparken context'i传入 edin, metadata paketi context oluşturmayı kolaylaştırmak için iki fonksiyon sağlar.
func NewOutgoingContext(ctx context.Context, md MD) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// Stream rpc
stream,err := client.StreamRPC(outgoingContext)Eğer orijinal ctx'de zaten metadata varsa NewOutgoingContext kullanmak önceki verileri doğrudan üzerine yazar, bu durumu önlemek için aşağıdaki fonksiyonu kullanabilirsiniz, üzerine yazmaz, bunun yerine verileri birleştirir.
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")
// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// Stream rpc
stream,err := client.StreamRPC(appendContext)Interceptor
gRPC interceptor'ları gin'deki Middleware'e benzer, her ikisi de istek öncesi veya sonrası bazı özel işler yapmak ve iş mantığını etkilememek içindir. gRPC'de interceptor'lar iki kategoriye ayrılır, sunucu interceptor'ları ve istemci interceptor'ları, istek türüne göre Unary RPC interceptor'ları ve Stream RPC interceptor'ları olarak ayrılır, aşağıdaki şekil

Interceptor'ları daha iyi anlayabilmek için aşağıda çok basit bir örnek ile açıklama yapılacaktır.
grpc_learn\interceptor
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| person.proto
|
+---person
| person.pb.go
| person_grpc.pb.go
|
\---server
main.goperson.proto içeriği şu şekildedir
syntax = "proto3";
option go_package = ".;person";
import "google/protobuf/wrappers.proto";
message personInfo {
string name = 1;
int64 age = 2;
string address = 3;
}
service person {
rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}Sunucu kodu şu şekildedir, mantık önceki içeriklerle aynı, oldukça basit, tekrar açıklanmayacaktır.
package main
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"io"
"sync"
)
// Veri depola
var personData sync.Map
type PersonService struct {
person.UnimplementedPersonServer
}
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, person.PersonNotFoundErr
}
personInfo := value.(*person.PersonInfo)
return personInfo, nil
}
func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
count := 0
for {
personInfo, err := personStream.Recv()
if errors.Is(err, io.EOF) {
return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
} else if err != nil {
return err
}
personData.Store(personInfo.Name, personInfo)
count++
}
}Sunucu Interceptor'ı
Sunucu rpc isteğini intercept eden UnaryServerInterceptor ve StreamServerInterceptor vardır, belirli türler aşağıda gösterilmiştir
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorUnary RPC
Unary RPC interceptor'ı oluşturmak için sadece UnaryserverInterceptor türünü implement etmeniz gerekir, aşağıda basit bir Unary RPC interceptor örneği vardır, işlevi her rpc isteğini ve yanıtını çıktılamaktır.
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc istek verisi
// param info *grpc.UnaryServerInfo bu Unary RPC'nin bazı istek bilgileri
// param unaryHandler grpc.UnaryHandler belirli handler
// return resp interface{} rpc yanıt verisi
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
resp, err = unaryHandler(ctx, req)
log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
return resp, err
}Unary RPC için interceptor her RPC isteğini ve yanıtını intercept eder, yani RPC istek aşamasını ve yanıt aşamasını intercept eder, eğer interceptor error dönerse bu istek sona erer.
Stream rpc
Stream RPC interceptor'ı oluşturmak için sadece StreamServerInterceptor türünü implement etmeniz gerekir, aşağıda basit bir Stream RPC interceptor örneği vardır.
// StreamPersonLogInterceptor
// param srv interface{} sunucu implementasyonuna karşılık gelen server
// param stream grpc.ServerStream stream nesnesi
// param info *grpc.StreamServerInfo stream bilgisi
// param streamHandler grpc.StreamHandler işlemci
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
err := streamHandler(srv, stream)
log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
return err
}Stream RPC için interceptor her stream nesnesinin Send ve Recve yöntemi çağrıldığında intercept eder, eğer interceptor error dönerse bu RPC isteğinin sona ermesine neden olmaz, sadece bu send veya recv'de hata oluştuğu anlamına gelir.
Interceptor Kullanımı
Oluşturulan interceptor'ların etkili olması için gRPC sunucusu oluşturulurken option olarak传入 edilmelidir, resmi de kullanım için ilgili fonksiyonları sağlar. Aşağıda gösterildiği gibi tek interceptor ekleyen fonksiyonlar ve zincir interceptor ekleyen fonksiyonlar vardır.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
func StreamInterceptor(i StreamServerInterceptor) ServerOption
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptionTIP
UnaryInterceptor tekrar kullanıldığında aşağıdaki panic fırlatılır
panic: The unary server interceptor was already set and may not be reset.StreamInterceptor da aynı şekilde, zincir interceptor tekrar çağrıldığında aynı zincire append edilir.
Kullanım örneği aşağıda gösterilmiştir
package main
import (
"google.golang.org/grpc"
"grpc_learn/interceptor/person"
"log"
"net"
)
func main() {
log.SetPrefix("server ")
listen, err := net.Listen("tcp", "9090")
if err != nil {
log.Panicln(err)
}
server := grpc.NewServer(
// Zincir interceptor ekle
grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
)
person.RegisterPersonServer(server, &PersonService{})
server.Serve(listen)
}İstemci Interceptor'ı
İstemci interceptor'ı sunucu ile neredeyse aynı, bir unary interceptor UnaryClientInterceptor, bir stream interceptor StreamClientInterceptor, belirli türler aşağıda gösterilmiştir.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)Unary RPC
Unary RPC istemci interceptor'ı oluşturmak için UnaryClientInterceptor implement edilmesi yeterlidir, aşağıda basit bir örnek vardır.
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string yöntem adı
// param req interface{} istek verisi
// param reply interface{} yanıt verisi
// param cc *grpc.ClientConn istemci bağlantı nesnesi
// param invoker grpc.UnaryInvoker intercept edilen belirli istemci yöntemi
// param opts ...grpc.CallOption bu isteğin yapılandırma öğeleri
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
err := invoker(ctx, method, req, reply, cc, opts...)
log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
return err
}İstemci unary RPC interceptor'ı aracılığıyla yerel isteğin istek verisi ve yanıt verisi ile bazı diğer istek bilgileri alınabilir.
Stream RPC
Stream RPC istemci interceptor'ı oluşturmak için StreamClientInterceptor implement edilmesi yeterlidir, aşağıda bir örnek vardır.
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc stream nesnesinin açıklama bilgisi
// param cc *grpc.ClientConn bağlantı nesnesi
// param method string yöntem adı
// param streamer grpc.Streamer stream nesnesi oluşturmak için kullanılan nesne
// param opts ...grpc.CallOption bağlantı yapılandırma öğeleri
// return grpc.ClientStream oluşturulan istemci stream nesnesi
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return stream, err
}Stream RPC istemci interceptor'ı aracılığıyla sadece istemci ile sunucu bağlantı kurduğunda yani stream oluşturma zamanında intercept edilebilir, istemci stream nesnesinin her mesaj alıp gönderdiğinde intercept edilemez, ancak interceptor'da oluşturulan stream nesnesini paketleyerek mesaj alıp göndermeyi intercept实现 edebiliriz, aşağıdaki gibi
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc stream nesnesinin açıklama bilgisi
// param cc *grpc.ClientConn bağlantı nesnesi
// param method string yöntem adı
// param streamer grpc.Streamer stream nesnesi oluşturmak için kullanılan nesne
// param opts ...grpc.CallOption bağlantı yapılandırma öğeleri
// return grpc.ClientStream oluşturulan istemci stream nesnesi
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %stream name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}
type ClientStreamInterceptorWrapper struct {
method string
desc *grpc.StreamDesc
grpc.ClientStream
}
func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
// Mesaj göndermeden önce
err := c.ClientStream.SendMsg(m)
// Mesaj gönderdikten sonra
log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
return err
}
func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
// Mesaj almadan önce
err := c.ClientStream.RecvMsg(m)
// Mesaj aldıktan sonra
log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
return err
}Interceptor Kullanımı
Kullanırken sunucu ile benzer şekilde dört araç fonksiyonu option aracılığıyla interceptor eklemek için kullanılır, tek interceptor ve zincir interceptor olarak ayrılır.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOptionTIP
İstemci WithUnaryInterceptor tekrar kullandığında panic fırlatılmaz, ancak sadece sonuncusu etkili olur.
Aşağıda bir kullanım örneği vardır
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"log"
)
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}Şu ana kadar tüm örnek kodu yazıldı, çalıştırıp sonucun nasıl olduğunu görmek zamanı geldi. Sunucu çıktısı şu şekildedir
server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not foundİstemci çıktısı şu şekildedir
C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not foundHer iki tarafın çıktısının beklentilere uygun olduğunu, intercept etkisini sağladığını görebilirsiniz, bu örnek çok basit bir örnektir, gRPC interceptor'larını kullanarak yetkilendirme, log, izleme ve diğer işlevler gibi birçok şey yapılabilir, kendi tekerleğinizi yapmayı seçebilir veya açık kaynak topluluğunun hazır tekerleklerini kullanmayı seçebilirsiniz, gRPC Ecosystem bir dizi açık kaynak gRPC interceptor middleware toplar, adres: grpc-ecosystem/go-grpc-middleware.
Hata İşleme
Başlamadan önce bir örneğe bakalım, önceki interceptor örneğinde kullanıcı sorgulama yapamadığında istemciye person not found hatası döndürülür, soru şu, istemci dönen hataya göre özel işlem yapabilir mi? Sonra deneyelim, istemci kodunda errors.Is kullanarak hatayı判断 etmeye çalışın.
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
log.Println(info, err)
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}
}Sonuç çıktısı şu şekildedir
client 2023/07/21 16:46:10 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not foundİstemcinin aldığı error'un şu şekilde olduğunu görebilirsiniz, sunucunun döndürdüğü error'un desc alanında olduğunu fark edersiniz
rpc error: code = Unknown desc = person not foundDoğal olarak errors.Is bu mantığı执行 etmez, errors.As ile değiştirseniz bile aynı sonuç olur.
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}Bu nedenle gRPC bu tür sorunları çözmek için status paketini sağlar, bu da istemcinin aldığı hatanın neden code ve desc alanları olduğunu gösterir, çünkü gRPC aslında istemciye bir Status döndürür, belirli türü aşağıda gösterilmiştir, protobuf tanımlı bir message olduğu görülebilir.
type Status struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}message Status {
// The status code, which should be an enum value of
// [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// A developer-facing error message, which should be in English. Any
// user-facing error message should be localized and sent in the
// [google.rpc.Status.details][google.rpc.Status.details] field, or localized
// by the client.
string message = 2;
// A list of messages that carry the error details. There is a common set of
// message types for APIs to use.
repeated google.protobuf.Any details = 3;
}Hata Kodları
Status yapısındaki Code, Http Status'a benzer bir varlıktır, mevcut rpc isteğinin durumunu göstermek için kullanılır, gRPC grpc/codes içinde 16 code tanımlar, çoğu senaryoyu kapsar, sırasıyla aşağıda gösterilmiştir
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32
const (
// Çağrı başarılı
OK Code = 0
// İstek iptal edildi
Canceled Code = 1
// Bilinmeyen hata
Unknown Code = 2
// Parametre geçersiz
InvalidArgument Code = 3
// İstek zaman aşımı
DeadlineExceeded Code = 4
// Kaynak mevcut değil
NotFound Code = 5
// Aynı kaynak zaten mevcut (bunun出现masını beklemiyordum)
AlreadyExists Code = 6
// Yetki yetersiz, erişim reddedildi
PermissionDenied Code = 7
// Kaynak tükendi, kalan kapasite kullanılamıyor, örneğin disk kapasitesi yetersiz
ResourceExhausted Code = 8
// 执行 koşulları yetersiz, örneğin rm ile boş olmayan bir dizini silmek, silme koşulu dizinin boş olması ama koşul karşılanmıyor
FailedPrecondition Code = 9
// İstek kesintiye uğradı
Aborted Code = 10
// İşlem erişim sınırın ötesine geçti
OutOfRange Code = 11
// Mevcut servisin implement edilmediğini gösterir
Unimplemented Code = 12
// Sistem iç hatası
Internal Code = 13
// Servis kullanılamıyor
Unavailable Code = 14
// Veri kayboldu
DataLoss Code = 15
// Kimlik doğrulama başarısız
Unauthenticated Code = 16
_maxCode = 17
)grpc/status paketi status ile error arasında dönüşüm için oldukça fazla fonksiyon sağlar. Doğrudan status.New kullanarak bir Status oluşturabiliriz veya Newf
func New(c codes.Code, msg string) *Status
func Newf(c codes.Code, format string, a ...interface{}) *StatusÖrneğin aşağıdaki kod
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)status'un err yöntemi aracılığıyla其中的 error alınabilir, durum ok olduğunda error nil'dir.
func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{s: s}
}Doğrudan error da oluşturabilirsiniz
func Err(c codes.Code, msg string) error
func Errorf(c codes.Code, format string, a ...interface{}) errorsuccess := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)Bu nedenle sunucu kodunu aşağıdaki gibi değiştirebiliriz
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
}
personInfo := value.(*person.PersonInfo)
return personInfo, status.Errorf(codes.OK, "request success")
}Bundan önce sunucunun döndürdüğü tüm code'lar unknown'du, şimdi değişiklikten sonra daha açık bir anlam kazandı. Bu nedenle istemci status.FromError veya aşağıdaki fonksiyonu kullanarak error'dan status alabilir, böylece farklı code'lara göre ilgili işlemi yapabilir
func FromError(err error) (s *Status, ok bool)
func Convert(err error) *Status
func Code(err error) codes.CodeÖrnek aşağıda gösterilmiştir
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
...
}Ancak grpc code'u mümkün olduğunca bazı genel senaryoları kapsasa da bazen geliştiricilerin ihtiyaçlarını karşılayamaz, bu durumda Status'taki Details alanı kullanılabilir ve ayrıca bir dilimdir, birden fazla bilgi barındırabilir. Status.WithDetails aracılığıyla bazı özel bilgiler传入 edilebilir
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)Status.Details aracılığıyla bilgi alın
func (s *Status) Details() []interface{}Dikkat edilmesi gereken,传入 edilen bilginin protobuf tarafından tanımlanmış olması en iyisidir, böylece sunucu ve istemci iki ucunun da ayrıştırması kolaylaşır, resmi birkaç örnek verir
message ErrorInfo {
// Hatanın nedeni
string reason = 1;
// Servisi tanımlayan ana主体
string domain = 2;
// Diğer bilgiler
map<string, string> metadata = 3;
}
// Yeniden deneme bilgisi
message RetryInfo {
// Aynı isteğin bekleme aralığı
google.protobuf.Duration retry_delay = 1;
}
// Hata ayıklama bilgisi
message DebugInfo {
// Yığın
repeated string stack_entries = 1;
// Bazı detay bilgileri
string detail = 2;
}
...
...Daha fazla örnek için googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) adresini ziyaret edebilirsiniz. Gerekirse aşağıdaki kodu kullanarak引入 edebilirsiniz.
import "google.golang.org/genproto/googleapis/rpc/errdetails"ErrorInfo'yu details olarak kullan
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
notFound.WithDetails(&errdetails.ErrorInfo{
Reason: "person not found",
Domain: "xxx",
Metadata: nil,
})İstemci tarafında veri alıp işlem yapılabilir, ancak yukarıdaki sadece gRPC'nin önerdiği bazı örneklerdir, bunun dışında业务 ihtiyaçlarını daha iyi karşılamak için kendi message'ınızı da tanımlayabilirsiniz, eğer bazı birleşik hata işlemleri yapmak istiyorsanız interceptor'a da koyabilirsiniz.
Zaman Aşımı Kontrolü

Çoğu durumda genellikle sadece bir servis olmaz ve upstream'de birçok servis olabilir, downstream'de de birçok servis olabilir. İstemci bir istek başlattığında en upstream servisten en downstream servise bir servis çağrı zinciri oluşur, şekildaki gibi, belki de şekildakinden daha uzun olabilir.
Bu kadar uzun bir çağrı zincirinde eğer其中一个 servisin mantık işleme uzun zaman alırsa upstream'in sürekli bekleme durumunda kalmasına neden olur. Gereksiz kaynak israfını azaltmak için bu nedenle zaman aşımı mekanizması引入 edilmelidir, böylece en upstream çağrıda传入 edilen zaman aşımı süresi tüm çağrı zincirinin izin verilen maksimum执行 süresi olur. gRPC prosesler arası ve diller arası zaman aşımı iletebilir, iletilmesi gereken bazı verileri HTTP2'nin HEADERS Frame帧ine koyar, aşağıdaki şekil gibi

gRPC isteğindeki zaman aşımı verisi HEADERS Frame'deki grpc-timeout alanına karşılık gelir. Dikkat edilmesi gereken, tüm gRPC kütüphanelerinin bu zaman aşımı iletim mekanizmasını implement etmediği, ancak gRPC-go kesinlikle destekler, eğer diğer dillerin kütüphanelerini kullanıyorsanız ve bu özelliği kullanıyorsanız bu noktaya ekstra dikkat etmeniz gerekir.
Bağlantı Zaman Aşımı
gRPC istemcisi sunucuya bağlantı kurarken varsayılan olarak asenkron kurulur, eğer bağlantı başarısız olursa boş bir Client döner. Bağlantının senkron olmasını istiyorsanız bağlantı başarılı olana kadar beklemek için grpc.WithBlock() kullanabilirsiniz.
dial, err := grpc.Dial("localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)Bir zaman aşımı süresi kontrol etmek istiyorsanız sadece bir TimeoutContext传入 etmeniz yeterlidir, context传入 etmek için grpc.Dial yerine grpc.DialContext kullanın.
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)Bu şekilde eğer bağlantı zaman aşımına uğrarsa error döner
context deadline exceededSunucu tarafında da bağlantı zaman aşımı ayarlanabilir, istemci ile yeni bağlantı kurulurken bir zaman aşımı süresi ayarlanır, varsayılan 120 saniyedir, eğer belirtilen sürede bağlantı başarılı bir şekilde kurulamazsa sunucu aktif olarak bağlantıyı keser.
server := grpc.NewServer(
grpc.ConnectionTimeout(time.Second*3),
)TIP
grpc.ConnectionTimeout hala deneysel aşamadadır, gelecekte API değiştirilebilir veya silinebilir.
İstek Zaman Aşımı
gRPC istemcisi istek başlattığında ilk parametre Context türüdür, aynı şekilde RPC isteğine bir zaman aşımı süresi eklemek istiyorsanız sadece bir TimeoutContext传入 etmeniz yeterlidir
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
// Zaman aşımı mantık işleme
}gRPC işlemden sonra zaman aşımı süresi sunucuya iletilir, iletim sürecinde帧 alanı şeklinde存在ir, go içinde context şeklinde存在ir, böylece tüm linkte iletilir. Link iletim sürecinde zaman aşımı süresini değiştirmek önerilmez, istek sırasında ne kadar zaman aşımı süresi ayarlanacağı en upstream'in düşünmesi gereken bir sorundur.
Kimlik Doğrulama Yetkilendirme
Mikroservis alanında her servis istek için kullanıcı kimliği ve yetkisini doğrulamalıdır, eğer monolitik uygulama gibi her servis kendi kimlik doğrulama mantığını implement ederse bu gerçekçi değildir. Bu nedenle birleşik bir kimlik doğrulama ve yetkilendirme servisine ihtiyaç vardır, yaygın çözüm OAuth2, dağıtık Session ve JWT kullanmaktır, bunlar arasında OAuth2 en yaygın kullanılır, bir dönem endüstri standardı haline gelmiştir, OAuth2'nin en yaygın token türü JWT'dir. Aşağıda OAuth2 yetkilendirme kod modunun akış şeması vardır, temel akış şekildaki gibidir.

Güvenli İletim
Servis Kaydı ve Keşfi
İstemci sunucunun belirli servisini çağırmadan önce sunucunun ip ve portunu bilmesi gerekir, önceki örneklerde sunucu adresi sabit yazılmıştı. Gerçek ağ ortamı her zaman o kadar stabil değildir, bazı servisler arıza nedeniyle çevrimdışı olabilir ve erişilemez, ayrıca iş gelişimi nedeniyle makine taşınması nedeniyle adres değişebilir, bu durumlarda statik adres kullanarak servise erişilemez, bu dinamik sorunlar servis keşfi ve kaydının çözmesi gereken sorunlardır, servis keşfi servis adresi değişikliklerini izler ve günceller, servis kaydı dış dünyaya kendi adresini bildirir. gRPC temel servis keşfi işlevi sağlar ve genişletme ve özelleştirmeyi destekler.
Statik adres kullanılamaz, bazı özel isimler kullanılabilir, örneğin tarayıcı DNS aracılığıyla alan adı çözümleyerek adres alır, aynı şekilde gRPC'nin varsayılan servis keşfi DNS aracılığıyla yapılır, yerel host dosyasını değiştirin, aşağıdaki eşlemeyi ekleyin
127.0.0.1 example.grpc.comArdından helloworld örneğinde istemci Dial adresini ilgili alan adına değiştirin
func main() {
// Bağlantı kur, şifreleme doğrulaması yok
conn, err := grpc.Dial("example.grpc.com:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// İstemci oluştur
client := hello2.NewSayHelloClient(conn)
// Uzaktan çağrı
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}Aynı şekilde normal çıktıyı görebilirsiniz
2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"gRPC'de bu tür isimler RFC 3986'da tanımlanan URI sözdizimine uymalıdır, format şu şekildedir
hierarchical part
┌───────────────────┴─────────────────────┐
authority path
┌───────────────┴───────────────┐┌───┴────┐
abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
└┬┘ └───────┬───────┘ └────┬────┘ └┬┘ └─────────┬─────────┘ └──┬──┘
scheme user information host port query fragmentYukarıdaki örnek URI aşağıdaki formdadır, dns varsayılan olarak desteklendiği için öndeki scheme atlanır.
dns:example.grpc.com:8080Bunun dışında gRPC varsayılan olarak Unix domain sockets'ı destekler, diğer yöntemler için gRPC genişletmesine göre özelleştirilmiş implementasyon yapmamız gerekir, bu nedenle özel bir çözümleyici resolver.Resovler implement etmemiz gerekir, resolver hedef adres ve servis yapılandırması güncellemelerini izlemekten sorumludur.
type Resolver interface {
// gRPC ResolveNow'u tekrar hedef adını çözümlemeyi denemek için çağırır. Bu sadece bir ipucudur, gerekirse çözümleyici bunu görmezden gelebilir, yöntem eşzamanlı olarak çağrılabilir
ResolveNow(ResolveNowOptions)
Close()
}gRPC bir Resolver oluşturucu传入 etmemizi ister, yani resolver.Builder, Resolver örnekleri oluşturmaktan sorumludur.
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}Builder'ın Scheme yöntemi sorumlu olduğu Scheme türünü döner, örneğin varsayılan dnsBuilder dns döner, oluşturucu başlatıldığında resolver.Register kullanarak global Builder'a kaydedilmeli veya options olarak grpc.WithResolver kullanarak ClientConn içine传入 edilmelidir, ikincinin önceliği前者'den yüksektir.

Yukarıdaki şekil resolver'ın çalışma akışını basitçe açıklar, ardından nasıl özelleştirilmiş resolver oluşturulacağını göstereceğim
Özelleştirilmiş Servis Çözümleyici
Aşağıda özelleştirilmiş bir çözümleyici yazın, oluşturmak için özelleştirilmiş bir çözümleyici oluşturucuya ihtiyaç vardır.
package myresolver
import (
"fmt"
"google.golang.org/grpc/resolver"
)
func NewBuilder(ads map[string][]string) *MyBuilder {
return &MyBuilder{ads: ads}
}
type MyBuilder struct {
ads map[string][]string
}
func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Scheme != c.Scheme() {
return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
}
m := &MyResolver{ads: c.ads, t: target, cc: cc}
// Burada updatestate zorunludur, aksi takdirde deadlock oluşur
m.start()
return m, nil
}
func (c *MyBuilder) Scheme() string {
return "hello"
}
type MyResolver struct {
t resolver.Target
cc resolver.ClientConn
ads map[string][]string
}
func (m *MyResolver) start() {
addres := make([]resolver.Address, 0)
for _, ad := range m.ads[m.t.URL.Opaque] {
addres = append(addres, resolver.Address{Addr: ad})
}
err := m.cc.UpdateState(resolver.State{
Addresses: addres,
// Yapılandırma, loadBalancingPolicy yük dengeleme stratejisidir
ServiceConfig: m.cc.ParseServiceConfig(
`{"loadBalancingPolicy":"round_robin"}`),
})
if err != nil {
m.cc.ReportError(err)
}
}
func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (m *MyResolver) Close() {}Özelleştirilmiş çözümleyici map içindeki eşleşen adresleri updatestate'e传入 eder, aynı zamanda yük dengeleme stratejisini belirtir, round_robin round-robin anlamına gelir.
// service config yapısı şu şekildedir
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *internalserviceconfig.BalancerConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}İstemci kodu şu şekildedir
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"grpc_learn/helloworld/client/myresolver"
hello2 "grpc_learn/helloworld/hello"
"log"
"time"
)
func init() {
// builder kaydı
resolver.Register(myresolver.NewBuilder(map[string][]string{
"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
}))
}
func main() {
// Bağlantı kur, şifreleme doğrulaması yok
conn, err := grpc.Dial("hello:myworld",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// İstemci oluştur
client := hello2.NewSayHelloClient(conn)
// Her saniye bir çağrı
for range time.Tick(time.Second) {
// Uzaktan çağrı
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}
}Normal olarak akış sunucunun kayıt merkezine kendi servisini kaydetmesi, ardından istemcinin kayıt merkezinden servis listesini alıp eşleştirmesi gerekir, burada传入 edilen map simüle edilmiş bir kayıt merkezidir, veri statik olduğu için servis kaydı aşaması atlanır, sadece servis keşfi kalır. İstemci tarafından kullanılan target hello:myworld'dür, hello özelleştirilmiş scheme'dir, myworld servis adıdır, özelleştirilmiş çözümleyici tarafından çözüldükten sonra 127.0.0.1:8080 gerçek adresi alınır, gerçek durumda yük dengeleme yapmak için bir servis adı birden fazla gerçek adresle eşleşebilir, bu nedenle servis adının bir dilime karşılık gelmesinin nedeni budur, burada iki sunucu açılır, farklı portları işgal eder, yük dengeleme stratejisi round-robin'dir, sunucu çıktıları sırasıyla aşağıdadır, istek zamanından yük dengeleme stratejisinin gerçekten etkili olduğu görülebilir, eğer strateji belirtilmezse varsayılan olarak sadece ilk servis seçilir.
// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"
// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"Kayıt merkezi aslında servis kayıt adı ile gerçek servis adresi eşleme koleksiyonudur, veri depolayabilen herhangi bir middleware koşulu karşılayabilir, hatta mysql'i kayıt merkezi olarak kullanmak da imkansız değildir (bunu yapan biri olacağını sanmıyorum). Genel olarak kayıt merkezi KV depolamadır, redis oldukça iyi bir seçimdir, ancak redis'i kayıt merkezi olarak kullanırsak birçok mantığı kendimiz implement etmemiz gerekir, örneğin servisin kalp atışı kontrolü, servis çevrimdışı olması, servis zamanlaması vb., oldukça zahmetlidir, redis'in bu konuda belirli bir uygulaması vardır ancak azdır. Profesyonel işleri profesyonellere bırakmak gerekir, bu konuda oldukça ünlü birçok şey vardır: Zookeeper, Consul, Eureka, ETCD, Nacos vb.
Bu middleware'lerden bazı farkları öğrenmek için 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) adresini ziyaret edebilirsiniz.
Consul ile Birleştirme
consul ile birleştirilmiş kullanım örneği için consul adresini ziyaret edebilirsiniz
