select
select è una struttura che può monitorare simultaneamente più stati di channel. La sua sintassi è simile a switch.
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}Questo codice, combinando context, channel e select, implementa una logica semplice per l'arresto graduale del programma. Nel codice, select monitora simultaneamente due channel ctx.Done e finished. Le condizioni di uscita sono due: primo, il sistema operativo invia un segnale di uscita; secondo, il channel finished ha messaggi da leggere, ovvero il compito del codice utente è completato. In questo modo possiamo fare lavoro di收尾 quando il programma esce.
Come è noto, select ha due caratteristiche molto importanti: primo, non bloccante, come si può vedere nel codice sorgente di invio e ricezione dei channel, è stato fatto qualche trattamento per select, permettendo di判断 se un channel è disponibile in condizioni non bloccanti; secondo, randomizzazione, se ci sono più channel disponibili, ne seleziona uno casualmente da eseguire, non seguendo l'ordine stabilito. Questo permette a ogni channel di essere eseguito in modo relativamente equo, altrimenti in casi estremi alcuni channel potrebbero non essere mai elaborati. Poiché tutto il suo lavoro è correlato ai channel, si consiglia prima di leggere l'articolo chan. Dopo aver compreso i channel, capire select sarà molto più fluido.
Struttura
A runtime c'è solo una struttura runtime.scase che rappresenta un分支 select. Ogni case a runtime è rappresentato da scase.
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}Dove c si riferisce al channel, elem è il puntatore all'elemento ricevuto o inviato. In realtà, la parola chiave select si riferisce alla funzione runtime.selectgo.
Principio
Go divide l'uso di select in quattro situazioni per l'ottimizzazione. Questo si può vedere nella funzione cmd/compile/internal/walk.walkSelectCases, dove c'è la logica di trattamento per queste quattro situazioni.
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimization: zero-case select
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimization: one-case select: single op.
if ncas == 1 {
...
}
// optimization: two-case select but one is default: single non-blocking op.
if ncas == 2 && dflt != nil {
...
}
...
return init
}Ottimizzazione
Il compilatore ottimizza le prime tre situazioni. La prima situazione è quando il numero di case è 0, ovvero un select vuoto. Sappiamo tutti che un'istruzione select vuota causa il blocco permanente della goroutine corrente.
select{}Il motivo per cui si blocca è che il compilatore lo traduce in una chiamata diretta alla funzione runtime.block:
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}E la funzione block chiama la funzione runtime.gopark, portando la goroutine corrente nello stato _Gwaiting ed entrando in blocco permanente, senza mai più essere schedulata.
La seconda situazione, c'è solo un case e non è default. In questo caso il compilatore lo traduce direttamente in un'operazione di invio/ricezione sul channel, ed è anche bloccante. Ad esempio, il seguente codice:
func main() {
ch := make(chan int)
select {
case <-ch:
// do something
}
}Viene tradotto in una chiamata diretta alla funzione runtime.chanrecv1, come si può vedere dal codice assembly:
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...Nel caso di un solo case, anche l'invio di dati al channel segue lo stesso principio. Viene tradotto in una chiamata diretta alla funzione runtime.chansend1, ed è anche bloccante.
La terza situazione, ci sono due case e uno di essi è default:
func main() {
ch := make(chan int)
select {
case ch <- 1:
// do something
default:
// do something
}
}In questo caso viene tradotto in un'istruzione if che chiama la funzione runtime.selectnbsend, come segue:
if selectnbsend(ch, 1) {
// do something
} else {
// do something
}Se si ricevono dati dal channel, viene tradotto in una chiamata a runtime.selectnbrecv:
ch := make(chan int)
select {
case x, ok := <-ch:
// do something
default:
// do something
}if selected, ok = selectnbrecv(&v, c); selected {
// do something
} else {
// do something
}Vale la pena notare che in questo caso l'invio o la ricezione del channel è non bloccante. Possiamo vedere chiaramente che il parametro block è false.
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}Sia per l'invio che per la ricezione di dati dal channel, quando block è false, c'è un percorso rapido che può判断 se si può inviare o ricevere dati senza acquisire il lock, come mostrato di seguito:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}Durante la lettura dal channel, se il channel è vuoto restituisce direttamente. Durante la scrittura sul channel, se il channel non è chiuso ed è pieno, restituisce direttamente. In condizioni normali causerebbero il blocco della goroutine, ma combinati con select non lo fanno.
Trattamento
Le tre situazioni sopra sono solo ottimizzazioni per casi speciali. La parola chiave select usata normalmente viene tradotta in una chiamata alla funzione runtime.selectgo, la cui logica di trattamento è lunga oltre 400 righe.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)Il compilatore raccoglie tutte le istruzioni case in un array scase, poi lo passa alla funzione selectgo. Dopo il trattamento, restituisce due valori di ritorno:
- Il primo è l'indice del channel selezionato casualmente, indicando quale channel è stato elaborato. Se nessuno, restituisce -1.
- Il secondo indica se l'operazione di lettura dal channel è avvenuta con successo.
Qui spiego brevemente i suoi parametri:
cas0: puntatore alla testa dell'arrayscase. La prima metà contiene i case di scrittura sul channel, la seconda metà i case di lettura dal channel, distinti dansends.order0: la sua lunghezza è il doppio dell'arrayscase. La prima metà è allocata all'arraypollorder, la seconda metà all'arraylockorder.nsendsenrecvs: numero di case di lettura/scrittura sul channel. La loro somma è il numero totale di case.block: indica se è bloccante. Se c'è un casedefault, rappresenta non bloccante, il suo valore èfalse, altrimenti ètrue.pc0: punta alla testa di un array[ncases]uintptr, usato per l'analisi race condition. Può essere ignorato in seguito, non aiuta a comprendere select.
Supponiamo di avere il seguente codice:
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}Visualizzando la sua forma assembly, qui per comodità di comprensione ho omesso parte del codice:
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4 alcune variabili temporanee
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // chiama la funzione runtime.selectgo
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // salta al分支 default
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // salta al分支 4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // salta al分支 3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // salta al分支 2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RETCome si può vedere, dopo aver chiamato la funzione selectgo, c'è una logica di判断+salto. Attraverso questo non è difficile dedurre la sua forma originale:
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}Il codice effettivo generato dal compilatore potrebbe essere diverso da questo, ma il significato generale è più o meno lo stesso. Quindi il compilatore, dopo aver chiamato la funzione selectgo, usa un'istruzione if per判断 quale channel deve essere eseguito. Prima della chiamata, il compilatore genera anche un ciclo for per raccogliere l'array scase, ma qui l'ho omesso.
Dopo aver compreso come selectgo viene usato esternamente, ora vediamo come funziona internamente la funzione selectgo. Per prima cosa inizializza alcuni array. nsends+nrecvs rappresenta il numero totale di case. Dal codice sottostante si può anche vedere che il numero massimo di case è 1 << 16. pollorder determina l'ordine di esecuzione dei channel, lockorder determina l'ordine di lock dei channel.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// La sua lunghezza è il doppio dell'array scase, la prima metà è allocata all'array pollorder, la seconda metà all'array lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]Poi inizializza l'array pollorder, che contiene gli indici dell'array sacses dei channel in attesa di esecuzione:
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]Traversa l'intero array scases, poi genera un numero casuale tra [0, i] tramite runtime.fastrandn, quindi lo scambia con i. Durante il processo salta i case con channel nil. Dopo il completamento della traversata, si ottiene un array pollorder con elementi mescolati, come mostrato nella figura seguente:

