gRPC

La llamada a procedimiento remoto (RPC) debería ser un punto que deba aprenderse en los microservicios. Durante el proceso de aprendizaje, se encontrarán diversos marcos RPC. Sin embargo, en el ámbito de Go, casi todos los marcos RPC se basan en gRPC, y además se ha convertido en un protocolo básico en el ámbito de la computación en la nube. ¿Por qué elegirlo? Esta es la respuesta oficial:
gRPC es un marco de trabajo de código abierto de alto rendimiento moderno para llamadas a procedimientos remotos (Remote Process Call, RPC) que puede ejecutarse en cualquier entorno. Puede conectar eficazmente servicios dentro y entre centros de datos con soporte de equilibrio de carga enchufable, seguimiento, comprobación de estado y autenticación. También es adecuado para la última milla de computación distribuida que conecta dispositivos, aplicaciones móviles y navegadores a servicios backend.
Sitio web oficial: gRPC
Documentación oficial: Documentation | gRPC
Tutorial técnico de gRPC: Basics tutorial | Go | gRPC
Sitio web de ProtocBuf: Reference Guides | Protocol Buffers Documentation (protobuf.dev)
También es un proyecto de código abierto bajo la fundación CNCF. CNCF significa CLOUD NATIVE COMPUTING FOUNDATION, traducido como Fundación de Computación en la Nube Nativa.

Características
Definición de servicio simple
Utilice Protocol Buffers para definir servicios, que es un potente conjunto de herramientas de serialización binaria y lenguaje.
Inicio y escalado muy rápidos
Solo se necesita una línea de código para instalar el entorno de ejecución y el entorno de desarrollo. Puede escalar a millones de RPC por segundo en solo unos segundos.
Multi-lenguaje, multi-plataforma
Genera automáticamente stubs de servicio del lado del cliente y del servidor para diferentes plataformas y lenguajes.
Flujo bidireccional y autorización integrada
Flujo bidireccional basado en HTTP/2 y autenticación y autorización enchufables.
Aunque GRPC es independiente del lenguaje, la mayor parte del contenido de este sitio está relacionado con Go, por lo que este artículo también utilizará Go como lenguaje principal para la explicación. Los compiladores y generadores de pb utilizados posteriormente pueden ser buscados por usuarios de otros lenguajes en el sitio web oficial de Protobuf según sea necesario. Por conveniencia, el proceso de creación del proyecto se omitirá directamente a continuación.
TIP
Este artículo hace referencia al contenido de los siguientes artículos:
写给 go 开发者的 gRPC 教程-protobuf 基础 - 掘金 (juejin.cn)
gRPC 中的 Metadata - 熊喵君的博客 | PANDAYCHEN
Instalación de dependencias
Primero descargue el compilador de Protocol Buffer. Dirección de descarga: Releases · protocolbuffers/protobuf (github.com)

