cond
sync.Cond è la variabile di condizione nella libreria standard di Go, ed è l'unico strumento di sincronizzazione che richiede un'inizializzazione manuale. A differenza di altri primitivi di sincronizzazione, sync.Cond richiede di passare un mutex (sync.Mutex) per proteggere l'accesso alle risorse condivise. Consente alle coroutine di entrare in stato di attesa fino a quando una certa condizione non è soddisfatta, e di essere risvegliate quando la condizione è soddisfatta.
Codice di Esempio
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Crea una variabile di condizione, passando il mutex
cd := sync.NewCond(&mu)
// Aggiunge 4 coroutine da gestire
wg.Add(4)
// Crea 3 coroutine, ognuna attende che la condizione sia soddisfatta
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Quando la condizione non è soddisfatta, la coroutine si blocca qui
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Crea una coroutine per aggiornare la condizione e risvegliare le altre coroutine
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Aggiorna la variabile condivisa
mu.Unlock()
if i > 100 {
cd.Broadcast() // Risveglia tutte le coroutine in attesa quando la condizione è soddisfatta
break
}
time.Sleep(time.Millisecond * 10) // Simula il carico di lavoro
}
}()
// Attende che tutte le coroutine completino
wg.Wait()
}Nell'esempio sopra, la variabile condivisa i viene accessibile e modificata concorrentemente da più coroutine. Attraverso il mutex mu si garantisce che, in condizioni di concorrenza, l'accesso a i sia sicuro. Poi, attraverso sync.NewCond(&mu) viene creata una variabile di condizione cd, che dipende dal lock mu per garantire che l'accesso alle risorse condivise sia sincronizzato durante l'attesa.
- Tre coroutine in attesa: Ogni coroutine si blocca attraverso
cd.Wait(), fino a quando la condizione è soddisfatta (i > 100). Queste coroutine rimangono in stato di blocco fino a quando il valore della risorsa condivisainon viene aggiornato. - Una coroutine che aggiorna la condizione e risveglia le altre: Quando la condizione è soddisfatta (cioè
i > 100), questa coroutine risveglia tutte le coroutine in attesa attraversocd.Broadcast(), permettendo loro di continuare l'esecuzione.
Struttura
type Cond struct {
// L è mantenuto durante l'osservazione o la modifica della condizione
L Locker
notify notifyList
}
type notifyList struct {
// wait è il numero di ticket del prossimo waiter. Viene incrementato
// atomicamente al di fuori del lock.
wait atomic.Uint32
notify uint32
// Lista di waiter parcheggiati.
lock mutex
head *sudog
tail *sudog
}La sua struttura non è complessa:
L, mutex, il cui tipo qui è l'interfacciaLocker, non un tipo di lock specificonotify, lista di notifica delle coroutine in attesa
La struttura più importante è runtime.notifyList
wait, valore atomico, registra quante coroutine sono in attesanotify, punta alla prossima coroutine che sarà risvegliata, inizia da 0 e incrementalock, mutex, non è il lock che passiamo, ma un lock implementato internamente inruntimehead,tail, puntatori alla lista
Ha solo tre metodi:
Wait, attesa bloccanteSignal, risveglia una coroutine in attesaBroadcast, risveglia tutte le coroutine in attesa
La maggior parte della sua implementazione è nascosta sotto la libreria runtime, queste implementazioni si trovano nel file runtime/sema.go, tanto che il suo codice nella libreria standard è molto breve, il suo principio di base è essenzialmente una coda bloccante con lock.
Wait
Il metodo Wait fa sì che la coroutine stessa entri in stato di attesa bloccante, fino a quando non viene risvegliata.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Per prima cosa aggiunge se stessa a notifyList, ma in realtà incrementa solo notifyList.wait di uno, qui l'operazione è equivalente a len(notifyList)-1, ottenendo l'indice dell'ultimo elemento.
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}La vera operazione di aggiunta viene completata nella funzione notifyListWait.
func notifyListWait(l *notifyList, t uint32) {
...
}In questa funzione, per prima cosa blocca la lista, poi giudica rapidamente se la coroutine corrente è già stata risvegliata, se è già risvegliata ritorna direttamente, non c'è bisogno di attesa bloccante.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}Se non è stata risvegliata, viene costruita come sudog e aggiunta alla coda, poi sospesa attraverso gopark.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)Dopo essere stata risvegliata, rilascia la struttura sudog.
releaseSudog(s)Signal
Signal risveglia le coroutine bloccate secondo l'ordine FIFO della coda.
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Il suo flusso è il seguente:
Giudica senza lock se
l.waitè uguale al.notify, se sono uguali indica che tutte le coroutine sono state risvegliate.goif l.wait.Load() == atomic.Load(&l.notify) { return }Dopo aver acquisito il lock, giudica di nuovo se tutte sono state risvegliate.
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }Incrementa
l.notifydi uno.goatomic.Store(&l.notify, t+1)Attraversa la lista, trova la coroutine che deve essere risvegliata, infine risveglia la coroutine attraverso
runtime.goready.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast risveglia tutte le coroutine bloccate.
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Il suo flusso è fondamentalmente lo stesso:
Controllo senza lock, se tutte sono state risvegliate.
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }Acquisisce il lock, svuota la lista, poi rilascia il lock, le nuove coroutine che arrivano successivamente verranno aggiunte alla testa della lista.
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Attraversa la lista, risveglia tutte le coroutine.
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Riepilogo
sync.Cond Lo scenario d'uso più comune è quando è necessario sincronizzare alcune condizioni tra più coroutine, tipicamente applicato in modelli produttore-consumatore, schedulazione di task e altri scenari. In questi scenari, più coroutine devono attendere che alcune condizioni siano soddisfatte prima di continuare l'esecuzione, o devono notificare più coroutine quando le condizioni cambiano. Fornisce un modo flessibile ed efficiente per gestire la sincronizzazione tra coroutine. Utilizzandolo insieme ai mutex, sync.Cond può garantire l'accesso sicuro alle risorse condivise e può controllare l'ordine di esecuzione delle coroutine quando condizioni specifiche sono soddisfatte. Comprendere i principi della sua implementazione interna aiuta a padroneggiare meglio le tecniche di programmazione concorrente, specialmente quando si tratta di sincronizzazione complessa delle condizioni.
