cond
sync.Cond は Go 標準ライブラリの条件変数で、手動初期化が必要な唯一の同期ツールです。他の同期プリミティブとは異なり、sync.Cond は共有リソースへのアクセスを保護するためにミューテックスロック (sync.Mutex) を渡す必要があります。これはコルーチンが特定の条件が満たされるまで待機状態に入り、条件が満たされた際にウェイクアップされることを可能にします。
サンプルコード
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// 条件変数を作成し、ミューテックスロックを渡す
cd := sync.NewCond(&mu)
// 4 つの待機コルーチンを追加
wg.Add(4)
// 3 つのコルーチンを作成し、各コルーチンは条件が満たされるまで待機
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// 条件が満たされない場合、コルーチンはここでブロック
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// 条件を更新し、他のコルーチンをウェイクアップするコルーチンを作成
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // 共有変数を更新
mu.Unlock()
if i > 100 {
cd.Broadcast() // 条件が満たされた際にすべての待機コルーチンをウェイクアップ
break
}
time.Sleep(time.Millisecond * 10) // 作業負荷をシミュレート
}
}()
// すべてのコルーチンが完了するのを待機
wg.Wait()
}上記の例では、共有変数 i が複数のコルーチンによって同時にアクセスおよび変更されます。ミューテックスロック mu を使用して、同時実行条件下で i へのアクセス操作が安全であることを保証します。その後、sync.NewCond(&mu) を使用して条件変数 cd を作成します。これは mu ロックに依存して、待機時に共有リソースへのアクセスが同期的であることを保証します。
- 3 つの待機コルーチン:各コルーチンは
cd.Wait()で自身をブロックし、条件が満たされる(i > 100)まで待機します。これらのコルーチンは共有リソースiの値が更新されるまでブロック状態のままです。 - 条件を更新し、他のコルーチンをウェイクアップするコルーチン:条件が満たされた場合(つまり
i > 100)、このコルーチンはcd.Broadcast()を使用してすべての待機コルーチンをウェイクアップし、続行できるようにします。
構造
type Cond struct {
// L is held while observing or changing the condition
L Locker
notify notifyList
}
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait atomic.Uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}その構造は複雑ではありません。
L、ミューテックスロックで、ここでは具体的なロックタイプではなくLockerインターフェースですnotify、待機コルーチンの通知リンクドリスト
重要なのは runtime.notifyList 構造体です。
wait、アトミック値で、何個の待機コルーチンがあるかを記録しますnotify、次にウェイクアップされるコルーチンを指し、0 から開始して増加しますlock、ミューテックスロックで、渡されたロックではなくruntime内部で実装されたロックですhead、tail、リンクドリストポインタ
合計で 3 つのメソッドがあります。
Wait、ブロック待機Signal、1 つの待機コルーチンをウェイクアップBroadcast、すべての待機コルーチンをウェイクアップ
実装の大部分は runtime ライブラリに隠されており、これらの実装は runtime/sema.go ファイルに位置しています。そのため標準ライブラリではコードが非常に簡潔で、基本原理はロック付きのブロッキングキューです。
Wait
Wait メソッドはコルーチン自身をブロック待機状態にし、ウェイクアップされるまで待機します。
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}まず自身を notifyList に追加しますが、実際には notifyList.wait を 1 つ増加するだけです。ここの操作は len(notifyList)-1 と同等で、最後の要素のインデックスを取得します。
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}実際の追加操作は notifyListWait 関数で完了されます。
func notifyListWait(l *notifyList, t uint32) {
...
}この関数では、まずリンクドリストにロックをかけ、その後現在のコルーチンがすでにウェイクアップされているかどうかを素早く判断します。すでにウェイクアップされている場合は直接戻り、ブロック待機する必要はありません。
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}ウェイクアップされていない場合は、sudog を構成してキューに追加し、その後 gopark を通じてサスペンドします。
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)ウェイクアップ後に sudog 構造体を解放します。
releaseSudog(s)Signal
Signal はキューの先入先出順序に従ってブロックされたコルーチンをウェイクアップします。
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}そのフローは以下の通りです。
ロックなしで
l.waitがl.notifyと等しいかどうかを判断します。等しい場合はすべてのコルーチンがすでにウェイクアップされていることを示します。goif l.wait.Load() == atomic.Load(&l.notify) { return }ロックをかけた後、再度すべてがすでにウェイクアップされているかどうかを判断します。
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }l.notifyを 1 つ増加します。goatomic.Store(&l.notify, t+1)リンクドリストを循環して、ウェイクアップが必要なコルーチンを見つけ、最後に
runtime.goreadyを通じてコルーチンをウェイクアップします。gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast はすべてのブロックされたコルーチンをウェイクアップします。
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}そのフローは基本的に同じです。
ロックなしでチェックし、すべてがすでにウェイクアップされているかどうかを確認します。
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }ロックをかけ、リンクドリストをクリアし、その後ロックを解放します。後続で新たに到達するコルーチンはリンクドリストのヘッダーに追加されます。
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)リンクドリストを循環して、すべてのコルーチンをウェイクアップします。
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
まとめ
sync.Cond の最も一般的な使用シーンは、複数のコルーチン間で特定の条件を同期する必要がある場合で、通常は生産者 - 消費者モデル、タスクスケジューリングなどのシーンに適用されます。これらのシーンでは、複数のコルーチンが特定の条件が満たされるまで待機してから続行するか、または条件が変更された際に複数のコルーチンに通知する必要があります。これはコルーチン間の同期を管理するための柔軟かつ効率的な方法を提供します。ミューテックスロックと連携して使用することで、sync.Cond は共有リソースへのアクセス安全を確保し、特定の条件が満たされた際にコルーチンの実行順序を制御できます。その内部実装原理を理解することは、複雑な条件同期に関わる場合特に、並行プログラミングのテクニックをよりよく掌握するのに役立ちます。