Poi ordina l'array pollorder in base all'indirizzo dei channel usando heap sort, ottenendo l'array lockorder. Quindi chiama runtime.sellock per acquisire i lock in ordine:
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}Vale la pena notare che ordinare i channel per indirizzo serve a evitare deadlock, poiché l'operazione select stessa non richiede lock per consentire la concorrenza. Supponiamo di acquisire i lock in ordine casuale secondo pollorder. Consideriamo il seguente caso di codice:
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()Tre goroutine ABC arrivano tutte a questo passo di acquisizione del lock, e il loro ordine di acquisizione è casuale e diverso tra loro. Potrebbe verificarsi la seguente situazione, come mostrato nella figura:

Supponiamo che l'ordine di acquisizione del lock di ABC sia come nella figura sopra. Allora la possibilità di deadlock è molto alta. Ad esempio, A acquisisce prima il lock di ch2, poi prova ad acquisire il lock di ch1. Ma supponiamo che ch1 sia già stato acquisito dalla goroutine B. La goroutine B prova ad acquisire il lock di ch2, causando così un deadlock.

Se tutte le goroutine acquisiscono i lock nello stesso ordine, non si verificheranno problemi di deadlock. Questo è il motivo fondamentale per cui lockorder deve essere ordinato per indirizzo.
Dopo aver acquisito i lock, inizia la vera fase di trattamento. Per prima cosa traversa l'array pollorder, accedendo ai channel nell'ordine mescolato precedente, traversando uno per uno per trovare un channel disponibile:
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // lettura channel
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // scrittura channel
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}Come si può vedere, qui sono stati fatti 6 tipi di trattamento per i channel di lettura/scrittura. Di seguito li spiego separatamente. Primo caso, lettura channel con mittente in attesa di invio. Qui si arriva alla funzione runtime.recv, il cui ruolo è già stato spiegato. Alla fine risveglia la goroutine mittente. Prima di risvegliare, la funzione di callback sblocca tutti i channel.
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retcSecondo caso, lettura channel, nessun mittente in attesa, numero di elementi nel buffer maggiore di 0. Qui si legge direttamente dal buffer. La logica è completamente identica a runtime.chanrecv, poi si sblocca.
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retcTerzo caso, lettura channel, ma il channel è chiuso e non ci sono elementi rimanenti nel buffer. Qui si sblocca prima e poi si restituisce direttamente.
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retcQuarto caso, invio dati a un channel chiuso. Qui si sblocca prima e poi si va in panic.
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))Quinto caso, c'è un ricevitore in attesa di blocco. Qui si chiama la funzione runitme.send, e alla fine si risveglia la goroutine ricevente. Prima di risvegliare, la funzione di callback sblocca tutti i channel.
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retcSesto caso, nessuna goroutine ricevente in attesa. Si mettono i dati da inviare nel buffer, poi si sblocca.
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retcPoi tutte le situazioni sopra alla fine arrivano al分支 retc, che deve solo restituire l'indice del channel selezionato casi e recvOk che rappresenta se la lettura è avvenuta con successo.
retc:
return casi, recvOKSettimo caso, nessun channel disponibile trovato e il codice contiene un分支 default. Allora si sbloccano i channel e si restituisce direttamente. Qui casi restituito è -1, indicando che non ci sono channel disponibili.
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}Ultimo caso, nessun channel disponibile trovato e il codice non contiene un分支 default. Allora la goroutine corrente entra in stato di blocco. Prima di questo, selectgo aggiunge la goroutine corrente a tutte le code recvq/sendq dei channel monitorati:
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}Qui vengono creati diversi sudog e collegati ai channel corrispondenti, come mostrato nella figura seguente:

