Skip to content

select

select è una struttura che può monitorare simultaneamente più stati di channel. La sua sintassi è simile a switch.

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Questo codice, combinando context, channel e select, implementa una logica semplice per l'arresto graduale del programma. Nel codice, select monitora simultaneamente due channel ctx.Done e finished. Le condizioni di uscita sono due: primo, il sistema operativo invia un segnale di uscita; secondo, il channel finished ha messaggi da leggere, ovvero il compito del codice utente è completato. In questo modo possiamo fare lavoro di收尾 quando il programma esce.

Come è noto, select ha due caratteristiche molto importanti: primo, non bloccante, come si può vedere nel codice sorgente di invio e ricezione dei channel, è stato fatto qualche trattamento per select, permettendo di判断 se un channel è disponibile in condizioni non bloccanti; secondo, randomizzazione, se ci sono più channel disponibili, ne seleziona uno casualmente da eseguire, non seguendo l'ordine stabilito. Questo permette a ogni channel di essere eseguito in modo relativamente equo, altrimenti in casi estremi alcuni channel potrebbero non essere mai elaborati. Poiché tutto il suo lavoro è correlato ai channel, si consiglia prima di leggere l'articolo chan. Dopo aver compreso i channel, capire select sarà molto più fluido.

Struttura

A runtime c'è solo una struttura runtime.scase che rappresenta un分支 select. Ogni case a runtime è rappresentato da scase.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

Dove c si riferisce al channel, elem è il puntatore all'elemento ricevuto o inviato. In realtà, la parola chiave select si riferisce alla funzione runtime.selectgo.

Principio

Go divide l'uso di select in quattro situazioni per l'ottimizzazione. Questo si può vedere nella funzione cmd/compile/internal/walk.walkSelectCases, dove c'è la logica di trattamento per queste quattro situazioni.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimization: zero-case select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimization: one-case select: single op.
  if ncas == 1 {
    ...
  }

  // optimization: two-case select but one is default: single non-blocking op.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Ottimizzazione

Il compilatore ottimizza le prime tre situazioni. La prima situazione è quando il numero di case è 0, ovvero un select vuoto. Sappiamo tutti che un'istruzione select vuota causa il blocco permanente della goroutine corrente.

go
select{}

Il motivo per cui si blocca è che il compilatore lo traduce in una chiamata diretta alla funzione runtime.block:

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

E la funzione block chiama la funzione runtime.gopark, portando la goroutine corrente nello stato _Gwaiting ed entrando in blocco permanente, senza mai più essere schedulata.

La seconda situazione, c'è solo un case e non è default. In questo caso il compilatore lo traduce direttamente in un'operazione di invio/ricezione sul channel, ed è anche bloccante. Ad esempio, il seguente codice:

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

Viene tradotto in una chiamata diretta alla funzione runtime.chanrecv1, come si può vedere dal codice assembly:

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

Nel caso di un solo case, anche l'invio di dati al channel segue lo stesso principio. Viene tradotto in una chiamata diretta alla funzione runtime.chansend1, ed è anche bloccante.

La terza situazione, ci sono due case e uno di essi è default:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

In questo caso viene tradotto in un'istruzione if che chiama la funzione runtime.selectnbsend, come segue:

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

Se si ricevono dati dal channel, viene tradotto in una chiamata a runtime.selectnbrecv:

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

Vale la pena notare che in questo caso l'invio o la ricezione del channel è non bloccante. Possiamo vedere chiaramente che il parametro block è false.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Sia per l'invio che per la ricezione di dati dal channel, quando block è false, c'è un percorso rapido che può判断 se si può inviare o ricevere dati senza acquisire il lock, come mostrato di seguito:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Durante la lettura dal channel, se il channel è vuoto restituisce direttamente. Durante la scrittura sul channel, se il channel non è chiuso ed è pieno, restituisce direttamente. In condizioni normali causerebbero il blocco della goroutine, ma combinati con select non lo fanno.

Trattamento

