Skip to content

waitgroup

WaitGroup は Go 標準ライブラリによって提供され、その機能は一組のコルーチンの実行完了を待機するために使用されます。

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

これは非常にシンプルなコードで、その機能は 0-9 を印刷する 10 個のコルーチンを開始し、それらの実行完了を待機することです。その使用方法についてはここで詳しく説明しませんが、次に基本的な動作原理について理解しましょう。少しも複雑ではありません。

構造

そのタイプ定義は sync/waitgroup.go ファイルに位置しています。

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

フィールドの説明は以下の通りです。

  • state、WaitGroup の状態を表します。上位 32 ビットは待機されるコルーチンの数を統計し、下位 32 ビットは wg 完了を待機するコルーチンの数を統計します。
  • sema、セマフォで、sync 標準ライブラリではほぼ至る所に存在します。

その核心は Add()Wait() という 2 つのメソッドにあり、基本的な動作原理はセマフォです。Wait() メソッドはセマフォの取得を試み、Add() メソッドはセマフォを解放して、M 個のコルーチンが一組の N 個のコルーチンの実行完了を待機するように実装します。

Add

Add メソッドは待機する必要があるコルーチンの数を増加します。

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

フローは以下の通りです。

  1. まず wg.state にシフト操作を行い、それぞれ上位 32 ビットと下位 32 ビットを取得します。対応する変数は vw です。

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. その後判断を開始します。v は wg カウンターを表し、w は wg 完了を待機するコルーチンの数を表します。

    1. v が 0 より小さい場合、直接 panic します。負数には何の意味もありません。

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. w が 0 でなく、deltav が等しい場合、Wait() メソッドと Add() メソッドが同時に呼び出されたことを示します。これは誤った使用方法です。

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. v が 0 より大きいか、または w が 0 の場合、現在 wg 完了を待機するコルーチンがないことを示し、直接戻れます。

      go
      if v > 0 || w == 0 {
      	return
      }
  3. ここに到達したということは、v が 0 で、w が 0 より大きいことを示します。つまり現在実行中のコルーチンはありませんが、wg 完了を待機するコルーチンがあります。したがってセマフォを解放して、これらのコルーチンをウェイクアップする必要があります。

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Done() メソッドは実際には Add(-1) で、説明する必要はありません。

Wait

現在他のコルーチンが実行完了を待機する必要がある場合、Wait メソッドの呼び出しは現在のコルーチンをブロッキング状態にします。

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

そのフローは for ループです。

  1. 上位 32 ビットと下位 32 ビットを読み取り、待機される必要があるコルーチンの数と、待機するコルーチンの数を取得します。待機する必要があるコルーチンがない場合は直接戻ります。

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. それ以外の場合は、CAS 操作を通じて待機コルーチン数を 1 つ増加し、その後セマフォの取得を試み、ブロッキング待機キューに入ります。

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. 待機コルーチンがウェイクアップされた後(すべての待機コルーチンの実行が完了し、セマフォが解放されたため)、state をチェックします。0 でない場合、Wait()Add() が再度同時に使用されたことを示します。

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. CAS が正常に更新されなかった場合は、ループを継続します。

まとめ

最後に、WaitGroup を使用する際に、AddWait を同時に呼び出さないように注意してください。

Golang学习网由www.golangdev.cn整理维护