Skip to content

waitgroup

WaitGroup, fourni par la bibliothèque standard Go, a pour fonction d'attendre la fin de l'exécution d'un groupe de coroutines.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

C'est un code très simple, sa fonction est de lancer 10 coroutines pour imprimer 0-9, et d'attendre qu'elles terminent leur exécution. Son utilisation ne sera pas détaillée, nous allons ensuite comprendre son principe de fonctionnement de base, qui n'est pas du tout complexe.

Structure

Sa définition de type se trouve dans le fichier sync/waitgroup.go

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Description des champs :

  • state, représente l'état du WaitGroup, les 32 bits de poids fort sont utilisés pour compter le nombre de coroutines attendues, les 32 bits de poids faible sont utilisés pour compter le nombre de coroutines en attente de la fin du wg.
  • sema, sémaphore, il est presque omniprésent dans la bibliothèque standard sync.

Son cœur réside dans les deux méthodes Add() et Wait(), le principe de fonctionnement de base est le sémaphore, la méthode Wait() essaie d'acquérir le sémaphore, la méthode Add() libère le sémaphore, pour réaliser que M coroutines attendent la fin de l'exécution d'un groupe de N coroutines.

Add

La méthode Add permet d'augmenter le nombre de coroutines à attendre.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

Le processus est le suivant :

  1. Elle effectue d'abord une opération de décalage sur wg.state, obtenant respectivement les 32 bits de poids fort et les 32 bits de poids faible, correspondant aux variables v et w

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Ensuite commence les vérifications, v représente le compteur wg, w représente le nombre de coroutines en attente de la fin du wg

    1. Si v est inférieur à 0, panic direct, un nombre négatif n'a aucun sens

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. w n'est pas 0, et delta est égal à v, cela signifie que les méthodes Wait() et Add() sont appelées de manière concurrente, c'est une utilisation incorrecte

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Si v est supérieur à 0, ou w est égal à 0, cela signifie qu'il n'y a pas de coroutine en attente de la fin du wg, on peut retourner directement

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Arrivé à cette étape, cela signifie que v est égal à 0, et w est supérieur à 0, c'est-à-dire qu'il n'y a actuellement aucune coroutine en cours d'exécution, mais il y a des coroutines en attente de la fin du wg, donc il faut libérer le sémaphore pour réveiller ces coroutines.

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

La méthode Done() est en fait Add(-1), rien de particulier à dire.

Wait

Si des coroutines doivent attendre la fin de l'exécution d'autres, l'appel à la méthode Wait bloquera la coroutine actuelle.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Son processus est une boucle for

  1. Lit les 32 bits de poids fort et les 32 bits de poids faible, obtient le nombre de coroutines à attendre et le nombre de coroutines en attente, s'il n'y a pas de coroutine à attendre, retourne directement

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Sinon, utilise une opération CAS pour incrémenter de un le nombre de coroutines en attente, puis essaie d'acquérir le sémaphore, entre dans la file d'attente bloquante

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Quand la coroutine en attente est réveillée (parce que toutes les coroutines attendues ont terminé leur exécution, le sémaphore a été libéré), vérifie state, s'il n'est pas 0, cela signifie que Wait() et Add() ont été utilisés de manière concurrente à nouveau

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Si la mise à jour CAS n'a pas réussi, continue la boucle

Résumé

Enfin, un rappel important : lors de l'utilisation de WaitGroup, les méthodes Add et Wait ne doivent pas être appelées de manière concurrente.

Golang by www.golangdev.cn edit