Le tre situazioni sopra sono solo ottimizzazioni per casi speciali. La parola chiave select usata normalmente viene tradotta in una chiamata alla funzione runtime.selectgo, la cui logica di trattamento è lunga oltre 400 righe.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Il compilatore raccoglie tutte le istruzioni case in un array scase, poi lo passa alla funzione selectgo. Dopo il trattamento, restituisce due valori di ritorno:

  1. Il primo è l'indice del channel selezionato casualmente, indicando quale channel è stato elaborato. Se nessuno, restituisce -1.
  2. Il secondo indica se l'operazione di lettura dal channel è avvenuta con successo.

Qui spiego brevemente i suoi parametri:

  • cas0: puntatore alla testa dell'array scase. La prima metà contiene i case di scrittura sul channel, la seconda metà i case di lettura dal channel, distinti da nsends.
  • order0: la sua lunghezza è il doppio dell'array scase. La prima metà è allocata all'array pollorder, la seconda metà all'array lockorder.
  • nsends e nrecvs: numero di case di lettura/scrittura sul channel. La loro somma è il numero totale di case.
  • block: indica se è bloccante. Se c'è un case default, rappresenta non bloccante, il suo valore è false, altrimenti è true.
  • pc0: punta alla testa di un array [ncases]uintptr, usato per l'analisi race condition. Può essere ignorato in seguito, non aiuta a comprendere select.

Supponiamo di avere il seguente codice:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Visualizzando la sua forma assembly, qui per comodità di comprensione ho omesso parte del codice:

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // 1 2 3 4 alcune variabili temporanee
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // chiama la funzione runtime.selectgo
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // salta al分支 default
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // salta al分支 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // salta al分支 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // salta al分支 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Come si può vedere, dopo aver chiamato la funzione selectgo, c'è una logica di判断+salto. Attraverso questo non è difficile dedurre la sua forma originale:

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Il codice effettivo generato dal compilatore potrebbe essere diverso da questo, ma il significato generale è più o meno lo stesso. Quindi il compilatore, dopo aver chiamato la funzione selectgo, usa un'istruzione if per判断 quale channel deve essere eseguito. Prima della chiamata, il compilatore genera anche un ciclo for per raccogliere l'array scase, ma qui l'ho omesso.

Dopo aver compreso come selectgo viene usato esternamente, ora vediamo come funziona internamente la funzione selectgo. Per prima cosa inizializza alcuni array. nsends+nrecvs rappresenta il numero totale di case. Dal codice sottostante si può anche vedere che il numero massimo di case è 1 << 16. pollorder determina l'ordine di esecuzione dei channel, lockorder determina l'ordine di lock dei channel.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// La sua lunghezza è il doppio dell'array scase, la prima metà è allocata all'array pollorder, la seconda metà all'array lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Poi inizializza l'array pollorder, che contiene gli indici dell'array sacses dei channel in attesa di esecuzione:

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Traversa l'intero array scases, poi genera un numero casuale tra [0, i] tramite runtime.fastrandn, quindi lo scambia con i. Durante il processo salta i case con channel nil. Dopo il completamento della traversata, si ottiene un array pollorder con elementi mescolati, come mostrato nella figura seguente:

Poi ordina l'array pollorder in base all'indirizzo dei channel usando heap sort, ottenendo l'array lockorder. Quindi chiama runtime.sellock per acquisire i lock in ordine:

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Vale la pena notare che ordinare i channel per indirizzo serve a evitare deadlock, poiché l'operazione select stessa non richiede lock per consentire la concorrenza. Supponiamo di acquisire i lock in ordine casuale secondo pollorder. Consideriamo il seguente caso di codice:

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Tre goroutine ABC arrivano tutte a questo passo di acquisizione del lock, e il loro ordine di acquisizione è casuale e diverso tra loro. Potrebbe verificarsi la seguente situazione, come mostrato nella figura:

Supponiamo che l'ordine di acquisizione del lock di ABC sia come nella figura sopra. Allora la possibilità di deadlock è molto alta. Ad esempio, A acquisisce prima il lock di ch2, poi prova ad acquisire il lock di ch1. Ma supponiamo che ch1 sia già stato acquisito dalla goroutine B. La goroutine B prova ad acquisire il lock di ch2, causando così un deadlock.

