select
select 是一種可以同時監聽多個管道狀態的結構,它的語法跟 switch 類似
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}這段代碼通過將 context,管道,select 三者結合使用實現了一個簡單的程序平滑退出的邏輯,代碼中 select 同時監聽著ctx.Done和finished兩個管道,它退出的條件有兩個,一是操作系統發送退出信號,二是finished管道有消息可以讀取即用戶代碼任務完成,這樣我們就可以在程序退出時做收尾工作。
眾所周知,select 有兩個非常重要的特性,一是非阻塞,在管道的發送與接收的源代碼中可以看到對於 select 做了一些處理,可以在非阻塞的情況下判斷管道是否可用,二是隨機化,如果有多個管道可用的話它會隨機選一個來執行,不遵守既定的順序可以讓每個管道都相對公平地得到執行,否則在極端情況下一些管道可能永遠也不會被處理。因為它的工作全部都跟管道有關,所以先建議閱讀chan這篇文章,了解了管道後再來了解 select 會暢通很多。
結構
運行時只有一個runtime.scase結構體表示 select 的分支,每一個case的運行時表示就是scase。
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}其中的c指的是管道,elem表示接收或發送元素的指針,實際上 select 關鍵字指的是runtime.selectgo函數。
原理
select 的使用方式被 go 分成了四種情況來進行優化,這一點可以在cmd/compile/internal/walk.walkSelectCases函數中看到對於這四種情況的處理邏輯。
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimization: zero-case select
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimization: one-case select: single op.
if ncas == 1 {
...
}
// optimization: two-case select but one is default: single non-blocking op.
if ncas == 2 && dflt != nil {
...
}
...
return init
}優化
編譯器會對前三種情況進行優化,第一種情況是 case 數量為 0 時即一個空的 select,我們都知道空的 select 語句會造成當前協程永久阻塞。
select{}之所以會阻塞是因為編譯器將其翻譯成了對runtime.block函數的直接調用
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}而block函數又調用了runtime.gopark函數,使得當前協程變為_Gwaitting狀態,並進入永久阻塞,再也不會得到調度。
第二種情況,只有一個 case 且不是 default,這種情況編譯器會直接將其翻譯成對管道的收發操作,並且還是阻塞式的,例如下面的這種代碼
func main() {
ch := make(chan int)
select {
case <-ch:
// do something
}
}它會被翻譯成對runtime.chanrecv1函數的直接調用,從匯編代碼中就可以看出來
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...在只有一個 case 的情況下對管道進行發送數據也是同樣的道理,它會被翻譯成對runtime.chansend1函數的直接調用,同樣也是阻塞式的。
第三種情況,有兩個 case 且其中一個是 default
func main() {
ch := make(chan int)
select {
case ch <- 1:
// do something
default:
// do something
}
}這種情況會將其翻譯成一個對runtime.selectnbsend調用的if語句,如下
if selectnbsend(ch, 1) {
// do something
} else {
// do something
}如果是接收管道數據就會翻譯成對runtime.selectnbrecv的調用
ch := make(chan int)
select {
case x, ok := <-ch:
// do something
default:
// do something
}if selected, ok = selectnbrecv(&v, c); selected {
// do something
} else {
// do something
}指的是注意的是,這種情況下對管道的接收或發送是非阻塞式的,我們可以很明顯的看到其中的blcok參數為false。
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}而不論是對管道發送或接收數據,在blcok為false時都有一個快速路徑可以在不加鎖的情況下判斷是否可以發送或接收數據,正如下所示
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}對於讀取管道時,如果管道是空的就會直接返回,對於寫管道時,如果管道未關閉且已經滿了也會直接返回,在一般情況下它們是會造成協程阻塞的,但是與 select 結合使用就不會。
處理
上面三種只是對特殊情況的優化,正常使用的 select 關鍵字會被翻譯成對runtime.selectgo函數的調用,它的處理邏輯長達 400 多行。
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)編譯器會將所有的 case 語句收集一個scase數組,然後傳遞給selectgo函數,處理完成後返回兩個返回值
- 第一個是隨機選取的管道下標,表示哪一個管道被處理了,沒有的話返回-1
- 第二個表示對於讀管道操作而言是否成功讀取
這裡簡單解釋下其參數
cas0,scase數組的頭部指針,前半部分存放的是寫管道 case,後半部分存放的讀管道 case,以nsends來區分order0,它的長度是scase數組的兩倍,前半部分分配給pollorder數組,後半部分分配給lockorder數組nsends和nrecvs表示讀/寫管道 case 的數量,兩者之和就是 case 的總數block表示是否阻塞,如果有defaultcase 就代表非阻塞,其值為true,否則為true。pc0,指向一個[ncases]uintptr的數組頭部,用於競態分析,後面可以忽略它,對於理解 select 而言沒什麼幫助
假設有下面的代碼
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}查看其匯編形式,這裡為了方便理解省去了部分代碼
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4幾個臨時變量
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // 調用runtime.selectgo函數
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // 跳轉到default分支
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // 跳轉到分支4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // 跳轉到分支3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // 跳轉到分支2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RET可以看到在調用selectgo函數後是有一個判斷+跳轉邏輯存在的,通過這些我們不難反推出其原來的樣子
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}編譯器生成的實際代碼可能和這個有出入,但大致意思是差不多的。所以編譯器會在調用完selectgo函數後同時使用if語句來判斷輪到哪一個管道被執行,並且在調用之前,編譯器還會生成一個 for 循環來收集scase數組不過這裡省略掉了。
在知曉了外部是如何使用selectgo函數以後,下面就來了解selectgo函數內部是如何工作的。它首先會初始化幾個數組,nsends+nrecvs表示 case 的總數,從下面的代碼也可以看出 case 數量的最大值也就是1 << 16,pollorder決定了管道的執行順序,lockorder決定了管道的鎖定順序。
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 它的長度是scase數組的兩倍,前半部分分配給pollorder數組,後半部分分配給lockorder數組。
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]接下來初始化pollorder數組,它存放的是待執行管道的sacses數組下標
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]它會遍歷整個scases數組,然後通過runtime.fastrandn生成[0, i]之間的隨機數,再將它與i交換,過程中會跳過管道為nil的 case,遍歷完成後就得到了一個元素被打亂了的pollorder數組,如下圖所示

