sync.Mutex Mutex
Il lock è una primitiva di sincronizzazione importante nei sistemi operativi. Il linguaggio Go fornisce implementazioni di mutex e read-write lock nella libreria standard, corrispondenti rispettivamente a:
sync.Mutex, mutex, lettura-lettura reciproca, lettura-scrittura reciproca, scrittura-scrittura reciprocasync.RWMutex, read-write lock, lettura condivisa, lettura-scrittura reciproca, scrittura-scrittura reciproca
I loro scenari d'uso sono molto comuni, utilizzati per proteggere l'accesso e la modifica ordinata di una certa memoria condivisa in caso di concorrenza, come nell'esempio seguente:
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}Senza la protezione del lock, il risultato di output di questa funzione potrebbe essere diverso ogni volta, imprevedibile. Ovviamente, nella maggior parte degli scenari non vogliamo che si verifichi una tale situazione. Questo caso è molto semplice per la maggior parte delle persone, e potresti già essere esperto nell'uso dei lock, ma potresti non comprendere come sono implementati internamente i lock del linguaggio Go. Il codice stesso non è complesso, e questo articolo ne fornirà una spiegazione dettagliata.
Locker
Prima di iniziare, diamo un'occhiata al tipo sync.Locker, che è un'interfaccia definita da Go:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}I metodi forniti sono molto semplici e facili da capire: lock e unlock. Grazie alla caratteristica di Go per cui l'implementazione delle interfacce è preferita alla convenzione, molte persone potrebbero non averlo mai visto. Viene menzionato qui solo brevemente perché non è così importante. Entrambi i lock discussi di seguito implementano questa interfaccia.
Mutex
La definizione del tipo di mutex Mutex si trova nel file sync/mutex.go. È un tipo struct, come segue:
type Mutex struct {
state int32
sema uint32
}Descrizione dei campi:
state, il campo rappresenta lo stato del locksema, cioè semaphore (semaforo), la cui spiegazione verrà fornita più avanti
Parliamo prima di questo state:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)state è un tipo intero a 32 bit. I 3 bit meno significativi sono utilizzati per rappresentare i tre stati sopra indicati. Ci sono tre stati totali, e questi stati non sono indipendenti; possono coesistere.
mutexLocked=1, bloccatomutexWoken=2, risvegliatomutexStarving=4, modalità fame
I 29 bit più significativi sono utilizzati per rappresentare quante goroutine sono in attesa del lock. Quindi, teoricamente, un mutex può essere utilizzato contemporaneamente da un massimo di 2^29+1 goroutine. Tuttavia, nella realtà è improbabile avere così tante goroutine. Anche se ognuna occupa solo 2KB (dimensione iniziale dello stack), lo spazio di memoria richiesto per creare così tante goroutine sarebbe di circa 1TB.
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+Il mutex ha due modalità di esecuzione: modalità normale e modalità fame. La modalità normale segue l'ordine FIFO (First In First Out) in cui le goroutine ottengono il lock secondo l'ordine in cui arrivano nella coda di attesa bloccata. Questo è il caso generale e offre le migliori prestazioni, poiché non ci sono problemi quando tutti ottengono il lock nell'ordine di accesso. La modalità fame è il caso non normale. La "fame" qui si riferisce a una goroutine in attesa che non riesce a ottenere il lock per un lungo periodo e rimane bloccata. Non significa che il mutex stesso sia in stato di fame. Allora, quando una goroutine entra in stato di fame? Secondo l'esempio ufficiale, c'è una goroutine arrivata per prima che si blocca perché non può ottenere il mutex. Quando il lock viene rilasciato e viene risvegliata, arriva un'altra goroutine che tenta di ottenere il lock (quella che ama "saltare la fila"). Poiché quest'ultima è in esecuzione (occupa lo slice di tempo della CPU), ha un'alta probabilità di competere con successo per il lock. In casi estremi, potrebbero esserci molte goroutine come questa, e la goroutine appena risvegliata non potrà mai ottenere il lock (salta sempre la fila senza fine). È arrivata per prima, ma non riesce mai a ottenere il lock.
const (
starvationThresholdNs = 1e6
)Per evitare questa situazione, Go imposta una soglia di attesa starvationThresholdNs. Se una goroutine non riesce a ottenere il lock per più di 1ms, il mutex entra in modalità fame. In modalità fame, la proprietà del lock viene trasferita direttamente alla prima goroutine nella coda di attesa. Le nuove goroutine non tentano di ottenere il lock e si mettono in coda. In questo modo, in modalità fame, la proprietà del mutex viene mantenuta sequenzialmente dalle goroutine nella coda di attesa (chi è in fila ottiene prima il lock, chi salta la fila va dietro). Quando una goroutine ottiene con successo il lock, se è l'ultima goroutine in attesa o se il tempo di attesa è inferiore a 1ms, il mutex torna alla modalità normale. Questo design della modalità fame evita che alcune goroutine non possano ottenere il lock per un lungo periodo e "muoiano di fame".
TryLock
Il mutex fornisce due metodi per acquisire il lock:
Lock(), acquisisce il lock in modo bloccanteTryLock(), acquisisce il lock in modo non bloccante
Vediamo prima il codice di TryLock, poiché la sua implementazione è più semplice:
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}All'inizio esegue un controllo: se il lock è già stato acquisito o se è in modalità fame (cioè molte goroutine sono in attesa del lock), allora la goroutine corrente non può ottenere il lock. Altrimenti, tenta di aggiornare lo stato a mutexLocked tramite un'operazione CAS. Se l'operazione CAS restituisce false, significa che durante questo periodo un'altra goroutine ha ottenuto con successo il lock, quindi la goroutine corrente non può ottenerlo. Altrimenti, ottiene con successo il lock. Da questo codice si può vedere che il chiamante di TryLock() è quello che tenta di "saltare la fila", perché cerca di prendere il lock indipendentemente dal fatto che ci siano goroutine in attesa (old potrebbe non essere uguale a 0).
Lock
Di seguito è riportato il codice di Lock. Utilizza anche un'operazione CAS per tentare di acquisire direttamente il lock, ma è più "onesto": tenta di acquisire direttamente il lock solo quando non ci sono goroutine in attesa bloccata (old=0).
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}Se scopre che ci sono goroutine in attesa bloccata, si "mette onestamente" in coda ed entra nel flusso di auto-rotazione lockslow per attendere il lock (il cuore del mutex). Per prima cosa prepara alcune variabili:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: utilizzato per registrare il tempo di inizio dell'attesa, per verificare se entrare in modalità fame.starving: indica se la goroutine corrente non ha ottenuto il lock per più di 1ms.awoke: contrassegna se la goroutine corrente è stata risvegliata.iter: contatore, registra il numero di volte di auto-rotazione.old: ottiene lo stato corrente del mutex.
Poi entra in un ciclo for per verificare se la goroutine corrente può entrare nello stato di auto-rotazione.
L'auto-rotazione è un meccanismo di sincronizzazione tra thread, noto anche come busy-waiting. Quando un thread non detiene il lock, non si sospende direttamente cambiando il contesto del thread, ma entra in uno stato di rotazione a vuoto, occupando lo slice di tempo della CPU durante questo processo. Se la competizione per il lock non è elevata o il tempo di mantenimento del lock è breve, questo approccio può effettivamente migliorare le prestazioni evitando frequenti cambi di contesto del thread. Tuttavia, non è onnipotente. In Go, l'abuso dell'auto-rotazione può portare alle seguenti conseguenze pericolose:
- Elevato utilizzo della CPU: troppe goroutine in auto-rotazione consumano molte risorse CPU, specialmente quando il lock è occupato per un lungo periodo.
- Influenza sulla schedulazione delle goroutine: il numero totale di processori P è limitato. Se molte goroutine in auto-rotazione occupano P, altre goroutine che eseguono codice utente non possono essere schedulate tempestivamente.
- Problemi di coerenza della cache: la natura di busy-waiting dei lock di auto-rotazione porta i thread a leggere ripetutamente lo stato del lock nella cache (cache). Se le goroutine in auto-rotazione sono eseguite su core diversi e lo stato del lock non viene aggiornato tempestivamente nella memoria globale, le goroutine leggono uno stato del lock inaccurato. Inoltre, la frequente sincronizzazione della coerenza della cache può ridurre significativamente le prestazioni.
Quindi, non tutte le goroutine possono entrare nello stato di auto-rotazione. Devono superare i seguenti giudizi rigorosi:
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}Le condizioni sono le seguenti:
Il lock corrente deve essere già acquisito e non deve essere in modalità fame. Altrimenti, significa che ci sono già goroutine che non possono ottenere il lock da molto tempo, quindi si passa direttamente al flusso di blocco.
Entra nel flusso di giudizio
runtime.sync_runtime_canSpin:goconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }Il numero di auto-rotazioni deve essere inferiore a
runtime.active_spin, di default è 4 volte. Troppe rotazioni sprecherebbero risorse.Il numero di core CPU deve essere maggiore di 1. L'auto-rotazione su sistemi single-core non ha alcun significato.
Il
gomaxprocscorrente deve essere maggiore della somma dei P inattivi e dei P in auto-rotazione più 1. Questo indica che non ci sono abbastanza processori disponibili per l'auto-rotazione.La coda locale del P corrente deve essere vuota. Altrimenti, significa che ci sono altri task utente da eseguire e non si può eseguire l'auto-rotazione.
Se il giudizio consente l'auto-rotazione, viene chiamata runtime.sync_runtime_doSpin per entrare nell'auto-rotazione. In pratica, esegue semplicemente 30 istruzioni PAUSE.
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RETSe non può eseguire l'auto-rotazione, ci sono solo due possibili esiti: ottenere con successo il lock o entrare nella coda di attesa e bloccarsi. Tuttavia, prima di questo ci sono molte cose da gestire:
- Se non è in modalità fame, tenta di acquisire il lock:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- Se il lock è già occupato o se è in modalità fame, il numero di goroutine in attesa aumenta di 1:
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- Se la goroutine corrente è già in stato di fame e il lock è ancora occupato, entra in modalità fame:
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- Se la goroutine corrente è stata risvegliata dall'auto-rotazione, aggiunge il flag
mutexWoken:
if awoke {
new &^= mutexWoken
}Poi tenta di aggiornare lo stato del lock tramite CAS. Se l'aggiornamento fallisce, passa direttamente al ciclo successivo:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}Se l'aggiornamento ha successo, inizia il giudizio seguente:
Se lo stato originale non era in modalità fame e nessun lock è occupato da goroutine, la goroutine corrente può acquisire direttamente il lock, uscire dal flusso e continuare a eseguire il codice utente.
goif old&(mutexLocked|mutexStarving) == 0 { break }Se il tentativo di acquisire il lock fallisce, registra il tempo di attesa. Se LIFO è true, indica che la coda è last-in-first-out, altrimenti è first-in-first-out (FIFO).
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }Tenta di acquisire il semaforo, entrando nella funzione
runtime.semacquire1. Se può acquisire il semaforo, restituisce direttamente senza bloccarsi. Altrimenti, chiamaruntime.goparkper sospendere la goroutine corrente e attendere il rilascio del semaforo.goruntime_SemacquireMutex(&m.sema, queueLifo, 1)Arrivati a questo punto, ci sono due possibilità: o si è acquisito con successo il semaforo direttamente, o si è stati appena risvegliati dal blocco e si è acquisito con successo il semaforo. In entrambi i casi, si è acquisito con successo il semaforo. Se si è in modalità fame, si può acquisire direttamente il lock.
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }Se non è in modalità fame, reimposta
itere ricomincia il flusso di auto-rotazione.goawoke = true iter = 0
A questo punto, il flusso di acquisizione del lock termina. L'intero processo è piuttosto complesso, utilizzando sia l'auto-rotazione che l'attesa bloccante con semaforo, bilanciando prestazioni ed equità, adatto alla maggior parte degli scenari di competizione per il lock.
Unlock
Il flusso di rilascio del lock è relativamente più semplice. Per prima cosa tenta il rilascio rapido. Se new è 0, significa che non ci sono goroutine in attesa e non è in modalità fame, quindi il rilascio del lock ha successo e può restituire direttamente.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}Altrimenti, deve entrare nel flusso di unlockSlow:
Verifica prima se il lock è già stato rilasciato:
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }Se è in modalità fame, rilascia direttamente il semaforo, completando il rilascio del lock. In modalità fame, la goroutine che rilascia il lock trasferisce direttamente la proprietà del lock alla prossima goroutine in attesa.
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }Se non è in modalità fame, entra nel flusso di rilascio normale:
Se non ci sono goroutine in attesa, o se c'è un'altra goroutine risvegliata che ha già ottenuto il lock, o se il lock è entrato in modalità fame:
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }Altrimenti, rilascia il semaforo per risvegliare la prossima goroutine in attesa, impostando lo stato corrente del lock su
mutexWoken:gonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
Infine, il flusso di rilascio del lock termina.
RWMutex
La definizione del tipo di read-write mutex RWMutex si trova nel file sync/rwmutex.go. La sua implementazione è basata sul mutex.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}Di seguito è riportata la descrizione di ciascun campo:
w, un mutex. Quando una goroutine writer detiene questo mutex, altre goroutine writer e reader vengono bloccate.writerSem, semaforo per writer, utilizzato per bloccare le goroutine writer in attesa delle goroutine reader. Le goroutine writer acquisiscono il semaforo, le goroutine reader rilasciano il semaforo.readerSem, semaforo per reader, utilizzato per bloccare le goroutine reader in attesa delle goroutine writer. Le goroutine reader acquisiscono il semaforo, le goroutine writer rilasciano il semaforo.readerCount, campo centrale, l'intero read-write lock dipende dal suo stato.readerWait, indica il numero di goroutine reader che devono attendere quando una goroutine writer è bloccata.
Il principio generale del read-write lock è rendere le goroutine writer mutualmente esclusive attraverso il mutex, e rendere writer e reader mutualmente esclusivi attraverso i due semafori writerSem e readerSem, consentendo la condivisione tra reader.
readerCount
Poiché questo readerCount cambia frequentemente ed è molto importante, viene trattato separatamente. Riassume approssimativamente i seguenti stati:
- 0, nessuna goroutine reader o writer attiva, stato idle
-rwmutexMaxReaders, una goroutine writer ha acquisito il mutex, nessuna goroutine reader attiva-rwmutexMaxReaders+N, una goroutine writer ha acquisito il write lock, le goroutine reader correnti devono bloccarsi in attesa che la goroutine writer rilasci il write lockN-rwmutexMaxReaders, una goroutine writer ha acquisito il mutex, deve attendere che le restanti goroutine reader rilascino il read lockN, ci sono N goroutine reader attive, cioè sono stati acquisitiNread lock, nessuna goroutine writer attiva
Dove rwmutexMaxReaders è una costante, il cui valore è il doppio del numero di goroutine che possono attendere bloccate nel mutex, perché metà sono goroutine reader e metà sono goroutine writer.
const rwmutexMaxReaders = 1 << 30L'intero read-write lock ha questa parte complessa di readerCount. Comprendendo le sue variazioni, si comprende il flusso di lavoro del read-write lock.
TryLock
Come al solito, vediamo prima il più semplice TryLock():
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}All'inizio, tenta di chiamare TryLock() del mutex. Se fallisce, restituisce direttamente. Poi utilizza un'operazione CAS per tentare di aggiornare il valore di readerCount da 0 a -rwmutexMaxReaders. 0 rappresenta che non ci sono goroutine reader al lavoro, -rwmutexMaxReaders indica che una goroutine writer ha acquisito il mutex. Se l'operazione CAS di aggiornamento fallisce, sblocca il mutex e restituisce false. Altrimenti, restituisce true.
Lock
Poi c'è Lock(), la cui implementazione è molto semplice:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}Per prima cosa compete con altre goroutine writer fino a ottenere il mutex, poi esegue questa operazione: prima sottrae atomicamente -rwmutexMaxReaders, poi aggiunge non atomicamente rwmutexMaxReaders al nuovo valore ottenuto:
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReadersScomposto in due passaggi:
Questo serve per notificare alle altre goroutine reader che ora c'è una goroutine writer che tenta di acquisire il lock, come già discusso nella parte di
TryLock.gorw.readerCount.Add(-rwmutexMaxReaders)Poi aggiunge
rwmutexMaxReadersper ottenere r, che rappresenta il numero di goroutine reader attualmente al lavoro.gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
Poi verifica se ci sono goroutine reader al lavoro, e se sì, aggiunge r al valore di readerWait. Se il risultato finale non è 0, indica che deve attendere che queste goroutine reader completino il lavoro, quindi entra nel flusso runtime_SemacquireRWMutex per tentare di acquisire il semaforo writerSem. Questo semaforo viene rilasciato dalle goroutine reader. Se può ottenere il semaforo, indica che le goroutine reader hanno completato il lavoro. Altrimenti, deve entrare nella coda di attesa bloccata (la logica del semaforo è fondamentalmente coerente con quella del mutex).
UnLock
Poi c'è UnLock(), che rilascia il write lock:
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}Il suo flusso è il seguente:
Come menzionato prima, durante l'acquisizione del lock,
readerCountviene aggiornato a un valore negativo. Qui aggiungerwmutexMaxReaders, indicando che non ci sono goroutine writer al lavoro. Il valore ottenuto rappresenta il numero di goroutine reader in attesa bloccata.gor := rw.readerCount.Add(rwmutexMaxReaders)Se è già 0 o maggiore di 0, rappresenta che il write lock è già stato rilasciato:
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }Rilascia il semaforo
readerSem, risvegliando le goroutine reader in attesa:gofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }Infine rilascia il mutex, risvegliando le goroutine writer in attesa:
gorw.w.Unlock()
Il rilascio del write lock è completato.
TryRLock
Ora vediamo la parte del read lock. Questo è il codice di TryRLock:
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}Fa solo due cose:
Verifica se ci sono goroutine writer al lavoro. Se sì, l'acquisizione del lock fallisce.
goc := rw.readerCount.Load() if c < 0 { return false }Tenta di aggiungere 1 a
readerCount. Se l'aggiornamento ha successo, l'acquisizione del lock ha successo:goif rw.readerCount.CompareAndSwap(c, c+1) { return true }Altrimenti, continua il ciclo di giudizio fino all'uscita.
Si può vedere che qui si dipende da readerCount, che è mantenuto nella parte del write lock. Questo è anche il motivo per cui si parla prima del write lock: perché la parte complessa e centrale è mantenuta nella parte del write lock.
RLock
La logica di RLock è ancora più semplice:
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}Tenta di aggiungere 1 al valore di readerCount. Se il nuovo valore ottenuto è ancora inferiore a 0, indica che una goroutine writer è al lavoro, quindi entra nel flusso di blocco del semaforo readerSem. La goroutine corrente entra nella coda di attesa bloccata.
RUnLock
Anche RUnLock è altrettanto semplice da comprendere:
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}Per prima cosa tenta di sottrarre 1 a readerCount, indicando che il numero di goroutine reader attive diminuisce di 1. Se il valore ottenuto è maggiore di 0, indica che può essere rilasciato direttamente, perché ora nessuna goroutine writer detiene il mutex. Se è inferiore a 0, indica che una goroutine writer ha già acquisito il mutex e sta attendendo che tutte le goroutine reader correnti completino il lavoro. Poi entra nel flusso di runlockSlow:
Se il valore originale di
readerCountè 0 (il lock è idle) o-rwmutexMaxReaders(la goroutine writer non ha goroutine reader in attesa, cioè tutti i read lock sono stati rilasciati), indica che non ci sono goroutine reader attive e non è necessario il rilascio:goif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }Se ci sono goroutine reader attive, sottrae 1 a
readerWait. Se la goroutine reader corrente è l'ultima goroutine reader attiva, rilascia il semaforowriterSem, risvegliando la goroutine writer in attesa:goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
Il flusso di rilascio del read lock termina.
Semaforo
Il semaforo all'interno del mutex è un semplice intero uint32. Attraverso la sottrazione atomica di 1 e l'aggiunta atomica di 1, rappresenta l'acquisizione e il rilascio del semaforo. Nel runtime, la struttura responsabile della gestione dei semafori è runtime.semaRoot. La sua definizione di tipo si trova nel file runtime/sema.go. semaRoot utilizza un albero bilanciato (treap) per organizzare e gestire i semafori. Ogni nodo nell'albero rappresenta un semaforo. Il tipo di nodo è *sudog, che è una lista doppiamente collegata che mantiene la coda di attesa del semaforo corrispondente. Il nodo mantiene l'unicità attraverso *sudog.elem (indirizzo del semaforo) e garantisce l'equilibrio dell'albero attraverso il campo *sudog.ticket.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}L'albero dei semafori semaRoot dipende da un mutex di livello inferiore runtime.mutex per garantire la sua sicurezza concorrente.
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot è memorizzato nel runtime in un semaTable globale. Sembra un array di lunghezza fissa utilizzato per memorizzare più insiemi di radici di alberi di semafori, ma in realtà, dal punto di vista del funzionamento, è effettivamente una tabella hash. Ogni elemento nella tabella contiene un semaRoot e alcuni byte di riempimento (pad), utilizzati per l'allineamento della memoria ed evitare la competizione delle cache line. semTabSize è la costante per la dimensione della tabella dei semafori, che specifica la lunghezza della tabella come 251. Generalmente si sceglie un numero primo per ridurre le collisioni hash e migliorare l'efficienza dell'hashing.
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}Il metodo rootFor equivale a una funzione hash. Accetta un puntatore di tipo uint32 addr (cioè l'indirizzo del semaforo) e restituisce il puntatore alla struttura semaRoot corrispondente a quell'indirizzo. Questa riga di codice converte prima addr in un uintptr, poi lo sposta a destra di 3 bit, equivalente a dividere per 8 (poiché un byte occupa 8 bit, dividendo l'indirizzo del puntatore per 8 si può mappare in un indice dell'array). Prendendo il modulo per semTabSize, assicura che l'indice sia nell'intervallo della dimensione della tabella dei semafori. Ottenendo il semaRoot tramite l'indice, si cerca poi il *sudog corrispondente al semaforo nell'albero bilanciato.
Acquire
L'acquisizione del semaforo è implementata dalla funzione runtime.semacquire1:
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)Accetta i seguenti parametri:
addr, l'indirizzo del semaforolifo, influenza l'ordine di dequeue dell'albero bilanciato. Di default è FIFO. LIFO è last-in-first-out. Quando il tempo di attesa del lock della goroutine non è 0 (almeno si è bloccata una volta), ètrueprofile, flag per l'analisi delle prestazioni del lockskipframes, il numero di stack frame da saltarereason, il motivo del blocco
Di seguito viene descritto brevemente l'intero flusso di acquisizione del semaforo:
Verifica lo stato della goroutine. Se la goroutine corrente non è la goroutine in schedulazione, lancia un'eccezione:
gogp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }Verifica se può acquisire il semaforo e tenta di acquisirlo con un metodo non bloccante. Se può acquisirlo, può restituire direttamente:
gofor { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }Se non può acquisirlo in modo non bloccante, entra in un ciclo per acquisire il semaforo con mezzi normali. Per prima cosa ottiene un
*sudogdalla cache tramiteacquireSudog(). Questa struttura rappresenta una goroutine bloccata in attesa:gos := acquireSudog()Poi ottiene l'albero dei semafori dalla tabella globale:
goroot := semtable.rootFor(addr)Entra in un ciclo, acquisisce il lock dell'albero dei semafori, verifica nuovamente se può acquisire il semaforo. Se non può, lo aggiunge all'albero dei semafori, poi chiama
goparkper sospenderlo in attesa. Fino a quando non viene risvegliato, continua a ripetere questo processo, in un ciclo fino a ottenere il semaforo:gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }Infine, quando viene risvegliato, rilascia
*sudog, restituendolo alla cache:goreleaseSudog(s)
Release
Il rilascio del semaforo risveglia le goroutine bloccate in attesa. Questa funzionalità è implementata dalla funzione runtime.semrelease1:
func semrelease1(addr *uint32, handoff bool, skipframes int)Accetta i seguenti parametri:
addr, l'indirizzo del semaforohandoff, indica se passare direttamente la G in schedulazione dal P corrente alla G risvegliata. Ètruesolo in modalità fameskipframes, il numero di stack frame da saltare
Di seguito viene descritto brevemente l'intero processo di rilascio:
Ottiene l'albero dei semafori, poi incrementa il semaforo di 1, indicando il rilascio di un semaforo:
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)Se il numero di goroutine in attesa è 0, restituisce direttamente:
goif root.nwait.Load() == 0 { return }Acquisisce il lock dell'albero dei semafori, verifica nuovamente se ci sono goroutine in attesa:
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }Ottiene una goroutine bloccata in attesa dall'albero dei semafori, decrementa
nwaitdi 1, poi rilascia il lock del semaforo:gos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)Verifica se può acquisire il semaforo:
goif handoff && cansemacquire(addr) { s.ticket = 1 }La funzione
readyWithTimepassa direttamente la G risvegliata come prossima goroutine da eseguire dal P, cioè modifica*p.runnext=g:goreadyWithTime(s, 5+skipframes)Se
handoffètrue, alloragoyielddisaccoppia la goroutine G che rilascia il semaforo dal M corrente e la riaggiunge alla coda di esecuzione locale del P, quindi inizia un nuovo ciclo di schedulazione per consentire alla goroutine G risvegliata di essere schedulata immediatamente:goif s.ticket == 1 && getg().m.locks == 0 { goyield() }
Il flusso di acquisizione e rilascio del semaforo è questo. In Go, i semafori sono utilizzati non solo nei mutex. Sono collocati qui perché i semafori sono più strettamente correlati ai mutex. Anche i commenti ufficiali lo indicano:
// Asynchronous semaphore for sync.Mutex.Dopo aver compreso i semafori, tornare a guardare i mutex diventa più chiaro.
TIP
Per quanto riguarda l'albero dei semafori semaRoot, le sue operazioni di enqueue e dequeue coinvolgono implementazioni di auto-bilanciamento piuttosto complesse. Approfondire questi aspetti non è correlato al tema di questo articolo e non ha significato, quindi sono stati omessi. Chi è interessato può esplorare il codice sorgente autonomamente.
Riepilogo
Il mutex sync.Mutex implementa l'attesa delle goroutine attraverso due meccanismi: auto-rotazione e semaforo. L'auto-rotazione è non bloccante, ma deve essere strettamente limitata nel suo utilizzo perché consuma risorse CPU; mentre il semaforo è bloccante e può evitare efficacemente sprechi di risorse non necessari. Per realizzare un meccanismo di competizione più equo, Go garantisce una competizione più equilibrata tra le goroutine distinguendo tra modalità normale e modalità fame. Rispetto a lock di basso livello come runtime.mutex, sync.Mutex, essendo un lock orientato all'utente, considera più scenari d'uso effettivi durante la progettazione.
Il read-write lock sync.RWMutex implementa la mutua esclusione writer-writer attraverso il mutex sync.Mutex, e su questa base aggiunge due semafori aggiuntivi per implementare la mutua esclusione reader-writer e la condivisione reader-reader, supportando così vari scenari di concorrenza.
Sebbene l'implementazione dei lock possa sembrare complessa, una volta compreso il principio di Mutex, imparare altri strumenti di sincronizzazione nella libreria standard sync diventa molto più facile.
