chan
I channel sono una struttura dati speciale che rappresenta l'implementazione del paradigma CSP in Go. Il cuore del paradigma CSP è che i processi si scambiano dati tramite messaggi. Con i channel possiamo facilmente comunicare tra goroutine.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Oltre alla comunicazione, i channel possono anche essere usati per sincronizzare le goroutine. In un sistema che richiede concorrenza, i channel sono quasi onnipresenti. Per comprendere meglio come funzionano i channel, di seguito ne illustreremo i principi.
Struttura
I channel sono rappresentati a runtime dalla struttura runtime.hchan, che contiene i seguenti campi:
type hchan struct {
qcount uint // totale dati nella coda
dataqsiz uint // dimensione della coda circolare
buf unsafe.Pointer // punta a un array di elementi dataqsiz
elemsize uint16
closed uint32
elemtype *_type // tipo di elemento
sendx uint // indice di invio
recvx uint // indice di ricezione
recvq waitq // lista di attesa per ricezione
sendq waitq // lista di attesa per invio
lock mutex
}Come si può vedere chiaramente, c'è il campo lock. I channel sono effettivamente una coda circolare sincronizzata con lock. Gli altri campi sono spiegati di seguito:
qcount: numero totale di datidataqsize: dimensione della coda circolarebuf: puntatore a un array di dimensionedataqsize, ovvero la coda circolareclosed: indica se il channel è chiusosendx,recvx: indici di invio e ricezionesendq,recvq: liste di attesa per le goroutine di invio e ricezione, composte da elementiruntime.sudoggotype waitq struct { first *sudog last *sudog }
Con il diagramma seguente si può comprendere chiaramente la struttura di un channel:

