select
select adalah struktur yang dapat memantau status beberapa saluran secara bersamaan, sintaksnya mirip dengan switch
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}Kode ini menggunakan kombinasi context, saluran, dan select untuk mengimplementasikan logika penghentian program yang mulus. Select dalam kode ini secara bersamaan memantau dua saluran ctx.Done dan finished, ada dua kondisi penghentian, pertama adalah sistem operasi mengirim sinyal keluar, kedua adalah saluran finished memiliki pesan yang dapat dibaca yaitu tugas kode pengguna selesai, sehingga kita dapat melakukan pekerjaan penyelesaian saat program keluar.
Seperti yang kita ketahui, select memiliki dua fitur yang sangat penting, pertama adalah non-blocking, dalam kode sumber pengiriman dan penerimaan saluran dapat dilihat bahwa select telah ditangani, dapat menentukan apakah saluran tersedia dalam kondisi non-blocking, kedua adalah randomisasi, jika ada beberapa saluran yang tersedia, ia akan memilih satu secara acak untuk dieksekusi, tidak mengikuti urutan yang telah ditentukan dapat membuat setiap saluran dieksekusi secara relatif adil, jika tidak dalam kondisi ekstrem beberapa saluran mungkin tidak akan pernah diproses. Karena semua pekerjaannya terkait dengan saluran, disarankan untuk membaca artikel chan terlebih dahulu, setelah memahami saluran kemudian memahami select akan lebih lancar.
Struktur
Runtime hanya memiliki satu struktur runtime.scase yang merepresentasikan cabang select, setiap case dalam runtime direpresentasikan sebagai scase.
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}Di antaranya c mengacu pada saluran, elem menunjukkan pointer elemen yang diterima atau dikirim, sebenarnya kata kunci select mengacu pada fungsi runtime.selectgo.
Prinsip
Cara penggunaan select dibagi oleh Go menjadi empat situasi untuk dioptimalkan, hal ini dapat dilihat di fungsi cmd/compile/internal/walk.walkSelectCases untuk logika penanganan keempat situasi ini.
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimasi: select tanpa case
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimasi: select satu case: operasi tunggal.
if ncas == 1 {
...
}
// optimasi: select dua case tetapi satu adalah default: operasi non-blocking tunggal.
if ncas == 2 && dflt != nil {
...
}
...
return init
}Optimasi
Compiler akan mengoptimalkan tiga situasi pertama, situasi pertama adalah ketika jumlah case adalah 0 yaitu select kosong, kita semua tahu bahwa select kosong akan menyebabkan goroutine saat ini tersumbat secara permanen.
select{}Alasan tersumbat adalah karena compiler menerjemahkannya menjadi pemanggilan langsung ke fungsi runtime.block
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}Dan fungsi block memanggil fungsi runtime.gopark, membuat goroutine saat ini berubah menjadi status _Gwaiting, dan masuk ke blocking permanen, tidak akan pernah mendapatkan penjadwalan lagi.
Situasi kedua, hanya ada satu case dan bukan default, dalam situasi ini compiler akan langsung menerjemahkannya menjadi operasi pengiriman/penerimaan saluran, dan masih blocking, misalnya kode berikut
func main() {
ch := make(chan int)
select {
case <-ch:
// lakukan sesuatu
}
}Ia akan diterjemahkan menjadi pemanggilan langsung ke fungsi runtime.chanrecv1, dapat dilihat dari kode assembly
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...Dalam kasus satu case, mengirim data ke saluran juga sama, ia akan diterjemahkan menjadi pemanggilan langsung ke fungsi runtime.chansend1, juga blocking.
Situasi ketiga, ada dua case dan salah satunya adalah default
func main() {
ch := make(chan int)
select {
case ch <- 1:
// lakukan sesuatu
default:
// lakukan sesuatu
}
}Situasi ini akan diterjemahkan menjadi pernyataan if yang memanggil runtime.selectnbsend, sebagai berikut
if selectnbsend(ch, 1) {
// lakukan sesuatu
} else {
// lakukan sesuatu
}Jika menerima data saluran akan diterjemahkan menjadi pemanggilan runtime.selectnbrecv
ch := make(chan int)
select {
case x, ok := <-ch:
// lakukan sesuatu
default:
// lakukan sesuatu
}if selected, ok = selectnbrecv(&v, c); selected {
// lakukan sesuatu
} else {
// lakukan sesuatu
}Yang perlu diperhatikan adalah, dalam situasi ini penerimaan atau pengiriman saluran adalah non-blocking, kita dapat dengan jelas melihat bahwa parameter block adalah false.
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}Dan baik untuk mengirim atau menerima data saluran, ketika block adalah false ada jalur cepat yang dapat menentukan apakah dapat mengirim atau menerima data tanpa mengunci, seperti yang ditunjukkan berikut
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}Saat membaca saluran, jika saluran kosong akan langsung return, saat menulis saluran, jika saluran tidak ditutup dan sudah penuh juga akan langsung return, dalam kondisi umum ini akan menyebabkan goroutine blocking, tetapi ketika dikombinasikan dengan select tidak akan.
Penanganan
Tiga situasi di atas hanyalah optimasi untuk kasus khusus, kata kunci select yang digunakan secara normal akan diterjemahkan menjadi pemanggilan fungsi runtime.selectgo, logika penanganannya mencapai lebih dari 400 baris.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)Compiler akan mengumpulkan semua pernyataan case menjadi array scase, kemudian meneruskannya ke fungsi selectgo, setelah penanganan selesai mengembalikan dua nilai return
- Yang pertama adalah indeks saluran yang dipilih secara acak, menunjukkan saluran mana yang diproses, jika tidak ada return -1
- Yang kedua menunjukkan apakah operasi baca saluran berhasil membaca
Berikut penjelasan singkat untuk parameternya
cas0, pointer kepala arrayscase, bagian pertama存放的是写管道 case,后半部分存放读管道 case,以nsends来区分order0, panjangnya dua kali arrayscase, bagian pertama dialokasikan untuk arraypollorder, bagian kedua dialokasikan untuk arraylockordernsendsdannrecvsmenunjukkan jumlah case baca/tulis saluran, jumlah keduanya adalah total caseblockmenunjukkan apakah blocking, jika ada casedefaultmewakili non-blocking, nilainyafalse, jika tidaktrue.pc0, menunjuk ke kepala array[ncases]uintptr, digunakan untuk analisis race, nanti dapat diabaikan, tidak membantu untuk memahami select
Misalkan ada kode berikut
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}Lihat bentuk assembly-nya, di sini untuk memudahkan pemahaman menghilangkan sebagian kode
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4 beberapa variabel sementara
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // panggil fungsi runtime.selectgo
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // lompat ke cabang default
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // lompat ke cabang 4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // lompat ke cabang 3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // lompat ke cabang 2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RETDapat dilihat bahwa setelah memanggil fungsi selectgo ada logika判断+lonjakan, melalui ini kita tidak sulit untuk menyimpulkan bentuk aslinya
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}Kode yang dihasilkan compiler mungkin berbeda dengan ini, tetapi maksudnya kurang lebih sama. Jadi compiler setelah memanggil fungsi selectgo akan menggunakan pernyataan if untuk menentukan saluran mana yang akan dieksekusi, dan sebelum memanggil, compiler juga akan menghasilkan loop for untuk mengumpulkan array scase tetapi di sini dihilangkan.
Setelah mengetahui bagaimana eksternal menggunakan fungsi selectgo, di bawah ini mari pahami bagaimana fungsi selectgo bekerja di internal. Pertama akan menginisialisasi beberapa array, nsends+nrecvs menunjukkan total case, dari kode di bawah juga dapat dilihat bahwa nilai maksimum jumlah case adalah 1 << 16, pollorder menentukan urutan eksekusi saluran, lockorder menentukan urutan penguncian saluran.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// panjangnya dua kali array scase, bagian pertama dialokasikan untuk array pollorder, bagian kedua dialokasikan untuk array lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]Selanjutnya inisialisasi array pollorder, ia存放的是待执行管道的sacses数组下标
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]Ia akan iterasi seluruh array scases, kemudian melalui runtime.fastrandn menghasilkan angka acak antara [0, i], kemudian menukarnya dengan i, selama proses akan melewatkan case yang salurannya nil, setelah iterasi selesai mendapatkan array pollorder yang elemennya diacak, seperti gambar berikut

