gRPC

원격 프로시저 호출 rpc 는 마이크로서비스에서 반드시 배워야 할 부분입니다. 학습 과정에서 다양한 rpc 프레임워크를 접하게 되지만, Go 분야에서는 거의 모든 rpc 프레임워크가 gRPC 를 기반으로 하며, 이는 클라우드 네이티브 분야의 기본 프로토콜이 되었습니다. 왜 gRPC 를 선택하는지에 대해 공식적으로는 다음과 같이 답변합니다:
gRPC 는 어떤 환경에서도 실행될 수 있는 현대적인 오픈소스 고성능 원격 프로시저 호출 (Remote Process Call, RPC) 프레임워크입니다. 플러그 가능한 로드 밸런싱, 추적, 헬스체크 및 인증 지원을 통해 데이터센터 내 및 데이터센터 간 서비스를 효과적으로 연결할 수 있습니다. 또한 장치, 모바일 애플리케이션 및 브라우저를 백엔드 서비스에 연결하는 라스트마일 분산 컴퓨팅에도 적합합니다.
공식 웹사이트: gRPC
공식 문서: Documentation | gRPC
gRPC 기술 튜토리얼: Basics tutorial | Go | gRPC
ProtocBuf 공식 웹사이트: Reference Guides | Protocol Buffers Documentation (protobuf.dev)
이것은 CNCF 재단의 오픈소스 프로젝트이기도 합니다. CNCF 는 CLOUD NATIVE COMPUTING FOUNDATION 의 약자입니다.

특징
간단한 서비스 정의
Protocol Buffers 를 사용하여 서비스를 정의합니다. 이는 강력한 바이너리 직렬화 도구 세트 및 언어입니다.
시작 및 확장이 매우 빠름
단 한 줄의 코드로 런타임 및 개발 환경을 설치할 수 있으며, 몇 초 만에 초당 수백만 개의 RPC 로 확장할 수 있습니다.
크로스랭귀어, 크로스플랫폼
다양한 플랫폼과 언어에 따라 클라이언트 및 서버 서비스 스텁을 자동으로 생성합니다.
양방향 스트리밍 및 통합 인증
HTTP/2 기반의 양방향 스트리밍 및 플러그 가능한 인증/인가
GRPC 는 언어와 무관하지만, 이 사이트의 내용은 대부분 Go 와 관련이 있으므로 이 문서에서는 주로 Go 를 사용하여 설명합니다. 이후에 사용되는 pb 컴파일러와 생성기는 다른 언어 사용자의 경우 Protobuf 공식 웹사이트에서 직접 찾을 수 있습니다. 편의를 위해 프로젝트 생성 과정은 생략합니다.
TIP
이 문서는 다음 문서들을 참고했습니다:
写给 go 开发者的 gRPC 教程 -protobuf 基础 - 掘金 (juejin.cn)
gRPC 中的 Metadata - 熊喵君的博客 | PANDAYCHEN
의존성 설치
먼저 Protocol Buffer 컴파일러를 다운로드합니다. 다운로드 주소: Releases · protocolbuffers/protobuf (github.com)

