cond
sync.Cond is Go's standard library condition variable, and it's the only synchronization tool that requires manual initialization. Unlike other synchronization primitives, sync.Cond requires a mutex lock (sync.Mutex) to be passed in to protect access to shared resources. It allows goroutines to enter a waiting state until a certain condition is met, and be awakened when the condition is satisfied.
Example Code
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Create a condition variable, passing in the mutex
cd := sync.NewCond(&mu)
// Add 4 goroutines to be processed
wg.Add(4)
// Create 3 goroutines, each waiting for the condition to be met
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// When condition is not met, goroutine blocks here
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Create a goroutine to update the condition and wake up other goroutines
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Update shared variable
mu.Unlock()
if i > 100 {
cd.Broadcast() // Wake up all waiting goroutines when condition is met
break
}
time.Sleep(time.Millisecond * 10) // Simulate workload
}
}()
// Wait for all goroutines to complete
wg.Wait()
}In the above example, the shared variable i is accessed and modified concurrently by multiple goroutines. Through the mutex lock mu, it ensures that access to i is safe under concurrent conditions. Then, through sync.NewCond(&mu), a condition variable cd is created, which relies on the mu lock to ensure that access to shared resources is synchronized while waiting.
- Three waiting goroutines: Each goroutine blocks itself through
cd.Wait()until the condition is met (i > 100). These goroutines remain blocked until the value of the shared resourceiis updated. - One goroutine updating the condition and waking up others: When the condition is met (i.e.,
i > 100), this goroutine wakes up all waiting goroutines throughcd.Broadcast(), allowing them to continue execution.
Structure
type Cond struct {
// L is held while observing or changing the condition
L Locker
notify notifyList
}
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait atomic.Uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}Its structure is not complex:
L, mutex lock, here the type isLockerinterface, not a concrete lock typenotify, notification linked list for waiting goroutines
The more important part is the runtime.notifyList structure:
wait, atomic value, records how many waiting goroutines there arenotify, points to the next goroutine to be awakened, starting from 0 and incrementinglock, mutex lock, not the lock we passed in, but a lock implemented internally byruntimehead,tail, linked list pointers
It has only three methods:
Wait, block and waitSignal, wake up one waiting goroutineBroadcast, wake up all waiting goroutines
Most of its implementation is hidden under the runtime library. These implementations are located in the runtime/sema.go file, so its code in the standard library is very brief. Its basic principle is a locked blocking queue.
Wait
The Wait method causes the goroutine itself to fall into blocking wait until awakened.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}It first adds itself to notifyList, but actually just increments notifyList.wait by one. The operation here is equivalent to len(notifyList)-1, getting the index of the last element.
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}The actual add operation is completed in the notifyListWait function:
func notifyListWait(l *notifyList, t uint32) {
...
}In this function, it first locks the linked list, then quickly checks whether the current goroutine has already been awakened. If already awakened, it returns directly without blocking and waiting.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}If not awakened, it constructs a sudog to join the queue, then suspends through gopark.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)After being awakened, it releases the sudog structure:
releaseSudog(s)Signal
Signal wakes up blocked goroutines in FIFO queue order.
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Its flow is as follows:
Check without locking whether
l.waitequalsl.notify. If equal, it means all goroutines have been awakened.goif l.wait.Load() == atomic.Load(&l.notify) { return }After locking, check again whether all have been awakened.
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }Increment
l.notifyby one.goatomic.Store(&l.notify, t+1)Traverse the linked list, find the goroutine to be awakened, and finally wake up the goroutine through
runtime.goready.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast wakes up all blocked goroutines.
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Its flow is basically the same:
Lock-free check, whether all have been awakened.
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }After locking, clear the linked list, then release the lock. Subsequent newly arrived goroutines will be added to the list head.
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Traverse the linked list, wake up all goroutines.
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Summary
sync.Cond's most common use cases are scenarios requiring synchronization of certain conditions between multiple goroutines, typically applied to producer-consumer models, task scheduling, and other scenarios. In these scenarios, multiple goroutines need to wait for certain conditions to be met before continuing execution, or need to notify multiple goroutines when conditions change. It provides a flexible and efficient way to manage synchronization between goroutines. By working with mutex locks, sync.Cond can ensure safe access to shared resources and can control the execution order of goroutines when specific conditions are met. Understanding its internal implementation principles helps us better master concurrent programming techniques, especially when dealing with complex condition synchronization.
