chan
channel est une structure de données spéciale, représentant typiquement la philosophie CSP en Go. Le cœur de la pensée CSP est que les processus communiquent entre eux par échange de messages pour le transfert de données. De manière correspondante, via channel, nous pouvons facilement communiquer entre goroutines.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Au-delà de la communication, channel permet également d'implémenter des opérations comme la synchronisation de goroutines. Dans les systèmes nécessitant de la concurrence, on retrouve channel presque partout. Pour mieux comprendre le fonctionnement de channel, nous allons expliquer ses principes ci-dessous.
Structure
À l'exécution, channel est représenté par la structure runtime.hchan, qui ne contient pas beaucoup de champs, comme suit :
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}On peut clairement voir le champ lock ci-dessus. channel est en fait une file circulaire synchronisée avec verrou. Les autres champs sont décrits ci-dessous :
qcount, représente le nombre total de donnéesdataqsize, la taille de la file circulairebuf, pointeur vers un tableau de tailledataqsize, c'est-à-dire la file circulaireclosed, si le channel est fermésendx,recvx, représentent les index d'envoi et de réceptionsendq,recvq, représentent les listes chaînées de goroutines d'envoi et de réception, composées d'élémentsruntime.sudoggotype waitq struct { first *sudog last *sudog }
Le diagramme suivant permet de comprendre clairement la structure de channel :

