Skip to content

select

select 는 여러 채널 상태를 동시에 모니터링할 수 있는 구조로, 문법은 switch 와 유사합니다.

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

이 코드는 context, 채널, select 세 가지를 결합하여 프로그램이 원활하게 종료되는 간단한 로직을 구현합니다. 코드에서 select 는 ctx.Donefinished 두 채널을 동시에 모니터링하며, 종료 조건은 두 가지입니다. 첫째는 운영체제가 종료 신호를 보내는 것이고, 둘째는 finished 채널에서 읽을 메시지가 있는 경우, 즉 사용자 코드 작업이 완료된 경우입니다. 이를 통해 프로그램 종료 시 정리 작업을 수행할 수 있습니다.

select 는 두 가지 매우 중요한 특성을 가지고 있습니다. 첫째는 논블록으로, 채널 송수신 소스 코드에서 select 에 대한 일부 처리를 볼 수 있으며, 논블록 상태에서 채널 사용 가능 여부를 판단할 수 있습니다. 둘째는 무작위화로, 사용 가능한 채널이 여러 개 있으면 무작위로 하나를 선택하여 실행합니다. 정해진 순서를 따르지 않으면 각 채널이 상대적으로 공정하게 실행될 수 있으며, 그렇지 않으면 극단적인 상황에서 일부 채널은 영원히 처리되지 않을 수 있습니다. select 의 작업은 모두 채널과 관련되므로, 먼저 chan 문서를 읽은 후 select 를 이해하는 것이 좋습니다.

구조

런타임에는 runtime.scase 구조체 하나만 있어 select 의 분기를 나타내며, 각 case의 런타임 표현는 scase입니다.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

여기서 c는 채널을 의미하고, elem은 수신 또는 송신 요소의 포인터입니다. 실제로 select 키워드는 runtime.selectgo 함수를 의미합니다.

원리

select 의 사용 방식은 go 가 네 가지 경우로 나누어 최적화하며, 이는 cmd/compile/internal/walk.walkSelectCases 함수에서 이 네 가지 경우의 처리 로직을 볼 수 있습니다.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimization: zero-case select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimization: one-case select: single op.
  if ncas == 1 {
    ...
  }

  // optimization: two-case select but one is default: single non-blocking op.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

최적화

컴파일러는 앞의 세 가지 경우에 대해 최적화를 수행합니다. 첫 번째 경우는 case 수가 0 일 때, 즉 빈 select 입니다. 빈 select 문은 현재 고루틴이 영구적으로 블록되도록 만듭니다.

go
select{}

블록되는 이유는 컴파일러가 이를 runtime.block 함수의 직접 호출로 변환하기 때문입니다.

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

block 함수는 runtime.gopark 함수를 호출하여 현재 고루틴을 _Gwaitting 상태로 만들고 영구 블록에 들어가 다시는 스케줄링되지 않도록 합니다.

두 번째 경우는 case 가 하나이고 default 가 아닌 경우입니다. 이 경우 컴파일러는 이를 채널의 송수신 작업으로 직접 변환하며, 블록 방식입니다. 예를 들어 다음 코드와 같습니다.

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

이는 runtime.chanrecv1 함수의 직접 호출로 변환됩니다. 어셈블리 코드에서 확인할 수 있습니다.

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

case 가 하나인 경우 채널에 데이터를 보내는 것도 같은 원리로, runtime.chansend1 함수의 직접 호출로 변환되며 역시 블록 방식입니다.

세 번째 경우는 case 가 두 개이고 그중 하나가 default 인 경우입니다.

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

이 경우 runtime.selectnbsend 호출에 대한 if 문으로 변환됩니다.

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

채널 데이터를 수신하는 경우 runtime.selectnbrecv 호출로 변환됩니다.

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

주의할 점은 이 경우 채널 수신 또는 송신이 논블록 방식이라는 것입니다. block 매개변수가 false인 것을 명확하게 볼 수 있습니다.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

채널 데이터 송수신 모두 blockfalse일 때 잠금 없이도 송신 또는 수신 가능한지 판단할 수 있는 빠른 경로가 있습니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

채널 읽기 시 채널이 비어 있으면 직접 반환하고, 채널 쓰기 시 채널이 닫히지 않고 이미 가득 차 있으면 직접 반환합니다. 일반적인 경우 이들은 고루틴 블록을 유발하지만 select 와 결합하여 사용하면 그렇지 않습니다.

처리