Kemudian terhadap array pollorder berdasarkan ukuran alamat saluran menggunakan heap sort mendapatkan array lockorder, kemudian panggil runtime.sellock untuk menguncinya sesuai urutan
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}Yang perlu diperhatikan di sini adalah, mengunci saluran berdasarkan ukuran alamat adalah untuk menghindari deadlock, karena operasi select itu sendiri tidak mengizinkan konkurensi. Asumsikan mengunci sesuai urutan acak pollorder, maka pertimbangkan situasi kode berikut
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()Tiga goroutine ABC semua sampai ke langkah penguncian ini, dan urutan penguncian mereka彼此 acak tidak sama, mungkin menyebabkan situasi seperti ini, seperti gambar berikut

Asumsikan urutan penguncian ABC sama dengan gambar di atas, maka kemungkinan menyebabkan deadlock sangat besar, misalnya A akan pertama memegang kunci ch2, kemudian mencoba mendapatkan kunci ch1, tetapi asumsikan ch1 sudah dikunci oleh goroutine B, goroutine B akan mencoba mendapatkan kunci ch2, maka ini menyebabkan deadlock.

Jika semua goroutine mengunci sesuai urutan yang sama, tidak akan terjadi masalah deadlock, ini juga alasan mendasar mengapa lockorder harus diurutkan berdasarkan ukuran alamat.
Setelah mengunci, dimulai tahap penanganan yang sebenarnya, pertama iterasi array pollorder, sesuai urutan yang diacak sebelumnya mengakses saluran, iterasi satu per satu menemukan saluran yang tersedia
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // baca saluran
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // tulis saluran
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}Dapat dilihat di sini terhadap baca/tulis saluran dilakukan penanganan 6 situasi, di bawah masing-masing dijelaskan. Situasi pertama, membaca saluran dan ada pengirim sedang menunggu mengirim, di sini akan masuk ke fungsi runtime.recv, fungsinya sudah dijelaskan, pada akhirnya akan membangunkan goroutine pengirim, sebelum membangunkan fungsi callback akan membuka semua kunci saluran.
recv:
// dapat menerima dari pengirim yang tidur (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retcSituasi kedua, membaca saluran, tidak ada pengirim sedang menunggu, jumlah elemen buffer lebih besar dari 0, di sini akan langsung membaca data dari buffer, logikanya sama persis dengan runtime.chanrecv, kemudian membuka kunci.
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retcSituasi ketiga, membaca saluran, tetapi saluran sudah ditutup, dan tidak ada elemen tersisa di buffer, di sini akan pertama membuka kunci kemudian langsung return.
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retcSituasi keempat, mengirim data ke saluran yang sudah ditutup, di sini akan pertama membuka kunci kemudian panic,
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))Situasi kelima, ada penerima sedang blocking menunggu, di sini akan memanggil fungsi runtime.send, dan pada akhirnya membangunkan goroutine penerima, sebelum membangunkan fungsi callback akan membuka semua kunci saluran.
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retcSituasi keenam, tidak ada goroutine penerima menunggu, masukkan data yang akan dikirim ke buffer, kemudian buka kunci.
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retcKemudian semua situasi di atas akhirnya akan masuk ke cabang retc, dan yang harus dilakukannya hanya mengembalikan indeks saluran yang dipilih casi dan recvOk yang menunjukkan apakah baca berhasil.
retc:
return casi, recvOKSituasi ketujuh, tidak menemukan saluran yang tersedia, dan kode mengandung cabang default, maka buka kunci saluran kemudian langsung return, di sini casi yang return adalah -1 yaitu menunjukkan tidak ada saluran yang tersedia.
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}Situasi terakhir, tidak menemukan saluran yang tersedia, dan kode tidak mengandung cabang default, maka goroutine saat ini akan masuk ke status blocking, sebelum ini selectgo akan menambahkan goroutine saat ini ke antrian recvq/sendq semua saluran yang dipantau
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}Di sini akan membuat beberapa sudog dan menghubungkannya dengan saluran yang sesuai, seperti gambar berikut

