waitgroup
WaitGroup, fourni par la bibliothèque standard Go, a pour fonction d'attendre la fin de l'exécution d'un groupe de coroutines.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}C'est un code très simple, sa fonction est de lancer 10 coroutines pour imprimer 0-9, et d'attendre qu'elles terminent leur exécution. Son utilisation ne sera pas détaillée, nous allons ensuite comprendre son principe de fonctionnement de base, qui n'est pas du tout complexe.
Structure
Sa définition de type se trouve dans le fichier sync/waitgroup.go
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Description des champs :
state, représente l'état du WaitGroup, les 32 bits de poids fort sont utilisés pour compter le nombre de coroutines attendues, les 32 bits de poids faible sont utilisés pour compter le nombre de coroutines en attente de la fin du wg.sema, sémaphore, il est presque omniprésent dans la bibliothèque standardsync.
Son cœur réside dans les deux méthodes Add() et Wait(), le principe de fonctionnement de base est le sémaphore, la méthode Wait() essaie d'acquérir le sémaphore, la méthode Add() libère le sémaphore, pour réaliser que M coroutines attendent la fin de l'exécution d'un groupe de N coroutines.
Add
La méthode Add permet d'augmenter le nombre de coroutines à attendre.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}Le processus est le suivant :
Elle effectue d'abord une opération de décalage sur
wg.state, obtenant respectivement les 32 bits de poids fort et les 32 bits de poids faible, correspondant aux variablesvetwgostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Ensuite commence les vérifications,
vreprésente le compteur wg,wreprésente le nombre de coroutines en attente de la fin du wgSi
vest inférieur à 0,panicdirect, un nombre négatif n'a aucun sensgoif v < 0 { panic("sync: negative WaitGroup counter") }wn'est pas 0, etdeltaest égal àv, cela signifie que les méthodesWait()etAdd()sont appelées de manière concurrente, c'est une utilisation incorrectegoif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Si
vest supérieur à 0, ouwest égal à 0, cela signifie qu'il n'y a pas de coroutine en attente de la fin du wg, on peut retourner directementgoif v > 0 || w == 0 { return }
Arrivé à cette étape, cela signifie que
vest égal à 0, etwest supérieur à 0, c'est-à-dire qu'il n'y a actuellement aucune coroutine en cours d'exécution, mais il y a des coroutines en attente de la fin du wg, donc il faut libérer le sémaphore pour réveiller ces coroutines.goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
La méthode Done() est en fait Add(-1), rien de particulier à dire.
Wait
Si des coroutines doivent attendre la fin de l'exécution d'autres, l'appel à la méthode Wait bloquera la coroutine actuelle.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Son processus est une boucle for
Lit les 32 bits de poids fort et les 32 bits de poids faible, obtient le nombre de coroutines à attendre et le nombre de coroutines en attente, s'il n'y a pas de coroutine à attendre, retourne directement
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }Sinon, utilise une opération CAS pour incrémenter de un le nombre de coroutines en attente, puis essaie d'acquérir le sémaphore, entre dans la file d'attente bloquante
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Quand la coroutine en attente est réveillée (parce que toutes les coroutines attendues ont terminé leur exécution, le sémaphore a été libéré), vérifie
state, s'il n'est pas 0, cela signifie queWait()etAdd()ont été utilisés de manière concurrente à nouveaugoif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnSi la mise à jour CAS n'a pas réussi, continue la boucle
Résumé
Enfin, un rappel important : lors de l'utilisation de WaitGroup, les méthodes Add et Wait ne doivent pas être appelées de manière concurrente.
