Skip to content

select

select est une structure qui permet de surveiller simultanément l'état de plusieurs canaux. Sa syntaxe est similaire à celle de switch.

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Ce code combine context, canal et select pour implémenter une logique simple d'arrêt progressif du programme. Le select surveille simultanément deux canaux : ctx.Done et finished. Les conditions de sortie sont au nombre de deux : premièrement, le système d'exploitation envoie un signal de sortie, et deuxièmement, le canal finished a un message à lire, c'est-à-dire que la tâche du code utilisateur est terminée. Ainsi, nous pouvons effectuer un travail de nettoyage lors de la sortie du programme.

Comme on le sait, select a deux caractéristiques très importantes. Premièrement, il est non bloquant. Dans le code source de l'envoi et de la réception sur les canaux, on peut voir des traitements spécifiques pour select, permettant de déterminer si un canal est disponible sans bloquer. Deuxièmement, il est aléatoire. Si plusieurs canaux sont disponibles, il en choisit un au hasard à exécuter. Ne pas respecter un ordre prédéfini permet à chaque canal d'être traité de manière relativement équitable, sinon dans des cas extrêmes certains canaux pourraient ne jamais être traités. Puisque son travail est entièrement lié aux canaux, il est recommandé de lire d'abord l'article chan, puis de comprendre select sera beaucoup plus fluide.

Structure

À l'exécution, seule la structure runtime.scase représente une branche de select. Chaque case est représenté à l'exécution par un scase.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

Le champ c fait référence au canal, et elem représente le pointeur vers l'élément à recevoir ou à envoyer. En fait, le mot-clé select fait référence à la fonction runtime.selectgo.

Principe

L'utilisation de select est divisée par Go en quatre cas pour optimisation, comme on peut le voir dans la logique de traitement de ces quatre cas dans la fonction cmd/compile/internal/walk.walkSelectCases.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimization: zero-case select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimization: one-case select: single op.
  if ncas == 1 {
    ...
  }

  // optimization: two-case select but one is default: single non-blocking op.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Optimisation

Le compilateur optimise les trois premiers cas. Le premier cas est lorsque le nombre de cases est 0, c'est-à-dire un select vide. Nous savons tous qu'une instruction select vide provoque un blocage permanent de la goroutine courante.

go
select{}

Le blocage se produit parce que le compilateur le traduit en un appel direct à la fonction runtime.block.

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

La fonction block appelle ensuite runtime.gopark, qui met la goroutine courante dans l'état _Gwaitting et entre dans un blocage permanent, ne sera plus jamais ordonnancée.

Le deuxième cas, un seul case et ce n'est pas default. Dans ce cas, le compilateur le traduit directement en une opération d'envoi ou de réception sur le canal, et de manière bloquante. Par exemple, le code suivant :

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

Il sera traduit en un appel direct à la fonction runtime.chanrecv1, comme on peut le voir dans le code assembleur :

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

Dans le cas d'un seul case, l'envoi de données sur un canal est similaire, il sera traduit en un appel direct à la fonction runtime.chansend1, également de manière bloquante.

Le troisième cas, deux cases dont l'un est default :

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

Ce cas sera traduit en une instruction if avec un appel à runtime.selectnbsend :

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

S'il s'agit de recevoir des données d'un canal, il sera traduit en un appel à runtime.selectnbrecv :

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

Il est important de noter que dans ce cas, l'envoi ou la réception sur le canal est non bloquant. On peut clairement voir que le paramètre block est false.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Que ce soit pour envoyer ou recevoir des données sur un canal, lorsque block est false, il existe un chemin rapide permettant de déterminer sans verrouillage si l'envoi ou la réception est possible, comme montré ci-dessous :

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Pour la lecture d'un canal, si le canal est vide, on retourne directement. Pour l'écriture dans un canal, si le canal n'est pas fermé et est déjà plein, on retourne également directement. En général, ces opérations provoqueraient un blocage de la goroutine, mais combinées avec select, ce n'est pas le cas.

