Skip to content

select

select は複数のパイプ状態を同時に監視できる構造で、構文は switch に似ています。

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

このコードは、context、パイプ、select の 3 つを組み合わせることで、プログラムのスムーズな終了を実現するシンプルなロジックを実装しています。コード内の select は ctx.Donefinished の 2 つのパイプを同時に監視しており、終了条件は 2 つあります。1 つ目はオペレーティングシステムから終了シグナルが送信されること、2 つ目は finished パイプに読み取り可能なメッセージがあること、つまりユーザーコードタスクが完了することです。これにより、プログラム終了時に後処理を行うことができます。

周知の通り、select には 2 つの非常に重要な特性があります。1 つ目はノンブロッキングで、パイプの送受信ソースコード内で select に対する処理が行われており、ノンブロッキングでパイプが利用可能かどうかを判断できます。2 つ目はランダム化で、複数のパイプが利用可能な場合、ランダムに 1 つを選択して実行し、既定の順序に従わないことで、各パイプが比較的公平に実行されるようになります。否则、極端な状況で一部のパイプが永遠に処理されない可能性があります。select の動作はすべてパイプに関連しているため、先に chan の記事を読んでパイプを理解した後に select を理解すると、非常にスムーズに進みます。

構造

実行時において、runtime.scase 構造体のみが select のブランチを表し、各 case の実行時表現が scase です。

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

ここで c はパイプを指し、elem は受信または送信要素のポインタを表します。実際には、select キーワードは runtime.selectgo 関数を指します。

原理

select の使用方法は Go によって 4 つの状況に分けて最適化されています。これは cmd/compile/internal/walk.walkSelectCases 関数内で、これら 4 つの状況に対する処理ロジックが見られます。

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimization: zero-case select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimization: one-case select: single op.
  if ncas == 1 {
    ...
  }

  // optimization: two-case select but one is default: single non-blocking op.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

最適化

コンパイラは最初の 3 つの状況に対して最適化を行います。1 つ目の状況は case 数が 0 の場合、つまり空の select です。空の select 文が現在のゴルーチンの永久ブロッキングを引き起こすことは皆知っています。

go
select{}

ブロッキングが発生する理由は、コンパイラがこれを runtime.block 関数の直接呼び出しに変換するためです。

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

block 関数は runtime.gopark 関数を呼び出し、現在のゴルーチンを _Gwaiting 状態に変更し、永久ブロッキングに入り、二度とスケジューリングされなくなります。

2 つ目の状況は、case が 1 つだけで default ではない場合です。この場合、コンパイラはこれを直接パイプの送受信操作に変換し、ブロッキング式になります。例えば以下のコードです。

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

これは runtime.chanrecv1 関数の直接呼び出しに変換され、アセンブリコードからもわかります。

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

case が 1 つの場合にパイプへデータを送信するのも同様で、runtime.chansend1 関数の直接呼び出しに変換され、これもブロッキング式です。

3 つ目の状況は、case が 2 つあり、そのうち 1 つが default の場合です。

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

この場合、runtime.selectnbsend 呼び出しの if 文に変換されます。

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

パイプデータを受信する場合、runtime.selectnbrecv の呼び出しに変換されます。

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

值得注意的是、この状況ではパイプの受信または送信はノンブロッキングです。block パラメータが false であることが明確に見えます。

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

パイプへのデータ送受信いずれにおいても、blockfalse の場合、ロックなしで送受信可能かどうかを判断できるクイックパスがあります。

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

パイプ読み取り時、パイプが空の場合は直接返します。パイプ書き込み時、パイプが閉じられておらず満杯の場合も直接返します。一般的にはこれらはゴルーチンのブロッキングを引き起こしますが、select と組み合わせることでそうなりません。

処理

上記 3 つは特殊な状況に対する最適化であり、通常使用される select キーワードは runtime.selectgo 関数の呼び出しに変換されます。その処理ロジックは 400 行以上あります。

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

コンパイラはすべての case 文を収集して scase 配列を作成し、selectgo 関数に渡します。処理完了後、2 つの戻り値を返します。

  1. 1 つ目はランダムに選択されたパイプのインデックスで、どのパイプが処理されたかを示します。ない場合は -1 を返します
  2. 2 つ目はパイプ読み取り操作に対して正常に読み取れたかどうかを示します

ここでパラメータを簡単に説明します。

  • cas0scase 配列のヘッダーポインタ。前半部分は書き込みパイプ case を格納し、後半部分は読み取りパイプ case を格納します。nsends で区別します
  • order0:長さは scase 配列の 2 倍で、前半部分は pollorder 配列に割り当てられ、後半部分は lockorder 配列に割り当てられます
  • nsendsnrecvs:読み取り/書き込みパイプ case の数を表し、両者の和が case の総数です
  • block:ブロッキングかどうかを示します。default case があればノンブロッキングを表し、値は false です。否则 true です
  • pc0[ncases]uintptr 配列のヘッダーを指し、競合分析に使用されます。後で無視できます。select を理解する上では役立ちません

以下のコードがあるとします。

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

アセンブリ形式を確認すると、理解を容易にするため一部のコードを省略しています。

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // 1 2 3 4 いくつかの一時変数
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // runtime.selectgo 関数を呼び出し
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // default ブランチにジャンプ
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // ブランチ 4 にジャンプ
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // ブランチ 3 にジャンプ
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // ブランチ 2 にジャンプ
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

selectgo 関数呼び出し後に判断 + ジャンプロジックが存在することがわかります。これらの判断から元の形を推測するのは難しくありません。

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

