Skip to content

chan

channel est une structure de données spéciale, représentant typiquement la philosophie CSP en Go. Le cœur de la pensée CSP est que les processus communiquent entre eux par échange de messages pour le transfert de données. De manière correspondante, via channel, nous pouvons facilement communiquer entre goroutines.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // do something
    done <- struct{}{}
  }()
  <-done
  fmt.Println("finished")
}

Au-delà de la communication, channel permet également d'implémenter des opérations comme la synchronisation de goroutines. Dans les systèmes nécessitant de la concurrence, on retrouve channel presque partout. Pour mieux comprendre le fonctionnement de channel, nous allons expliquer ses principes ci-dessous.

Structure

À l'exécution, channel est représenté par la structure runtime.hchan, qui ne contient pas beaucoup de champs, comme suit :

go
type hchan struct {
  qcount   uint           // total data in the queue
  dataqsiz uint           // size of the circular queue
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  elemsize uint16
  closed   uint32
  elemtype *_type // element type
  sendx    uint   // send index
  recvx    uint   // receive index
  recvq    waitq  // list of recv waiters
  sendq    waitq  // list of send waiters

  lock mutex
}

On peut clairement voir le champ lock ci-dessus. channel est en fait une file circulaire synchronisée avec verrou. Les autres champs sont décrits ci-dessous :

  • qcount, représente le nombre total de données

  • dataqsize, la taille de la file circulaire

  • buf, pointeur vers un tableau de taille dataqsize, c'est-à-dire la file circulaire

  • closed, si le channel est fermé

  • sendx, recvx, représentent les index d'envoi et de réception

  • sendq, recvq, représentent les listes chaînées de goroutines d'envoi et de réception, composées d'éléments runtime.sudog

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

Le diagramme suivant permet de comprendre clairement la structure de channel :

Lorsqu'on utilise les fonctions len et cap sur un channel, elles retournent respectivement les champs hchan.qcount et hchan.dataqsiz.

Création

Normalement, il n'y a qu'une seule façon de créer un channel, utiliser la fonction make :

go
ch := make(chan int, size)

Le compilateur traduit cela en un appel à la fonction runtime.makechan, qui est responsable de la création effective du channel. Son code est montré ci-dessous.

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

Cette logique est relativement simple, principalement pour allouer de la mémoire au channel. Elle calcule d'abord la taille de mémoire attendue en fonction du size et de la taille du type d'élément elem.size, puis traite selon trois cas :

  1. size est 0, allouer uniquement hchanSize
  2. L'élément ne contient pas de pointeurs, allouer un espace mémoire de la taille correspondante, et la mémoire de la file circulaire est continue avec celle du channel
  3. L'élément contient des pointeurs, la mémoire du channel et de la file circulaire est allouée séparément

Si la file circulaire stocke des éléments de type pointeur, comme ils référencent des éléments externes, le GC peut scanner ces pointeurs lors de la phase de marquage-nettoyage. Lorsque des éléments non-pointeurs sont stockés dans une mémoire continue, cela évite des scans inutiles. Après l'allocation mémoire, les autres champs d'information sont mis à jour.

À noter, lorsque la capacité du channel est un entier 64 bits, la fonction runtime.makechan64 est utilisée pour la création, qui est essentiellement un appel à runtime.makechan avec une vérification de type supplémentaire.

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

En général, il est préférable que size ne dépasse pas math.MaxInt32.

Envoi

Pour envoyer des données dans un channel, nous plaçons les données à envoyer à droite de la flèche :

go
ch <- struct{}{}

Le compilateur traduit cela en runtime.chansend1. La fonction réellement responsable de l'envoi est runtime.chansend, chansend1 lui passe le pointeur elem qui pointe vers l'élément à envoyer.

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

Elle vérifie d'abord si le channel est nil, block indique si l'opération d'envoi actuelle est bloquante (la valeur de block est liée à la structure select). Si l'envoi est bloquant et que le channel est nil, le programme plante directement. Dans le cas d'un envoi non bloquant, elle vérifie sans verrouillage si le channel est plein, et si c'est le cas, retourne directement.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Ensuite, elle acquiert le verrou et vérifie si le channel est fermé. S'il est déjà fermé, elle provoque une panic :

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

Ensuite, elle retire un sudog de la file recvq, puis la fonction runtime.send effectue l'envoi.

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

Le contenu de la fonction send est le suivant. Elle met à jour recvx et sendx, puis utilise la fonction runtime.memmove pour copier directement la mémoire des données de communication vers l'adresse de l'élément cible de la goroutine réceptrice, puis utilise runtime.goready pour mettre la goroutine réceptrice dans l'état _Grunnable afin qu'elle puisse participer à nouveau à l'ordonnancement.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

