select
select là một cấu trúc có thể đồng thời监听 nhiều trạng thái channel, cú pháp của nó tương tự như switch
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}Đoạn code này thông qua kết hợp sử dụng context, channel, select để thực hiện logic thoát mượt mà đơn giản của một chương trình, code trong select đồng thời监听 hai channel ctx.Done và finished, điều kiện thoát của nó có hai, một là hệ điều hành gửi tín hiệu thoát, hai là channel finished có tin nhắn có thể đọc tức là nhiệm vụ code người dùng hoàn thành, như vậy chúng ta có thể làm công việc tổng kết khi chương trình thoát.
Ai cũng biết, select có hai đặc tính vô cùng quan trọng, một là không chặn, trong code nguồn gửi và nhận channel có thể thấy đã xử lý một số thứ cho select, có thể phán đoán channel có khả dụng không trong tình huống không chặn, hai là ngẫu nhiên hóa, nếu có nhiều channel khả dụng thì nó sẽ ngẫu nhiên chọn một để thực thi, không tuân theo thứ tự đã định có thể khiến mỗi channel đều tương đối công bằng được thực thi, nếu không trong tình huống cực đoan một số channel có thể永远 không được xử lý. Vì công việc của nó đều liên quan đến channel, nên trước tiên đề nghị đọc bài viết chan, sau khi hiểu channel rồi再来 hiểu select sẽ thông suốt rất nhiều.
Cấu trúc
Runtime chỉ có một cấu trúc runtime.scase biểu diễn nhánh select, biểu diễn runtime của mỗi case chính là scase.
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}Trong đó c chỉ là channel, elem biểu thị con trỏ phần tử nhận hoặc gửi, thực tế từ khoá select chỉ là hàm runtime.selectgo.
Nguyên lý
Cách sử dụng select được go chia thành bốn tình huống để tối ưu hóa, điểm này có thể thấy trong hàm cmd/compile/internal/walk.walkSelectCases logic xử lý cho bốn tình huống này.
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimization: zero-case select
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimization: one-case select: single op.
if ncas == 1 {
...
}
// optimization: two-case select but one is default: single non-blocking op.
if ncas == 2 && dflt != nil {
...
}
...
return init
}Tối ưu hóa
Compiler sẽ tối ưu hóa cho ba tình huống đầu, tình huống đầu tiên là khi số lượng case bằng 0 tức một select rỗng, chúng ta đều biết select rỗng sẽ gây ra goroutine hiện tại bị chặn vĩnh viễn.
select{}Sở dĩ bị chặn là vì compiler dịch nó thành lời gọi trực tiếp đến hàm runtime.block
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}Mà hàm block lại gọi hàm runtime.gopark, khiến goroutine hiện tại biến thành trạng thái _Gwaitting, và đi vào chặn vĩnh viễn,永远 không được điều độ.
Tình huống thứ hai, chỉ có một case và không phải default, tình huống này compiler sẽ trực tiếp dịch nó thành thao tác gửi nhận channel, hơn nữa còn là chặn, ví dụ như code dưới đây
func main() {
ch := make(chan int)
select {
case <-ch:
// do something
}
}Nó sẽ được dịch thành lời gọi trực tiếp đến hàm runtime.chanrecv1, có thể nhìn ra từ code assembly
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...Trong tình huống chỉ có một case thì gửi dữ liệu đến channel cũng là đạo lý tương tự, nó sẽ được dịch thành lời gọi trực tiếp đến hàm runtime.chansend1, cũng là chặn.
Tình huống thứ ba, có hai case và trong đó một cái là default
func main() {
ch := make(chan int)
select {
case ch <- 1:
// do something
default:
// do something
}
}Tình huống này sẽ dịch nó thành một câu lệnh if gọi hàm runtime.selectnbsend, như dưới đây
if selectnbsend(ch, 1) {
// do something
} else {
// do something
}Nếu là nhận dữ liệu channel sẽ dịch thành lời gọi đến runtime.selectnbrecv
ch := make(chan int)
select {
case x, ok := <-ch:
// do something
default:
// do something
}if selected, ok = selectnbrecv(&v, c); selected {
// do something
} else {
// do something
}Điều đáng chú ý là, trong tình huống này việc nhận hoặc gửi channel là không chặn, chúng ta có thể nhìn thấy rõ ràng tham số blcok là false.
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}Mà bất kể là gửi hoặc nhận dữ liệu channel, khi blcok là false đều có một đường nhanh có thể phán đoán có thể gửi hoặc nhận không trong tình huống không khóa, như dưới đây
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}Khi đọc channel, nếu channel rỗng sẽ trực tiếp trả về, khi viết channel, nếu channel chưa đóng và đã đầy cũng sẽ trực tiếp trả về, trong tình huống thông thường chúng sẽ gây chặn goroutine, nhưng kết hợp với select thì không.
Xử lý
Ba tình huống trên chỉ là tối ưu hóa cho tình huống đặc biệt, từ khoá select được sử dụng bình thường sẽ được dịch thành lời gọi đến hàm runtime.selectgo, logic xử lý của nó dài đến hơn 400 dòng.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)Compiler sẽ thu thập tất cả câu lệnh case thành một mảng scase, sau đó truyền cho hàm selectgo, sau khi xử lý xong trả về hai giá trị trả về
- Cái đầu tiên là chỉ số channel được chọn ngẫu nhiên, biểu thị channel nào được xử lý, không có thì trả về -1
- Cái thứ hai biểu thị đối với thao tác đọc channel而言 có đọc thành công không
Ở đây giải thích đơn giản các tham số của nó
cas0, con trỏ头部 của mảngscase, nửa phần đầu chứa case viết channel, nửa phần sau chứa case đọc channel, phân biệt bằngnsendsorder0, độ dài của nó là hai lần mảngscase, nửa phần đầu phân phối cho mảngpollorder, nửa phần sau phân phối cho mảnglockordernsendsvànrecvsbiểu thị số lượng case đọc/viết channel, tổng của hai cái là tổng số caseblockbiểu thị có chặn không, nếu có casedefaultthì biểu thị không chặn, giá trị của nó làfalse, ngược lại làtrue.pc0, trỏ đến头部 của một mảng[ncases]uintptr, dùng để phân tích竞态, sau này có thể bỏ qua nó, không có giúp ích gì cho việc hiểu select
Giả sử có code dưới đây
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}Xem hình thức assembly của nó, ở đây để tiện hiểu đã bỏ qua một phần code
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4几个临时变量
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // 调用 runtime.selectgo 函数
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // 跳转到 default 分支
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // 跳转到分支 4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // 跳转到分支 3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // 跳转到分支 2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RETCó thể thấy sau khi gọi hàm selectgo có tồn tại logic phán đoán+nhảy, thông qua những điều này chúng ta không khó suy ra hình thức ban đầu của nó
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}Code thực tế compiler sinh ra có thể có khác biệt với cái này, nhưng ý nghĩa đại khái là tương tự. Nên compiler sẽ sau khi gọi xong hàm selectgo đồng thời sử dụng câu lệnh if để phán đoán đến channel nào được thực thi, hơn nữa trước khi gọi, compiler còn sẽ sinh ra một vòng lặp for để thu thập mảng scase nhưng ở đây đã bỏ qua.
Sau khi biết bên ngoài sử dụng hàm selectgo như thế nào, dưới đây就来 hiểu bên trong hàm selectgo làm việc ra sao. Đầu tiên nó sẽ khởi tạo một vài mảng, nsends+nrecvs biểu thị tổng số case, từ code dưới đây cũng có thể thấy giá trị tối đa của số lượng case chính là 1 << 16, pollorder quyết định thứ tự thực thi của channel, lockorder quyết định thứ tự khóa của channel.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 它的长度是 scase 数组的两倍,前半部分分配给 pollorder 数组,后半部分分配给 lockorder 数组。
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]Tiếp theo khởi tạo mảng pollorder, nó chứa chỉ số mảng sacses của channel待 thực thi
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]Nó sẽ duyệt toàn bộ mảng scases, sau đó thông qua runtime.fastrandn sinh ra số ngẫu nhiên trong khoảng [0, i], rồi đổi chỗ nó với i, trong quá trình sẽ bỏ qua case mà channel là nil, sau khi duyệt xong thì được một mảng pollorder mà các phần tử đã bị打乱, như hình dưới đây