Quando si usano le funzioni len e cap su un channel, restituiscono rispettivamente i campi hchan.qcount e hchan.dataqsiz.
Creazione
Normalmente c'è un solo modo per creare un channel, usando la funzione make:
ch := make(chan int, size)Il compilatore lo tradurrà in una chiamata alla funzione runtime.makechan, che si occupa della creazione effettiva del channel. Il codice è il seguente:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Questa logica è piuttosto semplice: si occupa principalmente di allocare memoria per il channel. Per prima cosa calcola la memoria necessaria in base al size e al tipo di elemento elem.Size_, quindi gestisce tre casi:
sizeper 0: alloca solohchanSize- L'elemento non contiene puntatori: alloca lo spazio di memoria corrispondente, e la memoria della coda circolare è continua con quella del channel
- L'elemento contiene puntatori: la memoria del channel e della coda circolare viene allocata separatamente
Se gli elementi nella coda circolare sono puntatori, poiché referenziano elementi esterni, il GC potrebbe scansionare questi puntatori durante la fase di mark-and-sweep. Quando gli elementi non sono puntatori e sono allocati in memoria continua, si evitano scansioni non necessarie. Dopo l'allocazione della memoria, vengono aggiornati gli altri campi informativi.
A proposito, quando la capacità del channel è un intero a 64 bit, viene usata la funzione runtime.makechan64 per la creazione. Essa è essenzialmente una chiamata a runtime.makechan, con un controllo di tipo aggiuntivo:
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}In generale, è meglio non superare math.MaxInt32 per size.
Invio
Quando si inviano dati a un channel, si posizionano i dati da inviare a destra della freccia:
ch <- struct{}{}Il compilatore lo tradurrà in runtime.chansend1. La funzione che si occupa effettivamente dell'invio dei dati è runtime.chansend, mentre chansend1 le passa un puntatore elem che punta all'elemento da inviare:
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Per prima cosa verifica se il channel è nil. block indica se l'operazione di invio corrente è bloccante (il valore di block è correlato alla struttura select). Se l'invio è bloccante e il channel è nil, si verifica un panic. In caso di invio non bloccante, verifica direttamente senza lock se il channel è pieno; se lo è, restituisce immediatamente:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Successivamente acquisisce il lock e verifica se il channel è chiuso. Se è chiuso, va in panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Poi dequeue un sudog dalla coda recvq e lo invia tramite la funzione runtime.send:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}Il contenuto della funzione send è il seguente: aggiorna recvx e sendx, quindi usa la funzione runtime.memmove per copiare direttamente la memoria dei dati di comunicazione all'indirizzo dell'elemento di destinazione della goroutine ricevente. Poi usa la funzione runtime.goready per portare la goroutine ricevente nello stato _Grunnable, in modo che possa partecipare nuovamente alla schedulazione:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}In questo processo, poiché è possibile trovare una goroutine in attesa di ricezione, i dati vengono inviati direttamente al ricevente senza essere memorizzati nella coda circolare. Se non ci sono ricevitori disponibili e la capacità è sufficiente, i dati vengono inseriti nel buffer della coda circolare e si restituisce immediatamente:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Se il buffer è pieno e l'invio è non bloccante, restituisce immediatamente:
if !block {
unlock(&c.lock)
return false
}Se l'invio è bloccante, prosegue con il seguente flusso di codice:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Per prima cosa costruisce la goroutine corrente come sudog e la aggiunge alla coda di attesa hchan.sendq. Poi runtime.gopark blocca la goroutine corrente, portandola nello stato _Gwaiting fino a quando non viene nuovamente svegliata dal ricevente. Inoltre, runtime.KeepAlive mantiene vivi i dati da inviare per assicurarsi che il ricevente li copi con successo. Quando viene svegliata, prosegue con la fase finale:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Come si può vedere, per l'invio di dati tramite channel ci sono i seguenti casi:
- Il channel è
nil: il programma va in crash - Il channel è chiuso: si verifica un panic
- La coda
recvqnon è vuota: invia direttamente al ricevente - Nessuna goroutine in attesa: aggiunge al buffer
- Il buffer è pieno: la goroutine di invio si blocca in attesa che altre goroutine ricevano i dati
Vale la pena notare che nella logica di invio sopra non è stato mostrato il trattamento dei dati in overflow del buffer. Questi dati non possono essere scartati; sono salvati in sudog.elem e gestiti dal ricevente.
Ricezione
In Go ci sono due sintassi per ricevere dati da un channel. La prima è solo leggere i dati:
data <- chLa seconda è verificare se la lettura è avvenuta con successo:
data, ok <- chQueste due sintassi vengono tradotte dal compilatore in chiamate a runtime.chanrecv1 e runtime.chanrecv2, che a loro volta sono chiamate a runtime.chanrecv. La logica di ricezione è simile a quella di invio nella parte iniziale: verifica prima se il channel è nil:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Poi, in caso di lettura non bloccante, verifica senza lock se il channel è vuoto. Se il channel non è chiuso, restituisce immediatamente. Se è chiuso, pulisce la memoria dell'elemento ricevuto:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Poi accede con lock alla coda hchan.sendq. Come si può vedere dal分支 if c.closed != 0, anche se il channel è chiuso, se ci sono ancora elementi nel channel, non restituisce immediatamente ma prosegue con il codice di consumo degli elementi. Questo è il motivo per cui è ancora consentito leggere dopo la chiusura di un channel:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Se il channel non è chiuso, verifica se ci sono goroutine in attesa di invio nella coda sendq. In caso affermativo, la funzione runtime.recv gestisce la goroutine mittente:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copia i dati dalla coda al ricevitore
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copia i dati dal mittente alla coda
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Nel primo caso, il channel ha capacità 0 (channel senza buffer). Il ricevente copia direttamente i dati dal mittente tramite la funzione runtime.recvDirect. Nel secondo caso, il buffer è pieno. Anche se prima non è stata mostrata la logica per verificare se il buffer è pieno, quando la capacità del buffer non è 0 e c'è un mittente in attesa, significa che il buffer è pieno, perché solo quando il buffer è pieno i mittenti si bloccano in attesa. Questa logica è gestita dal mittente. Poi il ricevente dequeue l'elemento dalla testa del buffer e ne copia la memoria nel puntatore dell'elemento ricevente di destinazione, quindi copia i dati che la goroutine mittente deve inviare e li enqueue. Infine, runtime.goready sveglia la goroutine mittente, portandola nello stato _Grunnable per partecipare nuovamente alla schedulazione.
Se non ci sono goroutine in attesa di invio, verifica se ci sono elementi in attesa di consumo nel buffer. Dequeue l'elemento dalla testa e ne copia la memoria nell'elemento di destinazione del ricevente, quindi restituisce:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Ricevi direttamente dalla coda
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Infine, se non ci sono elementi da consumare nel channel, runtime.gopark porta la goroutine corrente nello stato _Gwaiting, bloccandola in attesa fino a quando non viene svegliata dalla goroutine mittente:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Dopo essere stata svegliata, restituisce. Il valore success restituito proviene da sudog.success. Se il mittente ha inviato i dati con successo, questo campo dovrebbe essere impostato a true dal mittente. Questa logica può essere vista nella funzione runtime.send:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Corrispondentemente, nel判断 di sudog.success alla fine di runtime.chansend del mittente, la sua origine è l'impostazione nel ricevente nella funzione runtime.recv. Come si può vedere, mittente e ricevente devono collaborare per far funzionare correttamente il channel. In generale, ricevere dati è leggermente più complesso che inviare dati. Ci sono i seguenti casi:
- Il channel è
nil: il programma va in crash - Il channel è chiuso: se il channel è vuoto restituisce immediatamente, altrimenti passa al caso 5
- Capacità del buffer 0, c'è una goroutine in attesa in
sendq: copia direttamente i dati dal mittente, poi sveglia il mittente - Buffer pieno, c'è una goroutine in attesa in
sendq: dequeue l'elemento dalla testa del buffer, enqueue i dati del mittente, poi sveglia il mittente - Buffer non pieno e numero di elementi non zero: dequeue l'elemento dalla testa del buffer, poi restituisce
- Buffer vuoto: entra in stato di blocco, in attesa di essere svegliato dal mittente
Chiusura
Per chiudere un channel, si usa la funzione built-in close:
close(ch)Il compilatore lo tradurrà in una chiamata a runtime.closechan. Se il channel passato è nil o già chiuso, andrà in panic:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Poi dequeue tutte le goroutine bloccate nelle code sendq e recvq di questo channel e le sveglia tutte tramite runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// rilascia tutti i lettori
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// rilascia tutti gli scrittori (andranno in panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready tutte le G ora che abbiamo rilasciato il lock del channel.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
A proposito, Go permette channel unidirezionali, con le seguenti regole:
- Un channel di sola lettura non può inviare dati
- Un channel di sola lettura non può essere chiuso
- Un channel di sola scrittura non può leggere dati
Questi errori vengono individuati già nella fase di controllo dei tipi in compilazione, non a runtime. Se interessati, potete leggere il codice correlato in questi pacchetti:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Verifica della chiusura
Molto tempo fa (prima di go1), esisteva una funzione built-in closed per verificare se un channel era chiuso, ma è stata rapidamente rimossa. Questo perché i channel sono solitamente usati in contesti multi-goroutine. Anche se restituisce true, significa che il channel è chiuso, ma se restituisce false, non significa necessariamente che il channel non sia chiuso, perché nessuno sa chi potrebbe chiuderlo subito dopo. Quindi questo valore di ritorno non è affidabile. Usarlo come base per decidere se inviare dati a un channel è ancora più pericoloso, perché inviare dati a un channel chiuso causa un panic.
Se necessario, si può implementare una propria soluzione. Un approccio è provare a scrivere nel channel per verificare se è chiuso:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Tuttavia, questo ha effetti collaterali: se non è chiuso, scrive dati ridondanti e entra nel processo di gestione defer-recover, causando una perdita di prestazioni aggiuntiva. Quindi questo approccio di scrittura può essere scartato. Per l'approccio di lettura, si può considerare, ma non si può leggere direttamente dal channel perché leggere direttamente con block impostato a true bloccherà la goroutine. Dovrebbe essere usato con select: quando un channel è combinato con select, è non bloccante.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Questo sembra solo leggermente migliore del precedente. Funziona solo quando il channel è chiuso e non ci sono elementi nel buffer. Se ci sono elementi, consumerà inutilmente quell'elemento. Quindi non c'è ancora una buona implementazione.
In realtà, non abbiamo affatto bisogno di verificare se un channel è chiuso. Come già detto all'inizio, il valore di ritorno non è affidabile. Ciò che dovremmo fare è usare correttamente i channel e chiuderli correttamente. Quindi:
- Non chiudere mai un channel dal lato ricevente. Il fatto che chiudere un channel di sola lettura non sia consentito in compilazione indica chiaramente di non farlo. Lasciate che sia il mittente a farlo.
- Se ci sono più mittenti, designate una singola goroutine per eseguire l'operazione di chiusura, assicurandovi che
closevenga chiamato da una sola parte e una sola volta. - Quando si passa un channel, è meglio limitarlo a sola lettura o sola scrittura
Seguendo questi principi, si può garantire di non incorrere in grandi problemi.
