Skip to content

sync.Mutex Mutex Eksklusif

Lock adalah salah satu primitif sinkronisasi penting dalam sistem operasi, bahasa Go menyediakan dua implementasi di pustaka standarnya: mutex eksklusif dan read-write lock, masing-masing sesuai dengan

  • sync.Mutex, mutex eksklusif, read-read eksklusif, read-write eksklusif, write-write eksklusif
  • sync.RWMutex, read-write lock, read-read berbagi, read-write eksklusif, write-write eksklusif

Skenario penggunaan bisnisnya sangat umum, digunakan untuk melindungi akses dan modifikasi berurutan terhadap memori bersama dalam situasi konkuren, seperti contoh berikut

go
import (
	"fmt"
	"sync"
)

func main() {
	var i int
	var wg sync.WaitGroup
	var mu sync.Mutex

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mu.Lock()
			viewI := i
			mu.Unlock()

			viewI++

			mu.Lock()
			i = viewI
			mu.Unlock()
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

Tanpa perlindungan lock, hasil output setiap eksekusi fungsi ini mungkin berbeda dan tidak dapat diprediksi, jelas dalam sebagian besar skenario kita tidak ingin这样的情况 terjadi. Kasus ini sangat sederhana bagi kebanyakan orang, mungkin Anda sudah mahir menggunakan lock, tetapi belum tentu memahami bagaimana implementasi internal lock Go, kodenya sendiri tidak terlalu kompleks, artikel ini akan menjelaskannya secara detail.

Locker

Sebelum memulai, mari lihat tipe sync.Locker, ini adalah sekumpulan antarmuka yang didefinisikan Go

go
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

Metode yang disediakannya sangat sederhana dan mudah dipahami, yaitu lock dan unlock, namun karena karakteristik implementasi antarmuka Go lebih baik daripada konvensi, kebanyakan orang mungkin tidak pernah melihatnya, ini hanya disebutkan sekilas karena memang tidak terlalu penting, dua lock yang akan dibahas selanjutnya juga mengimplementasikan antarmuka ini.

Mutex

Definisi tipe mutex eksklusif Mutex terletak di file sync/mutex.go, ini adalah tipe struktur, sebagai berikut

go
type Mutex struct {
	state int32
	sema  uint32
}

Penjelasan field adalah sebagai berikut:

  • state, field menunjukkan status lock
  • sema, yaitu semaphore, penjelasan terkait akan dibahas nanti

Mari bahas state ini terlebih dahulu

go
const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
)

state adalah tipe integer 32-bit, 3 bit rendah digunakan untuk menunjukkan tiga status di atas, total ada tiga status, ketiga status ini tidak independen, mereka dapat hidup berdampingan.

  • mutexLocked=1, terkunci
  • mutexWoken=2, dibangunkan
  • mutexStarving=4, mode kelaparan

29 bit tinggi digunakan untuk menunjukkan berapa banyak goroutine yang menunggu lock, jadi secara teoritis sebuah mutex dapat digunakan secara bersamaan oleh maksimal 2^29+1 goroutine, namun dalam realitas kecil kemungkinan ada begitu banyak goroutine, bahkan jika masing-masing hanya menempati 2KB (ukuran stack awal), ruang memori yang dibutuhkan untuk membuat begitu banyak goroutine juga sekitar 1TB.

+-----------------------------------+---------------+------------+-------------+
|              waiter               | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
|              29 bits              | 1 bit         | 1 bit      | 1 bit       |
+-----------------------------------+---------------+------------+-------------+

Mutex memiliki dua mode operasi, pertama mode normal, kedua mode kelaparan. Mode normal adalah memegang lock sesuai urutan kedatangan goroutine dalam antrian阻塞等待, yaitu FIFO, ini adalah kasus umum, juga saat performa terbaik, karena semua memegang lock sesuai urutan akses tidak akan ada masalah. Mode kelaparan adalah kasus tidak umum, kelaparan di sini mengacu pada goroutine yang menunggu tidak dapat memegang lock untuk waktu yang lama dan selalu dalam status阻塞, bukan berarti mutex dalam status kelaparan, lalu kapan goroutine dalam status kelaparan? Contoh resmi diberikan, ada goroutine yang datang lebih dulu, karena tidak dapat memegang mutex dan terhambat, kemudian karena lock dilepaskan dan dibangunkan, saat ini datang goroutine lain yang baru berjalan ke sini dan mencoba memegang lock (suka memotong antrean), karena yang terakhir dalam status berjalan (sedang menempati slice waktu CPU), kemungkinan yang terakhir berhasil mendapatkan lock sangat tinggi, dan dalam kasus ekstrem mungkin ada banyak goroutine seperti ini, maka goroutine yang baru dibangunkan akan selalu tidak dapat memegang lock (terus-menerus dipotong antreannya), jelas dia yang datang lebih dulu, tetapi tidak pernah bisa mendapatkan lock.

