cond
sync.Cond 是 Go 標准庫中的條件變量,它是唯一一個需要手動初始化的同步工具。與其他同步原語不同,sync.Cond 需要傳入一個互斥鎖 (sync.Mutex) 來保護共享資源的訪問。它允許協程在某個條件滿足之前進入等待狀態,並在條件滿足時被喚醒。
示例代碼
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// 創建一個條件變量,並傳入互斥鎖
cd := sync.NewCond(&mu)
// 添加 4 個待處理的協程
wg.Add(4)
// 創建 3 個協程,每個協程都會等待條件滿足
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// 條件不滿足時,協程會被阻塞在此
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// 創建一個協程,用來更新條件並喚醒其他協程
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // 更新共享變量
mu.Unlock()
if i > 100 {
cd.Broadcast() // 條件滿足時喚醒所有等待的協程
break
}
time.Sleep(time.Millisecond * 10) // 模擬工作負載
}
}()
// 等待所有協程完成
wg.Wait()
}在上面的示例中,共享變量 i 被多個協程並發訪問和修改。通過互斥鎖 mu 來確保在並發條件下,訪問 i 的操作是安全的。然後,通過 sync.NewCond(&mu) 創建了一個條件變量 cd,它依賴於 mu 鎖來保證在等待時對共享資源的訪問是同步的。
- 三個等待的協程:每個協程通過
cd.Wait()阻塞自己,直到條件滿足(i > 100)。這些協程會在共享資源i的值更新之前一直處於阻塞狀態。 - 一個更新條件並喚醒其他協程的協程:當條件滿足時(即
i > 100),這個協程通過cd.Broadcast()喚醒所有等待的協程,讓它們繼續執行。
結構
type Cond struct {
// L is held while observing or changing the condition
L Locker
notify notifyList
}
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait atomic.Uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}其結構並不復雜:
L,互斥鎖,這裡的類型是Locker接口,而不是具體的鎖類型notify,等待協程的通知鏈表
比較重要的是runtime.notifyList結構
wait,原子值,記錄了有多少個等待協程notify,指向下一個將要被喚醒的協程,從 0 開始遞增lock,互斥鎖,並不是我們傳入的鎖,而是runtime內部實現的一個鎖head,tail,鏈表指針
它總共就三個方法
Wait, 阻塞等待Signal,喚醒一個等待協程Broadcast,喚醒所有等待協程
它的大部分實現都被隱藏在了runtime庫下,這些實現位於runtime/sema.go文件中,以至於在標准庫中它的代碼非常簡短,其基本原理就是一個加了鎖的阻塞隊列。
Wait
Wait方法會讓協程自身陷入阻塞等待,直到被喚醒。
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}它首先會將自身加入notifyList中,但其實只是將notifyList.wait加一而已,這裡的操作就相當於len(notifyList)-1 ,得到了最後一個元素的下標
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}真正的加入操作是在notifyListWait函數中完成
func notifyListWait(l *notifyList, t uint32) {
...
}在該函數中,它首先會對鏈表進行上鎖,然後快速判斷當前協程是否已經被喚醒了,如果已經喚醒了就直接返回,不需要阻塞等待。
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}如果沒有被喚醒,則構造成sudog加入隊列,然後通過gopark掛起。
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)被喚醒後釋放sudog結構
releaseSudog(s)Signal
Signal會按照隊列先入先出的順序喚醒阻塞的協程
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}它的流程如下
不加鎖直接判斷,
l.wait是否等於l.notify,相等則表示所有協程都已經喚醒goif l.wait.Load() == atomic.Load(&l.notify) { return }加鎖後,再判斷一次是否都已經被喚醒
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }l.notify加一goatomic.Store(&l.notify, t+1)循環遍歷鏈表,找到需要被喚醒的協程,最後通過
runtime.goready來喚醒協程。gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast會喚醒所有阻塞的協程
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}它的流程基本上是一致的
無鎖檢查,是否都已經被喚醒了
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }加鎖,清空鏈表,然後釋放鎖,後續新到達的協程會被添加到鏈表頭部
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)遍歷鏈表,喚醒所有協程
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
小結
sync.Cond 最常見的使用場景是需要在多個協程之間同步某些條件,通常應用於生產者-消費者模型、任務調度等場景。在這些場景中,多個協程需要等待某些條件滿足才能繼續執行,或者需要在條件改變時通知多個協程。它提供了一種靈活且高效的方式來管理協程間的同步。通過與互斥鎖配合使用, sync.Cond 可以確保共享資源的訪問安全,並且可以在特定條件滿足時控制協程的執行順序。理解其內部實現原理有助於我們更好地掌握並發編程的技巧,尤其是在涉及復雜條件同步時。
