Skip to content

waitgroup

WaitGroup, proporcionado por la biblioteca estándar de Go, su función es esperar a que un grupo de goroutines se ejecute.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Este es un código muy simple, su función es iniciar 10 goroutines para imprimir 0-9, y esperar a que terminen de ejecutarse. Su uso no se explicará más aquí. A continuación, entenderemos su principio básico de funcionamiento, que no es complejo en absoluto.

Estructura

Su definición de tipo se encuentra en el archivo sync/waitgroup.go:

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Descripción de los campos:

  • state, representa el estado de WaitGroup. Los 32 bits altos se usan para contar el número de goroutines que se esperan, y los 32 bits bajos se usan para contar el número de goroutines que esperan a que wg se complete.
  • sema, semáforo, que está casi en todas partes en la biblioteca estándar sync.

Su núcleo radica en los dos métodos Add() y Wait(). Su principio básico de funcionamiento es el semáforo: el método Wait() intenta adquirir el semáforo, y el método Add() libera el semáforo para implementar que M goroutines esperen a que un grupo de N goroutines se ejecute.

Add

El método Add aumenta el número de goroutines que necesitan ser esperadas.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

El flujo es el siguiente:

  1. Primero realiza una operación de desplazamiento en wg.state para obtener los 32 bits altos y los 32 bits bajos, correspondientes a las variables v y w:

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Luego comienza a verificar, v representa el contador de wg, w representa el número de goroutines que esperan a que wg se complete:

    1. Si v es menor que 0, directamente hace panic. Los números negativos no tienen significado:

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. Si w no es 0, y delta es igual a v, indica que los métodos Wait() y Add() se llaman concurrentemente, lo cual es un uso incorrecto:

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Si v es mayor que 0, o w es igual a 0, indica que actualmente no hay goroutines esperando a que wg se complete, puede retornar directamente:

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Llegar a este paso indica que v es igual a 0 y w es mayor que 0, es decir, actualmente no hay goroutines ejecutándose, pero hay goroutines esperando a que wg se complete, por lo que necesita liberar el semáforo para despertar estas goroutines:

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

El método Done() es en realidad Add(-1), no hay nada más que explicar.

Wait

Si actualmente hay otras goroutines que necesitan esperar a que se complete la ejecución, la llamada al método Wait hará que la goroutine actual se bloquee.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Su flujo es un bucle for:

  1. Lee los 32 bits altos y los 32 bits bajos para obtener el número de goroutines que se esperan y el número de goroutines que esperan. Si no hay goroutines que esperar, retorna directamente:

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. De lo contrario, usa una operación CAS para incrementar el número de goroutines que esperan en uno, luego intenta adquirir el semáforo y entra en la cola de espera bloqueada:

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Cuando la goroutine en espera es despertada (porque todas las goroutines esperadas se han ejecutado y liberaron el semáforo), verifica state. Si no es 0, indica que Wait() y Add() se usaron concurrentemente:

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Si la operación CAS no se actualizó exitosamente, continúa el bucle.

Resumen

Finalmente, se debe recordar que al usar WaitGroup, no se deben llamar concurrentemente a Add y Wait.

Golang editado por www.golangdev.cn