위의 세 가지 경우는 특수한 경우에 대한 최적화이며, 일반적으로 사용되는 select 키워드는 runtime.selectgo 함수 호출로 변환됩니다. 처리 로직은 400 여 줄에 달합니다.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

컴파일러는 모든 case 문을 수집하여 scase 배열로 만든 후 selectgo 함수에 전달하며, 처리 완료 후 두 개의 반환값을 반환합니다.

  1. 첫 번째는 무작위로 선택된 채널 인덱스로, 어떤 채널이 처리되었는지를 나타내며, 없으면 -1 을 반환합니다.
  2. 두 번째는 채널 읽기 작업이 성공적으로 읽었는지 여부를 나타냅니다.

여기서 매개변수를 간단히 설명합니다.

  • cas0, scase 배열의 헤드 포인터로, 전반부는 쓰기 채널 case 를 저장하고 후반부는 읽기 채널 case 를 저장하며, nsends로 구분합니다.
  • order0, 길이는 scase 배열의 두 배로, 전반부는 pollorder 배열에 할당되고 후반부는 lockorder 배열에 할당됩니다.
  • nsendsnrecvs는 읽기/쓰기 채널 case 의 수를 나타내며, 둘의 합이 case 의 총 수입니다.
  • block은 블록 여부를 나타내며, default case 가 있으면 논블록을 의미하고 값은 false이며, 그렇지 않으면 true입니다.
  • pc0, [ncases]uintptr 배열의 헤드를 가리키며, 경합 분석에 사용되며 나중에 무시할 수 있습니다. select 이해에는 도움이 되지 않습니다.

다음 코드가 있다고 가정합니다.

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

어셈블리 형태를 확인하면 이해를 돕기 위해 일부 코드를 생략했습니다.

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // 1 2 3 4 몇 개의 임시 변수
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // runtime.selectgo 함수 호출
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // default 분기로 이동
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // 분기 4 로 이동
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // 분기 3 으로 이동
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // 분기 2 로 이동
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

selectgo 함수 호출 후 판단 + 점프 로직이 존재하는 것을 볼 수 있습니다. 이를 통해 원래 모습을 추론할 수 있습니다.

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

컴파일러가 생성한 실제 코드는 이와 다를 수 있지만 대략적인 의미는 비슷합니다. 따라서 컴파일러는 selectgo 함수 호출 후 if 문을 사용하여 어떤 채널이 실행될지 판단하며, 호출 전에 컴파일러는 scase 배열을 수집하는 for 루프를 생성하지만 여기서는 생략했습니다.

selectgo 함수 사용 방법을 안 후, 이제 selectgo 함수 내부가 어떻게 작동하는지 알아보겠습니다. 먼저 몇 개의 배열을 초기화하며, nsends+nrecvs는 case 의 총 수를 나타냅니다. 아래 코드에서 case 수의 최댓값이 1 << 16임을 알 수 있습니다. pollorder는 채널 실행 순서를 결정하고, lockorder는 채널 잠금 순서를 결정합니다.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 길이는 scase 배열의 두 배로, 전반부는 pollorder 배열에, 후반부는 lockorder 배열에 할당됩니다.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

다음으로 pollorder 배열을 초기화하며, 대기 중인 채널의 sacses 배열 인덱스를 저장합니다.

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

이는 전체 scases 배열을 순회하고, runtime.fastrandn을 통해 [0, i] 사이의 난수를 생성한 후 이를 i와 교환합니다. 과정에서 채널이 nil인 case 를 건너뛰며, 순회 완료 후 요소가 뒤섞인 pollorder 배열을 얻습니다.

그런 다음 pollorder 배열을 채널 주소 크기에 따라 힙 정렬하여 lockorder 배열을 얻고, runtime.sellock을 호출하여 순서대로 잠급니다.

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

여기서 주의할 점은 채널을 주소 크기로 정렬하는 것은 데드락을 방지하기 위함입니다. select 작업 자체는 잠금이 병렬성을 허용하지 않기 때문입니다. pollorder의 무작위 순서로 잠근다고 가정하고 다음 코드의 경우를 고려해 보십시오.

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

세 고루틴 ABC 모두 잠금 단계에 도달했으며, 서로의 잠금 순서가 무작위로 서로 다를 수 있습니다. 다음과 같은 상황이 발생할 수 있습니다.

ABC 잠금 순서가 위 그림과 같다고 가정하면 데드락 가능성이 매우 큽니다. 예를 들어 A 는 먼저 ch2의 잠금을 持有하고 ch1의 잠금을 얻으려 하지만, ch1이 이미 고루틴 B 에 의해 잠겼다고 가정하면, 고루틴 B 는 ch2의 잠금을 얻으려 합니다. 이렇게 되면 데드락이 발생합니다.