Traitement

Les trois cas ci-dessus ne sont que des optimisations pour des cas particuliers. Un select utilisé normalement sera traduit en un appel à la fonction runtime.selectgo, dont la logique de traitement fait plus de 400 lignes.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Le compilateur collecte tous les case dans un tableau scase, puis le passe à la fonction selectgo. Après le traitement, deux valeurs sont retournées :

  1. La première est l'index du canal choisi aléatoirement, indiquant quel canal a été traité. Si aucun, retourne -1.
  2. La deuxième indique, pour une opération de lecture sur un canal, si la lecture a réussi.

Voici une explication simple de ses paramètres :

  • cas0, pointeur vers le début du tableau scase. La première partie contient les case d'écriture sur canal, la seconde partie les case de lecture sur canal, distingués par nsends.
  • order0, sa longueur est le double du tableau scase. La première partie est allouée au tableau pollorder, la seconde partie au tableau lockorder.
  • nsends et nrecvs représentent le nombre de case de lecture/écriture sur canal. Leur somme est le nombre total de case.
  • block indique si l'opération est bloquante. S'il y a un case default, cela signifie non bloquant, la valeur est false, sinon true.
  • pc0, pointeur vers le début d'un tableau [ncases]uintptr, utilisé pour l'analyse de concurrence. On peut l'ignorer pour comprendre select.

Supposons le code suivant :

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Voyons sa forme assembleur, ici simplifiée pour faciliter la compréhension :

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // variables temporaires 1 2 3 4
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // appel à runtime.selectgo
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // saut vers la branche default
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // saut vers la branche 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // saut vers la branche 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // saut vers la branche 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

On peut voir qu'après l'appel à la fonction selectgo, il y a une logique de jugement + saut. À travers cela, on peut facilement déduire son apparence originale :

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Le code réel généré par le compilateur peut différer, mais le sens général est similaire. Donc après avoir appelé la fonction selectgo, le compilateur utilise une instruction if pour déterminer quel canal doit être exécuté. Et avant l'appel, le compilateur génère également une boucle for pour collecter le tableau scase (ici omis).

Maintenant que nous savons comment la fonction selectgo est utilisée en externe, voyons comment elle fonctionne en interne. Elle commence par initialiser plusieurs tableaux. nsends+nrecvs représente le nombre total de case. Le code ci-dessous montre également que le nombre maximum de case est 1 << 16. pollorder détermine l'ordre d'exécution des canaux, lockorder détermine l'ordre de verrouillage des canaux.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// Sa longueur est le double du tableau scase, la première partie allouée au tableau pollorder, la seconde au tableau lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Ensuite, on initialise le tableau pollorder, qui contient les indices du tableau scases des canaux à exécuter :

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Elle parcourt tout le tableau scases, puis génère un nombre aléatoire dans [0, i] via runtime.fastrandn, et l'échange avec i. Les case avec un canal nil sont ignorés. Après le parcours, on obtient un tableau pollorder dont les éléments ont été mélangés, comme illustré ci-dessous :

Ensuite, on trie le tableau pollorder selon l'adresse des canaux en utilisant un tri par tas pour obtenir le tableau lockorder, puis on appelle runtime.sellock pour les verrouiller dans l'ordre :

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Il est important de noter que le tri des canaux par adresse vise à éviter les interblocages, car l'opération select elle-même ne nécessite pas de verrou pour permettre la concurrence. Supposons qu'on verrouille dans l'ordre aléatoire de pollorder, alors considérons la situation suivante :

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Trois goroutines A, B, C arrivent toutes à l'étape de verrouillage, et leurs ordres de verrouillage respectifs sont aléatoires et différents. Cela peut créer une situation comme illustré ci-dessous :