go
const (
	starvationThresholdNs = 1e6
)

Untuk menghindari这种情况, Go mengatur threshold等待starvationThresholdNs, jika ada goroutine yang tidak memegang lock lebih dari 1ms, mutex akan masuk ke mode kelaparan. Dalam mode kelaparan, kepemilikan mutex akan langsung diserahkan ke goroutine paling depan dalam antrian等待, goroutine baru tidak akan mencoba memegang lock, dan masuk ke ekor antrian untuk menunggu. Beginilah, dalam mode kelaparan kepemilikan mutex akan dipegang satu per satu oleh goroutine dalam antrian等待 (biarkan yang antre dapat lock dulu, yang memotong antrean pergi ke belakang), ketika goroutine berhasil memegang lock, jika dia adalah goroutine等待 terakhir atau waktu等待 kurang dari 1ms, akan mengubah mutex kembali ke mode normal. Desain mode kelaparan ini menghindari情况 beberapa goroutine tidak dapat memegang lock untuk waktu yang lama dan "kelaparan sampai mati".

TryLock

Mutex menyediakan dua metode untuk melakukan lock:

  • Lock(), mendapatkan lock dengan cara阻塞
  • TryLock(), mendapatkan lock dengan cara non-blokir

Mari lihat kode TryLock terlebih dahulu, karena implementasinya lebih sederhana

go
func (m *Mutex) TryLock() bool {
	old := m.state
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}

	return true
}

Saat dimulai akan melakukan pemeriksaan, jika lock sudah dipegang, atau dalam status kelaparan (yaitu banyak goroutine sedang menunggu lock), maka goroutine saat ini tidak dapat mendapatkan lock. Jika tidak, akan mencoba memperbarui status menjadi mutexLocked melalui operasi CAS, jika operasi CAS mengembalikan false, berarti selama periode ini ada goroutine lain yang berhasil mendapatkan lock, maka goroutine saat ini tidak dapat mendapatkan lock, jika tidak berhasil mendapatkan lock. Dari kode di sini dapat dilihat, pemanggil TryLock() adalah yang mencoba memotong antrean, karena tidak peduli apakah ada goroutine yang menunggu, langsung merebut lock (old mungkin tidak sama dengan 0).

Lock

Berikut adalah kode Lock, juga akan menggunakan operasi CAS untuk mencoba langsung memegang lock, hanya saja lebih "jujur", hanya akan langsung memegang lock ketika tidak ada goroutine yang阻塞等待 (old=0).

go
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
       return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

Jika menemukan ada goroutine yang sedang阻塞等待, maka akan "jujur" antre di belakang, masuk ke流程自旋lockslow menunggu lock (inti mutex). Pertama akan mempersiapkan beberapa variabel

go
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
  • waitStartTime: digunakan untuk mencatat waktu mulai等待, memeriksa apakah masuk mode kelaparan.
  • starving: menunjukkan apakah goroutine saat ini sudah lebih dari 1ms tidak mendapatkan lock.
  • awoke: menandai apakah goroutine saat ini sudah dibangunkan.
  • iter: counter, mencatat jumlah putaran自旋.
  • old: mendapatkan status mutex saat ini

Kemudian masuk ke loop for,判断 apakah goroutine saat ini dapat masuk status自旋.

