sync.Mutex Mutex
Sperren sind ein wichtiges Synchronisationsprimitiv in Betriebssystemen. Die Go-Sprache bietet in der Standardbibliothek zwei Implementierungen: Mutex und Lese-Schreib-Sperre, die entsprechen:
sync.Mutex, Mutex, Lesen-Lesen exklusiv, Lesen-Schreiben exklusiv, Schreiben-Schreiben exklusivsync.RWMutex, Lese-Schreib-Sperre, Lesen-Lesen gemeinsam, Lesen-Schreiben exklusiv, Schreiben-Schreiben exklusiv
Ihre Einsatzmöglichkeiten im Geschäftsumfeld sind sehr häufig. Sie dienen dazu, bei Nebenläufigkeit einen bestimmten gemeinsam genutzten Speicherbereich sequentiell zu schützen, wie im folgenden Beispiel:
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}Ohne den Schutz einer Sperre wäre das Ergebnis dieser Funktion bei jeder Ausführung unterschiedlich und unvorhersehbar. In den meisten Szenarien möchten wir solche Situationen nicht. Dieses Beispiel ist für die meisten Menschen sehr einfach. Vielleicht beherrschen Sie bereits die Verwendung von Sperren meisterhaft, aber Sie verstehen vielleicht nicht, wie die interne Implementierung von Sperren in Go funktioniert. Der Code selbst ist nicht komplex und wird im Folgenden ausführlich erläutert.
Locker
Bevor wir beginnen, schauen wir uns den Typ sync.Locker an. Es ist ein von Go definiertes Interface:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}Die bereitgestellten Methoden sind sehr einfach zu verstehen: Sperren und Entsperren. Aufgrund der Eigenschaft der Go-Interface-Implementierung, die vor der Konvention steht, haben die meisten Menschen es vielleicht nie gesehen. Hier wird es nur kurz erwähnt, da es wirklich nicht so wichtig ist. Die beiden später behandelten Sperren implementieren beide dieses Interface.
Mutex
Der Typ des Mutex Mutex ist in der Datei sync/mutex.go definiert. Es ist ein Strukturtyp, wie folgt:
type Mutex struct {
state int32
sema uint32
}Die Felder bedeuten:
state, dieses Feld repräsentiert den Zustand der Sperresema, das ist das Semaphor semaphore, dessen Einführung später erfolgt
Schauen wir uns zuerst state an:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)state ist ein 32-Bit-Ganzzahltyp. Die unteren 3 Bits werden verwendet, um die oben genannten drei Zustände darzustellen. Es gibt insgesamt drei Zustände, die nicht unabhängig sind, sondern koexistieren können.
mutexLocked=1, gesperrtmutexWoken=2, aufgewecktmutexStarving=4, Hungermodus
Die oberen 29 Bits werden verwendet, um anzuzeigen, wie viele Goroutinen auf die Sperre warten. Theoretisch kann ein Mutex von bis zu 2^29+1 Goroutinen gleichzeitig verwendet werden. In der Realität wird es jedoch kaum so viele Goroutinen geben. Selbst wenn jede nur 2KB belegt (initiale Stack-Größe), würde der Speicherbedarf für diese Anzahl von Goroutinen etwa 1TB betragen.
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+Der Mutex hat insgesamt zwei Betriebsmodi: den Normalmodus und den Hungermodus. Der Normalmodus bedeutet, dass Goroutinen die Sperre in der Reihenfolge halten, in der sie in der blockierenden Warteschlange angekommen sind, also FIFO. Dies ist der Normalfall und auch der mit der besten Leistung, da alle die Sperre in Zugriffsreihenfolge halten, gibt es keine Probleme. Der Hungermodus ist der ungewöhnliche Fall. Dieser Hunger bezieht sich darauf, dass wartende Goroutinen lange Zeit die Sperre nicht halten können und sich ständig im blockierten Zustand befinden. Es bedeutet nicht, dass der Mutex im Hungerzustand ist. Wann befinden sich Goroutinen also im Hungerzustand? Offiziell wird folgendes Beispiel gegeben: Es gibt eine zuerst angekommene Goroutine, die blockiert, weil sie den Mutex nicht halten kann. Nachdem die Sperre freigegeben wurde, wird sie aufgeweckt. Genau in diesem Moment kommt eine andere Goroutine, deren Code gerade an dieser Stelle ausgeführt wird und versucht, die Sperre zu halten (die gerne vordrängt). Da letztere sich gerade im laufenden Zustand befindet (CPU-Zeitscheibe belegt), ist die Wahrscheinlichkeit sehr hoch, dass sie erfolgreich um die Sperre konkurriert. In extremen Fällen kann es viele solche Goroutinen geben. Dann kann die gerade aufgeweckte Goroutine die Sperre ständig nicht halten (ständiges Vordrängen ohne Ende), obwohl sie zuerst da war, aber die Sperre nie erhalten kann.
const (
starvationThresholdNs = 1e6
)Um diese Situation zu vermeiden, hat Go einen Warteschwellenwert starvationThresholdNs festgelegt. Wenn eine Goroutine länger als 1ms die Sperre nicht halten konnte, wechselt der Mutex in den Hungermodus. Im Hungermodus wird der Besitz des Mutex direkt an die vorderste Goroutine in der Warteschlange übergeben. Neu angekommene Goroutinen versuchen nicht, die Sperre zu halten, sondern reihen sich am Ende der Warteschlange ein. So wird im Hungermodus der Besitz des Mutex vollständig von den Goroutinen in der Warteschlange nacheinander gehalten (die in der Reihe Stehenden bekommen die Sperre zuerst, die Vordrängenden gehen nach hinten). Wenn eine Goroutine erfolgreich die Sperre gehalten hat und sie die letzte wartende Goroutine ist oder die Wartezeit weniger als 1ms beträgt, wird der Mutex zurück in den Normalmodus geschaltet. Dieses Design des Hungermodus verhindert, dass einige Goroutinen lange Zeit die Sperre nicht halten können und "verhungern".
TryLock
Der Mutex bietet zwei Methoden zum Sperren:
Lock(), sperrt auf blockierende WeiseTryLock(), sperrt auf nicht-blockierende Weise
Schauen wir uns zuerst den Code von TryLock an, da seine Implementierung einfacher ist:
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}Zu Beginn wird geprüft, ob die Sperre bereits gehalten wird oder sich im Hungerzustand befindet (d.h. viele Goroutinen warten auf die Sperre). In diesem Fall kann die aktuelle Goroutine die Sperre nicht erhalten. Andernfalls wird durch eine CAS-Operation versucht, den Status auf mutexLocked zu aktualisieren. Wenn die CAS-Operation false zurückgibt, bedeutet dies, dass in dieser Zeit eine andere Goroutine erfolgreich die Sperre erhalten hat und die aktuelle Goroutine die Sperre nicht erhalten kann. Andernfalls wurde die Sperre erfolgreich erhalten. Aus diesem Code ist ersichtlich, dass der Aufrufer von TryLock() die Person ist, die versucht vorzudrängen, da sie die Sperre direkt an sich reißt, unabhängig davon, ob Goroutinen warten (old könnte ungleich 0 sein).
Lock
Hier ist der Code von Lock. Er verwendet ebenfalls eine CAS-Operation, um zu versuchen, die Sperre direkt zu halten, aber er ist "ehrlicher". Er wird nur dann versuchen, die Sperre direkt zu halten, wenn keine Goroutine blockiert wartet (old=0).
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}Wenn er feststellt, dass Goroutinen blockiert warten, wird er "ehrenhaft" hinten anstehen und in den lockslow-Spin-Prozess eintreten, um auf die Sperre zu warten (der Kern des Mutex). Zuerst werden einige Variablen vorbereitet:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: dient zum Aufzeichnen der Startzeit des Wartens, um zu prüfen, ob in den Hungermodus gewechselt werden soll.starving: gibt an, ob die aktuelle Goroutine bereits länger als 1ms die Sperre nicht erhalten hat.awoke: markiert, ob die aktuelle Goroutine bereits aufgeweckt wurde.iter: Zähler, der die Anzahl der Spins aufzeichnet.old: erfasst den aktuellen Zustand des Mutex
Dann wird eine for-Schleife betreten, um zu beurteilen, ob die aktuelle Goroutine in den Spin-Zustand eintreten kann.
Spin ist ein Synchronisationsmechanismus zwischen mehreren Threads, auch Busy-Waiting genannt. Ein Thread wird nicht direkt angehalten und der Thread-Kontext gewechselt, wenn er die Sperre nicht hält, sondern er geht in einen Leelauf über. Dabei wird ständig die CPU-Zeitscheibe belegt. Wenn die Sperrenkonkurrenz nicht groß ist oder die Sperre nur sehr kurz gehalten wird, kann dies tatsächlich häufige Thread-Kontextwechsel vermeiden und die Leistung effektiv verbessern. Es ist jedoch nicht universell einsetzbar. In Go kann missbräuchliche Verwendung von Spin zu folgenden gefährlichen Konsequenzen führen:
- Zu hohe CPU-Auslastung: Zu viele spinnende Goroutinen verbrauchen viele CPU-Ressourcen, besonders wenn die Sperre lange gehalten wird
- Beeinträchtigung der Goroutine-Scheduling: Die Gesamtzahl der Prozessoren P ist begrenzt. Wenn viele spinnende Goroutinen P belegen, können andere Goroutinen, die Benutzercode ausführen, nicht rechtzeitig gescheduled werden
- Cache-Kohärenzprobleme: Die Busy-Waiting-Eigenschaft von Spin-Sperren führt dazu, dass Threads den Sperrstatus wiederholt im Cache lesen. Wenn spinnende Goroutinen auf verschiedenen Kernen laufen und der Sperrstatus nicht rechtzeitig in den globalen Speicher aktualisiert wurde, lesen Goroutinen ungenaue Sperrzustände. Auch häufige Cache-Kohärenz-Synchronisation kann die Leistung erheblich beeinträchtigen.
Daher können nicht alle Goroutinen in den Spin-Zustand eintreten. Sie müssen folgende strikte Bedingungen erfüllen:
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}Die Bedingungen sind:
Die aktuelle Sperre wurde bereits gehalten und darf sich nicht im Hungerzustand befinden. Andernfalls bedeutet dies, dass bereits Goroutinen lange Zeit die Sperre nicht erhalten haben. In diesem Fall wird direkt in den Blockierungsprozess eingetreten.
Eintritt in die
runtime.sync_runtime_canSpin-Beurteilungsprozess:goconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }Die Anzahl der Spins ist kleiner als
runtime.active_spin, standardmäßig 4. Mehr Spins verschwenden Ressourcen.Die Anzahl der CPU-Kerne ist größer als 1. Auf einem Single-Core-System hat Spin keinen Sinn.
Der aktuelle
gomaxprocsist größer als die Summe aus inaktiven P und spinnenden P plus 1, was bedeutet, dass derzeit nicht genügend verfügbare Prozessoren zum Spinnen vorhanden sind.Die lokale Warteschlange des aktuellen P muss leer sein. Andernfalls bedeutet dies, dass andere Benutzeraufgaben ausgeführt werden müssen und kein Spin durchgeführt werden kann.
Wenn beurteilt wird, dass gespinnt werden kann, wird runtime.sync_runtime_doSpin aufgerufen, um in den Spin einzutreten. Tatsächlich führt es 30 PAUSE-Befehle aus.
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RETWenn kein Spin durchgeführt werden kann, gibt es nur zwei Möglichkeiten: Erfolgreiches Erhalten der Sperre oder Eintritt in die Warteschlange und Blockieren. Zuvor gibt es jedoch noch viele Dinge zu erledigen:
- Wenn nicht im Hungermodus, wird versucht, die Sperre zu erhalten:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- Wenn die Sperre bereits belegt ist oder sich im Hungermodus befindet, wird die Anzahl der wartenden Goroutinen um 1 erhöht:
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- Wenn die aktuelle Goroutine bereits im Hungerzustand ist und die Sperre noch belegt ist, wird in den Hungermodus gewechselt:
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- Wenn die aktuelle Goroutine durch Spin aufgeweckt wurde, wird das
mutexWoken-Flag gesetzt:
if awoke {
new &^= mutexWoken
}Dann wird versucht, den Sperrzustand durch CAS zu aktualisieren. Bei Misserfolg wird direkt die nächste Runde der Schleife begonnen:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}Bei erfolgreicher Aktualisierung werden folgende Beurteilungen durchgeführt:
Der ursprüngliche Zustand war nicht im Hungermodus und keine Goroutine hat die Sperre belegt. Dann kann die aktuelle Goroutine die Sperre direkt halten, den Prozess beenden und den Benutzercode weiter ausführen.
goif old&(mutexLocked|mutexStarving) == 0 { break }Der Versuch, die Sperre zu halten, ist fehlgeschlagen. Die Wartezeit wird aufgezeichnet. Dabei bedeutet LIFO=true, dass die Warteschlange Last-In-First-Out ist, andernfalls FIFO First-In-First-Out.
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }Es wird versucht, das Semaphor zu erhalten. Die Funktion
runtime.semacquire1wird aufgerufen. Wenn das Semaphor erhalten werden kann, wird direkt zurückgekehrt ohne zu blockieren. Andernfalls wirdruntime.goparkaufgerufen, um die aktuelle Goroutine anzuhalten und auf die Freigabe des Semaphors zu warten.goruntime_SemacquireMutex(&m.sema, queueLifo, 1)An diesem Punkt gibt es zwei Möglichkeiten: Entweder wurde das Semaphor direkt erfolgreich erhalten, oder die blockierte Goroutine wurde gerade aufgeweckt und hat das Semaphor erfolgreich erhalten. In beiden Fällen wurde das Semaphor erfolgreich erhalten. Wenn jetzt der Hungermodus aktiv ist, kann die Sperre direkt erhalten werden.
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }Wenn nicht im Hungermodus, werden
iterzurückgesetzt und der Spin-Prozess neu gestartet.goawoke = true iter = 0
Damit ist der Sperrprozess beendet. Der gesamte Prozess ist relativ komplex und verwendet sowohl Spin-Warten als auch Semaphor-Blockierungswarten, um Leistung und Fairness auszubalancieren. Er ist für die meisten Sperrkonkurrenzsituationen geeignet.
Unlock
Der Entsperrungsprozess ist relativ viel einfacher. Zuerst wird versucht, schnell zu entsperren. Wenn new gleich 0 ist, bedeutet dies, dass derzeit keine wartenden Goroutinen vorhanden sind und es sich nicht im Hungermodus befindet, also erfolgreich entsperrt wurde und direkt zurückgekehrt werden kann.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}Andernfalls muss in den unlockslow-Prozess eingetreten werden:
Zuerst wird geprüft, ob bereits entsperrt wurde:
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }Wenn im Hungermodus, wird direkt das Semaphor freigegeben und die Entsperrung abgeschlossen. Im Hungermodus übergibt die aktuell entsperrnde Goroutine den Besitz der Sperre direkt an die nächste wartende Goroutine.
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }Wenn nicht im Hungermodus, wird in den normalen Entsperrungsprozess eingetreten:
Wenn keine Goroutinen warten oder eine andere aufgeweckte Goroutine bereits die Sperre erhalten hat oder die Sperre in den Hungermodus gewechselt ist:
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }Andernfalls wird das Semaphor freigegeben, um die nächste wartende Goroutine aufzuwecken, und der aktuelle Sperrzustand wird auf
mutexWokengesetzt:gonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
Damit ist der Entsperrungsprozess beendet.
RWMutex
Der Lese-Schreib-Mutex RWMutex ist in der Datei sync/rwmutex.go definiert. Seine Implementierung basiert ebenfalls auf dem Mutex.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}Hier sind die Bedeutungen der einzelnen Felder:
w, ein Mutex. Wenn eine Schreib-Goroutine diesen Mutex hält, werden andere Schreib-Goroutinen und Lese-Goroutinen blockiert.writerSem, Schreib-Semaphor, dient zum Blockieren von Schreib-Goroutinen, um auf Lese-Goroutinen zu warten. Schreib-Goroutinen erhalten das Semaphor, Lese-Goroutinen geben das Semaphor frei.readerSem, Lese-Semaphor, dient zum Blockieren von Lese-Goroutinen, um auf Schreib-Goroutinen zu warten. Lese-Goroutinen erhalten das Semaphor, Schreib-Goroutinen geben das Semaphor frei.readerCount, Kernfeld. Der gesamte Lese-Schreib-Sperre verlässt sich darauf, den Zustand zu verwalten.readerWait, gibt an, wie viele Lese-Goroutinen eine blockierte Schreib-Goroutine warten muss.
Das Grundprinzip der Lese-Schreib-Sperre ist: Durch den Mutex werden Schreib-Goroutinen gegenseitig ausgeschlossen. Durch die zwei Semaphoren writerSem und readerSem werden Lesen-Schreiben gegenseitig ausgeschlossen und Lesen-Lesen gemeinsam genutzt.
readerCount
Da sich readerCount häufig ändert und sehr wichtig ist, wird es gesondert behandelt. Es lässt sich grob in folgende Zustände unterteilen:
- 0, derzeit sind weder Lese-Goroutinen noch Schreib-Goroutinen aktiv, der Zustand ist inaktiv
-rwmutexMaxReaders, eine Schreib-Goroutine hält bereits den Mutex, derzeit sind keine aktiven Lese-Goroutinen vorhanden-rwmutexMaxReaders+N, eine Schreib-Goroutine hält bereits die Schreibsperre, die aktuellen Lese-Goroutinen müssen blockieren und warten, bis die Schreib-Goroutine die Schreibsperre freigibtN-rwmutexMaxReaders, eine Schreib-Goroutine hat bereits den Mutex gehalten und muss blockieren und warten, bis die verbleibenden Lese-Goroutinen die Lesesperre freigebenN, derzeit sind N aktive Lese-Goroutinen vorhanden, d.h. N Lesesperren wurden hinzugefügt, keine aktiven Schreib-Goroutinen
Dabei ist rwmutexMaxReaders ein konstanter Wert. Sein Wert ist das Doppelte der Anzahl der blockierbaren Goroutinen des Mutex, da die Hälfte Lese-Goroutinen und die Hälfte Schreib-Goroutinen sind.
const rwmutexMaxReaders = 1 << 30Der gesamte Lese-Schreib-Sperre-Teil ist nur bei readerCount komplexer. Wenn man seine Änderungen versteht, versteht man im Wesentlichen den Arbeitsablauf der Lese-Schreib-Sperre.
TryLock
Wie üblich, schauen wir uns zuerst das einfachste TryLock() an:
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}Zunächst versucht es, TryLock() des Mutex aufzurufen. Bei Misserfolg wird direkt zurückgekehrt. Dann wird durch eine CAS-Operation versucht, den Wert von readerCount von 0 auf -rwmutexMaxReaders zu aktualisieren. 0 repräsentiert, dass keine gerade arbeitenden Lese-Goroutinen vorhanden sind. -rwmutexMaxReaders bedeutet, dass die Schreib-Goroutine bereits den Mutex hält. Wenn die CAS-Operation fehlschlägt, wird der Mutex entsperrt. Bei Erfolg wird true zurückgegeben.
Lock
Als Nächstes kommt Lock(). Seine Implementierung ist ebenfalls sehr einfach:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}Zunächst konkurriert es mit anderen Schreib-Goroutinen, bis es den Mutex hält. Dann führt es folgende Operation durch: Erst atomares Abziehen von -rwmutexMaxReaders, dann nicht-atomares Addieren von rwmutexMaxReaders:
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReadersIn zwei Schritten betrachtet:
Dies dient dazu, anderen Lese-Goroutinen mitzuteilen, dass gerade eine Schreib-Goroutine versucht, die Sperre zu halten. Dies wurde bereits im
TryLock-Teil erklärt.gorw.readerCount.Add(-rwmutexMaxReaders)Dann wird
rwmutexMaxReadersaddiert, um r zu erhalten. Dieses r repräsentiert die Anzahl der gerade arbeitenden Lese-Goroutinen.gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
Dann wird beurteilt, ob Lese-Goroutinen gerade arbeiten. Der Wert von readerWait wird um r erhöht. Wenn er am Ende immer noch nicht 0 ist, bedeutet dies, dass auf diese Lese-Goroutinen gewartet werden muss, bis sie ihre Arbeit beendet haben. Dann wird in den runtime_SemacquireRWMutex-Prozess eingetreten, um zu versuchen, das Semaphor writerSem zu erhalten. Dieses Semaphor wird von Lese-Goroutinen freigegeben. Wenn das Semaphor erhalten werden kann, bedeutet dies, dass die Lese-Goroutinen ihre Arbeit beendet haben. Andernfalls muss in die Blockierungswarteschlange eingetreten werden, um zu warten (dieser Semaphor-Teil ist im Wesentlichen identisch mit dem Mutex-Teil).
UnLock
Als Nächstes kommt UnLock(), die Freigabe der Schreibsperre:
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}Der Ablauf ist wie folgt:
Zuvor wurde erwähnt, dass beim Sperren
readerCountauf einen negativen Wert aktualisiert wird. Hier wirdrwmutexMaxReadersaddiert, was bedeutet, dass derzeit keine Schreib-Goroutine arbeitet. Der erhaltene Wert ist die Anzahl der blockiert wartenden Lese-Goroutinen.gor := rw.readerCount.Add(rwmutexMaxReaders)Wenn der Wert selbst 0 oder größer als 0 ist, bedeutet dies, dass die Schreibsperre bereits freigegeben wurde:
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }Das Semaphor
readerSemwird freigegeben, um wartende Lese-Goroutinen aufzuwecken:gofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }Schließlich wird der Mutex freigegeben, um wartende Schreib-Goroutinen aufzuwecken:
gorw.w.Unlock()
Die Freigabe der Schreibsperre ist abgeschlossen.
TryRLock
Nun zum Lesesperre-Teil. Hier ist der Code für TryRLock:
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}Es macht insgesamt nur zwei Dinge:
Beurteilen, ob eine Schreib-Goroutine gerade arbeitet. Falls ja, schlägt die Sperrung fehl:
goc := rw.readerCount.Load() if c < 0 { return false }Versuchen,
readerCountum 1 zu erhöhen. Bei erfolgreicher Aktualisierung war die Sperrung erfolgreich:goif rw.readerCount.CompareAndSwap(c, c+1) { return true }Andernfalls wird die Schleife fortgesetzt, bis sie beendet wird:
Man sieht, dass sich readerCount hierauf verlässt, was im Schreibsperre-Teil verwaltet wird. Deshalb wurde der Schreibsperre-Teil zuerst erklärt, da die komplexen Kernstellen im Schreibsperre-Teil verwaltet werden.
RLock
Die Logik von RLock ist noch einfacher:
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}Es versucht, den Wert von readerCount um 1 zu erhöhen. Wenn der erhaltene neue Wert immer noch kleiner als 0 ist, bedeutet dies, dass eine Schreib-Goroutine gerade arbeitet. Dann wird in den readerSem-Semaphor-Blockierungsprozess eingetreten. Die aktuelle Goroutine wird in die Blockierungswarteschlange eintreten und warten.
RUnLock
Auch RUnLock ist genauso einfach und verständlich:
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}Zuerst wird versucht, readerCount um eins zu verringern, was bedeutet, dass die Anzahl der aktiven Lese-Goroutinen um eins reduziert wird. Wenn der erhaltene Wert größer als 0 ist, bedeutet dies, dass direkt freigegeben werden kann, da derzeit keine Schreib-Goroutine den Mutex hält. Wenn der Wert kleiner als 0 ist, bedeutet dies, dass eine Schreib-Goroutine den Mutex bereits hält und darauf wartet, dass alle aktuellen Lese-Goroutinen ihre Arbeit beenden. Als Nächstes wird in den runlockSlow-Prozess eingetreten:
Wenn der ursprüngliche
readerCount-Wert 0 war (die Sperre ist inaktiv) oder-rwmutexMaxReaderswar (die Schreib-Goroutine hat keine wartenden Lese-Goroutinen, d.h. alle Lesesperren wurden bereits freigegeben), bedeutet dies, dass derzeit keine aktiven Lese-Goroutinen vorhanden sind und nicht entsperrt werden muss:goif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }Wenn es aktive Lese-Goroutinen gibt, wird
readerWaitum eins verringert. Wenn die aktuelle Lese-Goroutine die letzte aktive Lesende ist, wird daswriterSem-Semaphor freigegeben, um die wartende Schreib-Goroutine aufzuwecken:goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
Der Prozess der Freigabe der Lesesperre ist abgeschlossen.
Semaphore
Das Semaphor im Mutex ist ein einfacher uint32-Ganzzahltyp. Durch atomares Dekrementieren und Inkrementieren wird das Erhalten und Freigeben des Semaphors dargestellt. Die Struktur, die zur Laufzeit für die Verwaltung des Semaphors zuständig ist, ist runtime.semaRoot. Ihre Typdefinition befindet sich in der Datei runtime/sema.go. semaRoot verwendet einen balancierten binären Baum (Treap), um Semaphoren zu organisieren und zu verwalten. Jeder Knoten im Baum repräsentiert ein Semaphor. Der Knotentyp ist *sudog, eine doppelt verkettete Liste, die die Warteschlange für das entsprechende Semaphor verwaltet. Knoten sind durch *sudog.elem (Semaphor-Adresse) eindeutig und werden durch das *sudog.ticket-Feld balanciert.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}Der Semaphor-Baum semaRoot verlässt sich auf einen niedrigeren Mutex runtime.mutex, um seine Nebenläufigkeitssicherheit zu gewährleisten.
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot wird zur Laufzeit in einer globalen semaTable gespeichert. Sie sieht wie ein Array fester Länge aus, das mehrere Wurzelknoten von Semaphorbäumen speichert. Tatsächlich funktioniert sie wie eine Hash-Tabelle. Jedes Element in der Tabelle enthält ein semaRoot und einige Füllbytes (pad) zur Speicherausrichtung und Vermeidung von Cache-Line-Konflikten. semTabSize ist eine Konstante für die Größe der Semaphortabelle. Sie gibt die Länge der Tabelle mit 251 an. Normalerweise wird eine Primzahl gewählt, um Hash-Kollisionen zu reduzieren und die Streuungseffizienz zu verbessern.
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}Die rootFor-Methode entspricht einer Hash-Funktion. Sie nimmt einen Zeiger addr vom Typ uint32 (die Adresse des Semaphors) und gibt einen Zeiger auf die entsprechende semaRoot-Struktur zurück. Diese Codezeile konvertiert zuerst addr in ein uintptr, schiebt es dann um 3 Bits nach rechts (entspricht Division durch 8, da ein Byte 8 Bits hat und die Zeigeradresse durch 8 geteilt werden kann, um sie als Array-Index abzubilden). Durch Modulo semTabSize wird sichergestellt, dass der Index im Größenbereich der Semaphortabelle liegt. Nach Erhalt des semaRoot durch den Index wird im balancierten Baum nach der zum Semaphor gehörenden *sudog-Warteschlange gesucht.
Acquire
Das Erhalten des Semaphors wird durch die Funktion runtime.semacquire1 implementiert:
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)Sie empfängt folgende Parameter:
addr, die Adresse des Semaphorslifo, beeinflusst die Ausreihenfolge des balancierten Baums. Standardmäßig FIFO. LIFO bedeutet Last-In-First-Out. Wenn die Wartezeit der Goroutine auf die Sperre ungleich 0 ist (mindestens einmal blockiert), ist sietrue.profile, Flags zur Leistungsanalyse von Sperrenskipframes, Anzahl der zu überspringenden Stack-Framesreason, Grund für die Blockierung
Im Folgenden wird der gesamte Prozess des Semaphor-Erhalts kurz beschrieben:
Beurteilung des Goroutine-Zustands. Wenn die aktuelle Goroutine nicht die gerade geschedulte Goroutine ist, wird direkt eine Ausnahme geworfen:
gogp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }Beurteilung, ob das Semaphor erhalten werden kann, und Versuch, das Semaphor auf nicht-blockierende Weise zu erhalten. Wenn es erhalten werden kann, kann direkt zurückgekehrt werden:
gofor { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }Wenn es nicht nicht-blockierend erhalten werden kann, wird in einer Schleife auf normalem Wege versucht, das Semaphor zu erhalten. Zuerst wird durch
acquireSudog()ein*sudogaus dem Cache geholt. Diese Struktur repräsentiert eine blockiert wartende Goroutine:s := acquireSudog()Dann wird der Semaphor-Baum aus der globalen Tabelle geholt:
goroot := semtable.rootFor(addr)In die Schleife eintreten, den Semaphor-Baum sperren, erneut beurteilen, ob das Semaphor erhalten werden kann. Wenn nicht, wird er dem Semaphor-Baum hinzugefügt. Dann wird
goparkaufgerufen, um ihn anzuhalten und zu warten, bis er aufgeweckt wird und dieser Prozess wiederholt wird. Die Schleife wird fortgesetzt, bis das Semaphor erhalten wurde:gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }Schließlich wird beim Aufwachen
*sudogfreigegeben und an den Cache zurückgegeben:goreleaseSudog(s)
Release
Das Freigeben des Semaphors, Aufwecken der blockiert wartenden Goroutinen, wird durch die Funktion runtime.semrelease1 implementiert:
func semrelease1(addr *uint32, handoff bool, skipframes int)Sie empfängt folgende Parameter:
addr, Semaphor-Adressehandoff, gibt an, ob das aktuell von P geschedulte G direkt auf das aufgeweckte G umgeschaltet werden soll. Nur im Hungermodustrue.skipframes, Anzahl der zu überspringenden Stack-Frames
Im Folgenden wird der gesamte Freigabeprozess kurz beschrieben:
Den Semaphor-Baum holen, dann das Semaphor um eins erhöhen, was die Freigabe eines Semaphors bedeutet:
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)Wenn die Anzahl der wartenden Goroutinen 0 ist, direkt zurückkehren:
goif root.nwait.Load() == 0 { return }Den Semaphor-Baum sperren, erneut beurteilen, ob wartende Goroutinen vorhanden sind:
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }Eine blockiert wartende Goroutine aus dem Semaphor-Baum holen,
nwaitum eins verringern, dann die Sperre des Semaphors freigeben:gos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)Beurteilen, ob das Semaphor erhalten werden kann:
goif handoff && cansemacquire(addr) { s.ticket = 1 }Die Funktion
readyWithTimesetzt die aufgeweckte Goroutine G direkt als die nächste auszuführende Goroutine von P, d.h. sie modifiziert*p.runnext=g:goreadyWithTime(s, 5+skipframes)Wenn
handofftrueist, wirdgoyielddie aktuell semaphorfreigebende Goroutine G vom aktuellen M entbinden und wieder am Ende der lokalen Runqueue von P hinzufügen. Dann beginnt ein neuer Scheduling-Zyklus, damit die aufgeweckte Goroutine G sofort gescheduled werden kann:goif s.ticket == 1 && getg().m.locks == 0 { goyield() }
Dies sind die Prozesse für das Erhalten und Freigeben des Semaphors. In Go wird das Semaphor nicht nur beim Mutex verwendet. Es wird hier platziert, weil das Semaphor die größte Verbindung zum Mutex hat. Offiziell ist dies sogar in den Kommentaren so vermerkt:
// Asynchronous semaphore for sync.Mutex.Nachdem man das Semaphor verstanden hat, wird der Blick auf den Mutex viel klarer.
TIP
Bezüglich des semaRoot-Semaphor-Baums: Sein Ein- und Ausreihen beinhaltet Selbstausgleichsoperationen, deren Implementierung relativ aufwendig ist. Diese im Detail zu verfolgen hat nichts mit dem Thema dieses Artikels zu tun und ist ohne Bedeutung, daher wurde dies ausgeblendet. Interessierte können sich selbst den Quellcode ansehen.
Zusammenfassung
Der Mutex sync.Mutex implementiert das Warten von Goroutinen durch zwei Mechanismen: Spin und Semaphor. Spin ist nicht-blockierend, muss aber streng begrenzt werden, da es CPU-Ressourcen verbraucht. Das Semaphor ist blockierend und kann unnötigen Ressourcenverbrauch effektiv vermeiden. Um einen faireren Konkurrenzmechanismus zu implementieren, garantiert Go durch die Unterscheidung zwischen Normalmodus und Hungermodus, dass Goroutinen im Prozess der Konkurrenz um die Sperre ausgeglichener sein können. Im Vergleich zu runtime.mutex als Low-Level-Sperre wurde sync.Mutex als benutzerorientierte Sperre unter Berücksichtigung mehrerer tatsächlicher Verwendungsszenarien entworfen.
Die Lese-Schreib-Sperre sync.RWMutex implementiert Schreiben-Schreiben-Mutex durch den Mutex sync.Mutex und fügt auf dieser Basis zwei zusätzliche Semaphoren hinzu, um Lesen-Schreiben-Mutex und Lesen-Lesen gemeinsam zu implementieren und so verschiedene Nebenläufigkeitsszenarien zu unterstützen.
Obwohl die Implementierung von Sperren relativ komplex erscheint, wird das Verständnis anderer Synchronisationswerkzeuge in der sync-Standardbibliothek viel einfacher, sobald man das Prinzip von Mutex verstanden hat.