모든 고루틴이 동일한 순서로 잠금을 얻으면 데드락 문제가 발생하지 않습니다. 이것이 lockorder가 주소 크기로 정렬되는 근본적인 이유입니다.

잠근 후 실제 처리 단계가 시작됩니다. 먼저 pollorder 배열을 순회하여 이전에 섞인 순서로 채널에 액세스하고, 사용 가능한 채널을 찾을 때까지逐个 순회합니다.

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // 채널 읽기
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // 채널 쓰기
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

여기서 읽기/쓰기 채널에 대해 6 가지 경우의 처리를 했음을 볼 수 있습니다. 아래에서 각각 설명합니다. 첫 번째 경우, 읽기 채널이고 송신 측이 대기 중입니다. 여기서는 runtime.recv 함수로 이동하며, 그 역할은 이미 설명했습니다. 최종적으로 송신 측 고루틴을 깨우며, 깨우기 전 콜백 함수가 모든 채널 잠금을 해제합니다.

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

두 번째 경우, 읽기 채널이고 송신 측이 대기 중이지 않으며, 버퍼 요소 수가 0 보다 큽니다. 여기서는 버퍼에서 직접 데이터를 읽으며, 로직은 runtime.chanrecv와 완전히 동일하며 잠금을 해제합니다.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

세 번째 경우, 읽기 채널이지만 채널이 이미 닫혔고 버퍼에 남은 요소가 없습니다. 여기서는 먼저 잠금을 해제하고 직접 반환합니다.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

네 번째 경우, 닫힌 채널에 데이터를 보냅니다. 여기서는 먼저 잠금을 해제하고 panic합니다.

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

다섯 번째 경우, 수신 측이 블록 대기 중입니다. 여기서는 runitme.send 함수를 호출하며, 최종적으로 수신 측 고루틴을 깨우고, 깨우기 전 콜백 함수가 모든 채널 잠금을 해제합니다.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

여섯 번째 경우, 수신 측 고루틴이 대기하지 않으며, 보낼 데이터를 버퍼에 넣은 후 잠금을 해제합니다.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

그런 다음 위의 모든 경우는 마지막으로 retc 분기로 이동하며, 여기서 수행할 일은 선택된 채널 인덱스 casi와 읽기 성공 여부를 나타내는 recvOk를 반환하는 것뿐입니다.

go
retc:
    return casi, recvOK

일곱 번째 경우, 사용 가능한 채널을 찾지 못했고 코드에 default 분기가 포함되어 있으면 채널 잠금을 해제하고 직접 반환합니다. 여기서 반환되는 casi는 -1 로, 사용 가능한 채널이 없음을 나타냅니다.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

마지막 경우, 사용 가능한 채널을 찾지 못했고 코드에 default 분기가 포함되어 있지 않으면 현재 고루틴이 블록 상태에 빠집니다. 그 전 selectgo는 현재 고루틴을 모든 모니터링 채널의 recvq/sendq 큐에 추가합니다.

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

여기서는 여러 sudog를 생성하고 해당 채널과 연결합니다.

그런 다음 runtime.gopark을 통해 블록되며, 블록 전 runtime.selparkcommit 함수가 채널 잠금을 해제합니다. 이는 콜백 함수로 gopark에 전달됩니다.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

깨어난 후 첫 번째 일은 sudog와 채널의 연결을 해제하는 것입니다.

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

그런 다음 sudog를 이전 채널의 대기 큐에서 제거합니다.

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

위의 과정에서 반드시 깨운 측 고루틴이 처리한 채널을 찾으며, caseSuccess에 따라 마지막 처리를 합니다. 쓰기 작업의 경우 sg.successfalse이면 채널이 이미 닫혔음을 의미하며, go 런타임 전체에서 close 함수만 이 필드를 false로 설정합니다. 이는 현재 고루틴이 깨운 측이 close 함수를 통해 깨웠음을 나타냅니다. 읽기 작업의 경우, 송신 측에 의해 깨어난 경우 데이터 읽기 작업도 깨어나기 전 송신 측이 runtime.send 함수를 통해 완료했으며, 그 값은 true입니다. close 함수에 의해 깨어난 경우, 앞과 마찬가지로 직접 반환합니다.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

이로써 전체 select 로직이 대략 정리되었습니다. 위에서 여러 경우를 나누어 보았듯이 select 처리는 상당히 복잡합니다.

Golang by www.golangdev.cn edit