Poi si blocca tramite runtime.gopark. Prima di bloccare, si sbloccano i channel. Il lavoro di sblocco è completato dalla funzione runtime.selparkcommit, che viene passata come funzione di callback a gopark.
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = falseLa prima cosa dopo essere stati risvegliati è解除 il collegamento tra sudog e channel:
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nilPoi si rimuove sudog dalle code di attesa dei channel precedenti:
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}Nel processo sopra si deve trovare un channel elaborato dalla goroutine che ha risvegliato. Poi si fa l'ultimo trattamento in base a caseSuccess. Per le operazioni di scrittura, sg.success uguale a false rappresenta che il channel è chiuso. Inoltre, in tutto il runtime go, solo la funzione close imposta attivamente questo campo a false, il che indica che la goroutine corrente è stata risvegliata dal risvegliante tramite la funzione close. Per le operazioni di lettura, se si è stati risvegliati dal mittente, l'operazione di lettura dei dati è già stata completata prima di essere risvegliati dal mittente tramite la funzione runtime.send, il cui valore è true. Se si è stati risvegliati dalla funzione close, come prima, si restituisce direttamente.
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retcA questo punto l'intera logica di select è stata chiarita. Sopra sono state divise diverse situazioni, come si può vedere select è piuttosto complesso da trattare.
