cond
sync.Cond es la variable de condición en la biblioteca estándar de Go. Es la única herramienta de sincronización que requiere inicialización manual. A diferencia de otras primitivas de sincronización, sync.Cond requiere pasar un mutex (sync.Mutex) para proteger el acceso a recursos compartidos. Permite que las goroutines entren en estado de espera hasta que se cumpla una condición, y sean despertadas cuando la condición se cumpla.
Código de Ejemplo
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Crear una variable de condición, pasando el mutex
cd := sync.NewCond(&mu)
// Agregar 4 goroutines para procesar
wg.Add(4)
// Crear 3 goroutines, cada una esperará a que se cumpla la condición
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Cuando la condición no se cumple, la goroutine se bloquea aquí
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Crear una goroutine para actualizar la condición y despertar otras goroutines
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Actualizar variable compartida
mu.Unlock()
if i > 100 {
cd.Broadcast() // Despertar todas las goroutines en espera cuando se cumple la condición
break
}
time.Sleep(time.Millisecond * 10) // Simular carga de trabajo
}
}()
// Esperar a que todas las goroutines completen
wg.Wait()
}En el ejemplo anterior, la variable compartida i es accedida y modificada concurrentemente por múltiples goroutines. El mutex mu asegura que el acceso a i sea seguro bajo condiciones de concurrencia. Luego, a través de sync.NewCond(&mu) se crea una variable de condición cd, que depende del mutex mu para garantizar que el acceso a los recursos compartidos sea sincronizado durante la espera.
- Tres goroutines en espera: Cada goroutine se bloquea a través de
cd.Wait()hasta que se cumple la condición (i > 100). Estas goroutines permanecen en estado bloqueado hasta que el valor del recurso compartidoise actualiza. - Una goroutine que actualiza la condición y despierta otras goroutines: Cuando se cumple la condición (es decir,
i > 100), esta goroutine despierta todas las goroutines en espera a través decd.Broadcast(), permitiéndoles continuar la ejecución.
Estructura
type Cond struct {
// L se mantiene mientras se observa o cambia la condición
L Locker
notify notifyList
}
type notifyList struct {
// wait es el número de ticket del próximo waiter. Se incrementa
// atómicamente fuera del lock.
wait atomic.Uint32
notify uint32
// Lista de waiters estacionados.
lock mutex
head *sudog
tail *sudog
}Su estructura no es compleja:
L, mutex, aquí el tipo es la interfazLocker, no un tipo de mutex concretonotify, lista de notificación de goroutines en espera
Lo más importante es la estructura runtime.notifyList:
wait, valor atómico, registra cuántas goroutines están en esperanotify, apunta a la próxima goroutine que será despertada, comienza desde 0 e incrementalock, mutex, no es el mutex que pasamos, sino un mutex implementado internamente enruntimehead,tail, punteros de lista enlazada
Solo tiene tres métodos:
Wait, bloqueo en esperaSignal, despertar una goroutine en esperaBroadcast, despertar todas las goroutines en espera
La mayor parte de su implementación está oculta bajo la biblioteca runtime. Estas implementaciones están ubicadas en el archivo runtime/sema.go, por lo que su código en la biblioteca estándar es muy breve. Su principio básico es una cola de bloqueo con mutex.
Wait
El método Wait hace que la goroutine entre en estado de espera hasta que sea despertada.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Primero se agrega a sí misma a notifyList, pero en realidad solo incrementa notifyList.wait en uno. La operación aquí es equivalente a len(notifyList)-1, obteniendo el índice del último elemento.
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}La operación real de agregar se completa en la función notifyListWait:
func notifyListWait(l *notifyList, t uint32) {
...
}En esta función, primero bloquea la lista enlazada, luego verifica rápidamente si la goroutine actual ya ha sido despertada. Si ya fue despertada, retorna directamente sin necesidad de bloquear en espera.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}Si no ha sido despertada, se construye como sudog y se agrega a la cola, luego se suspende a través de gopark.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)Después de ser despertada, libera la estructura sudog:
releaseSudog(s)Signal
Signal despierta las goroutines bloqueadas en orden FIFO (primero en entrar, primero en salir).
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Su flujo es el siguiente:
Verifica sin lock si
l.waites igual al.notify. Si son iguales, significa que todas las goroutines han sido despertadas.goif l.wait.Load() == atomic.Load(&l.notify) { return }Después de adquirir el lock, verifica nuevamente si todas han sido despertadas.
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }Incrementa
l.notifyen uno.goatomic.Store(&l.notify, t+1)Itera sobre la lista enlazada, encuentra la goroutine que necesita ser despertada, y finalmente despierta la goroutine a través de
runtime.goready.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast despierta todas las goroutines bloqueadas.
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Su flujo es básicamente el mismo:
Verificación sin lock, si todas han sido despertadas.
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }Adquiere el lock, vacía la lista enlazada, luego libera el lock. Las nuevas goroutines que lleguen serán agregadas a la cabeza de la lista.
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Itera sobre la lista, despierta todas las goroutines.
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Resumen
Los escenarios de uso más comunes de sync.Cond son sincronizar ciertas condiciones entre múltiples goroutines, típicamente aplicados en modelos productor-consumidor, planificación de tareas, etc. En estos escenarios, múltiples goroutines necesitan esperar a que se cumplan ciertas condiciones para continuar ejecutando, o necesitan notificar a múltiples goroutines cuando las condiciones cambian. Proporciona una manera flexible y eficiente de gestionar la sincronización entre goroutines. Al usarse junto con mutex, sync.Cond puede garantizar el acceso seguro a recursos compartidos, y puede controlar el orden de ejecución de las goroutines cuando se cumplen condiciones específicas. Comprender sus principios de implementación interna ayuda a dominar mejor las técnicas de programación concurrente, especialmente cuando se trata de sincronización de condiciones complejas.