自旋 adalah mekanisme sinkronisasi antar multi-thread, disebut juga busy-waiting, thread tidak langsung suspend dan切换 thread context ketika tidak memegang lock tetapi masuk ke idle, proses ini terus menempati slice waktu CPU, jika dalam skenario lock competition tidak besar atau waktu memegang lock sangat singkat, melakukan hal ini memang dapat menghindari频繁切换 thread context, dapat secara efektif meningkatkan performa, namun bukan solusi万能, dalam bahasa Go penyalahgunaan自旋 dapat menyebabkan konsekuensi berbahaya berikut:

  • Penggunaan CPU terlalu tinggi: terlalu banyak goroutine自旋 akan mengonsumsi banyak sumber daya CPU, terutama ketika lock ditempati untuk waktu yang lama
  • Mempengaruhi penjadwalan goroutine: jumlah total processor P terbatas, jika ada banyak goroutine自旋 yang menempati P, maka goroutine lain yang menjalankan kode pengguna tidak dapat dijadwalkan tepat waktu
  • Masalah konsistensi cache: karakteristik busy-wait spin lock akan menyebabkan thread berulang kali membaca status lock di cache (cache), jika goroutine自旋 berjalan di core yang berbeda, dan status lock tidak diperbarui tepat waktu ke memori global, menyebabkan status lock yang dibaca goroutine tidak akurat, dan sinkronisasi konsistensi cache yang频繁 juga akan secara signifikan mengurangi performa,

Jadi tidak semua goroutine dapat masuk status自旋, harus melalui判断 ketat berikut

go
for {
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    ...
}

Kondisi adalah sebagai berikut:

  1. Lock saat ini sudah dipegang dan tidak boleh dalam status kelaparan, jika tidak berarti sudah ada goroutine yang lama tidak mendapatkan lock, saat ini langsung masuk流程阻塞.

  2. Masuk流程判断runtime.sync_runtime_canSpin

    go
     const (
    	active_spin = 4
    )
    func sync_runtime_canSpin(i int) bool {
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
    		return false
    	}
    	if p := getg().m.p.ptr(); !runqempty(p) {
    		return false
    	}
    	return true
    }
  3. Jumlah自旋 kurang dari runtime.active_spin, default 4 kali, terlalu banyak akan membuang sumber daya.

  4. Jumlah core CPU lebih besar dari 1, sistem single core自旋 tidak ada artinya.

  5. gomaxprocs saat ini lebih besar dari jumlah P idle dan P yang sedang自旋 ditambah 1, yaitu menunjukkan tidak ada prosesor tersedia yang cukup untuk自旋

  6. Antrian lokal P saat ini harus kosong, jika tidak berarti ada tugas pengguna lain yang akan dieksekusi, tidak dapat melakukan自旋

Jika判断 dapat自旋, akan memanggil runtime.sync_runtime_doSpin untuk masuk自旋, sebenarnya hanya menjalankan instruksi PAUSE 30 kali.

go
const (
	active_spin_cnt = 30
)

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
asm
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

Jika tidak dapat自旋, hanya akan ada dua hasil: berhasil mendapatkan lock dan masuk antrian等待陷入阻塞, namun sebelum itu masih banyak hal yang harus ditangani:

  1. Jika bukan mode kelaparan, coba dapatkan lock
go
new := old
if old&mutexStarving == 0 {
	new |= mutexLocked
}
  1. Jika lock sudah ditempati atau sekarang mode kelaparan, maka jumlah goroutine等待+1
go
if old&(mutexLocked|mutexStarving) != 0 {
	new += 1 << mutexWaiterShift
}
  1. Jika goroutine saat ini sudah dalam status kelaparan, dan lock masih ditempati, maka masuk mode kelaparan
go
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}
  1. Jika goroutine saat ini自旋 dibangunkan, maka tambahkan flag mutexWoken
go
if awoke {
	new &^= mutexWoken
}

Kemudian mulai mencoba memperbarui status lock melalui CAS, jika pembaruan gagal langsung mulai loop berikutnya

go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    ...
}else {
    ...
}

