waitgroup
WaitGroup, fornecido pela biblioteca padrão do Go, tem a função de esperar um conjunto de goroutines terminar sua execução.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}Este é um código muito simples, sua função é iniciar 10 goroutines para imprimir 0-9 e esperá-las terminar. Seu uso não será repetido aqui. A seguir, vamos entender seu princípio básico de funcionamento, que não é complexo.
Estrutura
Sua definição de tipo está localizada no arquivo sync/waitgroup.go:
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Definição dos campos:
state, representa o estado do WaitGroup. Os 32 bits mais altos são usados para contar o número de goroutines sendo esperadas, e os 32 bits mais baixos são usados para contar o número de goroutines esperando o wg completar.sema, semáforo, que é quase onipresente na biblioteca padrãosync.
Seu núcleo está nos dois métodos Add() e Wait(). O princípio básico de funcionamento é o semáforo: o método Wait() tenta obter o semáforo, e o método Add() libera o semáforo para implementar M goroutines esperando um conjunto de N goroutines terminar sua execução.
Add
O método Add é usado para aumentar o número de goroutines a serem esperadas.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}O fluxo é o seguinte:
Primeiro faz uma operação de deslocamento em
wg.statepara obter os 32 bits mais altos e os 32 bits mais baixos, correspondendo às variáveisvew:gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Depois começa a verificar, onde
vrepresenta a contagem do wg ewrepresenta o número de goroutines esperando o wg completar:Se
vfor menor que 0, entra empanicdiretamente. Números negativos não têm significado:goif v < 0 { panic("sync: negative WaitGroup counter") }Se
wnão for 0, edeltafor igual av, indica que os métodosWait()eAdd()foram chamados concorrentemente, o que é uma forma de uso incorreta:goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Se
vfor maior que 0, ouwfor igual a 0, indica que não há goroutines esperando o wg completar, pode retornar diretamente:goif v > 0 || w == 0 { return }
Chegar a este passo indica que
vé igual a 0 ewé maior que 0, ou seja, não há goroutines em execução no momento, mas há goroutines esperando o wg completar. Portanto, precisa liberar o semáforo para despertar estas goroutines:goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
O método Done() é na verdade Add(-1), não há nada a explicar.
Wait
Se houver outras goroutines que precisam esperar a execução completar, a chamada do método Wait fará com que a goroutine atual entre em estado de bloqueio.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Seu fluxo é um loop for:
Lê os 32 bits mais altos e os 32 bits mais baixos para obter o número de goroutines a serem esperadas e o número de goroutines em espera. Se não houver goroutines para esperar, retorna diretamente:
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }Caso contrário, usa operação CAS para incrementar o número de goroutines em espera em um, depois tenta obter o semáforo e entra na fila de espera bloqueada:
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Quando a goroutine em espera é despertada (porque todas as goroutines sendo esperadas terminaram sua execução e liberaram o semáforo), verifica
state. Se não for 0, indica queWait()eAdd()foram usados concorrentemente novamente:goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnSe a operação CAS não foi bem-sucedida, continua o loop
Resumo
Finalmente, vale lembrar que ao usar WaitGroup, não se deve chamar Add e Wait concorrentemente.