Kemudian runtime.gopark blocking, sebelum blocking akan membuka kunci saluran, pekerjaan membuka kunci diselesaikan oleh fungsi runtime.selparkcommit, ia diteruskan sebagai fungsi callback ke gopark.
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = falseHal pertama setelah dibangunkan adalah melepaskan tautan sudog dengan saluran
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nilKemudian hapus sudog dari antrian tunggu saluran sebelumnya
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg sudah di-dequeue oleh G yang membangunkan kita.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}Dalam proses di atas pasti akan menemukan saluran yang diproses oleh goroutine yang membangunkan, kemudian做出 penanganan terakhir berdasarkan caseSuccess. Untuk operasi tulis而言,sg.success adalah false mewakili saluran sudah ditutup, dan seluruh runtime go hanya fungsi close yang akan secara aktif mengatur field ini menjadi false, ini menunjukkan bahwa goroutine saat ini dibangunkan oleh peng唤醒方 melalui fungsi close. Untuk operasi baca而言,jika dibangunkan oleh pengirim, operasi baca data juga sudah diselesaikan oleh pengirim melalui fungsi runtime.send sebelum dibangunkan, nilainya true, jika dibangunkan oleh fungsi close, sama seperti sebelumnya langsung return.
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retcSampai di sini seluruh logika select sudah大致 jelas, di atas dibagi menjadi beberapa situasi, dapat dilihat penanganan select masih cukup kompleks.