Jika pembaruan berhasil, mulai判断 berikut.

  1. Status asli bukan mode kelaparan, dan tidak ada goroutine yang menempati lock, maka goroutine saat ini dapat langsung memegang lock, keluar dari流程,继续执行 kode pengguna.

    go
    if old&(mutexLocked|mutexStarving) == 0 {
    		break
    }
  2. Coba memegang lock gagal, catat waktu等待,其中 LIFO jika true, menunjukkan antrian后进先出, jika tidak adalah FIFO先进先出.

    go
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
    	waitStartTime = runtime_nanotime()
    }
  3. Coba mendapatkan semaphore, masuk fungsi runtime.semacquire1, jika bisa mendapatkan semaphore langsung return tidak akan阻塞, jika tidak akan memanggil runtime.gopark suspend goroutine saat ini menunggu pelepasan semaphore.

    go
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  4. Sampai langkah ini ada dua kemungkinan, pertama langsung berhasil mendapatkan semaphore, kedua阻塞 baru saja dibangunkan berhasil mendapatkan semaphore, tidak peduli yang mana berhasil mendapatkan semaphore, jika sekarang mode kelaparan, dapat langsung mendapatkan lock.

    go
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
    	delta := int32(mutexLocked - 1<<mutexWaiterShift)
    	if !starving || old>>mutexWaiterShift == 1 {
    		delta -= mutexStarving
    	}
    	atomic.AddInt32(&m.state, delta)
    	break
    }
  5. Jika bukan mode kelaparan, reset iter, mulai kembali流程自旋.

    go
    awoke = true
    iter = 0

Sampai di sini,流程 lock selesai, seluruh proses cukup kompleks,过程中 menggunakan dua cara自旋等待 dan semaphore阻塞等待, menyeimbangkan performa dan fairness, cocok untuk sebagian besar skenario lock competition.

Unlock

流程 unlock relatif lebih sederhana, pertama akan mencoba unlock cepat, jika new adalah 0 berarti sekarang tidak ada goroutine等待, dan bukan mode kelaparan, yaitu unlock berhasil, dapat langsung return.

go
func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

Jika tidak perlu masuk流程unlockslow

  1. Pertama判断 apakah sudah unlock

    go
    if (new+mutexLocked)&mutexLocked == 0 {
    	fatal("sync: unlock of unlocked mutex")
    }
  2. Jika mode kelaparan, langsung lepaskan semaphore, selesai unlock. Dalam mode kelaparan, goroutine yang unlock saat ini akan langsung menyerahkan kepemilikan lock ke goroutine等待 berikutnya.

    go
    if new&mutexStarving == 0 {
    	...
    } else {
    	runtime_Semrelease(&m.sema, true, 1)
    }
  3. Bukan mode kelaparan, masuk流程 unlock normal

    1. Jika tidak ada goroutine yang sedang menunggu, atau ada goroutine lain yang dibangunkan sudah mendapatkan lock, atau lock masuk mode kelaparan

      go
      if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
          return
      }
    2. Jika tidak, lepaskan semaphore untuk membangunkan goroutine等待 berikutnya, set status lock saat ini menjadi mutexWoken

      go
      new = (old - 1<<mutexWaiterShift) | mutexWoken
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
          runtime_Semrelease(&m.sema, false, 1)
          return
      }
      old = m.state

Terakhir,流程 unlock selesai.

RWMutex

Definisi tipe read-write mutex RWMutex terletak di file sync/rwmutex.go, implementasinya juga berbasis mutex eksklusif.

go
type RWMutex struct {
	w           Mutex        // held if there are pending writers
	writerSem   uint32       // semaphore for writers to wait for completing readers
	readerSem   uint32       // semaphore for readers to wait for completing writers
	readerCount atomic.Int32 // number of pending readers
	readerWait  atomic.Int32 // number of departing readers
}

Berikut penjelasan masing-masing field

  • w, sebuah mutex eksklusif, ketika goroutine writer memegang mutex ini, goroutine writer lain dan goroutine reader akan terhambat.
  • writerSem, semaphore writer, digunakan untuk阻塞 goroutine writer menunggu goroutine reader, goroutine writer mendapatkan semaphore, goroutine reader melepaskan semaphore.
  • readerSem, semaphore reader, digunakan untuk阻塞 goroutine reader menunggu goroutine writer, goroutine reader mendapatkan semaphore, goroutine writer melepaskan semaphore.
  • readerCount, field inti, seluruh read-write lock bergantung padanya untuk memelihara status.
  • readerWait, menunjukkan jumlah goroutine reader yang perlu ditunggu ketika goroutine writer terhambat

Prinsip umum read-write lock adalah, melalui mutex eksklusif untuk membuat goroutine writer saling eksklusif, melalui dua semaphore writerSem dan readerSem untuk membuat read-write eksklusif, read-read berbagi.

readerCount

