waitgroup
WaitGroup, proporcionado por la biblioteca estándar de Go, su función es esperar a que un grupo de goroutines se ejecute.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}Este es un código muy simple, su función es iniciar 10 goroutines para imprimir 0-9, y esperar a que terminen de ejecutarse. Su uso no se explicará más aquí. A continuación, entenderemos su principio básico de funcionamiento, que no es complejo en absoluto.
Estructura
Su definición de tipo se encuentra en el archivo sync/waitgroup.go:
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Descripción de los campos:
state, representa el estado de WaitGroup. Los 32 bits altos se usan para contar el número de goroutines que se esperan, y los 32 bits bajos se usan para contar el número de goroutines que esperan a que wg se complete.sema, semáforo, que está casi en todas partes en la biblioteca estándarsync.
Su núcleo radica en los dos métodos Add() y Wait(). Su principio básico de funcionamiento es el semáforo: el método Wait() intenta adquirir el semáforo, y el método Add() libera el semáforo para implementar que M goroutines esperen a que un grupo de N goroutines se ejecute.
Add
El método Add aumenta el número de goroutines que necesitan ser esperadas.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}El flujo es el siguiente:
Primero realiza una operación de desplazamiento en
wg.statepara obtener los 32 bits altos y los 32 bits bajos, correspondientes a las variablesvyw:gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Luego comienza a verificar,
vrepresenta el contador de wg,wrepresenta el número de goroutines que esperan a que wg se complete:Si
ves menor que 0, directamente hacepanic. Los números negativos no tienen significado:goif v < 0 { panic("sync: negative WaitGroup counter") }Si
wno es 0, ydeltaes igual av, indica que los métodosWait()yAdd()se llaman concurrentemente, lo cual es un uso incorrecto:goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Si
ves mayor que 0, owes igual a 0, indica que actualmente no hay goroutines esperando a que wg se complete, puede retornar directamente:goif v > 0 || w == 0 { return }
Llegar a este paso indica que
ves igual a 0 ywes mayor que 0, es decir, actualmente no hay goroutines ejecutándose, pero hay goroutines esperando a que wg se complete, por lo que necesita liberar el semáforo para despertar estas goroutines:goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
El método Done() es en realidad Add(-1), no hay nada más que explicar.
Wait
Si actualmente hay otras goroutines que necesitan esperar a que se complete la ejecución, la llamada al método Wait hará que la goroutine actual se bloquee.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Su flujo es un bucle for:
Lee los 32 bits altos y los 32 bits bajos para obtener el número de goroutines que se esperan y el número de goroutines que esperan. Si no hay goroutines que esperar, retorna directamente:
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }De lo contrario, usa una operación CAS para incrementar el número de goroutines que esperan en uno, luego intenta adquirir el semáforo y entra en la cola de espera bloqueada:
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Cuando la goroutine en espera es despertada (porque todas las goroutines esperadas se han ejecutado y liberaron el semáforo), verifica
state. Si no es 0, indica queWait()yAdd()se usaron concurrentemente:goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnSi la operación CAS no se actualizó exitosamente, continúa el bucle.
Resumen
Finalmente, se debe recordar que al usar WaitGroup, no se deben llamar concurrentemente a Add y Wait.
