Skip to content

cond

sync.Cond — это условная переменная стандартной библиотеки Go, и это единственный инструмент синхронизации, требующий ручной инициализации. В отличие от других примитивов синхронизации, sync.Cond требует передачи блокировки мьютекса (sync.Mutex) для защиты доступа к общим ресурсам. Она позволяет goroutine переходить в состояние ожидания до тех пор, пока не будет выполнено определённое условие, и быть пробуждёнными, когда условие выполнено.

Пример кода

go
package main

import (
    "fmt"
    "sync"
    "time"
)

var i = 0

func main() {
    var mu sync.Mutex
    var wg sync.WaitGroup

    // Создаём условную переменную, передавая мьютекс
    cd := sync.NewCond(&mu)

    // Добавляем 4 goroutine для обработки
    wg.Add(4)

    // Создаём 3 goroutine, каждая ждёт выполнения условия
   	for j := range 3 {
		go func() {
			defer wg.Done()

			mu.Lock()
			for i <= 100 {
                 // Когда условие не выполнено, goroutine блокируется здесь
				cd.Wait()
			}
			fmt.Printf("%d wake up\n", j)
			mu.Unlock()
		}()
	}

    // Создаём goroutine для обновления условия и пробуждения других goroutine
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            i++ // Обновляем общую переменную
            mu.Unlock()
            if i > 100 {
                cd.Broadcast() // Пробуждаем все ожидающие goroutine, когда условие выполнено
                break
            }
            time.Sleep(time.Millisecond * 10) // Симулируем нагрузку
        }
    }()

    // Ждём завершения всех goroutine
    wg.Wait()
}

В приведённом примере общая переменная i одновременно访问ается и модифицируется несколькими goroutine. Через блокировку мьютекса mu обеспечивается безопасность доступа к i в условиях конкурентности. Затем через sync.NewCond(&mu) создаётся условная переменная cd, которая полагается на блокировку mu для обеспечения синхронизации доступа к общим ресурсам во время ожидания.

  • Три ожидающие goroutine: Каждая goroutine блокирует себя через cd.Wait() до тех пор, пока условие не будет выполнено (i > 100). Эти goroutine остаются заблокированными до тех пор, пока значение общего ресурса i не будет обновлено.
  • Одна goroutine, обновляющая условие и пробуждающая другие: Когда условие выполнено (т.е. i > 100), эта goroutine пробуждает все ожидающие goroutine через cd.Broadcast(), позволяя им продолжить выполнение.

Структура

go
type Cond struct {
	// L удерживается при наблюдении или изменении условия
	L Locker

	notify  notifyList
}

type notifyList struct {
	// wait — номер билета следующего ожидающего. Атомарно
	// инкрементируется вне блокировки.
	wait atomic.Uint32

	notify uint32

	// Список припаркованных ожидающих.
	lock mutex
	head *sudog
	tail *sudog
}

Её структура не сложная:

  • L, блокировка мьютекса, здесь тип — интерфейс Locker, а не конкретный тип блокировки
  • notify, список уведомлений для ожидающих goroutine

Более важная часть — структура runtime.notifyList:

  • wait, атомарное значение, записывает, сколько ожидающих goroutine
  • notify, указывает на следующую goroutine для пробуждения, начиная с 0 и увеличиваясь
  • lock, блокировка мьютекса, не та блокировка, которую мы передали, а блокировка, реализованная внутренне runtime
  • head, tail, указатели связного списка

У неё есть только три метода:

  • Wait, блокировать и ждать
  • Signal, пробудить одну ожидающую goroutine
  • Broadcast, пробудить все ожидающие goroutine

Большая часть её реализации скрыта под библиотекой runtime. Эти реализации находятся в файле runtime/sema.go, поэтому её код в стандартной библиотеке очень краток. Её базовый принцип — блокирующая очередь с блокировкой.

Wait

Метод Wait заставляет саму goroutine перейти в состояние блокирующего ожидания до пробуждения.

go
func (c *Cond) Wait() {
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Сначала добавляет себя в notifyList, но на самом деле просто инкрементирует notifyList.wait на единицу. Операция здесь эквивалентна len(notifyList)-1, получая индекс последнего элемента.

go
func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

Фактическая операция добавления завершается в функции notifyListWait:

go
func notifyListWait(l *notifyList, t uint32) {
	...
}

В этой функции сначала блокирует связный список, затем быстро проверяет, была ли текущая goroutine уже пробуждена. Если уже пробуждена, возвращается напрямую без блокировки и ожидания.

go
lockWithRank(&l.lock, lockRankNotifyList)
// Возвращаемся сразу, если этот билет уже был уведомлён.
if less(t, l.notify) {
	unlock(&l.lock)
	return
}

Если не пробуждена, конструирует sudog для присоединения к очереди, затем приостанавливает через gopark.

go
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
	l.head = s
} else {
	l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)

После пробуждения освобождает структуру sudog:

go
releaseSudog(s)

Signal

Signal пробуждает заблокированные goroutine в порядке очереди FIFO.

go
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

Её поток следующий:

  1. Проверяет без блокировки, равен ли l.wait l.notify. Если равны, это означает, что все goroutine были пробуждены.

    go
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. После блокировки проверяет ещё раз, все ли были пробуждены.

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == l.wait.Load() {
    	unlock(&l.lock)
    	return
    }
  3. Инкрементирует l.notify на единицу.

    go
    atomic.Store(&l.notify, t+1)
  4. Обходит связный список, находит goroutine для пробуждения, и наконец пробуждает goroutine через runtime.goready.

    go
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    	if s.ticket == t {
    		n := s.next
    		if p != nil {
    			p.next = n
    		} else {
    			l.head = n
    		}
    		if n == nil {
    			l.tail = p
    		}
    		unlock(&l.lock)
    		s.next = nil
    		readyWithTime(s, 4)
    		return
    	}
    }
    unlock(&l.lock)

Broadcast

Broadcast пробуждает все заблокированные goroutine.

go
func (c *Cond) Broadcast() {
    runtime_notifyListNotifyAll(&c.notify)
}

Её поток в основном тот же:

  1. Проверка без блокировки, все ли были пробуждены.

    go
    // Быстрый путь: если нет новых ожидающих с последнего уведомления,
    // нам не нужно приобретать блокировку.
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. После блокировки очищает связный список, затем освобождает блокировку. Вновь прибывшие goroutine будут добавлены в голову списка.

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    atomic.Store(&l.notify, l.wait.Load())
    unlock(&l.lock)
  3. Обходит связный список, пробуждает все goroutine.

    go
    for s != nil {
    	next := s.next
    	s.next = nil
    	readyWithTime(s, 4)
    	s = next
    }

Итоги

Наиболее распространённые случаи использования sync.Cond — это сценарии, требующие синхронизации определённых условий между несколькими goroutine, обычно применяемые к моделям производитель-потребитель, планированию задач и другим сценариям. В этих сценариях нескольким goroutine нужно ждать выполнения определённых условий перед продолжением выполнения, или нужно уведомлять несколько goroutine при изменении условий. Она предоставляет гибкий и эффективный способ управления синхронизацией между goroutine. Работая с блокировками мьютексов, sync.Cond может обеспечить безопасный доступ к общим ресурсам и может контролировать порядок выполнения goroutine при выполнении определённых условий. Понимание её внутренних принципов реализации помогает нам лучше освоить техники конкурентного программирования, особенно при работе со сложной синхронизацией условий.

Golang by www.golangdev.cn edit