Sau đó đối với mảng pollorder dựa trên kích thước địa chỉ channel sử dụng sắp xếp heap thì được mảng lockorder, rồi gọi runtime.sellock khóa theo thứ tự
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}Ở đây đáng chú ý là, đối với channel dựa trên kích thước địa chỉ sắp xếp là để tránh deadlock, vì bản thân thao tác select không cần khóa cho phép đồng thời. Giả sử khóa theo thứ tự ngẫu nhiên của pollorder, vậy xem xét tình huống code dưới đây
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()Ba goroutine ABC đều đi đến bước khóa này, và thứ tự khóa của chúng彼此 đều là ngẫu nhiên không giống nhau, có thể gây ra tình huống như dưới đây, như hình dưới đây

Giả sử thứ tự khóa của ABC giống như hình trên, vậy khả năng gây deadlock rất lớn, ví dụ A sẽ先持有 khóa của ch2, rồi đi thử lấy khóa của ch1, nhưng giả sử ch1 đã bị goroutine B khóa, goroutine B lại đi thử lấy khóa của ch2, vậy như vậy là gây ra deadlock.

Nếu tất cả goroutine đều khóa theo thứ tự tương tự, sẽ không xảy ra vấn đề deadlock, đây cũng là nguyên nhân căn bản mà lockorder phải dựa trên kích thước địa chỉ để sắp xếp.
Sau khi khóa xong, thì bắt đầu giai đoạn xử lý thực sự, đầu tiên duyệt mảng pollorder, dựa theo thứ tự đã打乱之前 để truy cập channel,逐个 duyệt tìm một channel khả dụng
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // 读管道
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // 写管道
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}Có thể thấy ở đây đã xử lý 6 tình huống cho channel đọc/viết, dưới đây lần lượt讲解. Tình huống đầu tiên, đọc channel và có bên gửi đang chờ gửi, ở đây sẽ đi đến hàm runtime.recv, tác dụng của nó đã nói rồi, nó cuối cùng sẽ đánh thức goroutine bên gửi, trước khi đánh thức thì hàm callback sẽ khóa toàn bộ channel.
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retcTình huống thứ hai, đọc channel, không có bên gửi đang chờ, số lượng phần tử bộ đệm lớn hơn 0, ở đây sẽ trực tiếp đọc dữ liệu từ bộ đệm, logic của nó hoàn toàn tương tự như trong runtime.chanrecv, rồi mở khóa.
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retcTình huống thứ ba, đọc channel, nhưng channel đã đóng, và trong bộ đệm không có phần tử còn lại, ở đây sẽ先 mở khóa rồi trực tiếp trả về.
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retcTình huống thứ tư, gửi dữ liệu đến channel đã đóng, ở đây sẽ先 mở khóa rồi panic,
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))Tình huống thứ năm, có bên nhận đang chặn chờ, ở đây sẽ gọi hàm runitme.send, và cuối cùng đánh thức goroutine bên nhận, trước khi đánh thức thì hàm callback sẽ khóa toàn bộ channel.
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retcTình huống thứ sáu, không có goroutine bên nhận chờ, sẽ đặt dữ liệu cần gửi vào bộ đệm, rồi mở khóa.
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retcRồi tất cả tình huống trên cuối cùng đều đi vào nhánh retc này, mà việc nó cần làm chỉ là trả về chỉ số channel được chọn casi và recvOk biểu thị có đọc thành công không.
retc:
return casi, recvOKTình huống thứ bảy, không tìm thấy channel khả dụng, và code中包含 nhánh default, thì mở khóa channel rồi trực tiếp trả về, ở đây trả về casi là -1 tức biểu thị không có channel khả dụng.
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}Tình huống cuối cùng, không tìm thấy channel khả dụng, và code中不包含 nhánh default, vậy goroutine hiện tại sẽ rơi vào trạng thái chặn, trước đó selectgo sẽ thêm goroutine hiện tại vào hàng đợi recvq/sendq của tất cả channel监听
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}Ở đây sẽ tạo若干个 sudog và liên kết chúng với channel tương ứng, như hình dưới đây

