cond
sync.Cond ist eine Bedingungsvariable in der Go-Standardbibliothek. Sie ist das einzige Synchronisationswerkzeug, das manuell initialisiert werden muss. Im Gegensatz zu anderen Synchronisationsprimitiven benötigt sync.Cond die Übergabe eines Mutex (sync.Mutex), um den Zugriff auf gemeinsam genutzte Ressourcen zu schützen. Sie ermöglicht es Goroutinen, in einen Wartezustand zu versetzten, bis eine Bedingung erfüllt ist, und dann aufgeweckt zu werden.
Beispielcode
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Erstellt eine Bedingungsvariable und übergibt den Mutex
cd := sync.NewCond(&mu)
// Fügt 4 wartende Goroutinen hinzu
wg.Add(4)
// Erstellt 3 Goroutinen, die jeweils auf die Bedingung warten
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Wenn die Bedingung nicht erfüllt ist, wird die Goroutine hier blockiert
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Erstellt eine Goroutine, die die Bedingung aktualisiert und andere Goroutinen aufweckt
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Aktualisiert die gemeinsame Variable
mu.Unlock()
if i > 100 {
cd.Broadcast() // Weckt alle wartenden Goroutinen, wenn die Bedingung erfüllt ist
break
}
time.Sleep(time.Millisecond * 10) // Simuliert Arbeitslast
}
}()
// Wartet auf den Abschluss aller Goroutinen
wg.Wait()
}Im obigen Beispiel wird die gemeinsame Variable i von mehreren Goroutinen gleichzeitig zugegriffen und geändert. Durch den Mutex mu wird sichergestellt, dass der Zugriff auf i unter Nebenläufigkeitsbedingungen sicher ist. Dann wird durch sync.NewCond(&mu) eine Bedingungsvariable cd erstellt, die vom Mutex mu abhängt, um den synchronisierten Zugriff auf die gemeinsam genutzten Ressourcen während des Wartens zu gewährleisten.
- Drei wartende Goroutinen: Jede Goroutine blockiert sich selbst durch
cd.Wait(), bis die Bedingung erfüllt ist (i > 100). Diese Goroutinen bleiben blockiert, bis der Wert der gemeinsam genutzten Ressourceiaktualisiert wird. - Eine Goroutine zum Aktualisieren der Bedingung und Aufwecken anderer Goroutinen: Wenn die Bedingung erfüllt ist (d.h.
i > 100), weckt diese Goroutine alle wartenden Goroutinen durchcd.Broadcast(), damit sie ihre Ausführung fortsetzen können.
Struktur
type Cond struct {
// L is held while observing or changing the condition
L Locker
notify notifyList
}
type notifyList struct {
// wait is the ticket number of the next waiter. It is atomically
// incremented outside the lock.
wait atomic.Uint32
notify uint32
// List of parked waiters.
lock mutex
head *sudog
tail *sudog
}Die Struktur ist nicht komplex:
L: Mutex, hier ist der Typ dasLocker-Interface, nicht ein konkreter Sperrtypnotify: Benachrichtigungsliste der wartenden Goroutinen
Besonders wichtig ist die Struktur runtime.notifyList:
wait: Atomarer Wert, der aufzeichnet, wie viele Goroutinen wartennotify: Zeigt auf die nächste aufzuweckende Goroutine, beginnend bei 0lock: Mutex, nicht der von uns übergebene Lock, sondern ein intern implementierter Lock inruntimehead,tail: Listenzeiger
Sie hat insgesamt nur drei Methoden:
Wait: Blockierendes WartenSignal: Weckt eine wartende Goroutine aufBroadcast: Weckt alle wartenden Goroutinen auf
Der Großteil ihrer Implementierung ist in der runtime-Bibliothek verborgen. Diese Implementierungen befinden sich in der Datei runtime/sema.go, sodass der Code in der Standardbibliothek sehr kurz ist. Das Grundprinzip ist eine gesperrte blockierende Warteschlange.
Wait
Die Wait-Methode lässt die Goroutine in einen blockierten Wartezustand versetzen, bis sie aufgeweckt wird.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Zuerst fügt sie sich selbst zur notifyList hinzu. Tatsächlich wird nur notifyList.wait um eins erhöht. Diese Operation entspricht len(notifyList)-1, also dem Index des letzten Elements.
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}Das tatsächliche Hinzufügen erfolgt in der Funktion notifyListWait:
func notifyListWait(l *notifyList, t uint32) {
...
}In dieser Funktion wird zuerst die Liste gesperrt. Dann wird schnell geprüft, ob die aktuelle Goroutine bereits aufgeweckt wurde. Wenn ja, wird direkt zurückgekehrt, ohne blockieren zu müssen.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}Wenn sie nicht aufgeweckt wurde, wird sie als sudog konstruiert und zur Warteschlange hinzugefügt. Dann wird sie durch gopark angehalten.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)Nach dem Aufwecken wird die sudog-Struktur freigegeben:
releaseSudog(s)Signal
Signal weckt blockierte Goroutinen in der Reihenfolge First-In-First-Out der Warteschlange auf:
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Der Ablauf ist wie folgt:
Ohne Sperre wird direkt geprüft, ob
l.waitgleichl.notifyist. Gleichheit bedeutet, dass alle Goroutinen bereits aufgeweckt sind:goif l.wait.Load() == atomic.Load(&l.notify) { return }Nach dem Sperren wird erneut geprüft, ob alle bereits aufgeweckt wurden:
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }l.notifywird um eins erhöht:goatomic.Store(&l.notify, t+1)Die Liste wird durchlaufen, um die aufzuweckende Goroutine zu finden. Schließlich wird die Goroutine durch
runtime.goreadyaufgeweckt:gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast weckt alle blockierten Goroutinen auf:
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Der Ablauf ist im Wesentlichen derselbe:
Sperrenlose Prüfung, ob alle bereits aufgeweckt wurden:
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }Sperren, Liste leeren, dann Sperre freigeben. Neu eintreffende Goroutinen werden am Listenkopf hinzugefügt:
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Liste durchlaufen und alle Goroutinen aufwecken:
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Zusammenfassung
sync.Cond wird am häufigsten in Szenarien verwendet, in denen bestimmte Bedingungen zwischen mehreren Goroutinen synchronisiert werden müssen. Typische Anwendungen sind das Erzeuger-Verbraucher-Modell, Aufgabenplanung und ähnliche Szenarien. In diesen Situationen müssen mehrere Goroutinen darauf warten, dass bestimmte Bedingungen erfüllt sind, um fortzufahren, oder sie müssen benachrichtigt werden, wenn sich Bedingungen ändern. Es bietet eine flexible und effiziente Möglichkeit, die Synchronisation zwischen Goroutinen zu verwalten. Durch die Verwendung zusammen mit einem Mutex kann sync.Cond den sicheren Zugriff auf gemeinsam genutzte Ressourcen gewährleisten und die Ausführungsreihenfolge von Goroutinen steuern, wenn bestimmte Bedingungen erfüllt sind. Das Verständnis der internen Implementierungsprinzipien hilft uns, die Techniken der nebenläufigen Programmierung besser zu beherrschen, insbesondere bei komplexer Bedingungssynchronisation.