Karena readerCount ini cukup banyak perubahan, dan sangat penting, jadi diambil terpisah untuk dijelaskan, kira-kira dirangkum menjadi beberapa status berikut

  1. 0, sekarang tidak ada goroutine reader aktif maupun goroutine writer aktif, dalam status idle
  2. -rwmutexMaxReaders, sebuah goroutine writer sudah memegang mutex eksklusif, sekarang tidak ada goroutine reader aktif
  3. -rwmutexMaxReaders+N, sebuah goroutine writer sudah memegang write lock, goroutine reader saat ini perlu阻塞 menunggu goroutine writer melepaskan write lock
  4. N-rwmutexMaxReaders, sebuah goroutine writer sudah memegang mutex eksklusif, perlu阻塞 menunggu sisa goroutine reader melepaskan read lock
  5. N, sekarang ada N goroutine reader aktif, yaitu menambahkan N read lock, tidak ada goroutine writer aktif

其中 rwmutexMaxReaders adalah nilai konstanta, nilainya adalah 2 kali jumlah goroutine阻塞等待 yang dapat dimiliki mutex eksklusif, karena setengah adalah goroutine reader, setengah adalah goroutine writer.

go
const rwmutexMaxReaders = 1 << 30

Seluruh bagian read-write lock ini readerCount cukup kompleks, memahami perubahannya berarti memahami alur kerja read-write lock.

TryLock

Seperti biasa, mari lihat yang paling sederhana TryLock() terlebih dahulu

go
func (rw *RWMutex) TryLock() bool {
	if !rw.w.TryLock() {
		return false
	}
	if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
		rw.w.Unlock()
		return false
	}
	return true
}

Saat dimulai, akan mencoba memanggil TryLock() mutex eksklusif, jika gagal langsung return. Kemudian menggunakan operasi CAS untuk mencoba memperbarui nilai readerCount dari 0 menjadi -rwmutexMaxReaders. 0 mewakili tidak ada goroutine reader yang sedang bekerja, -rwmutexMaxReaders menunjukkan sekarang goroutine writer sudah memegang mutex eksklusif. Jika operasi CAS gagal akan unlock mutex eksklusif, jika berhasil return true.

Lock

Selanjutnya adalah Lock(), implementasinya juga sangat sederhana.

go
func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
}

Pertama akan bersaing dengan goroutine writer lain hingga memegang mutex eksklusif, kemudian melakukan operasi ini, pertama secara atomik mengurangi -rwmutexMaxReaders, kemudian secara non-atomik menambahkan rwmutexMaxReaders ke nilai baru yang didapat

go
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

Dipecah menjadi dua langkah untuk dilihat

  1. Ini untuk memberi tahu goroutine reader lain sekarang ada goroutine writer yang mencoba memegang lock, sudah dibahas di bagian TryLock.

    go
    rw.readerCount.Add(-rwmutexMaxReaders)
  2. Kemudian menambahkan rwmutexMaxReaders mendapatkan r, r ini mewakili jumlah goroutine reader yang sedang bekerja sekarang.

    go
    r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

Kemudian判断 apakah ada goroutine reader yang sedang bekerja, kemudian menambahkan readerWait dengan nilai r, jika akhirnya masih tidak 0 berarti perlu menunggu goroutine reader ini selesai bekerja, maka masuk流程runtime_SemacquireRWMutex mencoba mendapatkan semaphore writerSem, semaphore ini dilepaskan oleh goroutine reader, jika bisa mendapatkan semaphore berarti goroutine reader sudah selesai bekerja, jika tidak perlu masuk antrian阻塞等待 (logika semaphore bagian ini pada dasarnya sama dengan mutex eksklusif).

UnLock

Kemudian adalah UnLock(), melepaskan write lock.

go
func (rw *RWMutex) Unlock() {
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
       fatal("sync: Unlock of unlocked RWMutex")
    }

    for i := 0; i < int(r); i++ {
       runtime_Semrelease(&rw.readerSem, false, 0)
    }

    rw.w.Unlock()
}