Rồi bởi runtime.gopark chặn, trước khi chặn sẽ mở khóa channel, công việc mở khóa do hàm runtime.selparkcommit hoàn thành, nó được truyền vào gopark dưới hình thức hàm callback.
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = falseViệc đầu tiên sau khi被 đánh thức là解除 liên kết sudog với channel
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nilRồi移除 sudog từ hàng đợi chờ của channel之前
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}Trong quá trình trên nhất định sẽ tìm thấy channel được xử lý bởi goroutine bên đánh thức, rồi dựa vào caseSuccess để đưa ra xử lý cuối cùng. Đối với thao tác viết而言, sg.success là false biểu thị channel đã đóng, và toàn bộ go runtime cũng chỉ có hàm close sẽ chủ động đặt trường này thành false, điều này biểu thị goroutine hiện tại là bên đánh thức thông qua hàm close đánh thức. Đối với thao tác đọc而言, nếu là被 bên gửi đánh thức, thao tác đọc dữ liệu cũng đã được bên gửi hoàn thành thông qua hàm runtime.send trước khi被 đánh thức, giá trị của nó là true, nếu là被 hàm close đánh thức, cũng giống như前面 đều là trực tiếp trả về.
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retcĐến đây toàn bộ logic của select đều đã rõ ràng, trên đây chia làm mấy tình huống, có thể thấy select xử lý ra vẫn là khá phức tạp.
