Skip to content

waitgroup

WaitGroup, fornecido pela biblioteca padrão do Go, tem a função de esperar um conjunto de goroutines terminar sua execução.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Este é um código muito simples, sua função é iniciar 10 goroutines para imprimir 0-9 e esperá-las terminar. Seu uso não será repetido aqui. A seguir, vamos entender seu princípio básico de funcionamento, que não é complexo.

Estrutura

Sua definição de tipo está localizada no arquivo sync/waitgroup.go:

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Definição dos campos:

  • state, representa o estado do WaitGroup. Os 32 bits mais altos são usados para contar o número de goroutines sendo esperadas, e os 32 bits mais baixos são usados para contar o número de goroutines esperando o wg completar.
  • sema, semáforo, que é quase onipresente na biblioteca padrão sync.

Seu núcleo está nos dois métodos Add() e Wait(). O princípio básico de funcionamento é o semáforo: o método Wait() tenta obter o semáforo, e o método Add() libera o semáforo para implementar M goroutines esperando um conjunto de N goroutines terminar sua execução.

Add

O método Add é usado para aumentar o número de goroutines a serem esperadas.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

O fluxo é o seguinte:

  1. Primeiro faz uma operação de deslocamento em wg.state para obter os 32 bits mais altos e os 32 bits mais baixos, correspondendo às variáveis v e w:

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Depois começa a verificar, onde v representa a contagem do wg e w representa o número de goroutines esperando o wg completar:

    1. Se v for menor que 0, entra em panic diretamente. Números negativos não têm significado:

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. Se w não for 0, e delta for igual a v, indica que os métodos Wait() e Add() foram chamados concorrentemente, o que é uma forma de uso incorreta:

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Se v for maior que 0, ou w for igual a 0, indica que não há goroutines esperando o wg completar, pode retornar diretamente:

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Chegar a este passo indica que v é igual a 0 e w é maior que 0, ou seja, não há goroutines em execução no momento, mas há goroutines esperando o wg completar. Portanto, precisa liberar o semáforo para despertar estas goroutines:

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

O método Done() é na verdade Add(-1), não há nada a explicar.

Wait

Se houver outras goroutines que precisam esperar a execução completar, a chamada do método Wait fará com que a goroutine atual entre em estado de bloqueio.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Seu fluxo é um loop for:

  1. Lê os 32 bits mais altos e os 32 bits mais baixos para obter o número de goroutines a serem esperadas e o número de goroutines em espera. Se não houver goroutines para esperar, retorna diretamente:

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Caso contrário, usa operação CAS para incrementar o número de goroutines em espera em um, depois tenta obter o semáforo e entra na fila de espera bloqueada:

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Quando a goroutine em espera é despertada (porque todas as goroutines sendo esperadas terminaram sua execução e liberaram o semáforo), verifica state. Se não for 0, indica que Wait() e Add() foram usados concorrentemente novamente:

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Se a operação CAS não foi bem-sucedida, continua o loop

Resumo

Finalmente, vale lembrar que ao usar WaitGroup, não se deve chamar Add e Wait concorrentemente.

Golang por www.golangdev.cn edit