sync.Mutex Mutex de Exclusión Mutua
Los locks (bloqueos) son una de las primitivas de sincronización más importantes en los sistemas operativos. El lenguaje Go proporciona implementaciones de mutex y locks de lectura-escritura en su biblioteca estándar, que corresponden respectivamente a:
sync.Mutex, mutex de exclusión mutua, lectura-lectura exclusiva, lectura-escritura exclusiva, escritura-escritura exclusivasync.RWMutex, lock de lectura-escritura, lectura compartida, lectura-escritura exclusiva, escritura-escritura exclusiva
Sus escenarios de uso son muy comunes, utilizados para proteger el acceso y modificación ordenada de memoria compartida en situaciones de concurrencia, como en el siguiente ejemplo:
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}Sin la protección del lock, el resultado de salida de esta función podría ser diferente cada vez que se ejecuta, lo cual es impredecible. Obviamente, en la mayoría de los escenarios no queremos que esto ocurra. Este caso es muy simple para la mayoría de las personas, y quizás ya estás familiarizado con el uso de locks, pero es posible que no entiendas cómo se implementan internamente los locks en Go. El código en sí no es complejo, y este artículo lo explicará en detalle.
Locker
Antes de comenzar, echemos un vistazo al tipo sync.Locker. Es un conjunto de interfaces definidas por Go:
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}Sus métodos son muy simples y fáciles de entender: solo bloqueo y desbloqueo. Sin embargo, debido a que en Go la implementación de interfaces es mejor que los acuerdos, muchas personas pueden nunca haberlo visto. Solo lo mencionamos brevemente aquí porque realmente no es tan importante. Los dos locks que discutiremos a continuación también implementan esta interfaz.
Mutex
La definición del tipo de mutex de exclusión mutua Mutex se encuentra en el archivo sync/mutex.go. Es un tipo de estructura como sigue:
type Mutex struct {
state int32
sema uint32
}Descripción de los campos:
state, representa el estado del locksema, es decir, semáforo (semaphore), se explicará más adelante
Primero hablemos sobre state:
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)state es un tipo entero de 32 bits. Los 3 bits bajos se usan para representar los tres estados anteriores. Estos tres estados no son independientes y pueden coexistir:
mutexLocked=1, bloqueadomutexWoken=2, despertadomutexStarving=4, modo hambriento
Los 29 bits altos se usan para representar cuántas goroutines están esperando el lock. Teóricamente, un mutex puede ser usado simultáneamente por 2^29+1 goroutines. Sin embargo, en la práctica es poco probable tener tantas goroutines. Incluso si cada una ocupa solo 2KB (tamaño inicial del stack), se necesitaría aproximadamente 1TB de memoria para crear tantas goroutines.
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+Hay dos modos de ejecución para el mutex: modo normal y modo hambriento. El modo normal sigue el orden FIFO (primero en entrar, primero en salir) para que las goroutines obtengan el lock según su llegada a la cola de espera bloqueada. Este es el caso general y también cuando el rendimiento es óptimo, ya que seguir el orden de acceso evita problemas.
El modo hambriento es el caso especial. "Hambriento" se refiere a que una goroutine en espera no puede obtener el lock durante mucho tiempo y permanece bloqueada. No significa que el mutex esté en estado hambriento. ¿Cuándo entra una goroutine en estado hambriento? El ejemplo oficial es: hay una goroutine que llegó primero pero se bloquea porque no puede obtener el mutex. Cuando se libera el lock y se despierta, llega otra goroutine que intenta obtener el lock (le gusta "colarse"). Como esta última está en estado de ejecución (ocupando un time slice de CPU), tiene alta probabilidad de competir exitosamente por el lock. En casos extremos, puede haber muchas goroutines así, y la goroutine despertada nunca podrá obtener el lock (siempre se cuelan). Llegó primero, pero nunca puede obtener el lock.
const (
starvationThresholdNs = 1e6
)Para evitar esta situación, Go establece un umbral de espera starvationThresholdNs. Si una goroutine no ha obtenido el lock después de más de 1ms, el mutex entra en modo hambriento. En modo hambriento, la propiedad del lock se transfiere directamente a la primera goroutine en la cola de espera. Las nuevas goroutines no intentan obtener el lock y se colocan al final de la cola. Así, en modo hambriento, la propiedad del lock pasa secuencialmente a las goroutines en la cola de espera (las que hicieron fila primero obtienen el lock, los que se cuelan van al final). Cuando una goroutine obtiene exitosamente el lock, si es la última goroutine en espera o el tiempo de espera es menor a 1ms, el mutex vuelve al modo normal. Este diseño de modo hambriento evita que algunas goroutines no puedan obtener el lock durante mucho tiempo y "mueran de hambre".
TryLock
El mutex proporciona dos métodos para adquirir el lock:
Lock(), adquiere el lock de manera bloqueanteTryLock(), adquiere el lock de manera no bloqueante
Veamos primero el código de TryLock, ya que su implementación es más simple:
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}Al inicio, verifica si el lock ya está adquirido o si está en modo hambriento (muchas goroutines esperando el lock). Si es así, la goroutine actual no puede obtener el lock. De lo contrario, intenta actualizar el estado a mutexLocked mediante una operación CAS. Si CAS devuelve false, significa que otra goroutine obtuvo el lock durante este período, y la goroutine actual no puede obtenerlo. De lo contrario, obtiene el lock exitosamente. Como se puede ver, el llamador de TryLock() es el que intenta "colarse", ya que intenta抢夺 el lock directamente sin importar si hay goroutines esperando (old puede no ser igual a 0).
Lock
A continuación está el código de Lock. También usa una operación CAS para intentar adquirir el lock directamente, pero es más "honesto": solo intenta adquirir el lock directamente cuando no hay goroutines bloqueadas esperando (old=0).
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}Si descubre que hay goroutines bloqueadas esperando, se "coloca honestamente" al final de la fila y entra en el flujo de espera lockSlow (el núcleo del mutex). Primero prepara algunas variables:
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: registra el tiempo de inicio de espera para verificar si entra en modo hambrientostarving: indica si la goroutine actual ha estado esperando más de 1ms sin obtener el lockawoke: marca si la goroutine actual ha sido despertadaiter: contador que registra el número de iteraciones de spinold: obtiene el estado actual del mutex
Luego entra en un bucle for para determinar si la goroutine actual puede entrar en estado de spin.
El spin es un mecanismo de sincronización entre hilos, también conocido como "busy-waiting" (espera ocupada). Cuando un hilo no tiene el lock, no se suspende directamente cambiando el contexto del hilo, sino que entra en un estado de giro vacío, ocupando el time slice de CPU durante este proceso. Si la competencia por el lock es baja o el tiempo de posesión del lock es corto, esto puede evitar cambios frecuentes de contexto de hilos y mejorar efectivamente el rendimiento. Sin embargo, no es una solución mágica. En Go, el abuso de spin puede tener las siguientes consecuencias peligrosas:
- Alto uso de CPU: demasiadas goroutines en spin consumen muchos recursos de CPU, especialmente cuando el lock está ocupado por mucho tiempo
- Afecta la planificación de goroutines: el número total de procesadores P es limitado. Si muchas goroutines en spin ocupan P, otras goroutines que ejecutan código de usuario no pueden ser planificadas oportunamente
- Problemas de coherencia de caché: la naturaleza de busy-waiting del spin lock hace que los hilos lean repetidamente el estado del lock en la caché de alta velocidad (cache). Si las goroutines en spin se ejecutan en diferentes núcleos y el estado del lock no se actualiza oportunamente en la memoria global, las goroutines pueden leer estados de lock inexactos, y la sincronización frecuente de coherencia de caché también puede reducir significativamente el rendimiento
Por lo tanto, no todas las goroutines pueden entrar en estado de spin. Deben pasar las siguientes verificaciones estrictas:
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}Las condiciones son:
El lock actual debe estar adquirido y no estar en modo hambriento. De lo contrario, significa que ya hay goroutines esperando por mucho tiempo, y se entra directamente en el flujo de bloqueo.
Entra en el flujo de verificación
runtime.sync_runtime_canSpin:goconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }El número de iteraciones de spin debe ser menor que
runtime.active_spin, por defecto 4. Más iteraciones desperdician recursos.El número de núcleos de CPU debe ser mayor que 1. El spin en sistemas de un solo núcleo no tiene sentido.
El
gomaxprocsactual debe ser mayor que la suma del número de P inactivos y el número de P en spin más 1, lo que indica que no hay suficientes procesadores disponibles para el spin.La cola local del P actual debe estar vacía, lo que indica que hay otras tareas de usuario para ejecutar y no se puede realizar el spin.
Si se determina que se puede hacer spin, se llama a runtime.sync_runtime_doSpin para entrar en spin. En realidad, esto ejecuta 30 instrucciones PAUSE:
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RETSi no se puede hacer spin, solo hay dos resultados posibles: adquirir exitosamente el lock o entrar en la cola de espera y bloquearse. Sin embargo, antes de eso, hay muchas cosas que manejar:
- Si no está en modo hambriento, intenta adquirir el lock:
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- Si el lock ya está ocupado o está en modo hambriento, el número de waiters incrementa en 1:
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- Si la goroutine actual ya está en estado hambriento y el lock aún está ocupado, entra en modo hambriento:
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- Si la goroutine actual fue despertada por spin, agrega la bandera
mutexWoken:
if awoke {
new &^= mutexWoken
}Luego intenta actualizar el estado del lock mediante CAS. Si la actualización falla, comienza directamente la siguiente iteración del bucle:
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}Si la actualización es exitosa, comienza la siguiente verificación:
Si el estado original no era modo hambriento y ninguna goroutine ocupa el lock, la goroutine actual puede adquirir el lock directamente y salir del flujo para continuar ejecutando el código de usuario.
goif old&(mutexLocked|mutexStarving) == 0 { break }Si falla el intento de adquirir el lock, registra el tiempo de espera, donde LIFO si es true indica que la cola es LIFO (último en entrar, primero en salir), de lo contrario es FIFO (primero en entrar, primero en salir).
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }Intenta obtener el semáforo, entra en la función
runtime.semacquire1. Si puede obtener el semáforo, retorna directamente sin bloquearse. De lo contrario, llama aruntime.goparkpara suspender la goroutine actual y esperar la liberación del semáforo.goruntime_SemacquireMutex(&m.sema, queueLifo, 1)Llegar a este paso tiene dos posibilidades:一是obtuvo exitosamente el semáforo directamente, 二是acaba de ser despertado del bloqueo y obtuvo exitosamente el semáforo. En cualquiera de los dos casos, obtuvo exitosamente el semáforo. Si está en modo hambriento, puede adquirir el lock directamente.
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }Si no está en modo hambriento, reinicia
itery comienza de nuevo el flujo de spin.goawoke = true iter = 0
Aquí termina el flujo de adquisición del lock. Todo el proceso es bastante complejo, utilizando dos métodos: espera por spin y espera por bloqueo de semáforo, equilibrando rendimiento y equidad, adecuado para la mayoría de los escenarios de competencia por locks.
Unlock
El flujo de desbloqueo es relativamente simple. Primero intenta desbloquear rápidamente. Si new es 0, significa que no hay goroutines esperando y no está en modo hambriento, es decir, el desbloqueo es exitoso y puede retornar directamente.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}De lo contrario, necesita entrar en el flujo de unlockSlow:
Primero verifica si ya se desbloqueó:
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }Si está en modo hambriento, libera directamente el semáforo para completar el desbloqueo. En modo hambriento, la goroutine que desbloquea transfiere directamente la propiedad del lock a la siguiente goroutine en espera.
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }Si no está en modo hambriento, entra en el flujo de desbloqueo normal:
Si no hay goroutines esperando, o si hay otras goroutines despertadas que ya obtuvieron el lock, o si el lock entró en modo hambriento:
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }De lo contrario, libera el semáforo para despertar la siguiente goroutine en espera, y establece el estado actual del lock como
mutexWoken:gonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
Finalmente, el flujo de desbloqueo termina.
RWMutex
La definición del tipo de lock de lectura-escritura RWMutex se encuentra en el archivo sync/rwmutex.go. Su implementación también se basa en el mutex.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}Descripción de los campos:
w, un mutex. Cuando una goroutine escritora posee este mutex, otras goroutines escritoras y lectoras se bloquean.writerSem, semáforo de escritura, usado para bloquear goroutines escritoras esperando a que las goroutines lectoras completen. La goroutine escritora adquiere el semáforo, la goroutine lectora libera el semáforo.readerSem, semáforo de lectura, usado para bloquear goroutines lectoras esperando a que las goroutines escritoras completen. La goroutine lectora adquiere el semáforo, la goroutine escritora libera el semáforo.readerCount, campo central, todo el lock de lectura-escritura depende de él para mantener el estado.readerWait, indica el número de goroutines lectoras que deben esperar cuando una goroutine escritora se bloquea.
El principio general del lock de lectura-escritura es: usar el mutex para hacer que las goroutines escritoras sean mutuamente exclusivas, y usar los dos semáforos writerSem y readerSem para hacer que lectura y escritura sean mutuamente exclusivas, y que las lecturas sean compartidas.
readerCount
Dado que readerCount cambia bastante y es muy importante, lo explicamos por separado. Sus estados se pueden resumir en las siguientes categorías:
- 0: actualmente no hay goroutines lectoras activas ni goroutines escritoras activas, en estado inactivo
-rwmutexMaxReaders: una goroutine escritora ya posee el mutex, actualmente no hay goroutines lectoras activas-rwmutexMaxReaders+N: una goroutine escritora ya posee el lock de escritura, las goroutines lectoras actuales deben bloquearse y esperar a que la goroutine escritora libere el lock de escrituraN-rwmutexMaxReaders: una goroutine escritora ya posee el mutex, debe bloquearse y esperar a que las goroutines lectoras restantes liberen el lock de lecturaN: actualmente hay N goroutines lectoras activas, es decir, se agregaronNlocks de lectura, no hay goroutines escritoras activas
Donde rwmutexMaxReaders es un valor constante, su valor es el doble del número de goroutines que pueden esperar bloqueadas en el mutex, porque la mitad son goroutines lectoras y la mitad son goroutines escritoras.
const rwmutexMaxReaders = 1 << 30Esta parte del lock de lectura-escritura es bastante compleja con readerCount. Entender sus cambios permite comprender el flujo de trabajo del lock de lectura-escritura.
TryLock
Como de costumbre, veamos primero el más simple TryLock():
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}Al inicio, intenta llamar a TryLock() del mutex. Si falla, retorna directamente. Luego usa una operación CAS para intentar actualizar el valor de readerCount de 0 a -rwmutexMaxReaders. 0 representa que no hay goroutines lectoras trabajando, -rwmutexMaxReaders indica que la goroutine escritora ya posee el mutex. Si la operación CAS falla, desbloquea el mutex y retorna false.
Lock
A continuación está Lock(), su implementación también es muy simple:
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}Primero compite con otras goroutines escritoras hasta poseer el mutex, luego realiza esta operación: primero resta atómicamente -rwmutexMaxReaders, luego suma atómicamente rwmutexMaxReaders al nuevo valor obtenido:
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReadersDesglosémoslo en dos pasos:
Esto es para notificar a otras goroutines lectoras que ahora hay una goroutine escritora intentando adquirir el lock, como se mencionó en la sección
TryLock.gorw.readerCount.Add(-rwmutexMaxReaders)Luego suma
rwmutexMaxReaderspara obtener r, que representa el número de goroutines lectoras trabajando actualmente.gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
Luego verifica si hay goroutines lectoras trabajando, y si es así, suma r al valor de readerWait. Si finalmente no es 0, indica que necesita esperar a que estas goroutines lectoras terminen su trabajo, entonces entra en el flujo runtime_SemacquireRWMutex para intentar obtener el semáforo writerSem, que es liberado por las goroutines lectoras. Si puede obtener el semáforo, indica que las goroutines lectoras han terminado su trabajo. De lo contrario, necesita entrar en la cola de bloqueo para esperar (la lógica del semáforo es básicamente consistente con la del mutex).
UnLock
Luego está UnLock(), libera el lock de escritura:
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}Su flujo es el siguiente:
Como se mencionó anteriormente, al adquirir el lock se actualiza
readerCounta un valor negativo. Aquí se sumarwmutexMaxReaders, lo que indica que ahora no hay goroutines escritoras trabajando, y el valor obtenido es el número de goroutines lectoras esperando bloqueadas.gor := rw.readerCount.Add(rwmutexMaxReaders)Si es 0 o mayor que 0, representa que el lock de escritura ya fue liberado:
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }Libera el semáforo
readerSempara despertar a las goroutines lectoras en espera:gofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }Finalmente libera el mutex para despertar a las goroutines escritoras en espera:
gorw.w.Unlock()
La liberación del lock de escritura está completa.
TryRLock
A continuación veamos la parte del lock de lectura, este es el código de TryRLock:
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}Solo hace dos cosas:
Verifica si hay una goroutine escritora trabajando. Si es así, falla la adquisición del lock:
goc := rw.readerCount.Load() if c < 0 { return false }Intenta sumar 1 a
readerCount. Si la actualización es exitosa, la adquisición del lock es exitosa:goif rw.readerCount.CompareAndSwap(c, c+1) { return true }De lo contrario, continúa el bucle de verificación hasta salir.
Como se puede ver, readerCount depende del mantenimiento en la parte del lock de escritura. Esta es también la razón por la que explicamos primero el lock de escritura, porque la parte compleja y central se mantiene en el lock de escritura.
RLock
La lógica de RLock es aún más simple:
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}Intenta sumar 1 al valor de readerCount. Si el nuevo valor aún es menor que 0, significa que la goroutine escritora está trabajando, entonces entra en el flujo de bloqueo del semáforo readerSem, y la goroutine actual entra en la cola de bloqueo para esperar.
RUnLock
RUnLock también es simple y fácil de entender:
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}Primero intenta restar 1 a readerCount, indicando que el número de goroutines lectoras activas disminuye en 1. Si el valor obtenido es mayor que 0, indica que puede liberar directamente, porque ahora ninguna goroutine escritora posee el mutex. Si es menor que 0, indica que una goroutine escritora ya posee el mutex y está esperando a que todas las goroutines lectoras actuales completen su trabajo. Luego entra en el flujo runlockSlow:
Si el valor original de
readerCountes 0 (el lock está inactivo) o-rwmutexMaxReaders(la goroutine escritora no tiene goroutines lectoras que esperar, es decir, todos los locks de lectura fueron liberados), indica que actualmente no hay goroutines lectoras activas y no se necesita desbloqueo:goif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }Si hay goroutines lectoras activas, resta 1 a
readerWait. Si la goroutine lectora actual es la última goroutine lectora activa, libera el semáforowriterSempara despertar a la goroutine escritora en espera:goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
El flujo de liberación del lock de lectura termina.
Semaphore
El semáforo dentro del mutex es simplemente un tipo entero uint32, que representa la adquisición y liberación del semáforo mediante resta y suma atómicas de 1. La estructura responsable de mantener los semáforos en tiempo de ejecución es runtime.semaRoot, cuya definición de tipo se encuentra en el archivo runtime/sema.go. semaRoot usa un árbol balanceado (treap) para organizar y gestionar semáforos. Cada nodo en el árbol representa un semáforo, y el tipo de nodo es *sudog. Es una lista enlazada doble que mantiene la cola de espera del semáforo correspondiente. El nodo mantiene la unicidad a través de *sudog.elem (dirección del semáforo) y garantiza el equilibrio del árbol a través del campo *sudog.ticket.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}El árbol de semáforos semaRoot depende de un mutex de nivel inferior runtime.mutex para garantizar su seguridad de concurrencia.
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot se almacena en un semaTable global en tiempo de ejecución. Parece un array de longitud fija usado para almacenar múltiples conjuntos de raíces de árboles de semáforos, pero en realidad, desde la perspectiva de su modo de operación, es una tabla hash. Cada elemento en la tabla contiene un semaRoot y algunos bytes de relleno (pad) para alinear la memoria y evitar competencia de líneas de caché. semTabSize es una constante de tamaño de tabla de semáforos que especifica que la longitud de la tabla es 251. Generalmente se elige un número primo para reducir conflictos hash y mejorar la eficiencia de dispersión.
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}El método rootFor es equivalente a una función hash. Recibe un puntero addr de tipo uint32 (es decir, la dirección del semáforo) y retorna el puntero a la estructura semaRoot correspondiente a esa dirección. Esta línea de código primero convierte addr a un uintptr, luego lo desplaza 3 bits a la derecha, equivalente a dividir por 8 (porque un byte ocupa 8 bits, dividir la dirección del puntero por 8 puede mapearla como un índice de array). Al tomar el módulo con semTabSize, asegura que el índice esté dentro del rango de tamaño de la tabla de semáforos. Después de obtener el semaRoot por índice, busca en el árbol balanceado la cola de espera *sudog correspondiente al semáforo.
Acquire
Adquirir el semáforo, la implementación correspondiente es la función runtime.semacquire1:
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)Recibe los siguientes parámetros:
addr, dirección del semáforolifo, afecta el orden de salida de la cola del árbol balanceado. Por defecto es FIFO, LIFO es último en entrar primero en salir. Cuando el tiempo de espera del lock de la goroutine no es 0 (al menos se ha bloqueado una vez), estrueprofile, bandera para análisis de rendimiento del lockskipframes, número de frames de stack a saltarreason, razón del bloqueo
A continuación se describe brevemente todo el flujo de adquisición del semáforo:
Verifica el estado de la goroutine. Si la goroutine actual no es la goroutine que está siendo planificada, lanza una excepción:
gogp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }Verifica si puede adquirir el semáforo e intenta adquirirlo mediante un método no bloqueante. Si puede adquirirlo, puede retornar directamente:
gofor { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }Si no puede adquirirlo de manera no bloqueante, entra en un bucle para adquirir el semáforo mediante métodos normales. Primero obtiene un
*sudogde la caché a través deacquireSudog(), que representa una goroutine en espera bloqueada:s := acquireSudog()Luego obtiene el árbol de semáforos de la tabla global:
goroot := semtable.rootFor(addr)Entra en un bucle, adquiere el lock del árbol de semáforos, verifica nuevamente si puede adquirir el semáforo. Si no puede, lo agrega al árbol de semáforos y luego llama a
goparkpara suspenderlo y esperar hasta que sea despertado. Continúa repitiendo este proceso hasta obtener el semáforo:gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }Finalmente, cuando es despertado, libera
*sudogy lo devuelve a la caché:goreleaseSudog(s)
Release
Liberar el semáforo, despertar la goroutine en espera bloqueada. Esta funcionalidad es implementada por la función runtime.semrelease1:
func semrelease1(addr *uint32, handoff bool, skipframes int)Recibe los siguientes parámetros:
addr, dirección del semáforohandoff, indica si cambia directamente la G que el P actual está planificando a la G despertada. Solo estrueen modo hambrientoskipframes, número de frames de stack a saltar
A continuación se describe brevemente todo el proceso de liberación:
Obtiene el árbol de semáforos, luego incrementa el semáforo en 1, indicando que libera un semáforo:
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)Si el número de goroutines en espera es 0, retorna directamente:
goif root.nwait.Load() == 0 { return }Adquiere el lock del árbol de semáforos, verifica secundariamente si hay goroutines en espera:
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }Obtiene una goroutine en espera bloqueada del árbol de semáforos,
nwaitdecrementa en 1, luego libera el lock del semáforo:gos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)Verifica si puede adquirir el semáforo:
goif handoff && cansemacquire(addr) { s.ticket = 1 }La función
readyWithTimecambia directamente la G despertada como la siguiente G que el P ejecutará, es decir, modifica*p.runnext=g:goreadyWithTime(s, 5+skipframes)Si
handoffestrue, entoncesgoyielddesvincula la goroutine G que libera el semáforo del M actual, y la vuelve a agregar al final de la cola de ejecución local del P, luego comienza un nuevo ciclo de planificación para permitir que la goroutine G despertada sea planificada inmediatamente:goif s.ticket == 1 && getg().m.locks == 0 { goyield() }
El flujo de adquisición y liberación del semáforo es este. En Go, los semáforos se usan no solo en mutex, pero se colocan aquí porque la relación entre semáforos y mutex es la mayor. Incluso los comentarios oficiales lo indican:
// Asynchronous semaphore for sync.Mutex.Después de entender los semáforos, volver a ver el mutex es más claro.
TIP
Sobre el árbol de semáforos semaRoot, su implementación de entrada y salida de la cola involucra operaciones de auto-balanceo que son bastante complicadas. Profundizar en esto no tiene relación con el tema de este artículo y no tiene significado, así que se omitió. Los interesados pueden explorar el código fuente por su cuenta.
Resumen
El mutex de exclusión mutua sync.Mutex implementa la espera de goroutines mediante dos mecanismos: spin y semáforo. El spin es no bloqueante, pero necesita ser estrictamente limitado porque consume recursos de CPU; mientras que el semáforo es bloqueante y puede evitar efectivamente el desperdicio innecesario de recursos. Para implementar un mecanismo de competencia más justo, Go distingue entre modo normal y modo hambriento para garantizar que las goroutines puedan competir por el lock de manera más equilibrada. En comparación con locks de bajo nivel como runtime.mutex, sync.Mutex, como lock orientado al usuario, considera más escenarios de uso prácticos durante su diseño.
El lock de lectura-escritura sync.RWMutex implementa la exclusión mutua entre escritoras mediante el mutex sync.Mutex, y sobre esta base agrega dos semáforos adicionales para implementar la exclusión mutua entre lectura y escritura, y el compartir entre lecturas, soportando así múltiples escenarios de concurrencia.
Aunque la implementación de locks parece compleja, una vez que se entiende el principio de Mutex, aprender otras herramientas de sincronización en la biblioteca estándar sync se vuelve mucho más fácil.
