Skip to content

cond

sync.Cond 是 Go 标准库中的条件变量,它是唯一一个需要手动初始化的同步工具。与其他同步原语不同,sync.Cond 需要传入一个互斥锁 (sync.Mutex) 来保护共享资源的访问。它允许协程在某个条件满足之前进入等待状态,并在条件满足时被唤醒。

示例代码

go
package main

import (
    "fmt"
    "sync"
    "time"
)

var i = 0

func main() {
    var mu sync.Mutex
    var wg sync.WaitGroup

    // 创建一个条件变量,并传入互斥锁
    cd := sync.NewCond(&mu)

    // 添加 4 个待处理的协程
    wg.Add(4)

    // 创建 3 个协程,每个协程都会等待条件满足
   	for j := range 3 {
		go func() {
			defer wg.Done()

			mu.Lock()
			for i <= 100 {
                 // 条件不满足时,协程会被阻塞在此
				cd.Wait()
			}
			fmt.Printf("%d wake up\n", j)
			mu.Unlock()
		}()
	}

    // 创建一个协程,用来更新条件并唤醒其他协程
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            i++ // 更新共享变量
            mu.Unlock()
            if i > 100 {
                cd.Broadcast() // 条件满足时唤醒所有等待的协程
                break
            }
            time.Sleep(time.Millisecond * 10) // 模拟工作负载
        }
    }()

    // 等待所有协程完成
    wg.Wait()
}

在上面的示例中,共享变量 i 被多个协程并发访问和修改。通过互斥锁 mu 来确保在并发条件下,访问 i 的操作是安全的。然后,通过 sync.NewCond(&mu) 创建了一个条件变量 cd,它依赖于 mu 锁来保证在等待时对共享资源的访问是同步的。

  • 三个等待的协程:每个协程通过 cd.Wait() 阻塞自己,直到条件满足(i > 100)。这些协程会在共享资源 i 的值更新之前一直处于阻塞状态。
  • 一个更新条件并唤醒其他协程的协程:当条件满足时(即 i > 100),这个协程通过 cd.Broadcast() 唤醒所有等待的协程,让它们继续执行。

结构

go
type Cond struct {
	// L is held while observing or changing the condition
	L Locker

	notify  notifyList
}

type notifyList struct {
	// wait is the ticket number of the next waiter. It is atomically
	// incremented outside the lock.
	wait atomic.Uint32

	notify uint32

	// List of parked waiters.
	lock mutex
	head *sudog
	tail *sudog
}

其结构并不复杂:

  • L,互斥锁,这里的类型是Locker接口,而不是具体的锁类型
  • notify,等待协程的通知链表

比较重要的是runtime.notifyList结构

  • wait,原子值,记录了有多少个等待协程
  • notify,指向下一个将要被唤醒的协程,从 0 开始递增
  • lock,互斥锁,并不是我们传入的锁,而是runtime内部实现的一个锁
  • headtail,链表指针

它总共就三个方法

  • Wait, 阻塞等待
  • Signal ,唤醒一个等待协程
  • Broadcast,唤醒所有等待协程

它的大部分实现都被隐藏在了runtime库下,这些实现位于runtime/sema.go文件中,以至于在标准库中它的代码非常简短,其基本原理就是一个加了锁的阻塞队列。

Wait

Wait方法会让协程自身陷入阻塞等待,直到被唤醒。

go
func (c *Cond) Wait() {
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

它首先会将自身加入notifyList中,但其实只是将notifyList.wait加一而已,这里的操作就相当于len(notifyList)-1 ,得到了最后一个元素的下标

go
func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

真正的加入操作是在notifyListWait函数中完成

go
func notifyListWait(l *notifyList, t uint32) {
	...
}

在该函数中,它首先会对链表进行上锁,然后快速判断当前协程是否已经被唤醒了,如果已经唤醒了就直接返回,不需要阻塞等待。

go
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
	unlock(&l.lock)
	return
}

如果没有被唤醒,则构造成sudog加入队列,然后通过gopark挂起。

go
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
	l.head = s
} else {
	l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)

被唤醒后释放sudog结构

go
releaseSudog(s)

Signal

Signal会按照队列先入先出的顺序唤醒阻塞的协程

go
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

它的流程如下

  1. 不加锁直接判断,l.wait是否等于l.notify,相等则表示所有协程都已经唤醒

    go
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. 加锁后,再判断一次是否都已经被唤醒

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == l.wait.Load() {
    	unlock(&l.lock)
    	return
    }
  3. l.notify加一

    go
    atomic.Store(&l.notify, t+1)
  4. 循环遍历链表,找到需要被唤醒的协程,最后通过runtime.goready来唤醒协程。

    go
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    	if s.ticket == t {
    		n := s.next
    		if p != nil {
    			p.next = n
    		} else {
    			l.head = n
    		}
    		if n == nil {
    			l.tail = p
    		}
    		unlock(&l.lock)
    		s.next = nil
    		readyWithTime(s, 4)
    		return
    	}
    }
    unlock(&l.lock)

Broadcast

Broadcast会唤醒所有阻塞的协程

go
func (c *Cond) Broadcast() {
    runtime_notifyListNotifyAll(&c.notify)
}

它的流程基本上是一致的

  1. 无锁检查,是否都已经被唤醒了

    go
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. 加锁,清空链表,然后释放锁,后续新到达的协程会被添加到链表头部

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    atomic.Store(&l.notify, l.wait.Load())
    unlock(&l.lock)
  3. 遍历链表,唤醒所有协程

    go
    for s != nil {
    	next := s.next
    	s.next = nil
    	readyWithTime(s, 4)
    	s = next
    }

小结

sync.Cond 最常见的使用场景是需要在多个协程之间同步某些条件,通常应用于生产者-消费者模型、任务调度等场景。在这些场景中,多个协程需要等待某些条件满足才能继续执行,或者需要在条件改变时通知多个协程。它提供了一种灵活且高效的方式来管理协程间的同步。通过与互斥锁配合使用, sync.Cond 可以确保共享资源的访问安全,并且可以在特定条件满足时控制协程的执行顺序。理解其内部实现原理有助于我们更好地掌握并发编程的技巧,尤其是在涉及复杂条件同步时。

Golang学习网由www.golangdev.cn整理维护