자신의 상황에 따라 시스템과 버전을 선택하면 되며, 다운로드 완료 후 bin 디렉토리를 환경 변수에 추가해야 합니다.
그런 다음 코드 생성기를 다운로드해야 합니다. 컴파일러는 proto 파일을 해당 언어의 직렬화 코드로 생성하고, 생성기는 비즈니스 코드를 생성하는 데 사용됩니다.
$ go install google.golang.org/protobuf/cmd/protoc-gen-go@latest
$ go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@latest빈 프로젝트를 생성합니다. 이름은 grpc_learn 으로 하고, 다음과 같은 의존성을 추가합니다.
$ go get google.golang.org/grpc마지막으로 버전이 실제로 설치되었는지 확인합니다.
$ protoc --version
libprotoc 23.4
$ protoc-gen-go --version
protoc-gen-go.exe v1.28.1
$ protoc-gen-go-grpc --version
protoc-gen-go-grpc 1.3.0Hello World
프로젝트 구조
다음은 Hello World 예제로 설명하겠습니다. 다음과 같은 프로젝트 구조를 생성합니다.
grpc_learn\helloworld
|
+---client
| main.go
|
+---hello
|
|
+---pb
| hello.proto
|
\---server
main.goprotobuf 파일 정의
pb/hello.proto 에 다음과 같은 내용을 작성합니다. 이것은 매우 간단한 예제이며, protoc 문법을 모르는 경우 관련 문서를 참조하세요.
syntax = "proto3";
// .는 출력 경로에 직접 생성함을 의미하며, hello 는 패키지 이름입니다.
option go_package = ".;hello";
// 요청
message HelloReq {
string name = 1;
}
// 응답
message HelloRep {
string msg = 1;
}
// 서비스 정의
service SayHello {
rpc Hello(HelloReq) returns (HelloRep) {}
}코드 생성
작성 완료 후 protoc 컴파일러를 사용하여 데이터 직렬화 관련 코드를 생성하고, 생성기를 사용하여 rpc 관련 코드를 생성합니다.
$ protoc -I ./pb \
--go_out=./hello ./pb/*.proto\
--go-grpc_out=./hello ./pb/*.proto이때 hello 폴더에 hello.pb.go 와 hello_grpc.pb.go 파일이 생성된 것을 확인할 수 있습니다. hello.pb.go 를 살펴보면 우리가 정의한 message 를 확인할 수 있습니다.
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// 정의된 필드
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
}
type HelloRep struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// 정의된 필드
Msg string `protobuf:"bytes,1,opt,name=msg,proto3" json:"msg,omitempty"`
}hello_grpc.pb.go 에서 우리가 정의한 서비스를 확인할 수 있습니다.
type SayHelloServer interface {
Hello(context.Context, *HelloReq) (*HelloRep, error)
mustEmbedUnimplementedSayHelloServer()
}
// 이후 우리가 서비스 인터페이스를 구현할 때 이 구조체를 임베드해야 하며, mustEmbedUnimplementedSayHelloServer 메서드를 구현할 필요가 없습니다.
type UnimplementedSayHelloServer struct {
}
// 기본적으로 nil 을 반환합니다.
func (UnimplementedSayHelloServer) Hello(context.Context, *HelloReq) (*HelloRep, error) {
return nil, status.Errorf(codes.Unimplemented, "method Hello not implemented")
}
// 인터페이스 제약
func (UnimplementedSayHelloServer) mustEmbedUnimplementedSayHelloServer() {}
type UnsafeSayHelloServer interface {
mustEmbedUnimplementedSayHelloServer()
}서버 측 작성
server/main.go 에 다음과 같은 코드를 작성합니다.
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
pb "grpc_learn/server/protoc"
"log"
"net"
)
type GrpcServer struct {
pb.UnimplementedSayHelloServer
}
func (g *GrpcServer) Hello(ctx context.Context, req *pb.HelloReq) (*pb.HelloRep, error) {
log.Printf("received grpc req: %+v", req.String())
return &pb.HelloRep{Msg: fmt.Sprintf("hello world! %s", req.Name)}, nil
}
func main() {
// 포트 리스닝
listen, err := net.Listen("tcp", ":8080")
if err != nil {
panic(err)
}
// gprc 서버 생성
server := grpc.NewServer()
// 서비스 등록
pb.RegisterSayHelloServer(server, &GrpcServer{})
// 실행
err = server.Serve(listen)
if err != nil {
panic(err)
}
}클라이언트 작성
client/main.go 에 다음과 같은 코드를 작성합니다.
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
pb "grpc_learn/server/protoc"
"log"
)
func main() {
// 연결 설정, 암호화 검증 없음
conn, err := grpc.Dial("localhost:8080", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
panic(err)
}
defer conn.Close()
// 클라이언트 생성
client := pb.NewSayHelloClient(conn)
// 원격 호출
helloRep, err := client.Hello(context.Background(), &pb.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}실행
서버를 먼저 실행하고, 그 다음 클라이언트를 실행합니다. 서버 출력은 다음과 같습니다.
2023/07/16 16:26:51 received grpc req: name:"client"클라이언트 출력은 다음과 같습니다.
2023/07/16 16:26:51 received grpc resp: msg:"hello world! client"이 예제에서 클라이언트는 연결을 설정한 후 원격 메서드를 호출할 때 로컬 메서드를 호출하는 것과 마찬가지로 직접 client 의 Hello 메서드에 접근하여 결과를 얻을 수 있습니다. 이것이 가장 간단한 GRPC 예제이며, 많은 오픈소스 프레임워크도 이 전체 프로세스를 캡슐화한 것입니다.
bufbuild
위의 예제에서는 명령어를 사용하여 직접 코드를 생성했지만, 나중에 플러그인이 많아지면 명령어가 매우 번거로워질 수 있습니다. 이때 protobuf 파일을 관리하는 도구를 사용할 수 있으며, 바로 bufbuild/buf 라는 오픈소스 관리 도구가 있습니다.
오픈소스 주소: bufbuild/buf: A new way of working with Protocol Buffers. (github.com)
문서 주소: Buf - Install the Buf CLI
특징
- BSR 관리
- Linter
- 코드 생성
- 포맷팅
- 의존성 관리
이 도구를 사용하면 protobuf 파일을 매우 편리하게 관리할 수 있습니다.
문서에는 매우 많은 설치 방법이 제공되므로 자신에게 맞는 방법을 선택할 수 있습니다. 로컬에 go 환경이 설치되어 있다면 go install 을 사용하여 직접 설치할 수 있습니다.
$ go install github.com/bufbuild/buf/cmd/buf@latest설치 완료 후 버전을 확인합니다.
$ buf --version
1.24.0helloworld/pb 폴더로 이동하여 다음 명령어를 실행하여 pb 파일을 관리할 module 을 생성합니다.
$ buf mod init
$ ls
buf.yaml hello.protobuf.yaml 파일의 기본 내용은 다음과 같습니다.
version: v1
breaking:
use:
- FILE
lint:
use:
- DEFAULT그런 다음 helloworld/ 디렉토리로 이동하여 buf.gen.yaml 을 생성하고 다음과 같은 내용을 작성합니다.
version: v1
plugins:
- plugin: go
out: hello
opt:
- plugin: go-grpc
out: hello
opt:그런 다음 명령어를 실행하여 코드를 생성합니다.
$ buf generate완료 후 생성된 파일을 확인할 수 있습니다. 물론 buf 의 기능은 이것뿐만이 아니며, 다른 기능은 문서에서 직접 학습할 수 있습니다.
스트리밍 RPC
gRPC 의 호출 방식에는 두 가지 범주가 있습니다. Unary RPC 와 Stream RPC 입니다. Hello World 의 예제는 전형적인 Unary RPC 입니다.

Unary RPC(또는 일반 RPC 라고 하는 것이 더 이해하기 쉬울 수 있습니다. unary 를 어떻게 번역해야 할지 모르겠습니다) 는 일반 http 와 마찬가지로 사용되며, 클라이언트가 요청하고 서버가 데이터를 반환하는一问一答 방식입니다. 반면 Stream RPC 의 요청과 응답은 스트리밍일 수 있으며, 아래 그림과 같습니다.

스트림 요청을 사용할 때는 한 번만 응답을 반환하며, 클라이언트는 스트림을 통해 여러 번 매개변수를 서버에 보낼 수 있습니다. 서버는 Unary RPC 와 같이 모든 매개변수를 받은 후에 처리할 필요가 없으며, 구체적인 처리 로직은 서버에서 결정할 수 있습니다. 일반적으로 클라이언트만 스트림 요청을 능동적으로 닫을 수 있으며, 일단 스트림이 닫히면 현재 RPC 요청도 종료됩니다.
스트림 응답을 사용할 때는 한 번만 매개변수를 보내며, 서버는 스트림을 통해 여러 번 데이터를 클라이언트에 보낼 수 있습니다. 클라이언트는 Unary RPC 와 같이 모든 데이터를 받은 후에 처리할 필요가 없으며, 구체적인 처리 로직은 클라이언트에서 직접 결정할 수 있습니다. 일반적인 요청에서는 서버만 스트림 응답을 능동적으로 닫을 수 있으며, 일단 스트림이 닫히면 현재 RPC 요청도 종료됩니다.
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (Message);
}응답만 스트리밍일 수도 있습니다 (Server-Streaming RPC).
service MessageService {
rpc getMessage(google.protobuf.StringValue) returns (stream Message);
}또는 요청과 응답 모두 스트리밍일 수도 있습니다 (Bi-directional-Streaming RPC).
service MessageService {
rpc getMessage(stream google.protobuf.StringValue) returns (stream Message);
}단방향 스트리밍
다음은 단방향 스트리밍 작업을 보여주는 예제입니다. 먼저 다음과 같은 프로젝트 구조를 생성합니다.
grpc_learn\server_client_stream
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.gomessage.proto 의 내용은 다음과 같습니다.
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service MessageService {
rpc receiveMessage(google.protobuf.StringValue) returns (stream Message);
rpc sendMessage(stream Message) returns (google.protobuf.Int64Value);
}buf 를 사용하여 코드를 생성합니다.
$ buf generate여기서는 메시지 서비스를 예제로演示합니다. receiveMessage 은 지정된 사용자 이름을 수신하며, 유형은 문자열이고 메시지 스트림을 반환합니다. sendMessage 는 메시지 스트림을 수신하고 성공적으로 전송된 메시지 수를 반환하며, 유형은 64 비트 정수입니다. 다음으로 server/message_service.go 를 생성하여 기본적으로 생성된 서비스를 직접 구현합니다.
package main
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
)
type MessageService struct {
message.UnimplementedMessageServiceServer
}
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
return status.Errorf(codes.Unimplemented, "method ReceiveMessage not implemented")
}
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
return status.Errorf(codes.Unimplemented, "method SendMessage not implemented")
}수신 메시지와发送 메시지의 매개변수에는 모두 스트림 래핑 인터페이스가 있음을 확인할 수 있습니다.
type MessageService_ReceiveMessageServer interface {
// 메시지发送
Send(*Message) error
grpc.ServerStream
}
type MessageService_SendMessageServer interface {
// 반환값发送 및 연결 종료
SendAndClose(*wrapperspb.StringValue) error
// 메시지 수신
Recv() (*Message, error)
grpc.ServerStream
}둘 다 gprc.ServerStream 인터페이스를 임베드합니다.
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
Context() context.Context
SendMsg(m interface{}) error
RecvMsg(m interface{}) error
}스트림 RPC 는 Unary RPC 와 달리 매개변수와 반환값이 함수 시그니처에 명확하게 반영되지 않습니다. 이러한 메서드는 처음에는 매개변수와 반환값의 유형을 알 수 없으며,传入된 Stream 유형을 호출하여 스트림 전송을 완료해야 합니다. 다음으로 서버 측의 구체적인 로직을 작성합니다. 서버 측 로직을 작성할 때는 sync.map 을 사용하여 메시지 큐를 시뮬레이션합니다. 클라이언트가 ReceiveMessage 요청을 할 때 서버는 스트림 응답을 통해 클라이언트가 원하는 메시지를 계속 반환하다가 타임아웃 후 요청을 끊습니다. 클라이언트가 SendMessage 를 요청할 때는 스트림 요청을 통해 메시지를 계속 보내고, 서버는 메시지를 계속 큐에 넣다가 클라이언트가 능동적으로 요청을 끊으면 클라이언트에게 메시지 전송 수를 반환합니다.
package main
import (
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"sync"
"time"
)
// 시뮬레이션된 메시지 큐
var messageQueue sync.Map
type MessageService struct {
message.UnimplementedMessageServiceServer
}
// ReceiveMessage
// param user *wrapperspb.StringValue
// param recvServer message.MessageService_ReceiveMessageServer
// return error
// 지정된 사용자의 메시지 수신
func (m *MessageService) ReceiveMessage(user *wrapperspb.StringValue, recvServer message.MessageService_ReceiveMessageServer) error {
timer := time.NewTimer(time.Second * 5)
for {
time.Sleep(time.Millisecond * 100)
select {
case <-timer.C:
log.Printf("5 초 동안 %s 의 메시지가 없으므로 연결을 종료합니다", user.GetValue())
return nil
default:
value, ok := messageQueue.Load(user.GetValue())
if !ok {
messageQueue.Store(user.GetValue(), []*message.Message{})
continue
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue
}
// 메시지 가져오기
msg := queue[0]
// 스트림 전송을 통해 클라이언트에게 메시지发送
err := recvServer.Send(msg)
log.Printf("receive %+v\n", msg)
if err != nil {
return err
}
queue = queue[1:]
messageQueue.Store(user.GetValue(), queue)
timer.Reset(time.Second * 5)
}
}
}
// SendMessage
// param sendServer message.MessageService_SendMessageServer
// return error
// 지정된 사용자에게 메시지发送
func (m *MessageService) SendMessage(sendServer message.MessageService_SendMessageServer) error {
count := 0
for {
// 클라이언트로부터 메시지 수신
msg, err := sendServer.Recv()
if errors.Is(err, io.EOF) {
return sendServer.SendAndClose(wrapperspb.Int64(int64(count)))
}
if err != nil {
return err
}
log.Printf("send %+v\n", msg)
value, ok := messageQueue.Load(msg.From)
if !ok {
messageQueue.Store(msg.From, []*message.Message{msg})
continue
}
queue := value.([]*message.Message)
queue = append(queue, msg)
// 메시지를 메시지 큐에 넣기
messageQueue.Store(msg.From, queue)
count++
}
}클라이언트는 두 개의 고루틴을 시작합니다. 하나의 고루틴은 메시지를发送하고, 다른 고루틴은 메시지를 수신합니다. 물론 동시에发送하고 수신할 수도 있습니다. 코드는 다음과 같습니다.
package main
import (
"context"
"errors"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/server_client_stream/message"
"io"
"log"
"time"
)
var Client message.MessageServiceClient
func main() {
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Panicln(err)
}
defer dial.Close()
Client = message.NewMessageServiceClient(dial)
log.SetPrefix("client\t")
msgTask := task.NewTask(func(err error) {
log.Panicln(err)
})
ctx := context.Background()
// 메시지 수신 요청
msgTask.AddJobs(func() {
receiveMessageStream, err := Client.ReceiveMessage(ctx, wrapperspb.String("jack"))
if err != nil {
log.Panicln(err)
}
for {
recv, err := receiveMessageStream.Recv()
if errors.Is(err, io.EOF) {
log.Println("메시지가 없어 연결을 종료합니다")
break
} else if err != nil {
break
}
log.Printf("receive %+v", recv)
}
})
msgTask.AddJobs(func() {
from := "jack"
to := "mike"
sendMessageStream, err := Client.SendMessage(ctx)
if err != nil {
log.Panicln(err)
}
msgs := []string{
"있나요",
"오후에 함께 게임할 시간 있나요",
"그럼 좋아요, 나중에 시간 될 때 함께 해요",
"이번 주말에는 괜찮을 것 같아요",
"그럼 그렇게 정해요",
}
for _, msg := range msgs {
time.Sleep(time.Second)
sendMessageStream.Send(&message.Message{
From: from,
Content: msg,
To: to,
})
}
// 메시지发送 완료 후 능동적으로 요청을 종료하고 반환값 가져오기
recv, err := sendMessageStream.CloseAndRecv()
if err != nil {
log.Println(err)
} else {
log.Printf("发送 완료, 총 %d 개의 메시지发送\n", recv.GetValue())
}
})
msgTask.Run()
}실행 후 서버 출력은 다음과 같습니다.
server 2023/07/18 16:28:24 send from:"jack" content:"있나요" to:"mike"
server 2023/07/18 16:28:24 receive from:"jack" content:"있나요" to:"mike"
server 2023/07/18 16:28:25 send from:"jack" content:"오후에 함께 게임할 시간 있나요" to:"mike"
server 2023/07/18 16:28:25 receive from:"jack" content:"오후에 함께 게임할 시간 있나요" to:"mike"
server 2023/07/18 16:28:26 send from:"jack" content:"그럼 좋아요, 나중에 시간 될 때 함께 해요" to:"mike"
server 2023/07/18 16:28:26 receive from:"jack" content:"그럼 좋아요, 나중에 시간 될 때 함께 해요" to:"mike"
server 2023/07/18 16:28:27 send from:"jack" content:"이번 주말에는 괜찮을 것 같아요" to:"mike"
server 2023/07/18 16:28:27 receive from:"jack" content:"이번 주말에는 괜찮을 것 같아요" to:"mike"
server 2023/07/18 16:28:28 send from:"jack" content:"그럼 그렇게 정해요" to:"mike"
server 2023/07/18 16:28:28 receive from:"jack" content:"그럼 그렇게 정해요" to:"mike"
server 2023/07/18 16:28:33 5 초 동안 jack 의 메시지가 없으므로 연결을 종료합니다클라이언트 출력은 다음과 같습니다.
client 2023/07/18 16:28:24 receive from:"jack" content:"있나요" to:"mike"
client 2023/07/18 16:28:25 receive from:"jack" content:"오후에 함께 게임할 시간 있나요" to:"mike"
client 2023/07/18 16:28:26 receive from:"jack" content:"그럼 좋아요, 나중에 시간 될 때 함께 해요" to:"mike"
client 2023/07/18 16:28:27 receive from:"jack" content:"이번 주말에는 괜찮을 것 같아요" to:"mike"
client 2023/07/18 16:28:28 发送 완료, 총 5 개의 메시지发送
client 2023/07/18 16:28:28 receive from:"jack" content:"그럼 그렇게 정해요" to:"mike"
client 2023/07/18 16:28:33 메시지가 없어 연결을 종료합니다이 예제를 통해 단방향 스트림 RPC 요청 처리는 클라이언트와 서버 모두 Unary RPC 보다 복잡하다는 것을 알 수 있습니다. 양방향 스트림 RPC 는 이보다 더 복잡합니다.
양방향 스트리밍
양방향 스트림 PRC 는 요청과 응답 모두 스트리밍이라는 점에서 위의 예제에서 두 서비스를 하나로 결합한 것과 같습니다. 스트림 RPC 의 경우 첫 번째 요청은 반드시 클라이언트에서 시작하며, 이후 클라이언트는 언제든지 스트림을 통해 요청 매개변수를 보낼 수 있고, 서버도 언제든지 스트림을 통해 데이터를 반환할 수 있습니다. 어느 쪽이든 스트림을 닫으면 현재 요청이 종료됩니다.
TIP
이후의 내용은 특별한 경우가 아니면 pb 코드 생성 및 rpc 클라이언트/서버 생성 단계의 코드 설명은 생략합니다.
먼저 다음과 같은 프로젝트 구조를 생성합니다.
bi_stream\
| buf.gen.yaml
|
+---client
| main.go
|
+---message
| message.pb.go
| message_grpc.pb.go
|
+---pb
| buf.yaml
| message.proto
|
\---server
main.go
message_service.gomessage.proto 의 내용은 다음과 같습니다.
syntax = "proto3";
option go_package = ".;message";
import "google/protobuf/wrappers.proto";
message Message {
string from = 1;
string content = 2;
string to = 3;
}
service ChatService {
rpc chat(stream Message) returns (stream Message);
}서버 측 로직에서는 연결을 설정한 후 두 개의 고루틴을 시작합니다. 하나는 메시지를 수신하고, 다른 하나는 메시지를发送합니다. 구체적인 처리 로직은 이전 예제와 유사하지만, 이번에는 타임아웃 판정 로직을 제거했습니다.
package main
import (
"github.com/dstgo/task"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"sync"
"time"
)
// MessageQueue 시뮬레이션된 메시지 큐
var MessageQueue sync.Map
type ChatService struct {
message.UnimplementedChatServiceServer
}
// Chat
// param chatServer message.ChatService_ChatServer
// return error
// 채팅 서비스, 서버 측 로직은 멀티 고루틴으로 처리합니다.
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, _ := metadata.FromIncomingContext(chatServer.Context())
from := md.Get("from")[0]
defer log.Println(from, "채팅 종료")
var chatErr error
chatCh := make(chan error)
// 두 개의 고루틴 생성, 하나는 메시지 수신, 하나는 메시지发送
chatTask := task.NewTask(func(err error) {
chatErr = err
})
// 메시지 수신 고루틴
chatTask.AddJobs(func() {
for {
msg, err := chatServer.Recv()
log.Printf("receive %+v err %+v\n", msg, err)
if err != nil {
chatErr = err
chatCh <- err
break
}
value, ok := MessageQueue.Load(msg.To)
if !ok {
MessageQueue.Store(msg.To, []*message.Message{msg})
} else {
queue := value.([]*message.Message)
queue = append(queue, msg)
MessageQueue.Store(msg.To, queue)
}
}
})
// 메시지发送 고루틴
chatTask.AddJobs(func() {
Send:
for {
time.Sleep(time.Millisecond * 100)
select {
case <-chatCh:
log.Println(from, "send close")
break Send
default:
value, ok := MessageQueue.Load(from)
if !ok {
value = []*message.Message{}
MessageQueue.Store(from, value)
}
queue := value.([]*message.Message)
if len(queue) < 1 {
continue Send
}
msg := queue[0]
queue = queue[1:]
MessageQueue.Store(from, queue)
err := chatServer.Send(msg)
log.Printf("send %+v\n", msg)
if err != nil {
chatErr = err
break Send
}
}
}
})
chatTask.Run()
return chatErr
}클라이언트 로직에서는 두 개의 자식 고루틴을 시작하여 두 사람의 채팅 과정을 시뮬레이션합니다. 두 자식 고루틴에는 각각 손자 고루틴이 두 개씩 있어 메시지를 주고받습니다 (클라이언트 로직에서는 두 사람의 채팅 메시지 송수신 순서가 올바르게 보장되지는 않으며, 단순히 양쪽发送과 수신의 예제일 뿐입니다).
package main
import (
"context"
"github.com/dstgo/task"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"grpc_learn/bi_stream/message"
"log"
"time"
)
var Client message.ChatServiceClient
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090", grpc.WithTransportCredentials(insecure.NewCredentials()))
defer dial.Close()
if err != nil {
log.Panicln(err)
}
Client = message.NewChatServiceClient(dial)
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
NewChat("jack", "mike", "안녕하세요", "함께 게임할 시간 있나요?", "알아요")
})
chatTask.AddJobs(func() {
NewChat("mike", "jack", "안녕하세요", "없어요", "시간 없어요, 다른 사람 찾아보세요")
})
chatTask.Run()
}
func NewChat(from string, to string, contents ...string) {
ctx := context.Background()
mdCtx := metadata.AppendToOutgoingContext(ctx, "from", from)
chat, err := Client.Chat(mdCtx)
defer log.Println("채팅 종료", from)
if err != nil {
log.Panicln(err)
}
chatTask := task.NewTask(func(err error) {
log.Panicln(err)
})
chatTask.AddJobs(func() {
for _, content := range contents {
time.Sleep(time.Second)
chat.Send(&message.Message{
From: from,
Content: content,
To: to,
})
}
// 메시지发送 완료 후 연결 종료
time.Sleep(time.Second * 5)
chat.CloseSend()
})
// 메시지 수신 고루틴
chatTask.AddJobs(func() {
for {
msg, err := chat.Recv()
log.Printf("receive %+v\n", msg)
if err != nil {
log.Println(err)
break
}
}
})
chatTask.Run()
}정상적인情况下 서버 출력은 다음과 같습니다.
server 2023/07/19 17:18:44 server listening on [::]:9090
server 2023/07/19 17:18:49 receive from:"mike" content:"안녕하세요" to:"jack" err <nil>
server 2023/07/19 17:18:49 receive from:"jack" content:"안녕하세요" to:"mike" err <nil>
server 2023/07/19 17:18:49 send from:"jack" content:"안녕하세요" to:"mike"
server 2023/07/19 17:18:49 send from:"mike" content:"안녕하세요" to:"jack"
server 2023/07/19 17:18:50 receive from:"jack" content:"함께 게임할 시간 있나요?" to:"mike" err <nil>
server 2023/07/19 17:18:50 receive from:"mike" content:"없어요" to:"jack" err <nil>
server 2023/07/19 17:18:50 send from:"mike" content:"없어요" to:"jack"
server 2023/07/19 17:18:50 send from:"jack" content:"함께 게임할 시간 있나요?" to:"mike"
server 2023/07/19 17:18:51 receive from:"jack" content:"알아요" to:"mike" err <nil>
server 2023/07/19 17:18:51 receive from:"mike" content:"시간 없어요, 다른 사람 찾아보세요" to:"jack" err <nil>
server 2023/07/19 17:18:51 send from:"jack" content:"알아요" to:"mike"
server 2023/07/19 17:18:51 send from:"mike" content:"시간 없어요, 다른 사람 찾아보세요" to:"jack"
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 receive <nil> err EOF
server 2023/07/19 17:18:56 jack close send
server 2023/07/19 17:18:56 jack end chat
server 2023/07/19 17:18:56 mike close send
server 2023/07/19 17:18:56 mike end chat정상적인情况下 클라이언트 출력은 다음과 같습니다 (메시지 순서 로직이 혼란스러운 것을 확인할 수 있습니다).
client 2023/07/19 17:26:24 receive from:"jack" content:"안녕하세요" to:"mike"
client 2023/07/19 17:26:24 receive from:"mike" content:"안녕하세요" to:"jack"
client 2023/07/19 17:26:25 receive from:"mike" content:"없어요" to:"jack"
client 2023/07/19 17:26:25 receive from:"jack" content:"함께 게임할 시간 있나요?" to:"mike"
client 2023/07/19 17:26:26 receive from:"jack" content:"알아요" to:"mike"
client 2023/07/19 17:26:26 receive from:"mike" content:"시간 없어요, 다른 사람 찾아보세요" to:"jack"
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat jack
client 2023/07/19 17:26:32 receive <nil>
client 2023/07/19 17:26:32 rpc error: code = Unknown desc = EOF
client 2023/07/19 17:26:32 end chat mike예제를 통해 양방향 스트림의 처리 로직은 클라이언트와 서버 모두 단방향 스트림보다 더 복잡하며, 멀티 고루틴을 결합하여 비동기 작업을 시작해야만 로직을 더 잘 처리할 수 있다는 것을 알 수 있습니다.
metadata
metadata 는 본질적으로 map 이며, value 는 문자열 슬라이스입니다. http1 의 header 와 유사하며, gRPC 에서의 역할도 http header 와 유사하여 이번 RPC 호출에 대한 정보를 제공하며, metadata 의 수명은 rpc 호출 전체 과정과 함께하며, 호출이 끝나면 수명도 종료됩니다.
gRPC 에서 metadata 는 주로 context 를 통해 전송 및 저장되지만, gRPC 는 metadata 패키지를 제공하여 수동으로 context 를 조작하지 않아도 되도록 많은 편리한 함수를 제공합니다. metadata 는 gRPC 에서 metadata.MD 유형에 해당하며, 다음과 같습니다.
// MD 는 metadata 키에서 값으로의 매핑입니다. 사용자는 다음 두 가지 편의 함수인 New 와 Pairs 를 사용하여 MD 를 생성해야 합니다.
type MD map[string][]stringmetadata.New 함수를 사용하여 직접 생성할 수 있지만, 생성 전에 몇 가지 주의할 점이 있습니다.
func New(m map[string]string) MDmetadata 는 키 이름에 제한이 있으며, 다음 규칙으로 제한된 문자만 사용할 수 있습니다:
- ASCII 문자
- 숫자: 0-9
- 소문자: a-z
- 대문자: A-Z
- 특수 문자: -_
TIP
metadata 에서 대문자는 소문자로 변환되며, 이는 동일한 key 를 차지하고 값도 덮어쓰게 됩니다.
TIP
grpc-로 시작하는 key 는 grpc 에서 내부적으로 사용하는 예약된 key 이며, 이러한 key 를 사용하면 일부 오류가 발생할 수 있습니다.
수동 생성
metadata 를 생성하는 방법에는 여러 가지가 있으며, 여기서는 수동으로 metadata 를 생성하는 가장 일반적인 두 가지 방법을 소개합니다. 첫 번째는 metadata.New 함수를 사용하여 직접 map 을传入하는 것입니다.
func New(m map[string]string) MDmd := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})두 번째는 metadata.Pairs 로, 짝수 길이의 문자열 슬라이스를传入하면 자동으로 키 - 값 쌍으로解析됩니다.
func Pairs(kv ...string) MDmd := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")metadata.Join 을 사용하여 여러 metadata 를 병합할 수도 있습니다.
func Join(mds ...MD) MDmd1 := metadata.New(map[string]string{
"key": "value",
"key1": "value1",
"key2": "value2",
})
md2 := metadata.Pairs("k", "v", "k1", "v1", "k2", "v2")
union := metadata.Join(md1,md2)서버 측 사용
metadata 가져오기
서버 측에서 metadata 를 가져올 때는 metadata.FromIncomingContext 함수를 사용할 수 있습니다.
func FromIncomingContext(ctx context.Context) (MD, bool)Unary RPC 의 경우 service 매개변수에 context 매개변수가 있으므로 직접 가져올 수 있습니다.
func (h *HelloWorld) Hello(ctx context.Context, name *wrapperspb.StringValue) (*wrapperspb.StringValue, error) {
md, b := metadata.FromIncomingContext(ctx)
...
}스트림 RPC 의 경우 service 매개변수에 스트림 객체가 있으며, 이를 통해 스트림의 context 를 가져올 수 있습니다.
func (m *ChatService) Chat(chatServer message.ChatService_ChatServer) error {
md, b := metadata.FromIncomingContext(chatServer.Context())
...
}metadata发送
metadata 를发送할 때는 grpc.SendHeader 함수를 사용할 수 있습니다.
func SendHeader(ctx context.Context, md metadata.MD) error이 함수는 최대 한 번만 호출할 수 있으며, header 가 자동으로发送되는 일부 이벤트가 발생한 후에는 효과가 없습니다. 일부 경우 header 를 직접发送하고 싶지 않을 때는 grpc.SetHeader 함수를 사용할 수 있습니다.
func SetHeader(ctx context.Context, md metadata.MD) error이 함수를 여러 번 호출하면每次传入된 metadata 가 병합되며, 다음 경우에 클라이언트에게发送됩니다.
gprc.SendHeader와ServerStream.SendHeader가 호출될 때- Unary RPC 의 handler 가 반환될 때
- 스트림 RPC 에서 스트림 객체의
Stream.SendMsg가 호출될 때 - rpc 요청의 상태가
send out로 변경될 때. 이 경우 rpc 요청이 성공했거나 오류가 발생한 것입니다.
스트림 RPC 의 경우 스트림 객체의 SendHeader 메서드와 SetHeader 메서드를 사용하는 것이 좋습니다.
type ServerStream interface {
SetHeader(metadata.MD) error
SendHeader(metadata.MD) error
SetTrailer(metadata.MD)
...
}TIP
사용 과정에서 Header 와 Trailer 의 기능이 거의 비슷하다는 것을 알 수 있지만, 주요 차이점은发送 시점에 있습니다. Unary RPC 에서는 이를 체감하기 어렵지만, 스트림 RPC 에서는 특히 두드러집니다. 스트림 RPC 에서 Header 는 요청이 끝나기 전에发送할 수 있기 때문입니다. 앞에서 언급했듯이 Header 는 특정 경우에发送되며, Trailer 는 RPC 요청이 완전히 끝난 후에만发送됩니다. 그 전에는 가져오는 trailer 가 비어 있습니다.
클라이언트 사용
metadata 가져오기
클라이언트에서 응답 header 를 가져오려면 grpc.Header 와 grpc.Trailer 를 사용할 수 있습니다.
func Header(md *metadata.MD) CallOptionfunc Trailer(md *metadata.MD) CallOption주의할 점은 직접 가져올 수 없다는 것입니다. 위의 두 함수의 반환값이 CallOption 이므로 RPC 요청을 할 때 option 매개변수로传入해야 합니다.
// 값을 수신하기 위한 md 선언
var header, trailer metadata.MD
// rpc 요청 시 option传入
res, err := client.SomeRPC(
ctx,
data,
grpc.Header(&header),
grpc.Trailer(&trailer)
)요청 완료 후 값이传入된 md 에 기록됩니다. 스트림 RPC 의 경우 요청 시 반환된 스트림 객체를 통해 직접 가져올 수 있습니다.
type ClientStream interface {
Header() (metadata.MD, error)
Trailer() metadata.MD
...
}stream, err := client.StreamRPC(ctx)
header, err := stream.Header()
trailer := Stream.Trailer()metadata发送
클라이언트에서 metadata 를发送하는 것은 매우 간단합니다. 앞서 언급했듯이 metadata 의 표현 형태는 valueContext 이므로 metadata 를 context 에 결합한 후 요청할 때 context 를传入하면 됩니다. metadata 패키지는 context 를 쉽게 생성할 수 있는 두 가지 함수를 제공합니다.
func NewOutgoingContext(ctx context.Context, md MD) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
// Unary rpc
res,err := client.SomeRPC(outgoingContext,data)
// 스트림 rpc
stream,err := client.StreamRPC(outgoingContext)기존 ctx 에 이미 metadata 가 있는 경우 NewOutgoingContext 를 사용하면 이전 데이터가 직접 덮어쓰이게 됩니다. 이러한 상황을 피하기 위해 다음 함수를 사용할 수 있습니다. 이 함수는 덮어쓰지 않고 데이터를 병합합니다.
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Contextmd := metadata.Pairs("k1", "v1")
ctx := context.Background()
outgoingContext := metadata.NewOutgoingContext(ctx, md)
appendContext := metadata.AppendToOutgoingContext(outgoingContext, "k2","v2")
// Unary rpc
res,err := client.SomeRPC(appendContext,data)
// 스트림 rpc
stream,err := client.StreamRPC(appendContext)인터셉터
gRPC 의 인터셉터는 gin 의 Middleware 와 유사하여 요청 전후에 특별한 작업을 수행하면서도 비즈니스 로직에 영향을 주지 않습니다. gRPC 에서 인터셉터에는 두 가지 범주가 있습니다: 서버 측 인터셉터와 클라이언트 측 인터셉터. 요청 유형에 따라 Unary RPC 인터셉터와 스트림 RPC 인터셉터로 나뉩니다. 아래 그림을 참조하세요.

인터셉터를 더 잘 이해하기 위해 매우 간단한 예제를 통해 설명하겠습니다.
grpc_learn\interceptor
| buf.gen.yaml
|
+---client
| main.go
|
+---pb
| buf.yaml
| person.proto
|
+---person
| person.pb.go
| person_grpc.pb.go
|
\---server
main.goperson.proto 의 내용은 다음과 같습니다.
syntax = "proto3";
option go_package = ".;person";
import "google/protobuf/wrappers.proto";
message personInfo {
string name = 1;
int64 age = 2;
string address = 3;
}
service person {
rpc getPersonInfo(google.protobuf.StringValue) returns (personInfo);
rpc createPersonInfo(stream personInfo) returns (google.protobuf.Int64Value);
}서버 측 코드는 다음과 같습니다. 로직은 이전 내용과 동일하여 간단하므로 더 이상 설명하지 않습니다.
package main
import (
"context"
"errors"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"io"
"sync"
)
// 데이터 저장
var personData sync.Map
type PersonService struct {
person.UnimplementedPersonServer
}
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, person.PersonNotFoundErr
}
personInfo := value.(*person.PersonInfo)
return personInfo, nil
}
func (p *PersonService) CreatePersonInfo(personStream person.Person_CreatePersonInfoServer) error {
count := 0
for {
personInfo, err := personStream.Recv()
if errors.Is(err, io.EOF) {
return personStream.SendAndClose(wrapperspb.Int64(int64(count)))
} else if err != nil {
return err
}
personData.Store(personInfo.Name, personInfo)
count++
}
}서버 측 인터셉트
서버 측 rpc 요청을 인터셉트하는 것은 UnaryServerInterceptor 와 StreamServerInterceptor 가 있으며, 구체적인 유형은 다음과 같습니다.
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
type StreamServerInterceptor func(srv interface{}, ss ServerStream, info *StreamServerInfo, handler StreamHandler) errorUnary RPC
Unary RPC 인터셉터를 생성하려면 UnaryServerInterceptor 유형을 구현하면 됩니다. 다음은 각 rpc 요청과 응답을 출력하는 간단한 Unary RPC 인터셉터 예제입니다.
// UnaryPersonLogInterceptor
// param ctx context.Context
// param req interface{} rpc 의 요청 데이터
// param info *grpc.UnaryServerInfo 이번 UnaryRPC 의 일부 요청 정보
// param unaryHandler grpc.UnaryHandler 구체적인 handler
// return resp interface{} rpc 의 응답 데이터
// return err error
func UnaryPersonLogInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, unaryHandler grpc.UnaryHandler) (resp interface{}, err error) {
log.Println(fmt.Sprintf("before unary rpc intercept path: %s req: %+v", info.FullMethod, req))
resp, err = unaryHandler(ctx, req)
log.Println(fmt.Sprintf("after unary rpc intercept path: %s resp: %+v err: %+v", info.FullMethod, resp, err))
return resp, err
}Unary RPC 의 경우 인터셉터는 모든 RPC 의 요청과 응답, 즉 RPC 의 요청 단계와 응답 단계를 인터셉트합니다. 인터셉터가 error 를 반환하면 이번 요청이 종료됩니다.
스트림 rpc
스트림 RPC 인터셉터를 생성하려면 StreamServerInterceptor 유형을 구현하면 됩니다. 다음은 간단한 스트림 RPC 인터셉터 예제입니다.
// StreamPersonLogInterceptor
// param srv interface{} 서버 측 구현에 해당하는 server
// param stream grpc.ServerStream 스트림 객체
// param info *grpc.StreamServerInfo 스트림 정보
// param streamHandler grpc.StreamHandler 프로세서
// return error
func StreamPersonLogInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, streamHandler grpc.StreamHandler) error {
log.Println(fmt.Sprintf("before stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t", info.FullMethod, srv, info.IsClientStream, info.IsServerStream))
err := streamHandler(srv, stream)
log.Println(fmt.Sprintf("after stream rpc interceptor path: %s srv: %+v clientStream: %t serverStream: %t err: %+v", info.FullMethod, srv, info.IsClientStream, info.IsServerStream, err))
return err
}스트림 RPC 의 경우 인터셉터는 모든 스트림 객체의 Send 와 Recv 메서드가 호출되는 시점을 인터셉트합니다. 인터셉터가 error 를 반환하더라도 이번 RPC 요청이 종료되는 것은 아니며, 단지 이번 send 또는 recv 에 오류가 발생했음을 의미할 뿐입니다.
인터셉터 사용
생성된 인터셉터를生效시키려면 gRPC 서버를 생성할 때 option 으로传入해야 합니다. 공식에서도 관련 함수를 제공합니다. 다음과 같이 단일 인터셉터를 추가하는 함수와 체인 인터셉터를 추가하는 함수가 있습니다.
func UnaryInterceptor(i UnaryServerInterceptor) ServerOption
func ChainUnaryInterceptor(interceptors ...UnaryServerInterceptor) ServerOption
func StreamInterceptor(i StreamServerInterceptor) ServerOption
func ChainStreamInterceptor(interceptors ...StreamServerInterceptor) ServerOptionTIP
UnaryInterceptor 를 반복해서 사용하면 다음과 같은 panic 이 발생합니다.
panic: The unary server interceptor was already set and may not be reset.StreamInterceptor 도 마찬가지이며, 체인 인터셉터를 반복해서 호출하면 동일한 체인에 append 됩니다.
사용 예제는 다음과 같습니다.
package main
import (
"google.golang.org/grpc"
"grpc_learn/interceptor/person"
"log"
"net"
)
func main() {
log.SetPrefix("server ")
listen, err := net.Listen("tcp", "9090")
if err != nil {
log.Panicln(err)
}
server := grpc.NewServer(
// 체인 인터셉터 추가
grpc.ChainUnaryInterceptor(UnaryPersonLogInterceptor),
grpc.ChainStreamInterceptor(StreamPersonLogInterceptor),
)
person.RegisterPersonServer(server, &PersonService{})
server.Serve(listen)
}클라이언트 인터셉트
클라이언트 인터셉터는 서버 측과 거의 비슷합니다. 하나의 Unary 인터셉터 UnaryClientInterceptor 와 하나의 스트림 인터셉터 StreamClientInterceptor 가 있으며, 구체적인 유형은 다음과 같습니다.
type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
type StreamClientInterceptor func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)Unary RPC
Unary RPC 클라이언트 인터셉터를 생성하려면 UnaryClientInterceptor 를 구현하면 됩니다. 다음은 간단한 예제입니다.
// UnaryPersonClientInterceptor
// param ctx context.Context
// param method string 메서드 이름
// param req interface{} 요청 데이터
// param reply interface{} 응답 데이터
// param cc *grpc.ClientConn 클라이언트 연결 객체
// param invoker grpc.UnaryInvoker 인터셉트될 구체적인 클라이언트 메서드
// param opts ...grpc.CallOption 이번 요청의 구성 항목
// return error
func UnaryPersonClientInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
log.Println(fmt.Sprintf("before unary request path: %s req: %+v", method, req))
err := invoker(ctx, method, req, reply, cc, opts...)
log.Println(fmt.Sprintf("after unary request path: %s req: %+v rep: %+v", method, req, reply))
return err
}클라이언트의 Unary RPC 인터셉터를 통해 로컬 요청의 요청 데이터와 응답 데이터 및 기타 요청 정보를 가져올 수 있습니다.
스트림 RPC
스트림 RPC 클라이언트 인터셉터를 생성하려면 StreamClientInterceptor 를 구현하면 됩니다. 다음은 예제입니다.
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 스트림 객체의 설명 정보
// param cc *grpc.ClientConn 연결 객체
// param method string 메서드 이름
// param streamer grpc.Streamer 스트림 객체를 생성하는 객체
// param opts ...grpc.CallOption 연결 구성 항목
// return grpc.ClientStream 생성된 클라이언트 스트림 객체
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return stream, err
}스트림 RPC 클라이언트 인터셉터를 통해 클라이언트와 서버가 연결을 설정할 때, 즉 스트림을 생성하는 시점만 인터셉트할 수 있으며, 클라이언트 스트림 객체가 메시지를 주고받을 때마다 인터셉트할 수는 없습니다. 하지만 인터셉터에서 생성된 스트림 객체를 래핑하면 메시지 송수신을 인터셉트할 수 있습니다. 아래와 같이 합니다.
// StreamPersonClientInterceptor
// param ctx context.Context
// param desc *grpc.StreamDesc 스트림 객체의 설명 정보
// param cc *grpc.ClientConn 연결 객체
// param method string 메서드 이름
// param streamer grpc.Streamer 스트림 객체를 생성하는 객체
// param opts ...grpc.CallOption 연결 구성 항목
// return grpc.ClientStream 생성된 클라이언트 스트림 객체
// return error
func StreamPersonClientInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
log.Println(fmt.Sprintf("before create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
stream, err := streamer(ctx, desc, cc, method, opts...)
log.Println(fmt.Sprintf("after create stream path: %s name: %+v client: %t server: %t", method, desc.StreamName, desc.ClientStreams, desc.ServerStreams))
return &ClientStreamInterceptorWrapper{method, desc, stream}, err
}
type ClientStreamInterceptorWrapper struct {
method string
desc *grpc.StreamDesc
grpc.ClientStream
}
func (c *ClientStreamInterceptorWrapper) SendMsg(m interface{}) error {
// 메시지发送 전
err := c.ClientStream.SendMsg(m)
// 메시지发送 후
log.Println(fmt.Sprintf("%s send %+v err: %+v", c.method, m, err))
return err
}
func (c *ClientStreamInterceptorWrapper) RecvMsg(m interface{}) error {
// 메시지 수신 전
err := c.ClientStream.RecvMsg(m)
// 메시지 수신 후
log.Println(fmt.Sprintf("%s recv %+v err: %+v", c.method, m, err))
return err
}인터셉터 사용
사용 시 서버 측과 마찬가지로 네 가지 도구 함수를 통해 option 으로 인터셉터를 추가하며, 단일 인터셉터와 체인 인터셉터로 나뉩니다.
func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption
func WithChainUnaryInterceptor(interceptors ...UnaryClientInterceptor) DialOption
func WithStreamInterceptor(f StreamClientInterceptor) DialOption
func WithChainStreamInterceptor(interceptors ...StreamClientInterceptor) DialOptionTIP
클라이언트에서 WithUnaryInterceptor 를 반복해서 사용해도 panic 이 발생하지 않지만, 마지막 것만生效합니다.
다음은 사용 사례입니다.
package main
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/types/known/wrapperspb"
"grpc_learn/interceptor/person"
"log"
)
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jack")))
log.Println(client.GetPersonInfo(ctx, wrapperspb.String("jenny")))
}지금까지 전체 사례가 작성되었으므로 실행하여 결과를 확인해 보겠습니다. 서버 출력은 다음과 같습니다.
server 2023/07/20 17:27:57 before stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false
server 2023/07/20 17:27:57 after stream rpc interceptor path: /person/createPersonInfo srv: &{UnimplementedPersonServer:{}} clientStream: true serverStream: false err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jack"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: name:"jack" age:18 address:"usa" err: <nil>
server 2023/07/20 17:27:57 before unary rpc intercept path: /person/getPersonInfo req: value:"jenny"
server 2023/07/20 17:27:57 after unary rpc intercept path: /person/getPersonInfo resp: <nil> err: person not found클라이언트 출력은 다음과 같습니다.
client 2023/07/20 17:27:57 before create stream path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 after create stream path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/20 17:27:57 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/20 17:27:57 value:2 <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jack"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jack" rep: name:"jack" age:18 address:"usa"
client 2023/07/20 17:27:57 name:"jack" age:18 address:"usa" <nil>
client 2023/07/20 17:27:57 before unary request path: /person/getPersonInfo req: value:"jenny"
client 2023/07/20 17:27:57 after unary request path: /person/getPersonInfo req: value:"jenny" rep:
client 2023/07/20 17:27:57 <nil> rpc error: code = Unknown desc = person not found양쪽 출력이 예상과 일치하여 인터셉트 효과를 발휘했음을 알 수 있습니다. 이 사례는 매우 간단한 예제이며, gRPC 의 인터셉터를 사용하면 인증, 로그, 모니터링 등 많은 작업을 수행할 수 있습니다. 직접 구현할 수도 있고, 오픈소스 커뮤니티의 기존 라이브러리를 사용할 수도 있습니다. gRPC Ecosystem 은 일련의 오픈소스 gRPC 인터셉터 미들웨어를 수집하며, 주소는 grpc-ecosystem/go-grpc-middleware 입니다.
오류 처리
시작하기 전에 먼저 예제를 하나 살펴보겠습니다. 이전 인터셉터 사례에서 사용자가 조회되지 않으면 클라이언트에 person not found 오류를 반환합니다. 여기서 질문이 생깁니다. 클라이언트가 반환된 오류를 기반으로 특별한 처리를 할 수 있을까요? 다음으로 시도해 보겠습니다. 클라이언트 코드에서 errors.Is 를 사용하여 오류를 판별해 봅니다.
func main() {
log.SetPrefix("client ")
dial, err := grpc.Dial("localhost:9090",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)
if err != nil {
log.Panicln(err)
}
ctx := context.Background()
client := person.NewPersonClient(dial)
personStream, err := client.CreatePersonInfo(ctx)
personStream.Send(&person.PersonInfo{
Name: "jack",
Age: 18,
Address: "usa",
})
personStream.Send(&person.PersonInfo{
Name: "mike",
Age: 20,
Address: "cn",
})
recv, err := personStream.CloseAndRecv()
log.Println(recv, err)
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
log.Println(info, err)
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}
}결과 출력은 다음과 같습니다.
client 2023/07/21 16:46:10 before create stream path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 after create stream path: /person/createPersonInfo name: createPersonInfo client: true server: false
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"jack" age:18 address:"usa" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo send name:"mike" age:20 address:"cn" err: <nil>
client 2023/07/21 16:46:10 /person/createPersonInfo recv value:2 err: <nil>
client 2023/07/21 16:46:10 value:2 <nil>
client 2023/07/21 16:46:10 before unary request path: /person/getPersonInfo req: value:"john"
client 2023/07/21 16:46:10 after unary request path: /person/getPersonInfo req: value:"john" rep:
client 2023/07/21 16:46:10 <nil> rpc error: code = Unknown desc = person not found클라이언트가 수신한 error 는 다음과 같으며, 서버가 반환한 error 가 desc 필드에 있음을 알 수 있습니다.
rpc error: code = Unknown desc = person not found따라서 errors.Is 로직이 실행되지 않으며, errors.As 로 바꿔도 동일한 결과가 나옵니다.
if errors.Is(err, person.PersonNotFoundErr) {
log.Println("person not found err")
}이를 위해 gRPC 는 status 패키지를 제공하여 이러한 문제를 해결합니다. 이것이 클라이언트가 수신한 오류에 code 와 desc 필드가 있는 이유입니다. gRPC 가 실제로 클라이언트에 반환하는 것은 Status 이며, 구체적인 유형은 다음과 같으며, 이 또한 protobuf 로 정의된 message 입니다.
type Status struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Code int32 `protobuf:"varint,1,opt,name=code,proto3" json:"code,omitempty"`
Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
Details []*anypb.Any `protobuf:"bytes,3,rep,name=details,proto3" json:"details,omitempty"`
}message Status {
// 상태 코드, [google.rpc.Code][google.rpc.Code] 의 enum 값이어야 합니다.
int32 code = 1;
// 개발자 지향 오류 메시지, 영어여야 합니다. 모든 사용자 지향 오류 메시지는 현지화되어
// [google.rpc.Status.details][google.rpc.Status.details] 필드로发送되거나
// 클라이언트에 의해 현지화되어야 합니다.
string message = 2;
// 오류 세부 정보를 담고 있는 메시지 목록. API 가 사용할 수 있는 공통 메시지 유형 집합이 있습니다.
repeated google.protobuf.Any details = 3;
}오류 코드
Status 구조체의 Code 는 Http Status 와 유사한 존재로, 현재 rpc 요청의 상태를 나타내는 데 사용됩니다. gRPC 는 grpc/codes 에 16 개의 code 를 정의하며, 대부분의 시나리오를 포괄합니다. 각각은 다음과 같습니다.
// Code 는 gRPC spec 에 정의된 부호 없는 32 비트 오류 코드입니다.
type Code uint32
const (
// 호출 성공
OK Code = 0
// 요청 취소됨
Canceled Code = 1
// 알 수 없는 오류
Unknown Code = 2
// 매개변수 불법
InvalidArgument Code = 3
// 요청 타임아웃
DeadlineExceeded Code = 4
// 리소스 존재하지 않음
NotFound Code = 5
// 이미 동일한 리소스 존재 (이것이 나올 수 있다는 것이 놀랍습니다)
AlreadyExists Code = 6
// 권한 부족으로 접근 거부됨
PermissionDenied Code = 7
// 리소스 고갈, 남은 용량이 부족하여 사용할 수 없음. 예를 들어 디스크 용량이 부족한 경우 등
ResourceExhausted Code = 8
// 실행 조건 부족. 예를 들어 rm 으로 비어 있지 않은 디렉토리를 삭제할 때, 삭제 조건은 디렉토리가 비어 있어야 하지만 조건을 만족하지 못하는 경우
FailedPrecondition Code = 9
// 요청 중단됨
Aborted Code = 10
// 작업访问이 제한 범위를 초과함
OutOfRange Code = 11
// 현재 서비스가 구현되지 않았음을 나타냄
Unimplemented Code = 12
// 시스템 내부 오류
Internal Code = 13
// 서비스 사용 불가
Unavailable Code = 14
// 데이터 손실
DataLoss Code = 15
// 인증 통과하지 못함
Unauthenticated Code = 16
_maxCode = 17
)grpc/status 패키지는 status 와 error 간의 상호 변환을 위해 많은 함수를 제공합니다. status.New 를 사용하여 Status 를 직접 생성하거나 Newf 를 사용할 수 있습니다.
func New(c codes.Code, msg string) *Status
func Newf(c codes.Code, format string, a ...interface{}) *Status예를 들어 다음과 같은 코드입니다.
success := status.New(codes.OK, "request success")
notFound := status.Newf(codes.NotFound, "person not found: %s", name)status 의 err 메서드를 통해 그 중 error 를 가져올 수 있으며, 상태가 ok 일 때 error 는 nil 입니다.
func (s *Status) Err() error {
if s.Code() == codes.OK {
return nil
}
return &Error{s: s}
}error 를 직접 생성할 수도 있습니다.
func Err(c codes.Code, msg string) error
func Errorf(c codes.Code, format string, a ...interface{}) errorsuccess := status.Error(codes.OK, "request success")
notFound := status.Errorf(codes.InvalidArgument, "person not found: %s", name)따라서 서버 코드를 다음과 같이 수정할 수 있습니다.
func (p *PersonService) GetPersonInfo(ctx context.Context, name *wrapperspb.StringValue) (*person.PersonInfo, error) {
value, ok := personData.Load(name.Value)
if !ok {
return nil, status.Errorf(codes.NotFound, "person not found: %s", name.String())
}
personInfo := value.(*person.PersonInfo)
return personInfo, status.Errorf(codes.OK, "request success")
}이전에는 서버가 반환한 모든 code 가 unknown 이었지만, 수정 후 더 명확한 의미를 갖게 되었습니다. 따라서 클라이언트에서는 status.FromError 를 사용하거나 다음 함수를 사용하여 error 에서 status 를 가져와 서로 다른 code 에 따라 해당하는 처리를 할 수 있습니다.
func FromError(err error) (s *Status, ok bool)
func Convert(err error) *Status
func Code(err error) codes.Code예제는 다음과 같습니다.
info, err := client.GetPersonInfo(ctx, wrapperspb.String("john"))
s, ok := status.FromError(err)
switch s.Code() {
case codes.OK:
case codes.InvalidArgument:
...
}grpc 의 code 가 가능한 한 일반적인 시나리오를 포괄하려고 했지만, 때로는 개발자의 요구를 충족시키지 못할 수 있습니다. 이때 Status 의 Details 필드를 사용할 수 있으며, 이는 슬라이스이므로 여러 정보를 수용할 수 있습니다. Status.WithDetails 를 통해 일부 사용자 정의 정보를传入할 수 있습니다.
func (s *Status) WithDetails(details ...proto.Message) (*Status, error)Status.Details 를 통해 정보를 가져옵니다.
func (s *Status) Details() []interface{}주의할 점은传入된 정보는 protobuf 로 정의된 것이 가장 좋으며, 이렇게 해야 서버와 클라이언트 양쪽에서 쉽게解析할 수 있습니다. 공식에서는 몇 가지 예제를 제공합니다.
message ErrorInfo {
// 오류의 원인
string reason = 1;
// 서비스를 정의하는 주체
string domain = 2;
// 기타 정보
map<string, string> metadata = 3;
}
// 재시도 정보
message RetryInfo {
// 동일한 요청의 대기 간격 시간
google.protobuf.Duration retry_delay = 1;
}
// 디버그 정보
message DebugInfo {
// 스택
repeated string stack_entries = 1;
// 일부 세부 정보
string detail = 2;
}
...
...더 많은 예제는 googleapis/google/rpc/error_details.proto at master · googleapis/googleapis (github.com) 에서 확인할 수 있습니다. 필요하면 다음 코드를 통해引入할 수 있습니다.
import "google.golang.org/genproto/googleapis/rpc/errdetails"ErrorInfo 를 details 로 사용합니다.
notFound := status.Newf(codes.NotFound, "person not found: %s", name)
notFound.WithDetails(&errdetails.ErrorInfo{
Reason: "person not found",
Domain: "xxx",
Metadata: nil,
})클라이언트에서 데이터를 가져와 처리할 수 있습니다. 위에서 언급한 것은 gRPC 가 권장하는 일부 예제일 뿐이며, 그 외에도 message 를 직접 정의하여 해당 비즈니스 요구를 더 잘 충족시킬 수 있습니다. 통일된 오류 처리를 하고 싶다면 인터셉터에서 작업할 수도 있습니다.
타임아웃 제어

대부분의 경우 보통 하나의 서비스만 있는 것은 아니며, 상류에 많은 서비스가 있고 하류에도 많은 서비스가 있을 수 있습니다. 클라이언트가 한 번 요청을 시작하면 가장 상류의 서비스에서 가장 하류까지 서비스 호출 체인이 형성되며, 그림과 같거나 그보다 더 길 수 있습니다.
이렇게 긴 호출 체인에서 하나의 서비스 로직 처리에 오랜 시간이 걸리면 상류가 계속 대기 상태가 됩니다. 불필요한 리소스 낭비를 줄이기 위해 타임아웃 메커니즘을 도입할 필요가 있습니다. 이렇게 하면 가장 상류 호출 시传入된 타임아웃 시간이 전체 호출 체인이 허용하는 최대 실행 시간이 됩니다. gRPC 는 프로세스 간 및 언어 간 타임아웃을 전달할 수 있으며, 일부 프로세스 간에 전달해야 하는 데이터를 HTTP2 의 HEADERS Frame 프레임에 넣습니다. 아래 그림과 같습니다.

gRPC 요청의 타임아웃 데이터는 HEADERS Frame 의 grpc-timeout 필드에 해당합니다. 주의할 점은 모든 gRPC 라이브러리가 이 타임아웃 전달 메커니즘을 구현한 것은 아니지만, gRPC-go는 확실히 지원합니다. 다른 언어의 라이브러리를 사용하고 이 기능을 사용했다면 이 점을 추가로 주의해야 합니다.
연결 타임아웃
gRPC 클라이언트가 서버에 연결을 설정할 때 기본적으로 비동기로 설정됩니다. 연결 설정에 실패하면 빈 Client 만 반환됩니다. 연결을 동기적으로 설정하려면 grpc.WithBlock() 을 사용하여 연결이 성공적으로 설정될 때까지 블럭할 수 있습니다.
dial, err := grpc.Dial("localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)타임아웃 시간을 제어하려면 TimeoutContext 를传入하면 되며, grpc.Dial 대신 grpc.DialContext 를 사용하여 context 를传入합니다.
timeout, cancelFunc := context.WithTimeout(context.Background(), time.Second)
defer cancelFunc()
dial, err := grpc.DialContext(timeout, "localhost:9091",
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithChainUnaryInterceptor(UnaryPersonClientInterceptor),
grpc.WithChainStreamInterceptor(StreamPersonClientInterceptor),
)이렇게 하면 연결 설정 타임아웃 시 error 가 반환됩니다.
context deadline exceeded서버 측에서도 연결 타임아웃을 설정할 수 있습니다. 클라이언트와 새 연결을 설정할 때 타임아웃 시간을 설정하며, 기본값은 120 초입니다. 규정 시간 내에 연결이 성공적으로 설정되지 않으면 서버가 능동적으로 연결을 끊습니다.
server := grpc.NewServer(
grpc.ConnectionTimeout(time.Second*3),
)TIP
grpc.ConnectionTimeout 는 아직 실험 단계이며, 미래에 API 가 수정되거나 삭제될 수 있습니다.
요청 타임아웃
gRPC 클라이언트가 요청을 시작할 때 첫 번째 매개변수는 Context 유형입니다. 마찬가지로 RPC 요청에 타임아웃 시간을 추가하려면 TimeoutContext 를传入하면 됩니다.
timeout, cancel := context.WithTimeout(ctx, time.Second*3)
defer cancel()
info, err := client.GetPersonInfo(timeout, wrapperspb.String("john"))
switch status.Code(err) {
case codes.DeadlineExceeded:
// 타임아웃 로직 처리
}gRPC 의 처리를 통해 타임아웃 시간이 서버로 전달되며, 전송 과정에서 프레임 필드 형태로 존재하며, Go 안에서는 context 형태로 존재하며, 이렇게 전체 링크에서 전달됩니다. 링크 전달 과정에서 타임아웃 시간을 수정하는 것은 권장되지 않습니다. 요청 시 얼마나 긴 타임아웃 시간을 설정할지는 가장 상류에서 고려해야 할 문제입니다.
인증/인가
마이크로서비스 분야에서 각 서비스는 요청에 대해 사용자 신원과 권한을 검증해야 합니다. 모노리식 애플리케이션과 같이 각 서비스가 자체 인증 로직을 구현하는 것은 현실적이지 않습니다. 따라서 통일된 인증 및 권한 부여 서비스가 필요하며, 일반적인 해결책은 OAuth2, 분산 세션 및 JWT 를 사용하는 것입니다. 이 중 OAuth2 가 가장 널리 사용되며 업계 표준이 되었습니다. OAuth2 에서 가장 일반적으로 사용되는 토큰 유형은 JWT 입니다. 아래는 OAuth2 인증 코드 모드의 흐름도로, 기본 프로세스는 그림과 같습니다.

보안 전송
서비스 등록 및 발견
클라이언트가 서버의 특정 서비스를 호출하기 전에 서버의 ip 와 port 를 알아야 합니다. 이전 사례에서는 서버 주소가 하드코딩되어 있었습니다. 실제 네트워크 환경에서는 항상 안정적이지 않으며, 일부 서비스는 장애로 인해 오프라인이 되어访问할 수 없거나, 비즈니스 발전으로 인해 머신 마이그레이션이 이루어져 주소가 변경될 수 있습니다. 이러한 경우 정적 주소를 사용하여 서비스에访问할 수 없으며, 이러한 동적 문제는 서비스 발견 및 등록이 해결해야 할 문제입니다. 서비스 발견은 서비스 주소의 변경을 모니터링하고 업데이트하는 것을 담당하며, 서비스 등록은 외부에 자신의 주소를 알리는 것을 담당합니다. gRPC 는 기본 서비스 발견 기능을 제공하며, 확장 및 사용자 정의도 지원합니다.
정적 주소를 사용할 수 없으므로 일부 특정 이름을 사용하여 대체할 수 있습니다. 예를 들어 브라우저는 DNS解析를 통해 도메인 이름에서 주소를 가져오는 것과 마찬가지로 gRPC 의 기본 서비스 발견은 DNS 를 통해 이루어집니다. 로컬 host 파일을 수정하고 다음 매핑을 추가합니다.
127.0.0.1 example.grpc.com그런 다음 helloworld 예제의 클라이언트 Dial 주소를 해당 도메인 이름으로 변경합니다.
func main() {
// 연결 설정, 암호화 검증 없음
conn, err := grpc.Dial("example.grpc.com:8080",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// 클라이언트 생성
client := hello2.NewSayHelloClient(conn)
// 원격 호출
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}역시 정상적인 출력을 확인할 수 있습니다.
2023/08/26 15:52:52 received grpc resp: msg:"hello world! client"gRPC 에서 이러한 이름은 RFC 3986 에 정의된 URI 문법을 준수해야 하며, 형식은 다음과 같습니다.
hierarchical part
┌───────────────────┴─────────────────────┐
authority path
┌───────────────┴───────────────┐┌───┴────┐
abc://username:password@example.com:123/path/data?key=value&key2=value2#fragid1
└┬┘ └───────┬───────┘ └────┬────┘ └┬┘ └─────────┬─────────┘ └──┬──┘
scheme user information host port query fragment위 예제의 URI 는 다음과 같은 형식이며, 기본적으로 dns 를 지원하므로 접두사 scheme 이 생략되었습니다.
dns:example.grpc.com:8080그 외에도 gRPC 는 기본적으로 Unix domain sockets 를 지원합니다. 다른 방식의 경우 gRPC 확장에 따라 사용자 정의 구현이 필요하며, 이를 위해 사용자 정의解析器 resolver.Resolver 를 구현해야 합니다. resolver 는 대상 주소와 서비스 구성의 업데이트를 모니터링하는 역할을 합니다.
type Resolver interface {
// gRPC 는 ResolveNow 를 호출하여 대상 이름의 재解析을 시도합니다. 이는 단지 힌트이며, 필요하지 않으면解析器가 이를 무시할 수 있으며, 이 메서드는 병렬로 호출될 수 있습니다.
ResolveNow(ResolveNowOptions)
Close()
}gRPC 는 Resolver 생성기, 즉 resolver.Builder 를传入하도록 요구합니다. 이는 Resolver 인스턴스를 생성하는 역할을 합니다.
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}Builder 의 Scheme 메서드는负责解析하는 Scheme 유형을 반환합니다. 예를 들어 기본 dnsBuilder 는 dns 를 반환합니다. 생성기는 초기화 시 resolver.Register 를 사용하여 전역 Builder 에 등록하거나, options 로 사용하여 grpc.WithResolver 를 통해 ClientConn 내부에传入해야 합니다. 후자의 우선순위가 전자보다 높습니다.

위 그림은 resolver 의 작업 흐름을 간단히 설명하며, 다음으로 사용자 정의 resolver 를演示합니다.
사용자 정의 서비스解析
다음은 사용자 정의解析器를 작성하며, 이를 생성하기 위한 사용자 정의解析생성기가 필요합니다.
package myresolver
import (
"fmt"
"google.golang.org/grpc/resolver"
)
func NewBuilder(ads map[string][]string) *MyBuilder {
return &MyBuilder{ads: ads}
}
type MyBuilder struct {
ads map[string][]string
}
func (c *MyBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
if target.URL.Scheme != c.Scheme() {
return nil, fmt.Errorf("unsupported scheme: %s", target.URL.Scheme)
}
m := &MyResolver{ads: c.ads, t: target, cc: cc}
// 여기서는 반드시 updatestate 해야 하며, 그렇지 않으면 데드락이 발생합니다.
m.start()
return m, nil
}
func (c *MyBuilder) Scheme() string {
return "hello"
}
type MyResolver struct {
t resolver.Target
cc resolver.ClientConn
ads map[string][]string
}
func (m *MyResolver) start() {
addres := make([]resolver.Address, 0)
for _, ad := range m.ads[m.t.URL.Opaque] {
addres = append(addres, resolver.Address{Addr: ad})
}
err := m.cc.UpdateState(resolver.State{
Addresses: addres,
// 구성, loadBalancingPolicy 는 로드밸런싱 전략을 의미합니다.
ServiceConfig: m.cc.ParseServiceConfig(
`{"loadBalancingPolicy":"round_robin"}`),
})
if err != nil {
m.cc.ReportError(err)
}
}
func (m *MyResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
func (m *MyResolver) Close() {}사용자 정의解析器는 map 에서 일치하는 주소를 updatestate 에传入하며, 동시에 로드밸런싱 전략을 지정합니다. round_robin은 라운드 로빈을 의미합니다.
// service config 구조는 다음과 같습니다.
type jsonSC struct {
LoadBalancingPolicy *string
LoadBalancingConfig *internalserviceconfig.BalancerConfig
MethodConfig *[]jsonMC
RetryThrottling *retryThrottlingPolicy
HealthCheckConfig *healthCheckConfig
}클라이언트 코드는 다음과 같습니다.
package main
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/resolver"
"grpc_learn/helloworld/client/myresolver"
hello2 "grpc_learn/helloworld/hello"
"log"
"time"
)
func init() {
// builder 등록
resolver.Register(myresolver.NewBuilder(map[string][]string{
"myworld": {"127.0.0.1:8080", "127.0.0.1:8081"},
}))
}
func main() {
// 연결 설정, 암호화 검증 없음
conn, err := grpc.Dial("hello:myworld",
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
panic(err)
}
defer conn.Close()
// 클라이언트 생성
client := hello2.NewSayHelloClient(conn)
// 초당 한 번 호출
for range time.Tick(time.Second) {
// 원격 호출
helloRep, err := client.Hello(context.Background(), &hello2.HelloReq{Name: "client"})
if err != nil {
panic(err)
}
log.Printf("received grpc resp: %+v", helloRep.String())
}
}정상적인 경우 프로세스는 서버가 레지스트리에 자체 서비스를 등록하고, 클라이언트가 레지스트리에서 서비스 목록을 가져와 매칭하는 것입니다. 여기에서传入된 map 은 시뮬레이션된 레지스트리이며, 데이터가 정적이므로 서비스 등록 단계는 생략되고 서비스 발견만 남습니다. 클라이언트에서 사용하는 target 은 hello:myworld이며, hello 는 사용자 정의 scheme 이고 myworld 는 서비스 이름입니다. 사용자 정의解析器를 통해解析한 후 127.0.0.1:8080 의 실제 주소를 얻습니다. 실제 상황에서는 로드밸런싱을 위해 하나의 서비스 이름이 여러 실제 주소와 매칭될 수 있으므로 서비스 이름이 슬라이스에 해당하는 이유입니다. 여기서는 두 개의 서버를 시작하여 서로 다른 포트를 사용하며, 로드밸런싱 전략은 라운드 로빈입니다. 서버 출력은 다음과 같으며, 요청 시간을 통해 로드밸런싱 전략이 실제로 작동하고 있음을 알 수 있습니다. 전략을 지정하지 않으면 기본적으로 첫 번째 서비스만 선택합니다.
// server01
2023/08/29 17:32:21 received grpc req: name:"client"
2023/08/29 17:32:23 received grpc req: name:"client"
2023/08/29 17:32:25 received grpc req: name:"client"
2023/08/29 17:32:27 received grpc req: name:"client"
2023/08/29 17:32:29 received grpc req: name:"client"
// server02
2023/08/29 17:32:20 received grpc req: name:"client"
2023/08/29 17:32:22 received grpc req: name:"client"
2023/08/29 17:32:24 received grpc req: name:"client"
2023/08/29 17:32:26 received grpc req: name:"client"
2023/08/29 17:32:28 received grpc req: name:"client"레지스트리는 서비스 등록 이름과 실제 서비스 주소의 매핑 컬렉션을 저장하는 곳이며, 데이터 저장이 가능한 미들웨어라면 모두 조건을 충족합니다. 심지어 mysql 을 레지스트리로 사용하는 것도 불가능하지는 않습니다 (이렇게 하는 사람은 없을 것입니다). 일반적으로 레지스트리는 KV 저장 방식이며, redis 는 매우 좋은 선택입니다. 하지만 redis 를 레지스트리로 사용하려면 서비스 하트비트 검사, 서비스 오프라인 등 많은 로직을 직접 구현해야 하므로 상당히 번거롭습니다. redis 가 이 분야에서 일정하게 적용되기는 하지만 적습니다. 전문적인 일은 전문가에게 맡기는 것이 좋습니다. 이 분야에서 잘 알려진 것들은 많습니다: Zookeeper, Consul, Eureka, ETCD, Nacos 등입니다.
등록 센터 비교 및 선택: Zookeeper, Eureka, Nacos, Consul 및 ETCD - 掘金 (juejin.cn) 에서 이 몇 가지 미들웨어의 차이점을了解할 수 있습니다.
consul 과 결합
consul 을 사용한 사례는 [consul](community/mirco/consul.md#서비스 - 등록 - 및 - 디스커버리) 에서 확인할 수 있습니다.
