sync.Mutex Verrou mutuel
Les verrous sont une primitive de synchronisation importante dans les systèmes d'exploitation. Le langage Go fournit deux implémentations de verrous dans sa bibliothèque standard :
sync.Mutex, verrou mutuel : lecture-lecture exclusif, lecture-écriture exclusif, écriture-écriture exclusifsync.RWMutex, verrou lecture-écriture : lecture-lecture partagé, lecture-écriture exclusif, écriture-écriture exclusif
Leurs cas d'utilisation sont très courants, pour protéger l'accès et la modification séquentielle d'une zone de mémoire partagée en situation de concurrence.
Locker
Avant de commencer, regardons le type sync.Locker, c'est un ensemble d'interfaces définies par Go.
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}Mutex
Le verrou mutuel Mutex est défini dans le fichier sync/mutex.go, c'est un type struct.
type Mutex struct {
state int32
sema uint32
}Description des champs :
state, représente l'état du verrousema, sémaphore
state est un entier 32 bits, les 3 bits de poids faible représentent trois états :
mutexLocked=1, verrouillémutexWoken=2, réveillémutexStarving=4, mode famine
Les 29 bits de poids fort représentent le nombre de coroutines en attente du verrou.
Le verrou mutuel a deux modes de fonctionnement : mode normal et mode famine. Le mode normal suit l'ordre FIFO de la file d'attente. Le mode famine est activé quand une coroutine attend plus de 1ms sans obtenir le verrou.
TryLock
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}Lock
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}Le verrouillage utilise deux mécanismes : spin et sémaphore. Le spin est non bloquant mais consomme des ressources CPU. Le sémaphore est bloquant.
Les conditions pour entrer en spin sont strictes :
- Le verrou est déjà détenu et pas en mode famine
- Nombre de spins inférieur à 4
- CPU multi-cœurs
- La file locale de P est vide
Unlock
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}Le déverrouillage est relativement simple. En mode famine, le verrou est directement transmis à la coroutine en tête de file.
RWMutex
Le verrou lecture-écriture RWMutex est basé sur le verrou mutuel.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}w, un verrou mutuel pour les writerswriterSem, sémaphore d'écriturereaderSem, sémaphore de lecturereaderCount, champ central maintenant l'étatreaderWait, nombre de lecteurs à attendre
readerCount
readerCount peut avoir plusieurs états :
- 0 : ni lecteur ni writer actif
-rwmutexMaxReaders: un writer détient le verrou- N : N lecteurs actifs
Sémaphore
Le sémaphore dans le verrou mutuel est un simple uint32, incrémenté et décrémenté atomiquement. La structure responsable de maintenir les sémaphores dans le runtime est runtime.semaRoot, qui utilise un arbre binaire équilibré (treap).
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}Acquire
L'acquisition du sémaphore est implémentée par runtime.semacquire1. Si le sémaphore ne peut pas être obtenu de manière non bloquante, la coroutine est ajoutée à l'arbre des sémaphores et suspendue via gopark.
Release
La libération du sémaphore est implémentée par runtime.semrelease1. Elle réveille une coroutine en attente.
Résumé
Le verrou mutuel sync.Mutex implémente l'attente des coroutines via deux mécanismes : spin et sémaphore. Le spin est non bloquant mais consomme des ressources CPU, tandis que le sémaphore est bloquant et évite la consommation inutile de ressources.
Voici le code de Lock, qui utilise également l'opération CAS pour tenter d'acquérir le verrou directement. Cependant, il est plus "prudent" : il n'acquerra le verrou directement que lorsqu'aucune coroutine n'est en attente (old=0).
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}S'il détecte qu'une coroutine est en attente, il "prudemment" se met en file d'attente et entre dans le processus de spin lockSlow pour attendre le verrou (le cœur du verrou mutuel). D'abord, il prépare quelques variables :
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: utilisé pour enregistrer le temps de début d'attente, vérifier si le mode famine doit être activé.starving: indique si la coroutine courante n'a pas obtenu le verrou depuis plus de 1ms.awoke: marque si la coroutine courante a été réveillée.iter: compteur, enregistre le nombre de spins.old: obtient l'état actuel du verrou mutuel.
Ensuite, on entre dans une boucle for pour déterminer si la coroutine courante peut entrer en état de spin.
Le spin est un mécanisme de synchronisation entre threads, également appelé attente active (busy-waiting). Lorsqu'un thread ne détient pas le verrou, il ne se suspend pas directement pour changer de contexte thread mais entre en boucle inactive, occupant constamment le time-slice CPU. Dans des situations de faible contention de verrou ou lorsque le verrou est détenu pour une courte durée, cela peut effectivement éviter les changements de contexte fréquents et améliorer les performances. Cependant, ce n'est pas une solution universelle. En Go, l'abus de spin peut entraîner les conséquences dangereuses suivantes :
- Utilisation élevée du CPU : trop de coroutines en spin consomment beaucoup de ressources CPU, surtout lorsque le verrou est occupé longtemps.
- Impact sur l'ordonnancement des coroutines : le nombre total de processeurs P est limité. Si beaucoup de coroutines en spin occupent des P, les autres coroutines exécutant du code utilisateur ne pourront pas être ordonnancées en temps opportun.
- Problèmes de cohérence du cache : la caractéristique d'attente active des verrous spin entraîne une lecture répétée de l'état du verrou dans le cache. Si les coroutines en spin s'exécutent sur différents cœurs et que l'état du verrou n'est pas mis à jour à temps dans la mémoire globale, les coroutines lisent un état de verrou inexact, et les synchronisations fréquentes de cohérence du cache réduisent également significativement les performances.
Ce n'est donc pas toutes les coroutines qui peuvent entrer en état de spin, elles doivent passer par les jugements stricts suivants :
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}Les conditions sont les suivantes :
Le verrou actuel est déjà détenu et ne peut pas être en mode famine, sinon cela signifie qu'une coroutine attend déjà le verrou depuis longtemps, auquel cas on entre directement dans le processus de blocage.
Entrer dans le processus de jugement
runtime.sync_runtime_canSpingoconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }Le nombre de spins est inférieur à
runtime.active_spin, qui est de 4 par défaut. Plus de spins gaspilleraient des ressources.Le nombre de cœurs CPU est supérieur à 1. Le spin n'a aucun sens sur un système monocœur.
Le
gomaxprocsactuel est supérieur à la somme du nombre de P inactifs et de P en spin plus 1, ce qui indique qu'il n'y a pas assez de processeurs disponibles pour le spin.La file locale du P actuel doit être vide, sinon cela signifie qu'il y a d'autres tâches utilisateur à exécuter et le spin ne doit pas avoir lieu.
Si le spin est autorisé, runtime.sync_runtime_doSpin est appelé pour entrer en spin, ce qui exécute en fait 30 instructions PAUSE.
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RETSi le spin n'est pas possible, il n'y a que deux issues : obtenir le verrou avec succès ou entrer dans la file d'attente et se bloquer. Cependant, avant cela, il y a encore plusieurs choses à traiter :
- Si ce n'est pas le mode famine, tenter d'acquérir le verrou
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- Si le verrou est déjà occupé ou en mode famine, incrémenter le nombre de coroutines en attente de 1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- Si la coroutine courante est déjà en état de famine et que le verrou est toujours occupé, activer le mode famine
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- Si la coroutine courante a été réveillée par un spin, ajouter le drapeau
mutexWoken
if awoke {
new &^= mutexWoken
}Ensuite, on tente de mettre à jour l'état du verrou via CAS. Si la mise à jour échoue, on commence directement une nouvelle itération de la boucle
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}Si la mise à jour réussit, on procède aux jugements suivants.
L'état précédent n'était pas en mode famine et aucune coroutine ne détenait le verrou, donc la coroutine courante peut directement détenir le verrou, quitter le processus et continuer l'exécution du code utilisateur.
goif old&(mutexLocked|mutexStarving) == 0 { break }Échec de la tentative d'acquisition du verrou, enregistrement du temps d'attente. Si LIFO est true, cela signifie une file dernier entré premier sorti, sinon c'est FIFO premier entré premier sorti.
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }Tenter d'obtenir le sémaphore, entrer dans la fonction
runtime.semacquire1. Si le sémaphore peut être obtenu, on retourne directement sans blocage. Sinon,runtime.goparkest appelé pour suspendre la coroutine courante en attente de la libération du sémaphore.goruntime_SemacquireMutex(&m.sema, queueLifo, 1)À cette étape, il y a deux possibilités : soit le sémaphore a été obtenu directement avec succès, soit la coroutine qui était bloquée vient d'être réveillée et a obtenu le sémaphore. Dans les deux cas, le sémaphore a été obtenu avec succès. Si le mode famine est actif, le verrou peut être obtenu directement.
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }Si ce n'est pas le mode famine, réinitialiser
iteret recommencer le processus de spin.goawoke = true iter = 0
À ce stade, le processus de verrouillage est terminé. L'ensemble du processus est assez complexe, utilisant à la fois l'attente par spin et l'attente bloquante par sémaphore, équilibrant performance et équité, adapté à la plupart des situations de contention de verrou.
Unlock
Le processus de déverrouillage est relativement simple. Il tente d'abord un déverrouillage rapide. Si new est 0, cela signifie qu'il n'y a pas de coroutine en attente et que ce n'est pas le mode famine, donc le déverrouillage réussit et on peut retourner directement.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}Sinon, il faut entrer dans le processus unlockSlow
D'abord vérifier si le verrou est déjà déverrouillé
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }Si c'est le mode famine, libérer directement le sémaphore pour compléter le déverrouillage. En mode famine, la coroutine qui déverrouille transmet directement la propriété du verrou à la coroutine suivante en attente.
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }Si ce n'est pas le mode famine, entrer dans le processus de déverrouillage normal
S'il n'y a pas de coroutine en attente, ou si une autre coroutine réveillée a déjà obtenu le verrou, ou si le verrou est entré en mode famine
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }Sinon, libérer le sémaphore pour réveiller la coroutine suivante en attente, et définir l'état actuel du verrou à
mutexWokengonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
Le processus de déverrouillage est terminé.
RWMutex
Le verrou lecture-écriture RWMutex est défini dans le fichier sync/rwmutex.go, son implémentation est également basée sur le verrou mutuel.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}Voici la description de chaque champ :
w, un verrou mutuel, lorsque la coroutine writer détient ce verrou mutuel, les autres coroutines writers et readers seront bloquées.writerSem, sémaphore d'écriture, utilisé pour bloquer les coroutines writers en attente des coroutines readers. Les coroutines writers acquièrent le sémaphore, les coroutines readers libèrent le sémaphore.readerSem, sémaphore de lecture, utilisé pour bloquer les coroutines readers en attente des coroutines writers. Les coroutines readers acquièrent le sémaphore, les coroutines writers libèrent le sémaphore.readerCount, champ central, tout le verrou lecture-écriture s'appuie sur lui pour maintenir l'état.readerWait, représente le nombre de coroutines readers à attendre lorsque la coroutine writer est bloquée.
Le principe général du verrou lecture-écriture est d'utiliser le verrou mutuel pour rendre les coroutines writers mutuellement exclusives, et d'utiliser deux sémaphores writerSem et readerSem pour rendre l'écriture et la lecture mutuellement exclusives, tout en permettant le partage entre lectures.
readerCount
Comme ce readerCount change beaucoup et est très important, nous l'abordons séparément. Il peut être résumé dans les états suivants :
- 0, actuellement ni reader ni writer actif, état inactif
-rwmutexMaxReaders, une coroutine writer détient déjà le verrou mutuel, aucun reader actif actuellement-rwmutexMaxReaders+N, une coroutine writer détient déjà le verrou d'écriture, les readers actuels doivent attendre que le writer libère le verrou d'écritureN-rwmutexMaxReaders, une coroutine writer détient déjà le verrou mutuel, doit attendre que les readers restants libèrent le verrou de lectureN, actuellement N readers actifs, c'est-à-dire N verrous de lecture ajoutés, aucun writer actif
Parmi eux, rwmutexMaxReaders est une valeur constante, sa valeur est 2 fois le nombre de coroutines que le verrou mutuel peut bloquer en attente, car la moitié est pour les readers, l'autre moitié pour les writers.
const rwmutexMaxReaders = 1 << 30Pour toute la partie verrou lecture-écriture, seul ce readerCount est plus complexe, comprendre ses changements permet de comprendre le flux de travail du verrou lecture-écriture.
TryLock
Comme d'habitude, commençons par le plus simple TryLock()
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}Au début, il tente d'appeler le TryLock() du verrou mutuel, s'il échoue il retourne directement. Ensuite, il utilise l'opération CAS pour tenter de mettre à jour la valeur de readerCount de 0 à -rwmutexMaxReaders. 0 représente qu'il n'y a pas de reader travaillant actuellement, -rwmutexMaxReaders indique qu'une coroutine writer détient déjà le verrou mutuel. Si l'opération CAS échoue, le verrou mutuel est déverrouillé, si elle réussit, true est retourné.
Lock
Ensuite, Lock(), son implémentation est également très simple.
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}D'abord, il entre en compétition avec les autres writers jusqu'à obtenir le verrou mutuel, puis effectue cette opération : soustraire atomiquement -rwmutexMaxReaders, puis ajouter non atomiquement rwmutexMaxReaders à la nouvelle valeur obtenue
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReadersDécomposons en deux étapes :
Ceci est pour notifier aux autres readers qu'un writer tente de détenir le verrou, comme expliqué dans la section
TryLock.gorw.readerCount.Add(-rwmutexMaxReaders)En ajoutant
rwmutexMaxReaderson obtient r, ce r représente le nombre de readers actuellement en travail.gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
Ensuite, on vérifie s'il y a des readers en travail, puis on ajoute r à la valeur de readerWait, si le résultat final n'est toujours pas 0, cela signifie qu'il faut attendre que ces readers terminent leur travail, on entre alors dans le processus runtime_SemacquireRWMutex pour tenter d'obtenir le sémaphore writerSem, ce sémaphore est libéré par les readers. Si le sémaphore peut être obtenu, cela signifie que les readers ont terminé leur travail, sinon il faut entrer dans la file d'attente de blocage (cette logique de sémaphore est fondamentalement la même que pour le verrou mutuel).
Unlock
Ensuite, Unlock(), libération du verrou d'écriture.
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}Son processus est le suivant :
Comme mentionné précédemment, lors du verrouillage,
readerCountest mis à jour avec une valeur négative, ici en ajoutantrwmutexMaxReaders, cela indique qu'il n'y a plus de writer en travail, et la valeur obtenue est le nombre de readers en attente de blocage.gor := rw.readerCount.Add(rwmutexMaxReaders)S'il est initialement 0 ou supérieur à 0, cela signifie que le verrou d'écriture a déjà été libéré
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }Libérer le sémaphore
readerSem, réveiller les readers en attentegofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }Enfin, libérer le verrou mutuel, réveiller les writers en attente.
gorw.w.Unlock()
La libération du verrou d'écriture est terminée.
TryRLock
Passons maintenant à la partie verrou de lecture, voici le code de TryRLock
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}Il ne fait que deux choses au total :
Vérifier s'il y a un writer en travail, si oui le verrouillage échoue.
goc := rw.readerCount.Load() if c < 0 { return false }Tenter d'incrémenter
readerCountde 1, si la mise à jour réussit, le verrouillage réussitgoif rw.readerCount.CompareAndSwap(c, c+1) { return true }Sinon, continuer la boucle de vérification jusqu'à la sortie
On peut voir que le readerCount utilisé ici est maintenu dans la partie verrou d'écriture, c'est pourquoi il faut d'abord expliquer le verrou d'écriture, car les parties complexes et essentielles sont maintenues dans la partie verrou d'écriture.
RLock
La logique de RLock est encore plus simple
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}Il tente d'incrémenter la valeur de readerCount de 1, si la nouvelle valeur obtenue est toujours inférieure à 0, cela signifie qu'un writer est en travail, on entre alors dans le processus de blocage du sémaphore readerSem, la coroutine courante entrera dans la file d'attente de blocage.
RUnlock
RUnlock est également simple et facile à comprendre
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}Il tente d'abord de décrémenter readerCount de 1, indiquant que le nombre de readers actifs diminue de un. Si la valeur obtenue est supérieure à 0, cela signifie qu'on peut libérer directement car il n'y a pas de writer détenant le verrou mutuel. Si elle est inférieure à 0, cela signifie qu'un writer détient déjà le verrou mutuel et attend que tous les readers actuels terminent leur travail. Ensuite, on entre dans le processus rUnlockSlow :
Si la valeur originale de
readerCountest 0 (le verrou est inactif) ou-rwmutexMaxReaders(le writer n'a pas de readers à attendre, c'est-à-dire que tous les verrous de lecture ont été libérés), cela signifie qu'il n'y a pas de readers actifs, pas besoin de déverrouillergoif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }S'il y a des readers actifs, décrémenter
readerWaitde 1. Si le reader courant est le dernier reader actif, libérer le sémaphorewriterSem, réveiller le writer en attente.goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
Le processus de libération du verrou de lecture est terminé.
Semaphore
Le sémaphore dans le verrou mutuel est simplement un entier uint32, l'acquisition et la libération du sémaphore sont représentées par des décrémentations et incrémentations atomiques. La structure responsable du maintien des sémaphores dans le runtime est runtime.semaRoot, sa définition de type se trouve dans le fichier runtime/sema.go. semaRoot utilise un arbre binaire équilibré (treap) pour organiser et gérer les sémaphores. Chaque nœud de l'arbre représente un sémaphore, le type de nœud est *sudog, c'est une liste doublement chaînée qui maintient la file d'attente des sémaphores correspondants. Les nœuds restent uniques via *sudog.elem (adresse du sémaphore), et l'équilibrage de l'arbre est assuré par le champ *sudog.ticket.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}L'arbre de sémaphores semaRoot dépend d'un verrou mutuel de plus bas niveau runtime.mutex pour garantir sa sécurité concurrentielle.
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// utilisé pour l'alignement mémoire, améliore les performances
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot est stocké dans le runtime dans une semaTable globale. Elle ressemble à un tableau de longueur fixe, utilisé pour stocker une collection de racines d'arbres de sémaphores multiples, mais en termes de fonctionnement, c'est en fait une table de hachage. Chaque élément du tableau contient un semaRoot et quelques octets de remplissage (pad), utilisés pour aligner la mémoire et éviter la contention de ligne de cache. semTabSize est une constante de taille de table de sémaphores, spécifiant une longueur de table de 251, généralement un nombre premier est choisi pour réduire les collisions de hachage et améliorer l'efficacité de dispersion.
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}La méthode rootFor équivaut à une fonction de hachage. Elle accepte un pointeur addr de type uint32 (l'adresse du sémaphore), et retourne un pointeur vers la structure semaRoot correspondant à cette adresse. Cette ligne de code convertit d'abord addr en un uintptr, puis décale de 3 bits vers la droite, ce qui équivaut à diviser par 8 (puisqu'un octet occupe 8 bits, diviser l'adresse du pointeur par 8 permet de la mapper en indice de tableau). En prenant le modulo semTabSize, on s'assure que l'indice reste dans les limites de la taille de la table de sémaphores. Une fois le semaRoot obtenu par l'indice, on peut chercher dans l'arbre équilibré la file d'attente *sudog correspondant au sémaphore.
Acquire
L'acquisition du sémaphore est implémentée par la fonction runtime.semacquire1
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)Elle reçoit les paramètres suivants :
addr, l'adresse du sémaphorelifo, affecte l'ordre de retrait de l'arbre équilibré, par défaut FIFO, LIFO signifie dernier entré premier sorti. Lorsque le temps d'attente du verrou par la coroutine n'est pas 0 (elle a été bloquée au moins une fois), c'esttrueprofile, indicateurs pour l'analyse de performance des verrousskipframes, nombre de frames de pile à sauterreason, la raison du blocage
Voici une brève description du processus d'acquisition du sémaphore :
Vérifier l'état de la coroutine, si la coroutine courante n'est pas celle en cours d'ordonnancement, lancer une exception directement
gogp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }Vérifier si le sémaphore peut être obtenu, et tenter d'obtenir le sémaphore par une méthode non bloquante, si possible on peut retourner directement.
gofor { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }Si l'acquisition non bloquante n'est pas possible, on entre dans une boucle pour obtenir le sémaphore par les moyens normaux. D'abord, on obtient un
*sudogdu cache viaacquireSudog(), cette structure représente une coroutine en attente bloquées := acquireSudog()Ensuite, obtenir l'arbre de sémaphores depuis la table globale
goroot := semtable.rootFor(addr)Entrer dans la boucle, verrouiller l'arbre de sémaphores, vérifier à nouveau si le sémaphore peut être obtenu. Si non, l'ajouter à l'arbre de sémaphores, puis appeler
goparkpour le suspendre en attente, jusqu'à être réveillé et répéter ce processus, en bouclant jusqu'à obtenir le sémaphore.gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }Enfin, lors du réveil, libérer le
*sudog, le remettre dans le cache.goreleaseSudog(s)
Release
La libération du sémaphore, réveiller les coroutines bloquées en attente, cette fonctionnalité est implémentée par la fonction runtime.semrelease1
func semrelease1(addr *uint32, handoff bool, skipframes int)Elle reçoit les paramètres suivants :
addr, l'adresse du sémaphorehandoff, indique s'il faut basculer directement le G en cours d'ordonnancement sur le P actuel vers le G réveillé, uniquementtrueen mode famineskipframes, nombre de frames de pile à sauter
Brève description du processus de libération :
Obtenir l'arbre de sémaphores, puis incrémenter le sémaphore de un, indiquant la libération d'un sémaphore
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)Si le nombre de coroutines en attente est 0, retourner directement
goif root.nwait.Load() == 0 { return }Verrouiller l'arbre de sémaphores, vérifier à nouveau s'il y a des coroutines en attente
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }Obtenir une coroutine bloquée en attente de l'arbre de sémaphores, décrémenter
nwaitde un, puis libérer le verrou du sémaphoregos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)Vérifier si le sémaphore peut être obtenu
goif handoff && cansemacquire(addr) { s.ticket = 1 }La fonction
readyWithTimeplace directement la coroutine réveillée G comme prochaine coroutine à exécuter pour P, c'est-à-dire modifier*p.runnext=g.goreadyWithTime(s, 5+skipframes)Si
handoffesttrue, alorsgoyielddétache la coroutine G libérant le sémaphore du M actuel, et la remet en fin de file d'exécution locale de P, puis commence un nouveau cycle d'ordonnancement, pour permettre à la coroutine G réveillée d'être ordonnancée immédiatementgoif s.ticket == 1 && getg().m.locks == 0 { goyield() }
Le processus d'acquisition et de libération du sémaphore est complet. En Go, le sémaphore n'est pas utilisé uniquement pour les verrous mutuels, il est présenté ici parce que le sémaphore est le plus fortement associé aux verrous mutuels, le commentaire officiel le mentionne même explicitement
// Asynchronous semaphore for sync.Mutex.Après avoir compris les sémaphores, revoir les verrous mutuels sera plus clair.
TIP
Concernant l'arbre de sémaphores semaRoot, ses opérations d'enqueue et dequeue impliquent des opérations d'auto-équilibrage dont l'implémentation est assez fastidieuse. Approfondir ces détails n'est pas pertinent ni significatif par rapport au sujet de cet article, donc ils ont été omis. Les lecteurs intéressés peuvent consulter le code source eux-mêmes.
Résumé
Le verrou mutuel sync.Mutex implémente l'attente des coroutines via deux mécanismes : spin et sémaphore. Le spin est non bloquant mais nécessite une utilisation strictement limitée car il consomme des ressources CPU ; le sémaphore est bloquant et permet d'éviter efficacement la consommation inutile de ressources. Pour implémenter un mécanisme de concurrence plus équitable, Go garantit un meilleur équilibre dans la concurrence pour les verrous en distinguant le mode normal et le mode famine. Comparé au verrou de bas niveau runtime.mutex, sync.Mutex en tant que verrou orienté utilisateur est conçu en tenant compte de davantage de scénarios d'utilisation réels.
Le verrou lecture-écriture sync.RWMutex implémente l'exclusion mutuelle écriture-écriture via le verrou mutuel sync.Mutex, et ajoute sur cette base deux sémaphores supplémentaires pour implémenter l'exclusion mutuelle lecture-écriture et le partage lecture-lecture, supportant ainsi divers scénarios de concurrence.
Bien que l'implémentation des verrous semble assez complexe, une fois le principe de Mutex compris, l'apprentissage des autres outils de synchronisation de la bibliothèque standard sync devient beaucoup plus facile.