Lorsqu'on utilise les fonctions len et cap sur un channel, elles retournent respectivement les champs hchan.qcount et hchan.dataqsiz.
Création
Normalement, il n'y a qu'une seule façon de créer un channel, utiliser la fonction make :
ch := make(chan int, size)Le compilateur traduit cela en un appel à la fonction runtime.makechan, qui est responsable de la création effective du channel. Son code est montré ci-dessous.
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Cette logique est relativement simple, principalement pour allouer de la mémoire au channel. Elle calcule d'abord la taille de mémoire attendue en fonction du size et de la taille du type d'élément elem.size, puis traite selon trois cas :
sizeest 0, allouer uniquementhchanSize- L'élément ne contient pas de pointeurs, allouer un espace mémoire de la taille correspondante, et la mémoire de la file circulaire est continue avec celle du channel
- L'élément contient des pointeurs, la mémoire du channel et de la file circulaire est allouée séparément
Si la file circulaire stocke des éléments de type pointeur, comme ils référencent des éléments externes, le GC peut scanner ces pointeurs lors de la phase de marquage-nettoyage. Lorsque des éléments non-pointeurs sont stockés dans une mémoire continue, cela évite des scans inutiles. Après l'allocation mémoire, les autres champs d'information sont mis à jour.
À noter, lorsque la capacité du channel est un entier 64 bits, la fonction runtime.makechan64 est utilisée pour la création, qui est essentiellement un appel à runtime.makechan avec une vérification de type supplémentaire.
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}En général, il est préférable que size ne dépasse pas math.MaxInt32.
Envoi
Pour envoyer des données dans un channel, nous plaçons les données à envoyer à droite de la flèche :
ch <- struct{}{}Le compilateur traduit cela en runtime.chansend1. La fonction réellement responsable de l'envoi est runtime.chansend, chansend1 lui passe le pointeur elem qui pointe vers l'élément à envoyer.
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Elle vérifie d'abord si le channel est nil, block indique si l'opération d'envoi actuelle est bloquante (la valeur de block est liée à la structure select). Si l'envoi est bloquant et que le channel est nil, le programme plante directement. Dans le cas d'un envoi non bloquant, elle vérifie sans verrouillage si le channel est plein, et si c'est le cas, retourne directement.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Ensuite, elle acquiert le verrou et vérifie si le channel est fermé. S'il est déjà fermé, elle provoque une panic :
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Ensuite, elle retire un sudog de la file recvq, puis la fonction runtime.send effectue l'envoi.
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}Le contenu de la fonction send est le suivant. Elle met à jour recvx et sendx, puis utilise la fonction runtime.memmove pour copier directement la mémoire des données de communication vers l'adresse de l'élément cible de la goroutine réceptrice, puis utilise runtime.goready pour mettre la goroutine réceptrice dans l'état _Grunnable afin qu'elle puisse participer à nouveau à l'ordonnancement.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}Dans ce processus, comme une goroutine en attente de réception peut être trouvée, les données sont directement envoyées au récepteur sans être stockées dans la file circulaire. Si aucune goroutine réceptrice n'est disponible et que la capacité est suffisante, les données sont placées dans le tampon de la file circulaire, puis on retourne directement.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Si le tampon est plein, dans le cas d'un envoi non bloquant, on retourne directement :
if !block {
unlock(&c.lock)
return false
}Si c'est un envoi bloquant, on entre dans le flux de code suivant :
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}D'abord, elle construit la goroutine courante en sudog et l'ajoute à la file d'attente hchan.sendq des goroutines en attente d'envoi, puis utilise runtime.gopark pour bloquer la goroutine courante, la mettant dans l'état _Gwaitting jusqu'à ce qu'elle soit réveillée par un récepteur. De plus, elle utilise runtime.KeepAlive pour maintenir les données à envoyer en vie afin d'assurer que le récepteur puisse les copier avec succès. Après avoir été réveillée, on entre dans le flux de finalisation :
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}On peut voir que pour l'envoi de données dans un channel, il y a plusieurs cas :
- Le channel est
nil, le programme plante - Le channel est fermé, une
panicse produit - La file
recvqn'est pas vide, envoi direct au récepteur - Aucune goroutine en attente, ajout au tampon
- Le tampon est plein, la goroutine émettrice entre en état bloquant, en attendant qu'une autre goroutine reçoive les données
Il est à noter que dans la logique d'envoi ci-dessus, on ne voit pas le traitement des données qui dépassent le tampon. Ces données ne peuvent pas être abandonnées, elles sont stockées dans sudog.elem, et seront traitées par le récepteur.
Réception
En Go, il y a deux syntaxes pour recevoir des données d'un channel. La première est de lire uniquement les données :
data <- chLa seconde est de vérifier si la lecture a réussi :
data, ok <- chCes deux syntaxes sont traduites par le compilateur en appels à runtime.chanrecv1 et runtime.chanrecv2, mais elles ne sont en fait que des appels à runtime.chanrecv. La logique de réception au début est similaire à celle de l'envoi, vérifiant d'abord si le channel est nil.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Ensuite, dans le cas d'une lecture non bloquante, on vérifie sans verrouillage si le channel est vide. Si le channel n'est pas fermé, on retourne directement. S'il est fermé, on efface la mémoire de l'élément reçu.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Puis on acquiert le verrou pour accéder à la file hchan.sendq. À travers la branche if c.closed != 0 ci-dessous, on peut voir que même si le channel est déjà fermé, s'il y a encore des éléments dans le channel, on ne retourne pas directement mais on continue à exécuter le code de consommation des éléments. C'est pourquoi il est permis de lire après la fermeture du channel.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Si le channel n'est pas fermé, on vérifie si la file sendq contient des goroutines en attente d'envoi. Si oui, runtime.recv traite cette goroutine émettrice.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Dans le premier cas, la capacité du channel est 0 (channel sans tampon), le récepteur copie directement les données de l'émetteur via runtime.recvDirect. Dans le second cas, le tampon est plein. Bien qu'on n'ait pas vu de logique vérifiant si le tampon est plein, en fait, quand la capacité du tampon n'est pas 0 et qu'il y a un émetteur en attente, cela signifie déjà que le tampon est plein, car seuls les tampons pleins font que les émetteurs attendent. Cette logique est gérée par l'émetteur. Ensuite, le récepteur retire l'élément de tête du tampon et copie sa mémoire vers le pointeur de l'élément cible du récepteur, puis copie les données de la goroutine émettrice et les met dans la file. Enfin, runtime.goready réveille la goroutine émettrice, la mettant dans l'état _Grunnable pour qu'elle puisse rejoindre l'ordonnancement.
S'il n'y a pas de goroutine en attente d'envoi, on vérifie si le tampon contient des éléments en attente de consommation, on retire l'élément de tête et on copie sa mémoire vers l'élément cible du récepteur, puis on retourne.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Enfin, s'il n'y a pas d'éléments consommables dans le channel, runtime.gopark met la goroutine courante dans l'état _Gwaiting, bloquant en attendant d'être réveillée par une goroutine émettrice.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Après avoir été réveillée, elle retourne. À ce moment, la valeur success provient de sudog.success. Si l'émetteur a envoyé les données avec succès, ce champ doit avoir été défini à true par l'émetteur. Cette logique peut être vue dans la fonction runtime.send.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}De manière correspondante, à la fin de runtime.chansend côté émetteur, le jugement de sudog.success provient également du récepteur défini dans la fonction runtime.recv. À travers tout cela, on peut voir que le récepteur et l'émetteur se complètent pour permettre au channel de fonctionner normalement. En résumé, la réception de données est un peu plus complexe que l'envoi, avec les cas suivants :
- Le channel est
nil, le programme plante - Le channel est fermé, s'il est vide on retourne directement, s'il n'est pas vide on passe au cas 5
- La capacité du tampon est 0,
sendqa des goroutines en attente d'envoi, on copie directement les données de l'émetteur, puis on réveille l'émetteur - Le tampon est plein,
sendqa des goroutines en attente d'envoi, on retire l'élément de tête du tampon, on met les données de l'émetteur dans la file, puis on réveille l'émetteur - Le tampon n'est pas plein et contient des éléments, on retire l'élément de tête du tampon, puis on retourne
- Le tampon est vide, on entre en état bloquant, en attente d'être réveillé par un émetteur
Fermeture
Pour fermer un channel, nous utilisons la fonction intégrée close :
close(ch)Le compilateur traduit cela en un appel à runtime.closechan. Si le channel passé est nil ou déjà fermé, une panic se produit directement :
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Ensuite, toutes les goroutines bloquées dans sendq et recvq de ce channel sont retirées et réveillées via runtime.goready :
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
À noter, Go permet les channels unidirectionnels, avec les règles suivantes :
- Un channel en lecture seule ne peut pas envoyer de données
- Un channel en lecture seule ne peut pas être fermé
- Un channel en écriture seule ne peut pas lire de données
Ces erreurs sont détectées lors de la vérification de types à la compilation, pas à l'exécution. Pour en savoir plus, vous pouvez consulter le code dans les packages suivants :
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Vérifier si un channel est fermé
Très tôt (avant Go 1), il existait une fonction intégrée closed pour vérifier si un channel était fermé, mais elle a été rapidement supprimée. En effet, l'utilisation d'un channel se fait généralement dans un contexte multi-goroutines. Supposons qu'elle retourne true, cela peut effectivement signifier que le channel est fermé. Mais si elle retourne false, cela ne signifie pas que le channel n'est pas fermé, car personne ne sait qui fermera le channel au moment suivant. Donc cette valeur de retour n'est pas fiable. Si on se base sur cette valeur pour décider d'envoyer ou non des données dans le channel, c'est dangereux car envoyer des données dans un channel fermé provoque une panic.
Si vraiment nécessaire, vous pouvez implémenter votre propre fonction. Une solution consiste à vérifier si le channel est fermé en écrivant dedans :
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Cependant, cela a des effets secondaires. Si le channel n'est pas fermé, des données redondantes y seront écrites, et le traitement defer-recover entraîne une perte de performance supplémentaire. Donc cette solution d'écriture peut être abandonnée. Pour une solution de lecture, on peut envisager de ne pas lire directement le channel, car une lecture directe avec block à true bloquera la goroutine. Il faut plutôt utiliser select combiné avec le channel, car dans ce cas le channel est non bloquant.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Cela semble un peu mieux que la solution précédente, mais cela ne fonctionne que si le channel est fermé et que le tampon du channel est vide. S'il y a des éléments, cela consommera inutilement un élément. Il n'y a pas vraiment de bonne implémentation.
En fait, nous n'avons pas besoin de vérifier si un channel est fermé, pour les raisons expliquées au début : la valeur de retour n'est pas fiable. Utiliser correctement un channel et le fermer correctement est ce que nous devons faire. Donc :
- Ne jamais fermer un channel du côté récepteur. Le fait que fermer un channel en lecture seule ne passe pas la compilation indique clairement qu'il ne faut pas le faire. Laissez l'émetteur faire ce travail.
- S'il y a plusieurs émetteurs, une goroutine distincte doit être responsable de la fermeture, en s'assurant que
closen'est appelé que par une seule partie et une seule fois. - Lors du passage d'un channel, il est préférable de le limiter à la lecture seule ou à l'écriture seule.
En suivant ces principes, vous pouvez éviter la plupart des problèmes.
