cond
sync.Cond — это условная переменная стандартной библиотеки Go, и это единственный инструмент синхронизации, требующий ручной инициализации. В отличие от других примитивов синхронизации, sync.Cond требует передачи блокировки мьютекса (sync.Mutex) для защиты доступа к общим ресурсам. Она позволяет goroutine переходить в состояние ожидания до тех пор, пока не будет выполнено определённое условие, и быть пробуждёнными, когда условие выполнено.
Пример кода
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Создаём условную переменную, передавая мьютекс
cd := sync.NewCond(&mu)
// Добавляем 4 goroutine для обработки
wg.Add(4)
// Создаём 3 goroutine, каждая ждёт выполнения условия
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Когда условие не выполнено, goroutine блокируется здесь
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Создаём goroutine для обновления условия и пробуждения других goroutine
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Обновляем общую переменную
mu.Unlock()
if i > 100 {
cd.Broadcast() // Пробуждаем все ожидающие goroutine, когда условие выполнено
break
}
time.Sleep(time.Millisecond * 10) // Симулируем нагрузку
}
}()
// Ждём завершения всех goroutine
wg.Wait()
}В приведённом примере общая переменная i одновременно访问ается и модифицируется несколькими goroutine. Через блокировку мьютекса mu обеспечивается безопасность доступа к i в условиях конкурентности. Затем через sync.NewCond(&mu) создаётся условная переменная cd, которая полагается на блокировку mu для обеспечения синхронизации доступа к общим ресурсам во время ожидания.
- Три ожидающие goroutine: Каждая goroutine блокирует себя через
cd.Wait()до тех пор, пока условие не будет выполнено (i > 100). Эти goroutine остаются заблокированными до тех пор, пока значение общего ресурсаiне будет обновлено. - Одна goroutine, обновляющая условие и пробуждающая другие: Когда условие выполнено (т.е.
i > 100), эта goroutine пробуждает все ожидающие goroutine черезcd.Broadcast(), позволяя им продолжить выполнение.
Структура
type Cond struct {
// L удерживается при наблюдении или изменении условия
L Locker
notify notifyList
}
type notifyList struct {
// wait — номер билета следующего ожидающего. Атомарно
// инкрементируется вне блокировки.
wait atomic.Uint32
notify uint32
// Список припаркованных ожидающих.
lock mutex
head *sudog
tail *sudog
}Её структура не сложная:
L, блокировка мьютекса, здесь тип — интерфейсLocker, а не конкретный тип блокировкиnotify, список уведомлений для ожидающих goroutine
Более важная часть — структура runtime.notifyList:
wait, атомарное значение, записывает, сколько ожидающих goroutinenotify, указывает на следующую goroutine для пробуждения, начиная с 0 и увеличиваясьlock, блокировка мьютекса, не та блокировка, которую мы передали, а блокировка, реализованная внутреннеruntimehead,tail, указатели связного списка
У неё есть только три метода:
Wait, блокировать и ждатьSignal, пробудить одну ожидающую goroutineBroadcast, пробудить все ожидающие goroutine
Большая часть её реализации скрыта под библиотекой runtime. Эти реализации находятся в файле runtime/sema.go, поэтому её код в стандартной библиотеке очень краток. Её базовый принцип — блокирующая очередь с блокировкой.
Wait
Метод Wait заставляет саму goroutine перейти в состояние блокирующего ожидания до пробуждения.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Сначала добавляет себя в notifyList, но на самом деле просто инкрементирует notifyList.wait на единицу. Операция здесь эквивалентна len(notifyList)-1, получая индекс последнего элемента.
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}Фактическая операция добавления завершается в функции notifyListWait:
func notifyListWait(l *notifyList, t uint32) {
...
}В этой функции сначала блокирует связный список, затем быстро проверяет, была ли текущая goroutine уже пробуждена. Если уже пробуждена, возвращается напрямую без блокировки и ожидания.
lockWithRank(&l.lock, lockRankNotifyList)
// Возвращаемся сразу, если этот билет уже был уведомлён.
if less(t, l.notify) {
unlock(&l.lock)
return
}Если не пробуждена, конструирует sudog для присоединения к очереди, затем приостанавливает через gopark.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)После пробуждения освобождает структуру sudog:
releaseSudog(s)Signal
Signal пробуждает заблокированные goroutine в порядке очереди FIFO.
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Её поток следующий:
Проверяет без блокировки, равен ли
l.waitl.notify. Если равны, это означает, что все goroutine были пробуждены.goif l.wait.Load() == atomic.Load(&l.notify) { return }После блокировки проверяет ещё раз, все ли были пробуждены.
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }Инкрементирует
l.notifyна единицу.goatomic.Store(&l.notify, t+1)Обходит связный список, находит goroutine для пробуждения, и наконец пробуждает goroutine через
runtime.goready.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast пробуждает все заблокированные goroutine.
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Её поток в основном тот же:
Проверка без блокировки, все ли были пробуждены.
go// Быстрый путь: если нет новых ожидающих с последнего уведомления, // нам не нужно приобретать блокировку. if l.wait.Load() == atomic.Load(&l.notify) { return }После блокировки очищает связный список, затем освобождает блокировку. Вновь прибывшие goroutine будут добавлены в голову списка.
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Обходит связный список, пробуждает все goroutine.
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Итоги
Наиболее распространённые случаи использования sync.Cond — это сценарии, требующие синхронизации определённых условий между несколькими goroutine, обычно применяемые к моделям производитель-потребитель, планированию задач и другим сценариям. В этих сценариях нескольким goroutine нужно ждать выполнения определённых условий перед продолжением выполнения, или нужно уведомлять несколько goroutine при изменении условий. Она предоставляет гибкий и эффективный способ управления синхронизацией между goroutine. Работая с блокировками мьютексов, sync.Cond может обеспечить безопасный доступ к общим ресурсам и может контролировать порядок выполнения goroutine при выполнении определённых условий. Понимание её внутренних принципов реализации помогает нам лучше освоить техники конкурентного программирования, особенно при работе со сложной синхронизацией условий.