Dans ce processus, comme une goroutine en attente de réception peut être trouvée, les données sont directement envoyées au récepteur sans être stockées dans la file circulaire. Si aucune goroutine réceptrice n'est disponible et que la capacité est suffisante, les données sont placées dans le tampon de la file circulaire, puis on retourne directement.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

Si le tampon est plein, dans le cas d'un envoi non bloquant, on retourne directement :

go
if !block {
    unlock(&c.lock)
    return false
}

Si c'est un envoi bloquant, on entre dans le flux de code suivant :

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

D'abord, elle construit la goroutine courante en sudog et l'ajoute à la file d'attente hchan.sendq des goroutines en attente d'envoi, puis utilise runtime.gopark pour bloquer la goroutine courante, la mettant dans l'état _Gwaitting jusqu'à ce qu'elle soit réveillée par un récepteur. De plus, elle utilise runtime.KeepAlive pour maintenir les données à envoyer en vie afin d'assurer que le récepteur puisse les copier avec succès. Après avoir été réveillée, on entre dans le flux de finalisation :

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

On peut voir que pour l'envoi de données dans un channel, il y a plusieurs cas :

  1. Le channel est nil, le programme plante
  2. Le channel est fermé, une panic se produit
  3. La file recvq n'est pas vide, envoi direct au récepteur
  4. Aucune goroutine en attente, ajout au tampon
  5. Le tampon est plein, la goroutine émettrice entre en état bloquant, en attendant qu'une autre goroutine reçoive les données

Il est à noter que dans la logique d'envoi ci-dessus, on ne voit pas le traitement des données qui dépassent le tampon. Ces données ne peuvent pas être abandonnées, elles sont stockées dans sudog.elem, et seront traitées par le récepteur.

Réception

En Go, il y a deux syntaxes pour recevoir des données d'un channel. La première est de lire uniquement les données :

go
data <- ch

La seconde est de vérifier si la lecture a réussi :

go
data, ok <- ch

Ces deux syntaxes sont traduites par le compilateur en appels à runtime.chanrecv1 et runtime.chanrecv2, mais elles ne sont en fait que des appels à runtime.chanrecv. La logique de réception au début est similaire à celle de l'envoi, vérifiant d'abord si le channel est nil.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

Ensuite, dans le cas d'une lecture non bloquante, on vérifie sans verrouillage si le channel est vide. Si le channel n'est pas fermé, on retourne directement. S'il est fermé, on efface la mémoire de l'élément reçu.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

Puis on acquiert le verrou pour accéder à la file hchan.sendq. À travers la branche if c.closed != 0 ci-dessous, on peut voir que même si le channel est déjà fermé, s'il y a encore des éléments dans le channel, on ne retourne pas directement mais on continue à exécuter le code de consommation des éléments. C'est pourquoi il est permis de lire après la fermeture du channel.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

Si le channel n'est pas fermé, on vérifie si la file sendq contient des goroutines en attente d'envoi. Si oui, runtime.recv traite cette goroutine émettrice.

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

Dans le premier cas, la capacité du channel est 0 (channel sans tampon), le récepteur copie directement les données de l'émetteur via runtime.recvDirect. Dans le second cas, le tampon est plein. Bien qu'on n'ait pas vu de logique vérifiant si le tampon est plein, en fait, quand la capacité du tampon n'est pas 0 et qu'il y a un émetteur en attente, cela signifie déjà que le tampon est plein, car seuls les tampons pleins font que les émetteurs attendent. Cette logique est gérée par l'émetteur. Ensuite, le récepteur retire l'élément de tête du tampon et copie sa mémoire vers le pointeur de l'élément cible du récepteur, puis copie les données de la goroutine émettrice et les met dans la file. Enfin, runtime.goready réveille la goroutine émettrice, la mettant dans l'état _Grunnable pour qu'elle puisse rejoindre l'ordonnancement.

S'il n'y a pas de goroutine en attente d'envoi, on vérifie si le tampon contient des éléments en attente de consommation, on retire l'élément de tête et on copie sa mémoire vers l'élément cible du récepteur, puis on retourne.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

Enfin, s'il n'y a pas d'éléments consommables dans le channel, runtime.gopark met la goroutine courante dans l'état _Gwaiting, bloquant en attendant d'être réveillée par une goroutine émettrice.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

