Skip to content

cond

sync.Cond 是 Go 標准庫中的條件變量,它是唯一一個需要手動初始化的同步工具。與其他同步原語不同,sync.Cond 需要傳入一個互斥鎖 (sync.Mutex) 來保護共享資源的訪問。它允許協程在某個條件滿足之前進入等待狀態,並在條件滿足時被喚醒。

示例代碼

go
package main

import (
    "fmt"
    "sync"
    "time"
)

var i = 0

func main() {
    var mu sync.Mutex
    var wg sync.WaitGroup

    // 創建一個條件變量,並傳入互斥鎖
    cd := sync.NewCond(&mu)

    // 添加 4 個待處理的協程
    wg.Add(4)

    // 創建 3 個協程,每個協程都會等待條件滿足
   	for j := range 3 {
		go func() {
			defer wg.Done()

			mu.Lock()
			for i <= 100 {
                 // 條件不滿足時,協程會被阻塞在此
				cd.Wait()
			}
			fmt.Printf("%d wake up\n", j)
			mu.Unlock()
		}()
	}

    // 創建一個協程,用來更新條件並喚醒其他協程
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            i++ // 更新共享變量
            mu.Unlock()
            if i > 100 {
                cd.Broadcast() // 條件滿足時喚醒所有等待的協程
                break
            }
            time.Sleep(time.Millisecond * 10) // 模擬工作負載
        }
    }()

    // 等待所有協程完成
    wg.Wait()
}

在上面的示例中,共享變量 i 被多個協程並發訪問和修改。通過互斥鎖 mu 來確保在並發條件下,訪問 i 的操作是安全的。然後,通過 sync.NewCond(&mu) 創建了一個條件變量 cd,它依賴於 mu 鎖來保證在等待時對共享資源的訪問是同步的。

  • 三個等待的協程:每個協程通過 cd.Wait() 阻塞自己,直到條件滿足(i > 100)。這些協程會在共享資源 i 的值更新之前一直處於阻塞狀態。
  • 一個更新條件並喚醒其他協程的協程:當條件滿足時(即 i > 100),這個協程通過 cd.Broadcast() 喚醒所有等待的協程,讓它們繼續執行。

結構

go
type Cond struct {
	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
}

type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait atomic.Uint32

	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

其結構並不復雜:

  • L,互斥鎖,這裡的類型是Locker接口,而不是具體的鎖類型
  • notify,等待協程的通知鏈表

比較重要的是runtime.notifyList結構

  • wait,原子值,記錄了有多少個等待協程
  • notify,指向下一個將要被喚醒的協程,從 0 開始遞增
  • lock,互斥鎖,並不是我們傳入的鎖,而是runtime內部實現的一個鎖
  • headtail,鏈表指針

它總共就三個方法

  • Wait, 阻塞等待
  • Signal ,喚醒一個等待協程
  • Broadcast,喚醒所有等待協程

它的大部分實現都被隱藏在了runtime庫下,這些實現位於runtime/sema.go文件中,以至於在標准庫中它的代碼非常簡短,其基本原理就是一個加了鎖的阻塞隊列。

Wait

Wait方法會讓協程自身陷入阻塞等待,直到被喚醒。

go
func (c *Cond) Wait() {
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

它首先會將自身加入notifyList中,但其實只是將notifyList.wait加一而已,這裡的操作就相當於len(notifyList)-1 ,得到了最後一個元素的下標

go
func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

真正的加入操作是在notifyListWait函數中完成

go
func notifyListWait(l *notifyList, t uint32) {
	...
}

在該函數中,它首先會對鏈表進行上鎖,然後快速判斷當前協程是否已經被喚醒了,如果已經喚醒了就直接返回,不需要阻塞等待。

go
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
	unlock(&l.lock)
	return
}

如果沒有被喚醒,則構造成sudog加入隊列,然後通過gopark掛起。

go
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
	l.head = s
} else {
	l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)

被喚醒後釋放sudog結構

go
releaseSudog(s)

Signal

Signal會按照隊列先入先出的順序喚醒阻塞的協程

go
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

它的流程如下

  1. 不加鎖直接判斷,l.wait是否等於l.notify,相等則表示所有協程都已經喚醒

    go
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. 加鎖後,再判斷一次是否都已經被喚醒

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == l.wait.Load() {
    	unlock(&l.lock)
    	return
    }
  3. l.notify加一

    go
    atomic.Store(&l.notify, t+1)
  4. 循環遍歷鏈表,找到需要被喚醒的協程,最後通過runtime.goready來喚醒協程。

    go
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    	if s.ticket == t {
    		n := s.next
    		if p != nil {
    			p.next = n
    		} else {
    			l.head = n
    		}
    		if n == nil {
    			l.tail = p
    		}
    		unlock(&l.lock)
    		s.next = nil
    		readyWithTime(s, 4)
    		return
    	}
    }
    unlock(&l.lock)

Broadcast

Broadcast會喚醒所有阻塞的協程

go
func (c *Cond) Broadcast() {
    runtime_notifyListNotifyAll(&c.notify)
}

它的流程基本上是一致的

  1. 無鎖檢查,是否都已經被喚醒了

    go
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. 加鎖,清空鏈表,然後釋放鎖,後續新到達的協程會被添加到鏈表頭部

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    atomic.Store(&l.notify, l.wait.Load())
    unlock(&l.lock)
  3. 遍歷鏈表,喚醒所有協程

    go
    for s != nil {
    	next := s.next
    	s.next = nil
    	readyWithTime(s, 4)
    	s = next
    }

小結

sync.Cond 最常見的使用場景是需要在多個協程之間同步某些條件,通常應用於生產者-消費者模型、任務調度等場景。在這些場景中,多個協程需要等待某些條件滿足才能繼續執行,或者需要在條件改變時通知多個協程。它提供了一種靈活且高效的方式來管理協程間的同步。通過與互斥鎖配合使用, sync.Cond 可以確保共享資源的訪問安全,並且可以在特定條件滿足時控制協程的執行順序。理解其內部實現原理有助於我們更好地掌握並發編程的技巧,尤其是在涉及復雜條件同步時。

Golang學習網由www.golangdev.cn整理維護