Alurnya adalah sebagai berikut

  1. Sudah disebutkan sebelumnya saat lock akan memperbarui readerCount menjadi nilai negatif, di sini menambahkan rwmutexMaxReaders, berarti sekarang tidak ada goroutine writer yang sedang bekerja, kemudian nilai yang didapat adalah jumlah goroutine reader yang sedang阻塞等待.

    go
    r := rw.readerCount.Add(rwmutexMaxReaders)
  2. Jika nilainya sendiri 0 atau lebih besar dari 0, mewakili write lock sudah dilepaskan

    go
    if r >= rwmutexMaxReaders {
    	fatal("sync: Unlock of unlocked RWMutex")
    }
  3. Lepaskan semaphore readerSem, bangunkan goroutine reader yang menunggu

    go
    for i := 0; i < int(r); i++ {
    	runtime_Semrelease(&rw.readerSem, false, 0)
    }
  4. Terakhir lepaskan mutex eksklusif, bangunkan goroutine writer yang menunggu.

    go
    rw.w.Unlock()

Melepaskan write lock selesai.

TryRLock

Selanjutnya lihat bagian read lock, ini adalah kode TryRLock

go
func (rw *RWMutex) TryRLock() bool {
	for {
		c := rw.readerCount.Load()
		if c < 0 {
			return false
		}
		if rw.readerCount.CompareAndSwap(c, c+1) {
			return true
		}
	}
}

Hanya melakukan dua hal

1.判断 apakah ada goroutine writer yang sedang bekerja, jika ya maka lock gagal.

go
c := rw.readerCount.Load()
if c < 0 {
	return false
}
  1. Mencoba menambahkan readerCount dengan 1, jika pembaruan berhasil maka lock berhasil

    go
    if rw.readerCount.CompareAndSwap(c, c+1) {
    	return true
    }
  2. Jika tidak lanjutkan loop判断 hingga keluar

Dapat dilihat di sini bergantung pada readerCount semuanya dipelihara di bagian write lock, ini juga alasan mengapa harus menjelaskan write lock terlebih dahulu, karena bagian kompleks dan inti semuanya dipelihara di bagian write lock.

RLock

Logika RLock lebih sederhana