然後對pollorder數組根據管道的地址大小使用堆排序就得到了lockorder數組,再調用runtime.sellock按照順序將其上鎖
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}這裡值得注意的是,對管道按照地址大小排序是為了避免死鎖,因為 select 操作本身不需要鎖允許並發。假設按照pollorder隨機順序加鎖,那麼考慮下面代碼的情況
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()三個協程 ABC 都走到了加鎖這一步驟,並且它們彼此加鎖順序都是隨機的互不相同,有可能造成這樣一種情況,如下圖所示

假設 ABC 加鎖順序跟上圖一樣,那麼造成死鎖的可能性就非常大,比如 A 會先持有ch2的鎖,然後去嘗試獲取ch1的鎖,但假設ch1已經被協程 B 鎖住了,協程 B 又會去嘗試獲取ch2的鎖,那麼這樣就造成了死鎖。

如果所有協程都按照同樣的順序加鎖,就不會發送死鎖問題,這也是lockorder要按照地址大小來進行排序的根本原因。
上完鎖之後,就開始了真正的處理階段,首先遍歷pollorder數組,按照之前打亂的順序訪問管道,逐個遍歷找到一個可用的管道
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // 讀管道
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // 寫管道
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}可以看到這裡對讀/寫管道做了 6 種情況的處理,下面分別進行講解。第一種情況,讀取管道且有發送方正在等待發送,這裡會走到runtime.recv函數,其作用已經講過了,它最終會喚醒發送方協程,再喚醒之前回調函數會將全部管道解鎖。
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retc第二種情況,讀取管道,沒有發送方正在等待,緩沖區元素數量大於 0,這裡會直接從緩沖區中讀取數據,其邏輯跟runtime.chanrecv中完全一致,然後解鎖。
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retc第三種情況,讀取管道,但管道已經關閉了,且緩沖區中沒有剩余元素,這裡會先解鎖然後直接返回。
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retc第四種情況,向已關閉的管道發送數據,這裡會先解鎖然後panic,
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))第五種情況,有接收方正在阻塞等待,這裡會調用runitme.send函數,並最終喚醒接收方協程,在喚醒之前回調函數會將全部管道解鎖。
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retc第六種情況,沒有接收方協程等待,將要發送的數據放入緩沖區,然後解鎖。
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retc然後上面所有情況最後都會走入retc這個分支,而它要做的只有返回選中的管道下標casi以及代表著是否讀取成功的recvOk。
retc:
return casi, recvOK第七種情況,沒有找到可用的管道,且代碼中包含default 分支,則解鎖管道然後直接返回,這裡返回的casi為-1 即表示沒有可用的管道。
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}最後一種情況,沒有找到可用的管道,且代碼中不包含default分支,那麼當前協程會陷入阻塞狀態,在這之前selectgo會將當前協程加入所有監聽管道的recvq/sendq隊列中
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}這裡會將創建若干個sudog並將其和對應的管道鏈接起來,如下圖所示

然後由runtime.gopark阻塞,在阻塞前會將管道解鎖,解鎖的工作由runtime.selparkcommit函數完成,它被作為回調函數傳入了gopark中。
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false被喚醒後的第一件事情就是解除sudog與管道的鏈接
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nil然後將sudog從之前管道的等待隊列中移除
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}在上面的過程中一定會找到一個喚醒方協程所處理的管道,然後根據caseSuccess來做出最後的處理。對於寫操作而言,sg.success為false代表管道已經關閉了,而且整個 go 運行時也只有close函數會主動將該字段設置為false,這表明當前協程是喚醒方通過close函數喚醒的。對於讀操作而言,如果是被發送方喚醒的,數據讀取操作也早在被喚醒前由發送方通過runtime.send函數完成了,其值為true,如果是被close函數喚醒的,跟前面一樣都是直接返回。
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retc到此整個 select 的邏輯都大致理清楚了,上面分了好幾種情況,可見 select 處理起來還是比較復雜的。
