Skip to content

waitgroup

WaitGroup, fornito dalla libreria standard di Go, la sua funzione è attendere il completamento dell'esecuzione di un gruppo di goroutine.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Questo è un codice molto semplice, la sua funzione è avviare 10 goroutine per stampare 0-9 e attendere il loro completamento. Il suo utilizzo non verrà ulteriormente discusso. Di seguito comprenderemo il suo principio di funzionamento di base, che non è affatto complesso.

Struttura

La sua definizione di tipo si trova nel file sync/waitgroup.go:

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Descrizione dei campi:

  • state, rappresenta lo stato di WaitGroup. I 32 bit più significativi sono utilizzati per contare il numero di goroutine in attesa, i 32 bit meno significativi sono utilizzati per contare il numero di goroutine in attesa del completamento di wg.
  • sema, semaforo, che è quasi onnipresente nella libreria standard sync.

Il suo cuore risiede nei due metodi Add() e Wait(). Il principio di funzionamento di base è il semaforo: il metodo Wait() tenta di acquisire il semaforo, il metodo Add() rilascia il semaforo per realizzare M goroutine in attesa del completamento di un gruppo di N goroutine.

Add

Il metodo Add aumenta il numero di goroutine da attendere.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

Il flusso è il seguente:

  1. Per prima cosa esegue un'operazione di shift su wg.state, ottenendo rispettivamente i 32 bit più significativi e i 32 bit meno significativi, corrispondenti alle variabili v e w:

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Poi inizia il giudizio: v rappresenta il conteggio wg, w rappresenta il numero di goroutine in attesa del completamento di wg:

    1. Se v è inferiore a 0, direttamente panic, i numeri negativi non hanno alcun significato:

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. Se w non è 0, e delta è uguale a v, indica che i metodi Wait() e Add() sono stati chiamati concorrentemente, questo è un modo di utilizzo errato:

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Se v è maggiore di 0, o w è uguale a 0, indica che ora non ci sono goroutine in attesa del completamento di wg, può restituire direttamente:

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Arrivati a questo punto, significa che v è uguale a 0 e w è maggiore di 0, cioè attualmente non ci sono goroutine in esecuzione, ma ci sono goroutine in attesa del completamento di wg, quindi è necessario rilasciare il semaforo per risvegliare queste goroutine:

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Il metodo Done() è effettivamente Add(-1), non c'è nulla da spiegare.

Wait

Se ci sono altre goroutine che devono attendere il completamento dell'esecuzione, la chiamata al metodo Wait farà sì che la goroutine corrente si blocchi.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Il suo flusso è un ciclo for:

  1. Legge i 32 bit più significativi e i 32 bit meno significativi, ottenendo il numero di goroutine da attendere e il numero di goroutine in attesa. Se non ci sono goroutine da attendere, restituisce direttamente:

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Altrimenti, attraverso un'operazione CAS incrementa di uno il numero di goroutine in attesa, poi tenta di acquisire il semaforo ed entra nella coda di attesa bloccata:

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Quando la goroutine in attesa viene risvegliata (poiché tutte le goroutine in attesa sono state completate e hanno rilasciato il semaforo), verifica state. Se non è 0, indica che Wait() e Add() sono stati utilizzati nuovamente in modo concorrente:

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Se l'operazione CAS non è stata aggiornata con successo, continua il ciclo.

Infine, si ricorda che quando si utilizza WaitGroup, Add e Wait non devono essere chiamati concorrentemente.

Golang by www.golangdev.cn edit