go
func (rw *RWMutex) RLock() {
	if rw.readerCount.Add(1) < 0 {
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
}

Akan mencoba menambahkan nilai readerCount dengan 1, jika nilai baru yang didapat masih kurang dari 0, berarti goroutine writer sedang bekerja, maka masuk流程阻塞 semaphore readerSem, goroutine saat ini akan masuk antrian阻塞等待.

RUnLock

RUnLock juga sama mudah dipahami

go
func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
       rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	if rw.readerWait.Add(-1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

Pertama akan mencoba mengurangi readerCount dengan satu, menunjukkan jumlah goroutine reader aktif berkurang satu, jika nilai yang didapat lebih besar dari 0 berarti dapat langsung dilepaskan, karena sekarang tidak ada goroutine writer yang memegang mutex eksklusif, kurang dari 0 berarti ada goroutine writer yang sudah memegang mutex eksklusif, sedang menunggu semua goroutine reader saat ini selesai bekerja. Selanjutnya masuk流程runlockSlow

  1. Jika nilai readerCount asli adalah 0 (lock idle) atau -rwmutexMaxReaders (goroutine writer tidak memiliki goroutine reader yang perlu ditunggu, yaitu read lock sudah semua dilepaskan), berarti sekarang tidak ada goroutine reader aktif, tidak perlu unlock

    go
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    	fatal("sync: RUnlock of unlocked RWMutex")
    }
  2. Jika ada goroutine reader aktif, maka kurangi readerWait dengan satu, jika goroutine reader saat ini adalah goroutine reader aktif terakhir, maka lepaskan semaphore writerSem, bangunkan goroutine writer yang menunggu.

    go
    if rw.readerWait.Add(-1) == 0 {
    	runtime_Semrelease(&rw.writerSem, false, 1)
    }

流程 melepaskan read lock selesai.

Semaphore

Semaphore di dalam mutex eksklusif hanyalah integer uint32 biasa, melalui pengurangan atomik dengan satu dan penambahan dengan satu untuk menunjukkan mendapatkan dan melepaskan semaphore, di runtime struktur yang负责 memelihara semaphore adalah runtime.semaRoot, definisi tipenya terletak di file runtime/sema.go. semaRoot menggunakan balanced binary tree (treap) untuk mengorganisir dan mengelola semaphore, setiap node di tree mewakili sebuah semaphore, tipe node adalah *sudog, ini adalah doubly linked list, memelihara antrian等待 semaphore yang sesuai, node menjaga keunikan melalui *sudog.elem (alamat semaphore), dan menjaga keseimbangan tree melalui field *sudog.ticket.

go
type semaRoot struct {
	lock  mutex
	treap *sudog        // root of balanced tree of unique waiters.
	nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}

Tree semaphore semaRoot bergantung pada mutex eksklusif level lebih rendah runtime.mutex untuk menjamin keamanan konkurensinya.

go
var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
	root semaRoot
    // 用于内存对齐,提高性能
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

semaRoot disimpan dalam semaTable global saat runtime, terlihat seperti array panjang tetap, digunakan untuk menyimpan beberapa root node tree semaphore, tetapi sebenarnya dari cara kerjanya, ini adalah hash table. Setiap elemen dalam tabel berisi sebuah semaRoot dan beberapa byte padding (pad), digunakan untuk alignment memori dan menghindari cache line contention. semTabSize adalah konstanta ukuran tabel semaphore, menentukan panjang tabel adalah 251, biasanya memilih bilangan prima, dapat mengurangi hash collision, meningkatkan efisiensi hashing.

go
func (t *semTable) rootFor(addr *uint32) *semaRoot {
	return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

Metode rootFor setara dengan fungsi hash, menerima pointer uint32 addr (yaitu alamat semaphore), mengembalikan pointer struktur semaRoot yang sesuai dengan alamat tersebut. Baris kode ini pertama mengonversi addr menjadi uintptr, kemudian shift right 3 bit, setara dengan membagi 8 (karena satu byte menempati 8 bit, alamat pointer dibagi 8 dapat memetakannya menjadi indeks array), melalui modulo semTabSize, memastikan indeks dalam rentang ukuran tabel semaphore, melalui indeks mendapatkan semaRoot后, kemudian mencari *sudog等待 antrian yang sesuai dengan semaphore di balanced tree.

Acquire

Mendapatkan semaphore, implementasi yang sesuai adalah fungsi runtime.semacquire1,

go
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)

Menerima beberapa parameter berikut:

  • addr, alamat semaphore
  • lifo, mempengaruhi urutan dequeue balanced tree, default adalah FIFO, LIFO yaitu后进先出, ketika waktu等待 lock goroutine tidak 0 (setidaknya sudah阻塞 sekali), adalah true
  • profile, flag untuk analisis performa lock
  • skipframes, jumlah stack frame yang dilewati
  • reason, alasan阻塞

Berikut akan disingkat seluruh流程 mendapatkan semaphore:

1.判断 status goroutine, jika goroutine saat ini bukan goroutine yang sedang dijadwalkan, langsung throw exception

go
gp := getg()
if gp != gp.m.curg {
	throw("semacquire not on the G stack")
}

2.判断 apakah bisa mendapatkan semaphore, dan mencoba mendapatkan semaphore dengan cara non-blokir, jika bisa mendapatkan dapat langsung return.

go
for {
	v := atomic.Load(addr)
	if v == 0 {
		return false
	}
	if atomic.Cas(addr, v, v-1) {
		return true
	}
}
  1. Jika tidak bisa mendapatkan dengan non-blokir, akan masuk loop melalui cara normal untuk mendapatkan semaphore, pertama melalui acquireSudog() mendapatkan *sudog dari cache, struktur ini mewakili goroutine阻塞等待

    s := acquireSudog()
  2. Kemudian mendapatkan tree semaphore dari tabel global

    go
    root := semtable.rootFor(addr)
  3. Masuk loop, lock tree semaphore,判断 lagi apakah bisa mendapatkan semaphore, jika tidak akan menambahkannya ke tree semaphore, kemudian memanggil gopark suspend menunggu, hingga dibangunkan继续 mengulangi流程 ini, terus loop hingga mendapatkan semaphore.

    go
    for {
    	lockWithRank(&root.lock, lockRankRoot)
    	root.nwait.Add(1)
    	if cansemacquire(addr) {
    		root.nwait.Add(-1)
    		unlock(&root.lock)
    		break
    	}
    	root.queue(addr, s, lifo)
    	goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
    	if s.ticket != 0 || cansemacquire(addr) {
    		break
    	}
    }
  4. Terakhir saat dibangunkan akan melepaskan *sudog, mengembalikannya ke cache.

    go
    releaseSudog(s)

Release

Melepaskan semaphore, membangunkan goroutine yang阻塞等待, fungsi ini diimplementasikan oleh fungsi runtime.semrelease1

go
func semrelease1(addr *uint32, handoff bool, skipframes int)

Menerima parameter berikut

  • addr, alamat semaphore
  • handoff, menunjukkan apakah akan langsung切换 G yang sedang dijadwalkan P saat ini menjadi G yang dibangunkan, hanya true saat mode kelaparan
  • skipframes, jumlah stack frame yang dilewati

Berikut disingkat seluruh proses pelepasan:

  1. Dapatkan tree semaphore, kemudian semaphore ditambah satu, menunjukkan melepaskan satu semaphore

    go
    root := semtable.rootFor(addr)
    atomic.Xadd(addr, 1)
  2. Jika jumlah goroutine等待 adalah 0, langsung return

    go
    if root.nwait.Load() == 0 {
    	return
    }
  3. Lock tree semaphore,判断 kedua apakah ada goroutine yang menunggu

    go
    lockWithRank(&root.lock, lockRankRoot)
    if root.nwait.Load() == 0 {
    	unlock(&root.lock)
    	return
    }
  4. Dapatkan goroutine阻塞等待 dari tree semaphore, nwait berkurang satu, kemudian lepaskan lock semaphore

    go
    s, t0, tailtime := root.dequeue(addr)
    if s != nil {
    	root.nwait.Add(-1)
    }
    unlock(&root.lock)

5.判断 apakah bisa mendapatkan semaphore

go
if handoff && cansemacquire(addr) {
	s.ticket = 1
}
  1. Fungsi readyWithTime akan langsung menjadikan G yang dibangunkan sebagai G berikutnya yang akan dijalankan P, yaitu mengubah *p.runnext=g.

    go
    readyWithTime(s, 5+skipframes)
  2. Jika handoff adalah true, maka goyield akan membuat goroutine G yang melepaskan semaphore saat iniunbind dengan M saat ini, dan加入 kembali ke ekor antrian lokal P, kemudian mulai loop penjadwalan baru, agar G yang dibangunkan dapat segera dijadwalkan

    go
    if s.ticket == 1 && getg().m.locks == 0 {
    	goyield()
    }

流程 mendapatkan dan melepaskan semaphore adalah ini, dalam bahasa Go yang menggunakan semaphore bukan hanya mutex eksklusif, diletakkan di sini karena关联性 semaphore dengan mutex eksklusif adalah yang terbesar, resmi bahkan menulis di komentar

// Asynchronous semaphore for sync.Mutex.

Setelah memahami semaphore, kemudian melihat kembali mutex eksklusif akan lebih jelas.

TIP

Tentang tree semaphore semaRoot, enqueue dan dequeue-nya karena melibatkan implementasi operasi self-balancing yang cukup rumit, mendalami ini tidak relevan dengan tema artikel ini dan tidak bermakna, jadi disembunyikan, yang tertarik dapat自行 melihat source code.

Ringkasan

Mutex eksklusif sync.Mutex实现等待 goroutine melalui dua mekanisme自旋 dan semaphore. 自旋 adalah non-blokir, tetapi perlu dibatasi secara ketat penggunaannya, karena akan mengonsumsi sumber daya CPU; sedangkan semaphore adalah阻塞, dapat secara efektif menghindari pemborosan sumber daya yang tidak perlu. Untuk实现 mekanisme kompetisi yang lebih adil, Go membedakan mode normal dan mode kelaparan untuk menjamin goroutine dapat lebih seimbang dalam proses kompetisi lock. Dibandingkan dengan lock level rendah seperti runtime.mutex, sync.Mutex sebagai lock yang面向 pengguna, mempertimbangkan lebih banyak skenario penggunaan aktual saat desain.

Read-write lock sync.RWMutex,实现 write-write eksklusif melalui mutex eksklusif sync.Mutex, dan在此基础上额外 menambahkan dua semaphore, digunakan untuk实现 read-write eksklusif dan read-read berbagi, sehingga mendukung berbagai skenario konkuren.

Meskipun实现 lock terlihat cukup kompleks, tetapi setelah memahami prinsip Mutex, kemudian mempelajari alat sinkronisasi lain di pustaka standar sync akan menjadi lebih mudah.

Golang by www.golangdev.cn edit