コンパイラが生成する実際のコードはこれと異なる可能性がありますが、大まかな意味はほぼ同じです。したがって、コンパイラは selectgo 関数呼び出し後に if 文を使用してどのパイプが実行されるかを判断し、呼び出し前にコンパイラは scase 配列を収集するための for ループを生成しますが、ここでは省略されています。

外部が selectgo 関数をどのように使用するかがわかった後、次に selectgo 関数内部がどのように動作するかを理解しましょう。まずいくつかの配列を初期化します。nsends+nrecvs は case の総数を表し、以下のコードから case 数の最大値が 1 << 16 であることもわかります。pollorder はパイプの実行順序を決定し、lockorder はパイプのロック順序を決定します。

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 長さは scase 配列の 2 倍で、前半部分は pollorder 配列に割り当てられ、後半部分は lockorder 配列に割り当てられます。
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

次に pollorder 配列を初期化します。これは処理待ちパイプの sacses 配列インデックスを格納します。

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

scases 配列全体を走査し、runtime.fastrandn を通じて [0, i] の間の乱数を生成し、それを i と交換します。プロセス中にパイプが nil の case をスキップし、走査完了後に要素がかき混ぜられた pollorder 配列が得られます。

その後、pollorder 配列をパイプアドレスサイズに基づいてヒープソートし、lockorder 配列を取得し、runtime.sellock を呼び出して順序通りにロックします。

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

ここで值得注意的是、パイプをアドレスサイズでソートするのはデッドロックを回避するためです。select 操作自体がロックを許可して並行を許可しないためです。pollorder ランダム順序でロックすると、以下のコードの状況を考慮してください。

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

3 つのゴルーチン ABC がすべてロックステップに進み、彼此のロック順序がランダムで相互に異なる可能性があります。以下のような状況が発生する可能性があります。

ABC のロック順序が上図と同じだと仮定すると、デッドロックが発生する可能性が非常に大きくなります。例えば A はまず ch2 のロックを保持し、その後 ch1 のロックを取得しようとしますが、ch1 が既にゴルーチン B によってロックされていると仮定すると、ゴルーチン B は ch2 のロックを取得しようとします。これでデッドロックが発生します。

すべてのゴルーチンが同じ順序でロックすれば、デッドロック問題は発生しません。这也是 lockorder がアドレスサイズでソートされる根本的な理由です。

ロック後、実際の処理段階に入ります。まず pollorder 配列を走査し、之前にかき混ぜられた順序でパイプにアクセスし、逐一走査して利用可能なパイプを見つけます。

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // パイプ読み取り
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // パイプ書き込み
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

ここで読み取り/書き込みパイプに対して 6 つの状況の処理が行われています。以下でそれぞれ説明します。

1 つ目の状況、パイプ読み取りで送信側が待機中である場合、runtime.recv 関数に進みます。その役割は既に説明済みで、最終的に送信側ゴルーチンをウェイクアップします。ウェイクアップ前にコールバック関数がすべてのパイプをロック解除します。

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

2 つ目の状況、パイプ読み取りで送信側が待機しておらず、バッファ要素数が 0 より大きい場合、バッファから直接データを読み取ります。ロジックは runtime.chanrecv と完全に一致し、その後ロック解除します。

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

3 つ目の状況、パイプ読み取りですが、パイプが既に閉じられており、バッファに残り要素がない場合、まずロック解除して直接返します。

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

4 つ目の状況、閉じられたパイプにデータを送信する場合、まずロック解除して panic します。

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

5 つ目の状況、受信側がブロッキング待機中である場合、runtime.send 関数を呼び出し、最終的に受信側ゴルーチンをウェイクアップします。ウェイクアップ前にコールバック関数がすべてのパイプをロック解除します。

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

6 つ目の状況、受信側ゴルーチンが待機しておらず、送信するデータをバッファに格納し、その後ロック解除します。

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

その後、上記のすべての状況は最後に retc ブランチに進みます。そこで行うのは、選択されたパイプインデックス casi と読み取り成功を表す recvOk を返すことだけです。

go
retc:
    return casi, recvOK

7 つ目の状況、利用可能なパイプが見つからず、コードに default ブランチが含まれている場合、パイプをロック解除して直接返します。ここで返される casi は -1 で、利用可能なパイプがないことを表します。

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

最後の状況、利用可能なパイプが見つからず、コードに default ブランチが含まれていない場合、現在のゴルーチンはブロッキング状態に陥ります。その前に selectgo は現在のゴルーチンをすべての監視パイプの recvq/sendq キューに追加します。

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

ここで複数の sudog を作成し、対応するパイプとリンクします。

その後 runtime.gopark でブロッキングし、ブロッキング前にパイプをロック解除します。ロック解除作業は runtime.selparkcommit 関数によって完了し、コールバック関数として gopark に渡されます。

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

ウェイクアップ後の最初の仕事は sudog とパイプのリンクを解除することです。

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

その後、sudog を之前のパイプ待機キューから削除します。

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

上記のプロセスで、必ずウェイクアップ側ゴルーチンによって処理されたパイプが見つかり、caseSuccess に基づいて最後の処理を行います。書き込み操作而言、sg.successfalse の場合、パイプが既に閉じられていることを表します。Go ランタイム全体でも close 関数のみ对该フィールドを false に設定します。これは現在のゴルーチンがウェイクアップ側によって close 関数を通じてウェイクアップされたことを示します。読み取り操作而言、送信側によってウェイクアップされた場合、データ読み取り操作も既にウェイクアップ前に送信側によって runtime.send 関数を通じて完了しており、その値は true です。close 関数によってウェイクアップされた場合、前述と同様に直接返します。

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

これで select のロジック全体がほぼ整理されました。上記でいくつかの状況に分かれており、select の処理が比較的複雑であることがわかります。

Golang学习网由www.golangdev.cn整理维护