Supposons que les ordres de verrouillage de A, B, C soient comme dans l'image ci-dessus, alors la possibilité d'un interblocage est très grande. Par exemple, A détient d'abord le verrou de ch2, puis essaie d'obtenir le verrou de ch1, mais si ch1 est déjà verrouillé par la goroutine B, et que B essaie d'obtenir le verrou de ch2, cela crée un interblocage.

Si toutes les goroutines verrouillent dans le même ordre, il n'y aura pas de problème d'interblocage. C'est la raison fondamentale pour laquelle lockorder doit être trié par adresse.

Après le verrouillage, la véritable phase de traitement commence. On parcourt d'abord le tableau pollorder, accédant aux canaux dans l'ordre mélangé précédent, pour trouver un canal disponible :

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // lecture de canal
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // écriture de canal
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

On peut voir ici six cas de traitement pour la lecture/écriture de canaux, expliqués ci-dessous.

Premier cas, lecture de canal et un expéditeur attend pour envoyer. Ici, on arrive à la fonction runtime.recv, dont le rôle a déjà été expliqué. Elle finira par réveiller la goroutine de l'expéditeur. Avant le réveil, la fonction de rappel déverrouillera tous les canaux.

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Deuxième cas, lecture de canal, pas d'expéditeur en attente, nombre d'éléments dans le tampon supérieur à 0. Ici, on lit directement les données du tampon. La logique est identique à celle de runtime.chanrecv, puis on déverrouille.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Troisième cas, lecture de canal, mais le canal est déjà fermé et il n'y a pas d'éléments restants dans le tampon. Ici, on déverrouille d'abord puis on retourne directement.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Quatrième cas, envoi de données vers un canal déjà fermé. Ici, on déverrouille d'abord puis on provoque une panic.

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Cinquième cas, un récepteur attend bloqué. Ici, on appelle la fonction runtime.send, qui finira par réveiller la goroutine du récepteur. Avant le réveil, la fonction de rappel déverrouillera tous les canaux.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Sixième cas, pas de goroutine réceptrice en attente, on place les données à envoyer dans le tampon, puis on déverrouille.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Tous les cas ci-dessus finissent par entrer dans la branche retc, qui ne fait que retourner l'index du canal sélectionné casi et le booléen recvOk indiquant si la lecture a réussi.

go
retc:
    return casi, recvOK

Septième cas, aucun canal disponible trouvé, et le code contient une branche default. Alors on déverrouille les canaux et on retourne directement. Ici, casi retourné est -1, indiquant qu'aucun canal n'est disponible.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Dernier cas, aucun canal disponible trouvé, et le code ne contient pas de branche default. Alors la goroutine courante entre dans un état bloqué. Avant cela, selectgo ajoute la goroutine courante aux files d'attente recvq/sendq de tous les canaux surveillés :

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Ici, plusieurs sudog sont créés et liés aux canaux correspondants, comme illustré ci-dessous :

Ensuite, runtime.gopark bloque. Avant le blocage, les canaux sont déverrouillés. Ce travail est effectué par la fonction runtime.selparkcommit, passée comme fonction de rappel à gopark.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

La première chose après le réveil est de dissocier les sudog des canaux :

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Puis on retire les sudog des files d'attente des canaux :

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

Dans le processus ci-dessus, on trouvera certainement un canal traité par la goroutine qui a fait le réveil, puis on fera le traitement final selon caseSuccess. Pour une opération d'écriture, sg.success à false signifie que le canal est fermé. Et dans tout le runtime Go, seule la fonction close définit activement ce champ à false, ce qui indique que la goroutine courante a été réveillée par la fonction close. Pour une opération de lecture, si c'est l'expéditeur qui a fait le réveil, l'opération de lecture des données a déjà été effectuée par l'expéditeur via la fonction runtime.send avant le réveil, et la valeur est true. Si c'est la fonction close qui a fait le réveil, comme précédemment, on retourne directement.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

Jusqu'ici, toute la logique de select a été clarifiée. Avec les plusieurs cas ci-dessus, on voit que le traitement de select est assez complexe.

Golang by www.golangdev.cn edit