Seleccione el sistema y la versión según su situación. Después de la descarga, debe agregar el directorio bin al PATH.
Luego también debe descargar el generador de código. El compilador genera código de serialización del archivo proto al lenguaje correspondiente, y el generador se usa para generar código de negocio.
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latestCree un proyecto vacío, aquí el nombre es grpc_learn, luego introduzca las siguientes dependencias:
$ go get google.golang.org/grpcFinalmente, verifique la versión para ver si realmente se instaló correctamente:
$ protoc --version
libprotoc 23.4
$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1
$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0Hello World
Estructura del proyecto
A continuación se presentará una demostración con un ejemplo Hello World. Cree la siguiente estructura de proyecto:
grpc_learn\helloworld
|
+---client
| main.go
|
+---hello
|
|
+---pb
| hello.proto
|
\---server
main.goDefinir archivo protobuf
En pb/hello.proto, ingrese el siguiente contenido. Este es un ejemplo bastante simple. Si no conoce la sintaxis de protoc, consulte la documentación relacionada.
syntax = "proto3";
// . significa generar directamente en la ruta de salida, hello es el nombre del paquete
option go_package = ".;hello";
// Solicitud
message HelloReq {
string name = 1;
// Respuesta
message HelloRep {
string msg = 1;
}
// Definir servicio
service SayHello {
rpc Hello(HelloReq) returns (HelloRep) {}
}Generar código
Después de completar la escritura, use el compilador protoc para generar código relacionado con la serialización de datos y use el generador para generar código rpc:
$ protoc -I ./pb \
--go_out=./hello ./pb/*.proto\
--go-grpc_out=./hello ./pb/*.protoEn este momento, puede descubrir que se generaron los archivos hello.pb.go y hello_grpc.pb.go en la carpeta hello. Al navegar por hello.pb.go, puede encontrar el message que definimos:
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Campos definidos
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
type HelloRep struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// Campos definidos
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}En hello_grpc.pb.go, puede encontrar el servicio que definimos:
type SayHelloServer interface {
Hello(context.Context, *HelloReq) (*HelloRep, error)
mustEmbedUnimplementedSayHelloServer()
}
// Posteriormente, si implementamos la interfaz de servicio nosotros mismos, debemos incrustar esta estructura y no será necesario implementar el método mustEmbedUnimplementedSayHelloServer
type UnimplementedSayHelloServer struct {
}
// Devuelve nil por defecto
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
// Restricción de interfaz
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}
type UnsafeSayHelloServer interface {
mustEmbedUnimplementedSayHelloServer()
}Escribir el servidor
Escriba el siguiente código en server/main.go:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_learn/server/protoc"
"log"
"net"
)
type GrpcServer struct {
pb.UnimplementedSayHelloServer
}
func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
log.Printf("received grpc req: %+v", req.String())
return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}
func main() {
// Escuchar puerto
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
// Crear servidor gRPC
server := grpc.NewServer()
// Registrar servicio
pb.RegisterSayHelloServer(server, &GrpcServer{})
// Ejecutar
err = server.Serve(listen)
if err != nil {
panic(err)
}
}Escribir el cliente
Escriba el siguiente código en client/main.go:
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc_learn/server/protoc"
"log"
)
func main() {
// Establecer conexión, sin verificación de cifrado
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
// Crear cliente
client := pb.NewSayHelloClient(conn)
// Llamada remota
helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}Ejecutar
Ejecute primero el servidor, luego ejecute el cliente. La salida del servidor es la siguiente:
2023/07/16 16:26:51 received grpc req: name:"client"La salida del cliente es la siguiente:
2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"En este ejemplo, después de que el cliente establece la conexión, al llamar al método remoto, es como llamar a un método local. Simplemente acceda al método Hello del client y obtenga el resultado. Este es el ejemplo GRPC más simple. Muchos marcos de código abierto también encapsulan este proceso.
bufbuild
En el ejemplo anterior, el código se generó directamente usando comandos. Si hay muchos complementos más adelante, el comando será bastante tedioso. En este momento, puede usar una herramienta para gestionar archivos protobuf. Hay una herramienta de gestión de código abierto llamada bufbuild/buf.
Dirección de código abierto: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)
Dirección de documentación: Buf - Install the Buf CLI
Características
- Gestión BSR
- Linter
- Generación de código
- Formateo
- Gestión de dependencias
Con esta herramienta, puede gestionar archivos protobuf de manera muy conveniente.
La documentación proporciona muchas formas de instalación. Puede elegir la que prefiera. Si tiene un entorno Go instalado localmente, puede usar directamente go install para instalar:
$ go install github.com/bufbuild/buf/cmd/buf@latestDespués de la instalación, verifique la versión:
$ buf --version
1.24.0Vaya a la carpeta helloworld/pb y ejecute el siguiente comando para crear un módulo para gestionar archivos pb:
$ buf mod init
$ ls
buf.yaml hello.protoEl contenido del archivo buf.yaml es el siguiente por defecto:
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULTLuego vaya al directorio helloworld/ y cree buf.gen.yaml con el siguiente contenido:
version: v1
plugins:
- plugin: go
out: hello
opt:
- plugin: go-grpc
out: hello
opt:Luego ejecute el comando para generar código:
$ buf generateDespués de completar, puede ver los archivos generados. Por supuesto, buf tiene más funciones que esta. Puede aprender otras funciones en la documentación.
RPC de flujo
Hay dos categorías principales de llamadas gRPC: RPC unario (Unary RPC) y RPC de flujo (Stream RPC). El ejemplo en Hello World es un típico RPC unario.

El RPC unario (o RPC ordinario sería más fácil de entender, realmente no sé cómo traducir unary) se usa como HTTP ordinario. El cliente solicita y el servidor devuelve datos. Es una forma de pregunta y respuesta. Mientras que las solicitudes y respuestas de RPC de flujo pueden ser en flujo, como se muestra a continuación:

Al usar una solicitud de flujo, solo se devuelve una respuesta. El cliente puede enviar parámetros al servidor varias veces a través del flujo. El servidor no necesita esperar a que se reciban todos los parámetros para procesarlos como en el RPC unario. La lógica de procesamiento específica puede ser decidida por el servidor. Normalmente, solo el cliente puede cerrar activamente la solicitud de flujo. Una vez que el flujo se cierra, la solicitud RPC actual finalizará.
Al usar una respuesta de flujo, solo se envía un parámetro. El servidor puede enviar datos al cliente varias veces a través del flujo. El cliente no necesita esperar a recibir todos los datos para procesarlos como en el RPC unario. La lógica de procesamiento específica puede ser decidida por el cliente. En una solicitud normal, solo el servidor puede cerrar activamente la respuesta de flujo. Una vez que el flujo se cierra, la solicitud RPC actual finalizará.
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}También puede ser solo la respuesta en flujo (Server-Streaming RPC):
service MessageService {
rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}O tanto la solicitud como la respuesta pueden ser en flujo (Bi-directional-Streaming RPC):
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}Flujo unidireccional
A continuación se demuestra la operación de flujo unidireccional a través de un ejemplo. Primero cree la siguiente estructura de proyecto:
grpc_learn\server_client_stream
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.goEl contenido de message.proto es el siguiente:
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service MessageService {
rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}Genere código a través de buf:
$ buf generateAquí se demuestra un servicio de mensajes. receiveMessage recibe un nombre de usuario especificado, tipo cadena, y devuelve un flujo de mensajes. sendMessage recibe un flujo de mensajes y devuelve el número de mensajes enviados exitosamente, tipo entero de 64 bits. A continuación, cree server/message_service.go e implemente el servicio generado por defecto:
package main
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
)
type MessageService struct {
message.UnimplementedMessageServiceServer
}
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}Se puede ver que tanto el parámetro de recepción de mensajes como el de envío de mensajes tienen una interfaz de envoltura de flujo:
type MessageService_ReceiveMessageServer interface {
// Enviar mensaje
Send(*Message) error
grpc.ServerStream
}
type MessageService_SendMessageServer interface {
// Enviar valor de retorno y cerrar conexión
SendAndClose(*wrapperspb.StringValue) error
// Recibir mensaje
Recv() (*Message, error)
grpc.ServerStream
}Ambos incrustan la interfaz gprc.ServerStream:
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}Se puede ver que el RPC de flujo no es como el RPC unario donde los parámetros de entrada y los valores de retorno pueden reflejarse claramente en la firma de la función. Estos métodos a primera vista no pueden mostrar qué tipo son los parámetros de entrada y los valores de retorno. Se necesita llamar al tipo Stream pasado para completar la transmisión de flujo. A continuación, comience a escribir la lógica específica del servidor. Al escribir la lógica del servidor, se usa un sync.map para simular una cola de mensajes. Cuando el cliente solicita ReceiveMessage, el servidor devuelve continuamente los mensajes que el cliente desea a través de la respuesta de flujo hasta que se agote el tiempo y se cierre la solicitud. Cuando el cliente solicita SendMessage, envía continuamente mensajes a través de la solicitud de flujo, y el servidor coloca continuamente los mensajes en la cola hasta que el cliente cierre activamente la solicitud y devuelva el número de mensajes enviados al cliente.
package main
import (
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"sync"
"time"
)
// Una cola de mensajes simulada
var messageQueue sync.Map
type MessageService struct {
message.UnimplementedMessageServiceServer
}
// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// Recibir mensajes de un usuario especificado
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
timer := time.NewTimer(time.Second * 5)
for {
time.Sleep(time.Millisecond * 100)
select {
case <-timer.C:
log.Printf("No se recibieron mensajes de %s en 5 segundos, cerrando conexión", user.GetValue())
return nil
default:
value, ok := messageQueue.Load(user.GetValue())
if !ok {
messageQueue.Store(user.GetValue(), []*message.Message{})
continue
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue
}
// Obtener mensaje
msg := queue[0]
// Enviar mensaje al cliente a través de transmisión de flujo
err := recvServer.Send(msg)
log.Printf("receive %+v\n", msg)
if err != nil {
return err
}
queue = queue[1:]
messageQueue.Store(user.GetValue(), queue)
timer.Reset(time.Second * 5)
}
}
}
// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// Enviar mensaje a un usuario especificado
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
count := 0
for {
// Recibir mensaje del cliente
msg, err := sendServer.Recv()
if errors.Is(err, io.EOF) {
return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
}
if err != nil {
return err
}
log.Printf("send %+v\n", msg)
value, ok := messageQueue.Load(msg.From)
if !ok {
messageQueue.Store(msg.From, []*message.Message{msg})
continue
}
queue := value.([]*message.Message)
queue = append(queue, msg)
// Poner mensaje en la cola de mensajes
messageQueue.Store(msg.From, queue)
count++
}
}El cliente abre dos goroutines, una para enviar mensajes y otra para recibir mensajes. Por supuesto, también puede enviar y recibir al mismo tiempo. El código es el siguiente:
package main
import (
"context"
"errors"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"time"
)
var Client message.MessageServiceClient
func main() {
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicln(err)
}
defer dial.Close()
Client = message.NewMessageServiceClient(dial)
log.SetPrefix("client\t")
msgTask := task.NewTask(func(err error) {
log.Panicln(err)
})
ctx := context.Background()
// Solicitud de recepción de mensajes
msgTask.AddJobs(func() {
receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
if err != nil {
log.Panicln(err)
}
for {
recv, err := receiveMessageStream.Recv()
if errors.Is(err, io.EOF) {
log.Println("No hay mensajes, cerrando conexión")
break
} else if err != nil {
break
}
log.Printf("receive %+v", recv)
}
})
msgTask.AddJobs(func() {
from := "jack"
to := "mike"
sendMessageStream, err := Client.SendMessage(ctx)
if err != nil {
log.Panicln(err)
}
msgs := []string{
"¿Estás ahí?",
"¿Tienes tiempo para jugar esta tarde?",
"Bueno, jugaremos otro día cuando tengas tiempo",
"Debería poder este fin de semana",
"Entonces está decidido",
}
for _, msg := range msgs {
time.Sleep(time.Second)
sendMessageStream.Send(&message.Message{
From: from,
Content: msg,
To: to,
})
}
// Mensajes enviados, cerrar activamente la solicitud y obtener el valor de retorno
recv, err := sendMessageStream.CloseAndRecv()
if err != nil {
log.Println(err)
} else {
log.Printf("Envío completado, se enviaron un total de %d mensajes\n", recv.GetValue())
}
})
msgTask.Run()
}Después de ejecutar, la salida del servidor es la siguiente:
server 2023/07/18 16:28:24 send from:"jack" content:"¿Estás ahí?" to:"mike"
server 2023/07/18 16:28:24 receive from:"jack" content:"¿Estás ahí?" to:"mike"
server 2023/07/18 16:28:25 send from:"jack" content:"¿Tienes tiempo para jugar esta tarde?" to:"mike"
server 2023/07/18 16:28:25 receive from:"jack" content:"¿Tienes tiempo para jugar esta tarde?" to:"mike"
server 2023/07/18 16:28:26 send from:"jack" content:"Bueno, jugaremos otro día cuando tengas tiempo" to:"mike"
server 2023/07/18 16:28:26 receive from:"jack" content:"Bueno, jugaremos otro día cuando tengas tiempo" to:"mike"
server 2023/07/18 16:28:27 send from:"jack" content:"Debería poder este fin de semana" to:"mike"
server 2023/07/18 16:28:27 receive from:"jack" content:"Debería poder este fin de semana" to:"mike"
server 2023/07/18 16:28:28 send from:"jack" content:"Entonces está decidido" to:"mike"
server 2023/07/18 16:28:28 receive from:"jack" content:"Entonces está decidido" to:"mike"
server 2023/07/18 16:28:33 No se recibieron mensajes de jack en 5 segundos, cerrando conexiónLa salida del cliente es la siguiente:
client 2023/07/18 16:28:24 receive from:"jack" content:"¿Estás ahí?" to:"mike"
client 2023/07/18 16:28:25 receive from:"jack" content:"¿Tienes tiempo para jugar esta tarde?" to:"mike"
client 2023/07/18 16:28:26 receive from:"jack" content:"Bueno, jugaremos otro día cuando tengas tiempo" to:"mike"
client 2023/07/18 16:28:27 receive from:"jack" content:"Debería poder este fin de semana" to:"mike"
client 2023/07/18 16:28:28 Envío completado, se enviaron un total de 5 mensajes
client 2023/07/18 16:28:28 receive from:"jack" content:"Entonces está decidido" to:"mike"
client 2023/07/18 16:28:33 No hay mensajes, cerrando conexiónA través de este ejemplo, se puede descubrir que el procesamiento de solicitudes de RPC de flujo unidireccional es más complejo tanto para el cliente como para el servidor que el RPC unario. Sin embargo, el RPC de flujo bidireccional es aún más complejo que ellos.
Flujo bidireccional
El RPC de flujo bidireccional, es decir, tanto la solicitud como la respuesta son en flujo, es equivalente a combinar los dos servicios del ejemplo anterior en uno. Para el RPC de flujo, la primera solicitud siempre es iniciada por el cliente. Luego, el cliente puede enviar parámetros de solicitud a través del flujo en cualquier momento, y el servidor también puede devolver datos a través del flujo en cualquier momento. Independientemente de qué parte cierre activamente el flujo, la solicitud actual finalizará.
TIP
A menos que sea necesario, el contenido posterior omitirá directamente la descripción del código de generación de código pb y la creación de clientes y servidores rpc.
Primero cree la siguiente estructura de proyecto:
bi_stream\
| buf.gen.yaml
|
+---client
| main.go
|
+---message
| message.pb.go
| message_grpc.pb.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.go
message_service.goEl contenido de message.proto es el siguiente:
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service ChatService {
rpc chat(stream Message) returns (stream Message);
}En la lógica del servidor, después de establecer la conexión, se abren dos goroutines, una responsable de recibir mensajes y otra de enviar mensajes. La lógica de procesamiento específica es similar al ejemplo anterior, pero esta vez se elimina la lógica de determinación de tiempo de espera.
package main
import (
"github.com/dstgo/task"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"sync"
"time"
)
// MessageQueue Cola de mensajes simulada
var MessageQueue sync.Map
type ChatService struct {
message.UnimplementedChatServiceServer
}
// Chat
// param chatServer message.ChatService_ChatServer
// return error
// Servicio de chat, la lógica del servidor la procesaremos con múltiples goroutines
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, _ := metadata.FromIncomingContext(chatServer.Context())
from := md.Get("from")[0]
defer log.Println(from, "finaliza chat")
var chatErr error
chatCh := make(chan error)
// Crear dos goroutines, una para recibir mensajes y otra para enviar mensajes
chatTask := task.NewTask(func(err error) {
chatErr = err
})
// Goroutine para recibir mensajes
chatTask.AddJobs(func() {
for {
msg, err := chatServer.Recv()
log.Printf("receive %+v err %+v\n", msg, err)
if err != nil {
chatErr = err
chatCh <- err
break
}
value, ok := MessageQueue.Load(msg.To)
if !ok {
MessageQueue.Store(msg.To, []*message.Message{msg})
} else {
queue := value.([]*message.Message)
queue = append(queue, msg)
MessageQueue.Store(msg.To, queue)
}
}
})
// Goroutine para enviar mensajes
chatTask.AddJobs(func() {
Send:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-chatCh:
log.Println(from, "cierra envío")
break Send
default:
value, ok := MessageQueue.Load(from)
if !ok {
value = []*message.Message{}
MessageQueue.Store(from, value)
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue Send
}
msg := queue[0]
queue = queue[1:]
MessageQueue.Store(from, queue)
err := chatServer.Send(msg)
log.Printf("send %+v\n", msg)
if err != nil {
chatErr = err
break Send
}
}
}
})
chatTask.Run()
return chatErr
}En la lógica del cliente, se abren dos sub-goroutines para simular el proceso de chat entre dos personas. Cada sub-goroutine tiene dos sub-goroutines adicionales responsables de enviar y recibir mensajes (la lógica del cliente no garantiza que el orden de envío y recepción de mensajes entre las dos personas sea correcto, es solo un ejemplo simple de envío y recepción entre ambas partes):
package main
import (
"context"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"time"
)
var Client message.ChatServiceClient
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer dial.Close()
if err != nil {
log.Panicln(err)
}
Client = message.NewChatServiceClient(dial)
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
NewChat("jack", "mike", "Hola", "¿Tienes tiempo para jugar?", "Bueno")
})
chatTask.AddJobs(func() {
NewChat("mike", "jack", "Hola", "No", "No tengo tiempo, busca a alguien más")
})
chatTask.Run()
}
func NewChat(from string, to string, contents ...string) {
ctx := context.Background()
mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
chat, err := Client.Chat(mdCtx)
defer log.Println("end chat", from)
if err != nil {
log.Panicln(err)
}
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
for _, content := range contents {
time.Sleep(time.Second)
chat.Send(&message.Message{
From: from,
Content: content,
To: to,
})
}
// Mensajes enviados, cerrar conexión
time.Sleep(time.Second * 5)
chat.CloseSend()
})
// Goroutine para recibir mensajes
chatTask.AddJobs(func() {
for {
msg, err := chat.Recv()
log.Printf("receive %+v\n", msg)
if err != nil {
log.Println(err)
break
}
}
})
chatTask.Run()
}Normalmente, la salida del servidor es:
server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"Hola" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"Hola" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"Hola" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"Hola" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"¿Tienes tiempo para jugar?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"No" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"No" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"¿Tienes tiempo para jugar?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"Bueno" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"No tengo tiempo, busca a alguien más" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"Bueno" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"No tengo tiempo, busca a alguien más" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chatNormalmente, la salida del cliente es (se puede ver que el orden lógico de los mensajes está desordenado):
client 2023/07/19 17:26:24 receive from:"jack" content:"Hola" to:"mike"
client 2023/07/19 17:26:24 receive from:"mike" content:"Hola" to:"jack"
client 2023/07/19 17:26:25 receive from:"mike" content:"No" to:"jack"
client 2023/07/19 17:26:25 receive from:"jack" content:"¿Tienes tiempo para jugar?" to:"mike"
client 2023/07/19 17:26:26 receive from:"jack" content:"Bueno" to:"mike"
client 2023/07/19 17:26:26 receive from:"mike" content:"No tengo tiempo, busca a alguien más" to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mikeA través del ejemplo, se puede ver que la lógica de procesamiento de flujo bidireccional, tanto para el cliente como para el servidor, es más compleja que el flujo unidireccional. Se requiere combinar múltiples goroutines para abrir tareas asíncronas para manejar mejor la lógica.
metadata
Esencialmente, metadata es un map, cuyo valor es un slice de cadenas. Es similar al header de HTTP/1, y su papel en gRPC también es similar al header HTTP, proporcionando información sobre la llamada RPC actual. Al mismo tiempo, el ciclo de vida de metadata sigue toda la llamada RPC. Cuando la llamada termina, su ciclo de vida también termina.
En gRPC, se transmite y almacena principalmente a través de context. Sin embargo, gRPC proporciona el paquete metadata, que tiene muchas funciones convenientes para simplificar las operaciones, sin necesidad de operar manualmente context. El tipo correspondiente a metadata en gRPC es metadata.MD, como se muestra a continuación:
// MD es un mapeo de claves de metadata a valores. Los usuarios deben usar las siguientes
// dos funciones de conveniencia New y Pairs para generar MD.
type MD map[string][]stringPodemos usar directamente la función metadata.New para crear, pero antes de crear, hay algunos puntos a tener en cuenta:
func New(m map[string]string) MDmetadata tiene restricciones en los nombres de las claves, solo pueden ser caracteres limitados por las siguientes reglas:
- Caracteres ASCII
- Números: 0-9
- Letras minúsculas: a-z
- Letras mayúsculas: A-Z
- Caracteres especiales: - _
TIP
En metadata, las letras mayúsculas se convertirán a minúsculas, es decir, ocuparán la misma clave y el valor también se sobrescribirá.
TIP
Las claves que comienzan con grpc- son claves internas reservadas para uso de grpc. Si usa este tipo de claves, puede causar algunos errores.
Creación manual
Hay muchas formas de crear metadata. Aquí se presentan los dos métodos más comunes para crear metadata manualmente. El primero es usar la función metadata.New, pasando directamente un map:
func New(m map[string]string) MDmd := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})El segundo es metadata.Pairs, pasando un slice de cadenas de longitud par, que se analizará automáticamente en pares clave-valor:
func Pairs(kv ...string) MDmd := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")También puede usar metadata.Join para combinar múltiples metadata:
func Join(mds ...MD) MDmd1 := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)Uso en el servidor
Obtener metadata
El servidor puede usar la función metadata.FromIncomingContext para obtener metadata:
func FromIncomingContext(ctx context.Context) (MD, bool)Para RPC unario, el parámetro del servicio tendrá un parámetro context, del cual puede obtener metadata directamente:
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
md, b := metadata.FromIncomingContext(ctx)
...
}Para RPC de flujo, el parámetro del servicio tendrá un objeto de flujo, a través del cual puede obtener el context del flujo:
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, b := metadata.FromIncomingContext(chatServer.Context())
...
}Enviar metadata
Puede usar la función grpc.SendHeader para enviar metadata:
func SendHeader(ctx context.Context, md metadata.MD) errorEsta función se puede llamar como máximo una vez. No tendrá efecto si se usa después de que ocurran algunos eventos que causen el envío automático del header. En algunos casos, si no desea enviar el header directamente, puede usar la función grpc.SetHeader:
func SetHeader(ctx context.Context, md metadata.MD) errorSi esta función se llama varias veces, el metadata transmitido cada vez se combinará y se enviará al cliente en las siguientes situaciones:
- Cuando se llaman
gprc.SendHeaderyServerStream.SendHeader - Cuando el handler de RPC unario devuelve
- Cuando se llama al método
Stream.SendMsgdel objeto de flujo en RPC de flujo - Cuando el estado de la solicitud RPC se convierte en
send out. En este caso, la solicitud RPC tuvo éxito o hubo un error.
Para RPC de flujo, se recomienda usar los métodos SendHeader y SetHeader del objeto de flujo:
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
...
}TIP
Durante el uso, descubrirá que las funciones Header y Trailer son casi las mismas. Sin embargo, su principal diferencia radica en el momento del envío. Puede que no se sienta en RPC unario, pero esta diferencia es particularmente obvia en RPC de flujo, porque el Header en RPC de flujo se puede enviar sin esperar a que termine la solicitud. Como se mencionó anteriormente, el Header se enviará en ciertas situaciones específicas, mientras que Trailer solo se enviará después de que termine toda la solicitud RPC. Antes de esto, el trailer obtenido está vacío.
Uso en el cliente
Obtener metadata
El cliente puede obtener el header de respuesta a través de grpc.Header y grpc.Trailer:
func Header(md *metadata.MD) CallOptionfunc Trailer(md *metadata.MD) CallOptionSin embargo, tenga en cuenta que no se puede obtener directamente. Como se puede ver, los valores de retorno de las dos funciones anteriores son CallOption, es decir, se pasan como parámetros de opción al iniciar una solicitud RPC:
// Declarar md para recibir valores
var header, trailer metadata.MD
// Pasar option al llamar a la solicitud RPC
res, err := client.SomeRPC(
ctx,
data,
grpc.Header(&header),
grpc.Trailer(&trailer)
)Después de completar la solicitud, el valor se escribirá en el md pasado. Para RPC de flujo, se puede obtener directamente a través del objeto de flujo devuelto al iniciar la solicitud:
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
...
}stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()Enviar metadata
Es muy simple para el cliente enviar metadata. Como se mencionó anteriormente, la manifestación de metadata es valueContext. Combine metadata con context y luego pase el context al solicitar. El paquete metadata proporciona dos funciones para facilitar la construcción de context:
func NewOutgoingContext(ctx context.Context, md MD) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
// RPC unario
res,err := client.SomeRPC(outgoingContext,data)
// RPC de flujo
stream,err := client.StreamRPC(outgoingContext)Si el ctx original ya tiene metadata, usar NewOutgoingContext sobrescribirá los datos anteriores. Para evitar esta situación, puede usar la siguiente función, que no sobrescribe, sino que combina los datos:
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")
// RPC unario
res,err := client.SomeRPC(appendContext,data)
// RPC de flujo
stream,err := client.StreamRPC(appendContext)Interceptores
Los interceptores de gRPC son similares al Middleware en gin, ambos se utilizan para realizar algún trabajo especial antes o después de una solicitud sin afectar la lógica de negocio en sí. En gRPC, hay dos categorías principales de interceptores: interceptores del lado del servidor e interceptores del lado del cliente. Según el tipo de solicitud, hay interceptores de RPC unario e interceptores de RPC de flujo, como se muestra en la siguiente figura:

Para comprender mejor los interceptores, a continuación se describirá un ejemplo muy simple.
grpc_learn\interceptor
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| person.proto
|
+---person
| person.pb.go
| person_grpc.pb.go
|
\---server
main.goEl contenido de person.proto es el siguiente:
syntax = "proto3";
option go_package = ".;person";
import "google/protobuf/wrappers.proto";
message personInfo {
string name = 1;
int64 age = 2;
string address = 3;
}
service person {
rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}El código del servidor es el siguiente. La lógica es toda del contenido anterior, es bastante simple y no se explicará más:
package main
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"io"
"sync"
)
// Almacenar datos
var personData sync.Map
type PersonService struct {
person.UnimplementedPersonServer
}
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, person.PersonNotFoundErr
}
personInfo := value.(*person.PersonInfo)
return personInfo, nil
}
func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
count := 0
for {
personInfo, err := personStream.Recv()
if errors.Is(err, io.EOF) {
return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
} else if err != nil {
return err
}
personData.Store(personInfo.Name, personInfo)
count++
}
}Interceptores del servidor
Los interceptores de solicitudes RPC del servidor son UnaryServerInterceptor y StreamServerInterceptor. Los tipos específicos son los siguientes:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorRPC unario
Para crear un interceptor de RPC unario, solo necesita implementar el tipo UnaryServerInterceptor. A continuación se muestra un ejemplo simple de interceptor de RPC unario. La función es registrar cada solicitud y respuesta RPC:
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} datos de solicitud RPC
// param info *grpc.UnaryServerInfo información de solicitud de RPC unario actual
// param unaryHandler grpc.UnaryHandler handler específico
// return resp interface{} datos de respuesta RPC
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
resp, err = unaryHandler(ctx, req)
log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
return resp, err
}Para RPC unario, el interceptor intercepta cada solicitud y respuesta RPC, es decir, intercepta la fase de solicitud y la fase de respuesta de RPC. Si el interceptor devuelve error, la solicitud actual finalizará.
RPC de flujo
Para crear un interceptor de RPC de flujo, solo necesita implementar el tipo StreamServerInterceptor. A continuación se muestra un ejemplo simple de interceptor de RPC de flujo:
// StreamPersonLogInterceptor
// param srv interface{} corresponde al server implementado por el servidor
// param stream grpc.ServerStream objeto de flujo
// param info *grpc.StreamServerInfo información de flujo
// param streamHandler grpc.StreamHandler procesador
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
err := streamHandler(srv, stream)
log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
return err
}Para RPC de flujo, el interceptor intercepta el momento en que se llaman los métodos Send y Recv de cada objeto de flujo. Si el interceptor devuelve error, no causará el final de la solicitud RPC actual, solo representa que hubo un error en el send o recv actual.
Usar interceptores
Para que los interceptores creados surtan efecto, deben pasarse como option al crear el servidor gRPC. Oficialmente también se proporcionan funciones relacionadas para su uso. Como se muestra a continuación, hay funciones para agregar un solo interceptor y funciones para agregar interceptores en cadena:
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
func StreamInterceptor(i StreamServerInterceptor) ServerOption
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptionTIP
El uso repetido de UnaryInterceptor lanzará el siguiente panic:
panic: The unary server interceptor was already set and may not be reset.StreamInterceptor es lo mismo. Mientras que el uso repetido de interceptores en cadena los agregará a la misma cadena.
El ejemplo de uso es el siguiente:
package main
import (
"google.golang.org/grpc"
"grpc_learn/interceptor/person"
"log"
"net"
)
func main() {
log.SetPrefix("server ")
listen, err := net.Listen("tcp", "9090")
if err != nil {
log.Panicln(err)
}
server := grpc.NewServer(
// Agregar interceptores en cadena
grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
)
person.RegisterPersonServer(server, &PersonService{})
server.Serve(listen)
}Interceptores del cliente
Los interceptores del cliente son similares a los del servidor. Hay un interceptor unario UnaryClientInterceptor y un interceptor de flujo StreamClientInterceptor. Los tipos específicos son los siguientes:
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)RPC unario
Para crear un interceptor de cliente de RPC unario, implemente UnaryClientInterceptor. A continuación se muestra un ejemplo simple:
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string nombre del método
// param req interface{} datos de solicitud
// param reply interface{} datos de respuesta
// param cc *grpc.ClientConn objeto de conexión del cliente
// param invoker grpc.UnaryInvoker método específico del cliente interceptado
// param opts ...grpc.CallOption opciones de configuración de la solicitud actual
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
err := invoker(ctx, method, req, reply, cc, opts...)
log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
return err
}A través del interceptor de cliente de RPC unario, puede obtener los datos de solicitud y respuesta locales, así como otra información de solicitud.
RPC de flujo
Para crear un interceptor de cliente de RPC de flujo, implemente StreamClientInterceptor. A continuación se muestra un ejemplo:
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc información de descripción del objeto de flujo
// param cc *grpc.ClientConn objeto de conexión
// param method string nombre del método
// param streamer grpc.Streamer objeto utilizado para crear el objeto de flujo
// param opts ...grpc.CallOption opciones de configuración de conexión
// return grpc.ClientStream objeto de flujo del cliente creado
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return stream, err
}A través del interceptor de cliente de RPC de flujo, solo puede interceptar el momento en que el cliente establece una conexión con el servidor, es decir, el momento de crear el flujo. No puede interceptar cada vez que el objeto de flujo del cliente envía o recibe mensajes. Sin embargo, podemos envolver el objeto de flujo creado en el interceptor para lograr la interceptación de envío y recepción de mensajes, como se muestra a continuación:
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc información de descripción del objeto de flujo
// param cc *grpc.ClientConn objeto de conexión
// param method string nombre del método
// param streamer grpc.Streamer objeto utilizado para crear el objeto de flujo
// param opts ...grpc.CallOption opciones de configuración de conexión
// return grpc.ClientStream objeto de flujo del cliente creado
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}
type ClientStreamInterceptorWrapper struct {
method string
desc *grpc.StreamDesc
grpc.ClientStream
}
func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
// Antes de enviar el mensaje
err := c.ClientStream.SendMsg(m)
// Después de enviar el mensaje
log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
return err
}
func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
// Antes de recibir el mensaje
err := c.ClientStream.RecvMsg(m)
// Después de recibir el mensaje
log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
return err
}Usar interceptores
Al usarlos, es similar al servidor. También hay cuatro funciones de utilidad para agregar interceptores a través de option, divididos en interceptor único e interceptor en cadena:
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOptionTIP
El uso repetido de WithUnaryInterceptor en el cliente no lanzará panic, pero solo el último tendrá efecto.
A continuación se muestra un caso de uso:
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"log"
)
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}Hasta ahora, todo el caso ha sido escrito. Es hora de ejecutarlo para ver cómo es el resultado. La salida del servidor es la siguiente:
server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not foundLa salida del cliente es la siguiente:
C:\Users\Stranger\AppData\Local\Temp\GoLand\___go_build_grpc_learn_interceptor_client.exe
client 2023/07/20 17:27:57 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfotream req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfotream req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not foundSe puede ver que las salidas de ambos lados cumplen con las expectativas, logrando el efecto de interceptación. Este caso es solo un ejemplo muy simple. Usando los interceptores de gRPC, se pueden hacer muchas cosas como autorización, registro, monitoreo y otras funciones. Puede elegir hacer sus propias herramientas o usar herramientas existentes de la comunidad de código abierto. gRPC Ecosystem recopila una serie de middleware de interceptores de gRPC de código abierto. Dirección: grpc-ecosystem/go-grpc-middleware.
Manejo de errores
Antes de comenzar, veamos un ejemplo. En el caso anterior del interceptor, si el usuario no se encuentra, se devolverá un error al cliente person not found. Entonces surge la pregunta: ¿puede el cliente realizar un tratamiento especial según el error devuelto? A continuación, inténtelo. En el código del cliente, intente usar errors.Is para juzgar el error:
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
log.Println(info, err)
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}
}El resultado de salida es el siguiente:
client 2023/07/21 16:46:10 before create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream path: /person/createPersonInfotream name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfotream req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfotream req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not foundSe puede ver que el error recibido por el cliente es así. Se encontrará que el error devuelto por el servidor está en el campo desc:
rpc error: code = Unknown desc = person not foundNaturalmente, la lógica de errors.Is no se ejecutó. Incluso si se cambia a errors.As, el resultado es el mismo:
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}Para ello, gRPC proporciona el paquete status para resolver este tipo de problemas. Esta es también la razón por la cual el error recibido por el cliente tiene campos code y desc. Porque gRPC en realidad devuelve un Status al cliente. Su tipo específico es el siguiente, también es un message definido por protobuf:
type Status struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}message Status {
// El código de estado, que debería ser un valor enum de
// [google.rpc.Code][google.rpc.Code].
int32 code = 1;
// Un mensaje de error orientado al desarrollador, que debería estar en inglés. Cualquier
// mensaje de error orientado al usuario debería estar localizado y enviarse en el
// campo [google.rpc.Status.details][google.rpc.Status.details], o localizado
// por el cliente.
string message = 2;
// Una lista de mensajes que llevan los detalles del error. Hay un conjunto común de
// tipos de mensajes para que las APIs los usen.
repeated google.protobuf.Any details = 3;
}Códigos de error
El Code en la estructura Status es similar a Http Status, utilizado para indicar el estado de la solicitud rpc actual. gRPC define 16 códigos en grpc/codes, que cubren la mayoría de los escenarios, como se muestra a continuación:
// A Code is an unsigned 32-bit error code as defined in the gRPC spec.
type Code uint32
const (
// Llamada exitosa
OK Code = 0
// Solicitud cancelada
Canceled Code = 1
// Error desconocido
Unknown Code = 2
// Parámetro inválido
InvalidArgument Code = 3
// Solicitud agotada
DeadlineExceeded Code = 4
// Recurso no encontrado
NotFound Code = 5
// Ya existe el mismo recurso (sorprendido de que esto exista)
AlreadyExists Code = 6
// Permiso insuficiente, acceso denegado
PermissionDenied Code = 7
// Recursos agotados, la capacidad restante no es suficiente para usar, como espacio en disco insuficiente, etc.
ResourceExhausted Code = 8
// Condiciones de ejecución insuficientes, como usar rm para eliminar un directorio no vacío, la condición de eliminación es que el directorio esté vacío, pero la condición no se cumple
FailedPrecondition Code = 9
// Solicitud interrumpida
Aborted Code = 10
// Operación fuera del rango de acceso permitido
OutOfRange Code = 11
// Indica que el servicio actual no está implementado
Unimplemented Code = 12
// Error interno del sistema
Internal Code = 13
// Servicio no disponible
Unavailable Code = 14
// Pérdida de datos
DataLoss Code = 15
// No autenticado
Unauthenticated Code = 16
_maxCode = 17
)El paquete grpc/status proporciona muchas funciones para facilitar la conversión mutua entre status y error. Podemos usar directamente status.New para crear un Status, o Newf:
func New(c codes.Code, msg string) *Status
func Newf(c codes.Code, format string, a ...interface{}) *StatusPor ejemplo, el siguiente código:
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)A través del método err de status, puede obtener el error. Cuando el estado es ok, el error es nil:
func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{s: s}
}También puede crear error directamente:
func Err(c codes.Code, msg string) error
func Errorf(c codes.Code, format string, a ...interface{}) errorsuccess := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)Entonces podemos modificar el código del servidor de la siguiente manera:
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
}
personInfo := value.(*person.PersonInfo)
return personInfo, status.Errorf(codes.OK, "request success")
}Antes de esto, todos los códigos devueltos por el servidor eran unknown. Ahora, después de la modificación, tienen una semántica más clara. Entonces, en el cliente, puede usar status.FromError o usar las siguientes funciones para obtener status del error, y así realizar el procesamiento correspondiente según diferentes códigos:
func FromError(err error) (s *Status, ok bool)
func Convert(err error) *Status
func Code(err error) codes.CodeEl ejemplo es el siguiente:
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
...
}Sin embargo, aunque los códigos de grpc cubren tantos escenarios generales como sea posible, a veces aún no pueden satisfacer las necesidades de los desarrolladores. En este momento, puede usar el campo Details de Status, y es un slice que puede contener múltiples informaciones. Use Status.WithDetails para pasar alguna información personalizada:
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)Use Status.Details para obtener información:
func (s *Status) Details() []interface{}Tenga en cuenta que la información pasada debe ser definida por protobuf para que tanto el servidor como el cliente puedan analizarla fácilmente. Oficialmente se dan algunos ejemplos:
message ErrorInfo {
// La razón del error
string reason = 1;
// El sujeto que define el servicio
string domain = 2;
// Otra información
map<string, string> metadata = 3;
}
// Información de reintento
message RetryInfo {
// El intervalo de espera para la misma solicitud
google.protobuf.Duration retry_delay = 1;
}
// Información de depuración
message DebugInfo {
// Pila
repeated string stack_entries = 1;
// Algunos detalles
string detail = 2;
}
...
...Se pueden ver más ejemplos en googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com). Si es necesario, se puede introducir a través del siguiente código:
import "google.golang.org/genproto/googleapis/rpc/errdetails"Use ErrorInfo como details:
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
notFound.WithDetails(&errdetails.ErrorInfo{
Reason: "person not found",
Domain: "xxx",
Metadata: nil,
})En el cliente, puede obtener los datos y procesarlos. Sin embargo, los anteriores son solo algunos ejemplos recomendados por gRPC. Además de esto, también puede definir sus propios messages para satisfacer mejor las necesidades comerciales correspondientes. Si desea realizar algún procesamiento de errores unificado, también puede operarlo dentro del interceptor.
Control de tiempo de espera

En la mayoría de los casos, generalmente no hay solo un servicio, y puede haber muchos servicios aguas arriba y muchos servicios aguas abajo. El cliente inicia una solicitud, desde el servicio más aguas arriba hasta el más aguas abajo, formando una cadena de llamadas de servicio, como se muestra en la figura. Quizás sea incluso más larga que la de la figura.
En una cadena de llamadas tan larga, si la lógica de procesamiento de uno de los servicios tarda mucho tiempo, hará que el servicio aguas arriba esté siempre en estado de espera. Para reducir el desperdicio innecesario de recursos, es necesario introducir un mecanismo de tiempo de espera. De esta manera, el tiempo de espera pasado cuando la llamada más aguas arriba es el tiempo máximo permitido para toda la cadena de llamadas. Y gRPC puede transmitir el tiempo de espera entre procesos y lenguajes. Coloca algunos datos que deben transmitirse entre procesos en el marco HEADERS Frame de HTTP/2, como se muestra a continuación:

Los datos de tiempo de espera en la solicitud gRPC corresponden al campo grpc-timeout en el marco HEADERS Frame. Tenga en cuenta que no todas las bibliotecas de gRPC han implementado este mecanismo de transmisión de tiempo de espera. Sin embargo, gRPC-go definitivamente lo admite. Si usa bibliotecas en otros idiomas y utiliza esta característica, debe prestar atención adicional a esto.
Tiempo de espera de conexión
Cuando el cliente gRPC establece una conexión con el servidor, se establece de forma asíncrona de forma predeterminada. Si la conexión falla, solo se devolverá un Client vacío. Si desea que la conexión se establezca de forma sincrónica, puede usar grpc.WithBlock() para bloquear la espera hasta que se establezca la conexión:
dial, err := grpc.Dial("localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)Si desea controlar un tiempo de espera, solo necesita pasar un TimeoutContext. Use grpc.DialContext en lugar de gprc.Dial para pasar el context:
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)De esta manera, si la conexión se agota, se devolverá un error:
context deadline exceededEn el servidor, también se puede establecer un tiempo de espera de conexión. Al establecer un nuevo tiempo de espera con el cliente, el valor predeterminado es 120 segundos. Si no se establece una conexión exitosa dentro del tiempo especificado, el servidor cerrará activamente la conexión:
server := grpc.NewServer(
grpc.ConnectionTimeout(time.Second*3),
)TIP
grpc.ConnectionTimeout todavía está en fase experimental. La API puede modificarse o eliminarse en el futuro.
Tiempo de espera de solicitud
Cuando el cliente gRPC inicia una solicitud, el primer parámetro es de tipo Context. De la misma manera, si desea agregar un tiempo de espera a la solicitud RPC, solo necesita pasar un TimeoutContext:
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
// Procesamiento de lógica de tiempo de espera
}A través del procesamiento de gRPC, el tiempo de espera se transmite al servidor. Durante el proceso de transmisión, existe en forma de campo de marco. En Go, existe en forma de context, transmitiéndose así en toda la cadena. Durante el proceso de transmisión en la cadena, no se recomienda modificar el tiempo de espera. Cuánto tiempo de espera debe establecerse en la solicitud, esto debería ser considerado por el servicio más aguas arriba.
Autenticación y autorización
En el ámbito de los microservicios, cada servicio necesita verificar la identidad y los permisos del usuario para cada solicitud. Si es como una aplicación monolítica, donde cada servicio debe implementar su propio conjunto de lógica de autenticación, esto claramente no es realista. Por lo tanto, se necesita un servicio de autenticación y autorización unificado. Las soluciones comunes son OAuth2, sesión distribuida y JWT. Entre ellas, OAuth2 es la más utilizada y se ha convertido en un estándar de la industria. El tipo de token más comúnmente utilizado en OAuth2 es JWT. A continuación se muestra un diagrama de flujo del modo de código de autorización de OAuth2. El proceso básico se muestra en la figura:

Transmisión segura
Registro y descubrimimiento de servicios
Antes de que el cliente llame a un servicio específico del servidor, necesita conocer la IP y el puerto del servidor. En los casos anteriores, la dirección del servidor estaba escrita de forma estática. En un entorno de red real, no siempre es tan estable. Algunos servicios pueden fallar y no estar disponibles debido a fallos, o la dirección puede cambiar debido a la migración de máquinas para el desarrollo del negocio. En estos casos, no se puede acceder al servicio usando una dirección estática. Estos problemas dinámicos son los que el descubrimiento y registro de servicios deben resolver. El descubrimiento de servicios es responsable de monitorear los cambios en la dirección del servicio y actualizarlos. El registro de servicios es responsable de informar a los demás su dirección. En gRPC, se proporciona una función básica de descubrimiento de servicios, y admite extensión y personalización.
No se puede usar una dirección estática, pero se pueden usar algunos nombres específicos para reemplazarla. Por ejemplo, el navegador obtiene la dirección a través de la resolución DNS del nombre de dominio. De la misma manera, el descubrimiento de servicios predeterminado en gRPC se realiza a través de DNS. Modifique el archivo host local y agregue el siguiente mapeo:
127.0.0.1 example.grpc.comLuego cambie la dirección Dial en el ejemplo helloworld al nombre de dominio correspondiente:
func main() {
// Establecer conexión, sin verificación de cifrado
conn, err := grpc.Dial("example.grpc.com:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// Crear cliente
client := hello2.NewSayHelloClient(conn)
// Llamada remota
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}También se puede ver la salida normal:
2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"En gRPC, este tipo de nombres debe seguir la sintaxis URI definida en RFC 3986. El formato es:
hierarchical part
┌───────────────────┴─────────────────────┐
authority path
┌───────────────┴───────────────┐┌───┴────┐
abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
└┬┘ └───────┬───────┘ └────┬────┘ └┬┘ └─────────┬─────────┘ └──┬──┘
scheme user information host port query fragmentEl URI en el ejemplo anterior es de la siguiente forma. Dado que dns es compatible de forma predeterminada, se omite el prefijo scheme:
dns:example.grpc.com:8080Además, gRPC también admite Unix domain sockets de forma predeterminada. Para otros métodos, necesitamos implementar personalizaciones según las extensiones de gRPC. Para ello, necesitamos implementar un resolvedor personalizado resolver.Resolver. Resolver es responsable de monitorear las actualizaciones de la dirección de destino y la configuración del servicio:
type Resolver interface {
// gRPC llamará a ResolveNow para intentar analizar nuevamente el nombre de destino. Esto es solo una sugerencia. Si no es necesario, el resolvedor puede ignorarlo. El método puede llamarse concurrentemente.
ResolveNow(ResolveNowOptions)
Close()
}gRPC requiere que pasemos un constructor de Resolver, es decir, resolver.Builder, que es responsable de construir instancias de Resolver:
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}El método Scheme de Builder devuelve el tipo de Scheme que es responsable de analizar. Por ejemplo, el dnsBuilder predeterminado devuelve dns. El constructor debe registrarse en el Builder global usando resolver.Register durante la inicialización, o pasarse como options usando grpc.WithResolver internamente en ClientConn. Este último tiene prioridad sobre el primero.

La figura anterior describe brevemente el flujo de trabajo del resolver. A continuación se demostrará cómo personalizar el resolver.
Resolver de servicio personalizado
A continuación se escribe un resolvedor personalizado. Se necesita un constructor de resolvedor personalizado para construirlo:
package myresolver
import (
"fmt"
"google.golang.org/grpc/resolver"
)
func NewBuilder(ads map[string][]string) *MyBuilder {
return &MyBuilder{ads: ads}
}
type MyBuilder struct {
ads map[string][]string
}
func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Scheme != c.Scheme() {
return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
}
m := &MyResolver{ads: c.ads, t: target, cc: cc}
// Aquí es necesario updatestate, de lo contrario se producirá un bloqueo
m.start()
return m, nil
}
func (c *MyBuilder) Scheme() string {
return "hello"
}
type MyResolver struct {
t resolver.Target
cc resolver.ClientConn
ads map[string][]string
}
func (m *MyResolver) start() {
addres := make([]resolver.Address, 0)
for _, ad := range m.ads[m.t.URL.Opaque] {
addres = append(addres, resolver.Address{Addr: ad})
}
err := m.cc.UpdateState(resolver.State{
Addresses: addres,
// Configuración, loadBalancingPolicy se refiere a la estrategia de equilibrio de carga
ServiceConfig: m.cc.ParseServiceConfig(
`{"loadBalancingPolicy":"round_robin"}`),
})
if err != nil {
m.cc.ReportError(err)
}
}
func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (m *MyResolver) Close() {}El resolvedor personalizado pasa las direcciones coincidentes en el map a updatestate, y también especifica la estrategia de equilibrio de carga. round_robin se refiere a round-robin.
// La estructura de service config es la siguiente
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *internalserviceconfig.BalancerConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}El código del cliente es el siguiente:
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"grpc_learn/helloworld/client/myresolver"
hello2 "grpc_learn/helloworld/hello"
"log"
"time"
)
func init() {
// Registrar builder
resolver.Register(myresolver.NewBuilder(map[string][]string{
"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
}))
}
func main() {
// Establecer conexión, sin verificación de cifrado
conn, err := grpc.Dial("hello:myworld",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// Crear cliente
client := hello2.NewSayHelloClient(conn)
// Llamar una vez por segundo
for range time.Tick(time.Second) {
// Llamada remota
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}
}Normalmente, el flujo debería ser que el servidor se registre en el centro de registro y luego el cliente obtenga la lista de servicios del centro de registro y la coincida. Aquí, el map pasado es un centro de registro simulado. Los datos son estáticos, por lo que se omite el registro de servicios, quedando solo el descubrimiento de servicios. El target utilizado por el cliente es hello:myworld, donde hello es el scheme personalizado y myworld es el nombre del servicio. Después de ser analizado por el resolvedor personalizado, se obtiene la dirección real 127.0.0.1:8080. En la situación real, para hacer equilibrio de carga, un nombre de servicio puede coincidir con múltiples direcciones reales, por lo que es por eso que el nombre del servicio corresponde a un slice. Aquí se abren dos servidores, ocupando diferentes puertos. La estrategia de equilibrio de carga es round-robin. Las salidas del servidor son respectivamente las siguientes. Se puede ver por el tiempo de solicitud que la estrategia de equilibrio de carga realmente está funcionando. Si no se especifica la estrategia, solo se seleccionará el primer servicio de forma predeterminada.
// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"
// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"El centro de registro es básicamente una colección de mapeos entre nombres de registro de servicios y direcciones de servicio reales. Cualquier middleware de almacenamiento de datos puede satisfacer las condiciones. Incluso usar mysql como centro de registro no es imposible (aunque probablemente nadie lo haría). Generalmente, el centro de registro es un almacenamiento KV. Redis es una buena opción, pero si usamos redis como centro de registro, necesitaríamos implementar muchas lógicas nosotros mismos, como la comprobación de latidos del servicio, el cierre del servicio, la programación del servicio, etc., lo cual es bastante problemático. Aunque redis tiene ciertas aplicaciones en este aspecto, son pocas. Como dice el dicho, deja que los profesionales hagan el trabajo profesional. Hay muchos que son famosos en este aspecto: Zookeeper, Consul, Eureka, ETCD, Nacos, etc.
Puede visitar 注册中心对比和选型:Zookeeper、Eureka、Nacos、Consul 和 ETCD - 掘金 (juejin.cn) para conocer algunas diferencias entre estos middlewares.
Combinar con consul
Para el caso de uso combinado con consul, visite consul