Se tutte le goroutine acquisiscono i lock nello stesso ordine, non si verificheranno problemi di deadlock. Questo è il motivo fondamentale per cui lockorder deve essere ordinato per indirizzo.

Dopo aver acquisito i lock, inizia la vera fase di trattamento. Per prima cosa traversa l'array pollorder, accedendo ai channel nell'ordine mescolato precedente, traversando uno per uno per trovare un channel disponibile:

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // lettura channel
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // scrittura channel
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Come si può vedere, qui sono stati fatti 6 tipi di trattamento per i channel di lettura/scrittura. Di seguito li spiego separatamente. Primo caso, lettura channel con mittente in attesa di invio. Qui si arriva alla funzione runtime.recv, il cui ruolo è già stato spiegato. Alla fine risveglia la goroutine mittente. Prima di risvegliare, la funzione di callback sblocca tutti i channel.

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Secondo caso, lettura channel, nessun mittente in attesa, numero di elementi nel buffer maggiore di 0. Qui si legge direttamente dal buffer. La logica è completamente identica a runtime.chanrecv, poi si sblocca.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Terzo caso, lettura channel, ma il channel è chiuso e non ci sono elementi rimanenti nel buffer. Qui si sblocca prima e poi si restituisce direttamente.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Quarto caso, invio dati a un channel chiuso. Qui si sblocca prima e poi si va in panic.

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Quinto caso, c'è un ricevitore in attesa di blocco. Qui si chiama la funzione runitme.send, e alla fine si risveglia la goroutine ricevente. Prima di risvegliare, la funzione di callback sblocca tutti i channel.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Sesto caso, nessuna goroutine ricevente in attesa. Si mettono i dati da inviare nel buffer, poi si sblocca.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Poi tutte le situazioni sopra alla fine arrivano al分支 retc, che deve solo restituire l'indice del channel selezionato casi e recvOk che rappresenta se la lettura è avvenuta con successo.

go
retc:
    return casi, recvOK

Settimo caso, nessun channel disponibile trovato e il codice contiene un分支 default. Allora si sbloccano i channel e si restituisce direttamente. Qui casi restituito è -1, indicando che non ci sono channel disponibili.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Ultimo caso, nessun channel disponibile trovato e il codice non contiene un分支 default. Allora la goroutine corrente entra in stato di blocco. Prima di questo, selectgo aggiunge la goroutine corrente a tutte le code recvq/sendq dei channel monitorati:

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Qui vengono creati diversi sudog e collegati ai channel corrispondenti, come mostrato nella figura seguente:

Poi si blocca tramite runtime.gopark. Prima di bloccare, si sbloccano i channel. Il lavoro di sblocco è completato dalla funzione runtime.selparkcommit, che viene passata come funzione di callback a gopark.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

La prima cosa dopo essere stati risvegliati è解除 il collegamento tra sudog e channel:

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Poi si rimuove sudog dalle code di attesa dei channel precedenti:

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

Nel processo sopra si deve trovare un channel elaborato dalla goroutine che ha risvegliato. Poi si fa l'ultimo trattamento in base a caseSuccess. Per le operazioni di scrittura, sg.success uguale a false rappresenta che il channel è chiuso. Inoltre, in tutto il runtime go, solo la funzione close imposta attivamente questo campo a false, il che indica che la goroutine corrente è stata risvegliata dal risvegliante tramite la funzione close. Per le operazioni di lettura, se si è stati risvegliati dal mittente, l'operazione di lettura dei dati è già stata completata prima di essere risvegliati dal mittente tramite la funzione runtime.send, il cui valore è true. Se si è stati risvegliati dalla funzione close, come prima, si restituisce direttamente.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

A questo punto l'intera logica di select è stata chiarita. Sopra sono state divise diverse situazioni, come si può vedere select è piuttosto complesso da trattare.

Golang by www.golangdev.cn edit