waitgroup
WaitGroup は Go 標準ライブラリによって提供され、その機能は一組のコルーチンの実行完了を待機するために使用されます。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}これは非常にシンプルなコードで、その機能は 0-9 を印刷する 10 個のコルーチンを開始し、それらの実行完了を待機することです。その使用方法についてはここで詳しく説明しませんが、次に基本的な動作原理について理解しましょう。少しも複雑ではありません。
構造
そのタイプ定義は sync/waitgroup.go ファイルに位置しています。
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}フィールドの説明は以下の通りです。
state、WaitGroup の状態を表します。上位 32 ビットは待機されるコルーチンの数を統計し、下位 32 ビットは wg 完了を待機するコルーチンの数を統計します。sema、セマフォで、sync標準ライブラリではほぼ至る所に存在します。
その核心は Add() と Wait() という 2 つのメソッドにあり、基本的な動作原理はセマフォです。Wait() メソッドはセマフォの取得を試み、Add() メソッドはセマフォを解放して、M 個のコルーチンが一組の N 個のコルーチンの実行完了を待機するように実装します。
Add
Add メソッドは待機する必要があるコルーチンの数を増加します。
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}フローは以下の通りです。
まず
wg.stateにシフト操作を行い、それぞれ上位 32 ビットと下位 32 ビットを取得します。対応する変数はvとwです。gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)その後判断を開始します。
vは wg カウンターを表し、wは wg 完了を待機するコルーチンの数を表します。vが 0 より小さい場合、直接panicします。負数には何の意味もありません。goif v < 0 { panic("sync: negative WaitGroup counter") }wが 0 でなく、deltaとvが等しい場合、Wait()メソッドとAdd()メソッドが同時に呼び出されたことを示します。これは誤った使用方法です。goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }vが 0 より大きいか、またはwが 0 の場合、現在 wg 完了を待機するコルーチンがないことを示し、直接戻れます。goif v > 0 || w == 0 { return }
ここに到達したということは、
vが 0 で、wが 0 より大きいことを示します。つまり現在実行中のコルーチンはありませんが、wg 完了を待機するコルーチンがあります。したがってセマフォを解放して、これらのコルーチンをウェイクアップする必要があります。goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
Done() メソッドは実際には Add(-1) で、説明する必要はありません。
Wait
現在他のコルーチンが実行完了を待機する必要がある場合、Wait メソッドの呼び出しは現在のコルーチンをブロッキング状態にします。
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}そのフローは for ループです。
上位 32 ビットと下位 32 ビットを読み取り、待機される必要があるコルーチンの数と、待機するコルーチンの数を取得します。待機する必要があるコルーチンがない場合は直接戻ります。
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }それ以外の場合は、CAS 操作を通じて待機コルーチン数を 1 つ増加し、その後セマフォの取得を試み、ブロッキング待機キューに入ります。
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }待機コルーチンがウェイクアップされた後(すべての待機コルーチンの実行が完了し、セマフォが解放されたため)、
stateをチェックします。0 でない場合、Wait()とAdd()が再度同時に使用されたことを示します。goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnCAS が正常に更新されなかった場合は、ループを継続します。
まとめ
最後に、WaitGroup を使用する際に、Add と Wait を同時に呼び出さないように注意してください。
