Skip to content

waitgroup

WaitGroup wird von der Go-Standardbibliothek bereitgestellt. Seine Funktion besteht darin, auf die Beendigung einer Gruppe von Goroutinen zu warten.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Dies ist ein sehr einfacher Code. Seine Funktion besteht darin, 10 Goroutinen zu starten, die 0-9 drucken, und auf ihre Beendigung zu warten. Die Verwendung wird nicht weiter erläutert. Als Nächstes werden wir die grundlegenden Arbeitsprinzipien verstehen, die überhaupt nicht komplex sind.

Struktur

Die Typdefinition befindet sich in der Datei sync/waitgroup.go:

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Die Felder bedeuten:

  • state, repräsentiert den Zustand des WaitGroup. Die oberen 32 Bits werden zum Zählen der Anzahl der erwarteten Goroutinen verwendet, die unteren 32 Bits zum Zählen der Anzahl der auf die Fertigstellung wartenden Goroutinen.
  • sema, Semaphor. In der sync-Standardbibliothek ist es fast überall vorhanden.

Der Kern liegt in den zwei Methoden Add() und Wait(). Das grundlegende Arbeitsprinzip ist das Semaphor. Die Wait()-Methode versucht, das Semaphor zu erhalten, die Add()-Methode gibt das Semaphor frei. So wird erreicht, dass M Goroutinen auf die Beendigung einer Gruppe von N Goroutinen warten.

Add

Die Add-Methode erhöht die Anzahl der zu erwartenden Goroutinen.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

Der Ablauf ist wie folgt:

  1. Zuerst wird eine Schiebeoperation auf wg.state durchgeführt, um die oberen 32 Bits und die unteren 32 Bits separat zu erhalten. Diese entsprechen den Variablen v und w:

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Dann beginnt die Beurteilung. v repräsentiert den WaitGroup-Zähler, w repräsentiert die Anzahl der auf die Fertigstellung wartenden Goroutinen:

    1. Wenn v kleiner als 0 ist, wird direkt panic ausgelöst. Negative Zahlen haben keine Bedeutung:

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. Wenn w ungleich 0 ist und delta gleich v ist, bedeutet dies, dass die Wait()-Methode und die Add()-Methode gleichzeitig aufgerufen wurden. Dies ist eine falsche Verwendung:

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Wenn v größer als 0 ist oder w gleich 0 ist, bedeutet dies, dass derzeit keine Goroutinen auf die Fertigstellung warten und direkt zurückgekehrt werden kann:

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Wenn dieser Punkt erreicht wird, bedeutet dies, dass v gleich 0 ist und w größer als 0. Das heißt, derzeit werden keine Goroutinen ausgeführt, aber es gibt Goroutinen, die auf die Fertigstellung des WaitGroup warten. Daher muss das Semaphor freigegeben werden, um diese Goroutinen aufzuwecken:

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Die Done()-Methode ist eigentlich Add(-1). Es gibt nichts weiter zu erklären.

Wait

Wenn derzeit andere Goroutinen auf die Beendigung der Ausführung warten müssen, wird der Aufruf der Wait-Methode die aktuelle Goroutine blockieren.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Der Ablauf ist eine for-Schleife:

  1. Die oberen 32 Bits und die unteren 32 Bits werden gelesen, um die Anzahl der zu erwartenden Goroutinen und die Anzahl der wartenden Goroutinen zu erhalten. Wenn keine Goroutinen erwartet werden müssen, wird direkt zurückgekehrt:

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Andernfalls wird durch eine CAS-Operation die Anzahl der wartenden Goroutinen um eins erhöht. Dann wird versucht, das Semaphor zu erhalten und in die Blockierungswarteschlange einzutreten:

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Wenn die wartende Goroutine aufgeweckt wird (weil alle erwarteten Goroutinen ihre Ausführung beendet haben und das Semaphor freigegeben wurde), wird state überprüft. Wenn es nicht 0 ist, bedeutet dies, dass Wait() und Add() wieder gleichzeitig verwendet wurden:

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Wenn die CAS-Aktualisierung nicht erfolgreich war, wird die Schleife fortgesetzt.

Zusammenfassung

Abschließend sei daran erinnert, dass bei der Verwendung von WaitGroup die Methoden Add und Wait nicht gleichzeitig aufgerufen werden sollten.

Golang by www.golangdev.cn edit