waitgroup
WaitGroup wird von der Go-Standardbibliothek bereitgestellt. Seine Funktion besteht darin, auf die Beendigung einer Gruppe von Goroutinen zu warten.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}Dies ist ein sehr einfacher Code. Seine Funktion besteht darin, 10 Goroutinen zu starten, die 0-9 drucken, und auf ihre Beendigung zu warten. Die Verwendung wird nicht weiter erläutert. Als Nächstes werden wir die grundlegenden Arbeitsprinzipien verstehen, die überhaupt nicht komplex sind.
Struktur
Die Typdefinition befindet sich in der Datei sync/waitgroup.go:
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Die Felder bedeuten:
state, repräsentiert den Zustand des WaitGroup. Die oberen 32 Bits werden zum Zählen der Anzahl der erwarteten Goroutinen verwendet, die unteren 32 Bits zum Zählen der Anzahl der auf die Fertigstellung wartenden Goroutinen.sema, Semaphor. In dersync-Standardbibliothek ist es fast überall vorhanden.
Der Kern liegt in den zwei Methoden Add() und Wait(). Das grundlegende Arbeitsprinzip ist das Semaphor. Die Wait()-Methode versucht, das Semaphor zu erhalten, die Add()-Methode gibt das Semaphor frei. So wird erreicht, dass M Goroutinen auf die Beendigung einer Gruppe von N Goroutinen warten.
Add
Die Add-Methode erhöht die Anzahl der zu erwartenden Goroutinen.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}Der Ablauf ist wie folgt:
Zuerst wird eine Schiebeoperation auf
wg.statedurchgeführt, um die oberen 32 Bits und die unteren 32 Bits separat zu erhalten. Diese entsprechen den Variablenvundw:gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Dann beginnt die Beurteilung.
vrepräsentiert den WaitGroup-Zähler,wrepräsentiert die Anzahl der auf die Fertigstellung wartenden Goroutinen:Wenn
vkleiner als 0 ist, wird direktpanicausgelöst. Negative Zahlen haben keine Bedeutung:goif v < 0 { panic("sync: negative WaitGroup counter") }Wenn
wungleich 0 ist unddeltagleichvist, bedeutet dies, dass dieWait()-Methode und dieAdd()-Methode gleichzeitig aufgerufen wurden. Dies ist eine falsche Verwendung:goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Wenn
vgrößer als 0 ist oderwgleich 0 ist, bedeutet dies, dass derzeit keine Goroutinen auf die Fertigstellung warten und direkt zurückgekehrt werden kann:goif v > 0 || w == 0 { return }
Wenn dieser Punkt erreicht wird, bedeutet dies, dass
vgleich 0 ist undwgrößer als 0. Das heißt, derzeit werden keine Goroutinen ausgeführt, aber es gibt Goroutinen, die auf die Fertigstellung des WaitGroup warten. Daher muss das Semaphor freigegeben werden, um diese Goroutinen aufzuwecken:goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
Die Done()-Methode ist eigentlich Add(-1). Es gibt nichts weiter zu erklären.
Wait
Wenn derzeit andere Goroutinen auf die Beendigung der Ausführung warten müssen, wird der Aufruf der Wait-Methode die aktuelle Goroutine blockieren.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Der Ablauf ist eine for-Schleife:
Die oberen 32 Bits und die unteren 32 Bits werden gelesen, um die Anzahl der zu erwartenden Goroutinen und die Anzahl der wartenden Goroutinen zu erhalten. Wenn keine Goroutinen erwartet werden müssen, wird direkt zurückgekehrt:
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }Andernfalls wird durch eine CAS-Operation die Anzahl der wartenden Goroutinen um eins erhöht. Dann wird versucht, das Semaphor zu erhalten und in die Blockierungswarteschlange einzutreten:
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Wenn die wartende Goroutine aufgeweckt wird (weil alle erwarteten Goroutinen ihre Ausführung beendet haben und das Semaphor freigegeben wurde), wird
stateüberprüft. Wenn es nicht 0 ist, bedeutet dies, dassWait()undAdd()wieder gleichzeitig verwendet wurden:goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnWenn die CAS-Aktualisierung nicht erfolgreich war, wird die Schleife fortgesetzt.
Zusammenfassung
Abschließend sei daran erinnert, dass bei der Verwendung von WaitGroup die Methoden Add und Wait nicht gleichzeitig aufgerufen werden sollten.