Après avoir été réveillée, elle retourne. À ce moment, la valeur success provient de sudog.success. Si l'émetteur a envoyé les données avec succès, ce champ doit avoir été défini à true par l'émetteur. Cette logique peut être vue dans la fonction runtime.send.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

De manière correspondante, à la fin de runtime.chansend côté émetteur, le jugement de sudog.success provient également du récepteur défini dans la fonction runtime.recv. À travers tout cela, on peut voir que le récepteur et l'émetteur se complètent pour permettre au channel de fonctionner normalement. En résumé, la réception de données est un peu plus complexe que l'envoi, avec les cas suivants :

  1. Le channel est nil, le programme plante
  2. Le channel est fermé, s'il est vide on retourne directement, s'il n'est pas vide on passe au cas 5
  3. La capacité du tampon est 0, sendq a des goroutines en attente d'envoi, on copie directement les données de l'émetteur, puis on réveille l'émetteur
  4. Le tampon est plein, sendq a des goroutines en attente d'envoi, on retire l'élément de tête du tampon, on met les données de l'émetteur dans la file, puis on réveille l'émetteur
  5. Le tampon n'est pas plein et contient des éléments, on retire l'élément de tête du tampon, puis on retourne
  6. Le tampon est vide, on entre en état bloquant, en attente d'être réveillé par un émetteur

Fermeture

Pour fermer un channel, nous utilisons la fonction intégrée close :

go
close(ch)

Le compilateur traduit cela en un appel à runtime.closechan. Si le channel passé est nil ou déjà fermé, une panic se produit directement :

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

Ensuite, toutes les goroutines bloquées dans sendq et recvq de ce channel sont retirées et réveillées via runtime.goready :

go
func closechan(c *hchan) {
    ...
  var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

À noter, Go permet les channels unidirectionnels, avec les règles suivantes :

  1. Un channel en lecture seule ne peut pas envoyer de données
  2. Un channel en lecture seule ne peut pas être fermé
  3. Un channel en écriture seule ne peut pas lire de données

Ces erreurs sont détectées lors de la vérification de types à la compilation, pas à l'exécution. Pour en savoir plus, vous pouvez consulter le code dans les packages suivants :

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

Vérifier si un channel est fermé

Très tôt (avant Go 1), il existait une fonction intégrée closed pour vérifier si un channel était fermé, mais elle a été rapidement supprimée. En effet, l'utilisation d'un channel se fait généralement dans un contexte multi-goroutines. Supposons qu'elle retourne true, cela peut effectivement signifier que le channel est fermé. Mais si elle retourne false, cela ne signifie pas que le channel n'est pas fermé, car personne ne sait qui fermera le channel au moment suivant. Donc cette valeur de retour n'est pas fiable. Si on se base sur cette valeur pour décider d'envoyer ou non des données dans le channel, c'est dangereux car envoyer des données dans un channel fermé provoque une panic.

Si vraiment nécessaire, vous pouvez implémenter votre propre fonction. Une solution consiste à vérifier si le channel est fermé en écrivant dedans :

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

Cependant, cela a des effets secondaires. Si le channel n'est pas fermé, des données redondantes y seront écrites, et le traitement defer-recover entraîne une perte de performance supplémentaire. Donc cette solution d'écriture peut être abandonnée. Pour une solution de lecture, on peut envisager de ne pas lire directement le channel, car une lecture directe avec block à true bloquera la goroutine. Il faut plutôt utiliser select combiné avec le channel, car dans ce cas le channel est non bloquant.

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

Cela semble un peu mieux que la solution précédente, mais cela ne fonctionne que si le channel est fermé et que le tampon du channel est vide. S'il y a des éléments, cela consommera inutilement un élément. Il n'y a pas vraiment de bonne implémentation.

En fait, nous n'avons pas besoin de vérifier si un channel est fermé, pour les raisons expliquées au début : la valeur de retour n'est pas fiable. Utiliser correctement un channel et le fermer correctement est ce que nous devons faire. Donc :

  1. Ne jamais fermer un channel du côté récepteur. Le fait que fermer un channel en lecture seule ne passe pas la compilation indique clairement qu'il ne faut pas le faire. Laissez l'émetteur faire ce travail.
  2. S'il y a plusieurs émetteurs, une goroutine distincte doit être responsable de la fermeture, en s'assurant que close n'est appelé que par une seule partie et une seule fois.
  3. Lors du passage d'un channel, il est préférable de le limiter à la lecture seule ou à l'écriture seule.

En suivant ces principes, vous pouvez éviter la plupart des problèmes.

Golang by www.golangdev.cn edit