Skip to content

sync.Mutex 互斥鎖

鎖是操作系統中的一種重要的同步原語,Go 語言在標准庫中提供了互斥鎖和讀寫鎖兩種實現,分別對應了

  • sync.Mutex,互斥鎖,讀讀互斥,讀寫互斥,寫寫互斥
  • sync.RWMutex,讀寫鎖,讀讀共享,讀寫互斥,寫寫互斥

它們的業務使用場景非常常見,用於在並發情況下保護某一片共享內存能夠順序地訪問和修改,正如下面的例子

go
import (
	"fmt"
	"sync"
)

func main() {
	var i int
	var wg sync.WaitGroup
	var mu sync.Mutex

	for range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			mu.Lock()
			viewI := i
			mu.Unlock()

			viewI++

			mu.Lock()
			i = viewI
			mu.Unlock()
		}()
	}

	wg.Wait()
	fmt.Println(i)
}

如果沒有鎖的保護,那麼該函數每次執行輸出的結果都可能不同的,無法預測,顯然在大部分場景中我們都不希望發生這樣的情況。這個案例對於大部分人來說都很簡單,或許你對已經對鎖的使用得心應手,但未必了解 Go 語言的鎖內部是如何實現的,它本身的代碼並不復雜,本文接下來會進行較為詳細的講解。

Locker

在開始之前我們先看一個類型sync.Locker,它是 Go 定義的一組接口

go
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
	Lock()
	Unlock()
}

它提供的方法非常的簡單易懂,就是加鎖和解鎖,不過由於 Go 接口實現優於約定的特性,所以大部分人可能都從來沒見過它,這裡也只是簡單提一嘴,因為它確實沒那麼重要,後面所講的兩個鎖也是都實現了該接口。

Mutex

互斥鎖Mutex的類型定義位於sync/mutex.go文件中,它是一個結構體類型,如下

go
type Mutex struct {
	state int32
	sema  uint32
}

字段釋義如下:

  • state ,字段表示鎖的狀態
  • sema ,即信號量 semaphore,有關它的介紹會在後面講到

先來講講這個state

go
const (
    mutexLocked = 1 << iota // mutex is locked
    mutexWoken
    mutexStarving
)

state是一個 32 位的整數類型,低 3 位用於表示上面的三種狀態,總共有三種狀態,這三種狀態並非獨立的,它們可以共存。

  • mutexLocked=1,被鎖住
  • mutexWoken=2 ,被喚醒
  • mutexStarving=4 ,飢餓模式

高 29 位用於表示有多少個協程正在等待鎖,所以理論上來說一個互斥鎖最多可以被 2^29+1 個協程同時使用,不過現實中不太可能會有這麼多的協程,即便每個只佔 2KB(初始棧空間大小),創建如此數量的協程所需要的內存空間也要 1TB 左右。

+-----------------------------------+---------------+------------+-------------+
|              waiter               | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
|              29 bits              | 1 bit         | 1 bit      | 1 bit       |
+-----------------------------------+---------------+------------+-------------+

互斥鎖一共有兩種運行模式,一是正常模式,二是飢餓模式。正常模式就是按照協程在阻塞等待隊列中到達的順序來持有鎖,即 FIFO,這是一般的情況,也是性能最好的時候,因為大家都按照訪問的順序來持有鎖就不會有問題。飢餓模式就是不一般的情況,這個飢餓指的是等待協程長時間無法持有鎖而一直處於阻塞狀態,並不是說互斥鎖是飢餓狀態,那麼什麼時候協程會處於飢餓狀態呢?官方給出一個例子,有一個先到的協程,因為無法持有互斥鎖而阻塞,後續由於鎖的釋放被喚醒了,就在這時候來了另一個代碼剛剛運行到這塊嘗試持有鎖的協程(喜歡插隊的),由於後者正處於運行的狀態(正在佔用 CPU 時間片),後者能夠成功競爭到鎖的幾率是很高的,並且在極端情況下像這樣的協程可能會有很多,那麼剛剛喚醒的協程就會一直無法持有鎖(一直插隊沒完沒了了),明明是它先到的,卻一直無法獲得鎖。

go
const (
	starvationThresholdNs = 1e6
)

為了避免這種情況,Go 設置了一個等待閾值starvationThresholdNs ,如果有協程超過 1ms 仍未持有鎖,互斥鎖就會進入飢餓模式。在飢餓模式下,互斥鎖的所有權會直接移交給等待隊列中最前面的協程,新來的協程不會嘗試持有鎖,而且進入隊尾等待。就這樣,飢餓模式下互斥鎖的所有權會全部由等待隊列中協程逐個持有(讓排隊的人先獲得鎖,插隊的後面去),當協程成功持有鎖後,如果自己是最後一個等待協程或等待時間小於 1ms,就會將互斥鎖切換回正常模式。這種飢餓模式的設計,就避免了一些協程長時間無法持有鎖而「餓死」的情況。

TryLock

互斥鎖提供了兩個方法來進行加鎖:

  • Lock(),以阻塞的方式獲取鎖
  • TryLock(),非阻塞的方式獲取鎖

先來看看TryLock的代碼,因為它的實現更簡單

go
func (m *Mutex) TryLock() bool {
	old := m.state
	if old&(mutexLocked|mutexStarving) != 0 {
		return false
	}

	if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
		return false
	}

	return true
}

它開始時會進行檢查,如果鎖已經被持有了,或者處於飢餓狀態(即很多協程正在等待鎖),那麼當前協程無法獲得鎖。否則的話通過 CAS 操作嘗試更新狀態為 mutexLocked,如果 CAS 操作返回false,則表示在此期間有其它協程成功獲得了鎖,那麼當前協程無法獲得鎖,否則成功獲得鎖。從這裡的代碼可以看出, TryLock()的調用者就是那個嘗試插隊的人,因為它不管有沒有協程正在等待,就直接搶奪鎖(old 可能不等於 0)。

Lock

下面是Lock的代碼,它也會使用 CAS 操作來嘗試直接持有鎖,只不過它更「老實」,它只會在沒有協程阻塞等待時才會去直接持有鎖(old=0)。

go
func (m *Mutex) Lock() {
    // Fast path: grab unlocked mutex.
    if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
       return
    }
    // Slow path (outlined so that the fast path can be inlined)
    m.lockSlow()
}

倘若它發現有協程正在阻塞等待,那麼它就會「老實」地到後面排隊,進入lockslow自旋流程等待鎖(互斥鎖的核心)。首先會准備一些變量

go
func (m *Mutex) lockSlow() {
    var waitStartTime int64
    starving := false
    awoke := false
    iter := 0
    old := m.state
  • waitStartTime: 用於記錄等待開始的時間,檢查是否進入飢餓模式。
  • starving: 表示當前協程是否已經超過 1ms 未獲得鎖。
  • awoke: 標記當前協程是否已被喚醒。
  • iter: 計數器,記錄自旋的次數。
  • old: 獲取當前互斥鎖的狀態

然後進入 for 循環,判斷當前協程能否進入自旋狀態。

自旋是一種多線程間同步機制,又稱為忙等待(busy-waiting),線程未持有鎖時不會直接掛起切換線程上下文而是進入空轉,這個過程中一直佔用 CPU 時間片,如果是在鎖競爭不大或是持有鎖的時間很短的場情況下,這樣做確實可以避免頻繁切換線程上下文,能夠有效地提高性能,然而它並不是萬能的,在 Go 語言中濫用自旋有可能導致以下危險的後果:

  • CPU 佔用過高:自旋的協程過多會消耗大量的 CPU 資源,尤其是在鎖被佔用的時間較長時
  • 影響協程調度:處理器 P 的總數量是有限的,如果有很多自旋的協程佔用了 P,那麼其它執行用戶代碼的協程就無法及時的被調度
  • 緩存一致性問題:自旋鎖的忙等待特性會導致線程在高速緩存(cache)中反復讀取鎖的狀態,如果自旋的協程在不同的核心上運行,並且鎖的狀態沒有被及時更新到全局內存中,導致協程讀到的鎖狀態不准確,並且頻繁的緩存一致性同步也會顯著降低性能,

所以並不是所有的協程都能夠進入自旋狀態,它需要經過以下的嚴格判斷

go
for {
    if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
        if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
        atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
            awoke = true
        }
        runtime_doSpin()
        iter++
        old = m.state
        continue
    }
    ...
}

條件如下:

  1. 當前鎖已經被持有且不能處於飢餓狀態,否則意味著已經有協程長時間無法獲得鎖,這時候直接進入阻塞流程。

  2. 進入runtime.sync_runtime_canSpin判斷流程

    go
     const (
    	active_spin = 4
    )
    func sync_runtime_canSpin(i int) bool {
    	if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 {
    		return false
    	}
    	if p := getg().m.p.ptr(); !runqempty(p) {
    		return false
    	}
    	return true
    }
  3. 自旋次數小於runtime.active_spin,默認是4次,次數多了浪費資源。

  4. CPU核數大於1,單核系統自旋沒有任何意義。

  5. 當前的 gomaxprocs 大於空閒 P 和正在自旋的 P 數量之和加 1,即說明當前沒有足夠的可用處理器來進行自旋

  6. 當前P的本地隊列必須空的,否則說明有其它用戶任務要執行,不能進行自旋

如果判斷可以自旋的話,就會調用runtime.sync_runtime_doSpin進入自旋,實際上它就是執行了30次PAUSE指令。

go
const (
	active_spin_cnt = 30
)

func sync_runtime_doSpin() {
	procyield(active_spin_cnt)
}
asm
TEXT runtime·procyield(SB),NOSPLIT,$0-0
	MOVL	cycles+0(FP), AX
again:
	PAUSE
	SUBL	$1, AX
	JNZ	again
	RET

如果不能進行自旋,就只會有兩種下場:成功獲得鎖和進入等待隊列陷入阻塞,不過在此之前還有很多事處理:

  1. 如果不為飢餓模式,就嘗試獲取鎖
go
new := old
if old&mutexStarving == 0 {
	new |= mutexLocked
}
  1. 如果鎖已經被佔用或現在是飢餓模式,則等待的協程數+1
go
if old&(mutexLocked|mutexStarving) != 0 {
	new += 1 << mutexWaiterShift
}
  1. 如果當前協程已經處於飢餓狀態,且鎖仍然被佔用,則進入飢餓模式
go
if starving && old&mutexLocked != 0 {
	new |= mutexStarving
}
  1. 如果當前協程自旋被喚醒,則加上mutexWoken標志
go
if awoke {
	new &^= mutexWoken
}

然後就開始嘗試通過 CAS 去更新鎖的狀態,更新失敗就直接開始下一輪循環

go
if atomic.CompareAndSwapInt32(&m.state, old, new) {
    ...
}else {
    ...
}

更新成功的話就開始下面的判斷。

  1. 原狀態並非飢餓模式,且沒有協程佔用鎖,那麼當前協程可以直接持有鎖,退出流程,繼續執行用戶代碼。

    go
    if old&(mutexLocked|mutexStarving) == 0 {
    		break
    }
  2. 嘗試持有鎖失敗,記錄下等待時間,其中 LIFO 如果為 true,表示隊列後進先出,否則就是 FIFO 先進先出。

    go
    queueLifo := waitStartTime != 0
    if waitStartTime == 0 {
    	waitStartTime = runtime_nanotime()
    }
  3. 嘗試獲取信號量,進入runtime.semacquire1函數,如果能獲取信號量就直接返回不會阻塞,否則的話就會調用runtime.gopark 掛起當前協程等待信號量的釋放。

    go
    runtime_SemacquireMutex(&m.sema, queueLifo, 1)
  4. 走到這一步有兩種可能,一是直接成功獲取信號量,二是阻塞剛剛被喚醒成功獲得信號量,不管是哪一種都成功獲得了信號量,如果現在是飢餓模式,就可以直接獲得鎖。

    go
    starving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs
    old = m.state
    if old&mutexStarving != 0 {
    	delta := int32(mutexLocked - 1<<mutexWaiterShift)
    	if !starving || old>>mutexWaiterShift == 1 {
    		delta -= mutexStarving
    	}
    	atomic.AddInt32(&m.state, delta)
    	break
    }
  5. 如果不是飢餓模式,則重置iter,重新開始自旋流程。

    go
    awoke = true
    iter = 0

至此,加鎖的流程就結束了,整個過程比較復雜,過程中用到了自旋等待和信號量阻塞等待兩種方式,平衡了性能和公平性,適用於大多數的鎖競爭情況。

Unlock

解鎖的流程相對而言要簡單很多,它首先會嘗試快速解鎖,如果new為 0 的話表示現在沒有等待協程,且不是飢餓模式,即解鎖成功,可以直接返回。

go
func (m *Mutex) Unlock() {
	new := atomic.AddInt32(&m.state, -mutexLocked)
	if new != 0 {
		m.unlockSlow(new)
	}
}

否則就需要進入unlockslow的流程

  1. 首先判斷是否已經解鎖了

    go
    if (new+mutexLocked)&mutexLocked == 0 {
    	fatal("sync: unlock of unlocked mutex")
    }
  2. 如果是飢餓模式,就直接釋放信號量,完成解鎖。在飢餓模式下,當前解鎖的協程將直接將鎖的所有權交給下一個等待的協程。

    go
    if new&mutexStarving == 0 {
    	...
    } else {
    	runtime_Semrelease(&m.sema, true, 1)
    }
  3. 不是飢餓模式,進入正常解鎖流程

    1. 如果沒有協程正在等待,或者有其它被喚醒的協程已經獲得了鎖,又或者說鎖進入了飢餓模式

      go
      if old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 {
          return
      }
    2. 否則的話,就釋放信號量喚醒下一個等待的協程,將當前鎖的狀態設置為mutexWoken

      go
      new = (old - 1<<mutexWaiterShift) | mutexWoken
      if atomic.CompareAndSwapInt32(&m.state, old, new) {
          runtime_Semrelease(&m.sema, false, 1)
          return
      }
      old = m.state

最後,解鎖的流程就結束了。

RWMutex

讀寫互斥鎖RWMutex的類型定義位於sync/rwmutex.go文件中,它的實現也基於互斥鎖。

go
type RWMutex struct {
	w           Mutex        // held if there are pending writers
	writerSem   uint32       // semaphore for writers to wait for completing readers
	readerSem   uint32       // semaphore for readers to wait for completing writers
	readerCount atomic.Int32 // number of pending readers
	readerWait  atomic.Int32 // number of departing readers
}

下面是各個字段的釋義

  • w,一個互斥鎖,寫者協程持有該互斥鎖時,其它寫者協程和讀者協程將被阻塞。
  • writerSem,寫信號量,用於阻塞寫者協程來等待讀者協程,寫者協程獲取信號量,讀者協程釋放信號量。
  • readerSem,讀信號量,用於阻塞讀者協程來等待寫者協程,讀者協程獲取信號量,寫者協程釋放信號量。
  • readerCount,核心字段,整個讀寫鎖都靠它來維護狀態。
  • readerWait,表示寫者協程被阻塞時,需要等待的讀者協程個數

讀寫鎖大致的原理就是,通過互斥鎖來使得寫者協程間互斥,通過兩個信號量writerSemreaderSem來使得讀寫互斥,讀讀共享。

readerCount

由於這個readerCount變化比較多,且很重要,所以單獨拎出來說,它大致上歸納為以下幾種狀態

  1. 0,當前既沒有讀者協程活躍也沒有寫者協程活躍,處於空閒的狀態
  2. -rwmutexMaxReaders,一個寫者協程已經持有了互斥鎖,當前沒有活躍的讀者協程
  3. -rwmutexMaxReaders+N,一個寫者協程已經持有了寫鎖,當前的讀者協程需要阻塞等待寫者協程釋放寫鎖
  4. ``N-rwmutexMaxReaders`,一個寫者協程已經持有了互斥鎖,需要阻塞等待剩余讀者協程釋放讀鎖
  5. N,當前有 N 個活躍讀者協程,即加了N個讀鎖,沒有活躍的寫者協程

其中rwmutexMaxReaders是一個常量值,它的值是互斥鎖可以阻塞等待協程數量的 2 倍,因為一半是讀者協程,一半是寫者協程。

go
const rwmutexMaxReaders = 1 << 30

整個讀寫鎖部分就這個readerCount比較復雜,理解了它的變化也就搞明白了讀寫鎖的工作流程。

TryLock

還是老樣子,先來看看最簡單的TryLock()

go
func (rw *RWMutex) TryLock() bool {
	if !rw.w.TryLock() {
		return false
	}
	if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
		rw.w.Unlock()
		return false
	}
	return true
}

開始時,它會嘗試調用互斥鎖的TryLock(),如果失敗了就直接返回。然後用 CAS 操作嘗試去將readerCount的值從 0 更新為 -rwmutexMaxReaders。0 代表的是沒有正在工作的讀者協程,-rwmutexMaxReaders表示現在寫者協程已經持有了互斥鎖。CAS 操作更新失敗就將互斥鎖解鎖,成功的話就返回 true

Lock

接下來是Lock(),它的實現也很簡單。

go
func (rw *RWMutex) Lock() {
	rw.w.Lock()
	r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
	if r != 0 && rw.readerWait.Add(r) != 0 {
		runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
	}
}

首先它會跟其它的寫者協程競爭直到持有互斥鎖,然後進行這麼一個操作,先原子地減去-rwmutexMaxReaders,然後再將得到的新值非原子地加上 rwmutexMaxReaders

go
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

拆成兩步來看

  1. 這是為了通知其它讀者協程現在正有寫者協程嘗試持有鎖,在TryLock部分已經講過了。

    go
    rw.readerCount.Add(-rwmutexMaxReaders)
  2. 又加上rwmutexMaxReaders得到了 r,這個 r 代表的就是現在正在工作的讀者協程數量。

    go
    r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders

然後判斷是否有讀者協程正在工作,然年後將readerWait的值加r,最終還是不為 0 的話表示需要等待這些讀者協程工作完,則進入 runtime_SemacquireRWMutex流程嘗試獲取信號量writerSem ,該信號量是由讀者協程釋放的,如果能拿到信號量就表示讀者協程已經工作完畢,否則的話就需要進入阻塞隊列等待(這部分信號量的邏輯跟互斥鎖那塊基本上一致)。

UnLock

然後是UnLock(),釋放寫鎖。

go
func (rw *RWMutex) Unlock() {
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
       fatal("sync: Unlock of unlocked RWMutex")
    }

    for i := 0; i < int(r); i++ {
       runtime_Semrelease(&rw.readerSem, false, 0)
    }

    rw.w.Unlock()
}

它的流程如下

  1. 前面提到過在加鎖的時候會將readerCount更新為負值,這裡再加上 rwmutexMaxReaders ,就表示現在沒有寫者協程正在工作,然後得到的值就是正在阻塞等待的讀者協程數量。

    go
    r := rw.readerCount.Add(rwmutexMaxReaders)
  2. 如果它本身就是 0 或大於 0,代表著寫鎖已經被釋放了

    go
    if r >= rwmutexMaxReaders {
    	fatal("sync: Unlock of unlocked RWMutex")
    }
  3. 釋放信號量readerSem,喚醒等待的讀者協程

    go
    for i := 0; i < int(r); i++ {
    	runtime_Semrelease(&rw.readerSem, false, 0)
    }
  4. 最後再釋放互斥鎖,喚醒等待的寫者協程。

    go
    rw.w.Unlock()

釋放寫鎖完成。

TryRLock

接下來看看讀鎖部分,這是TryRLock的代碼

go
func (rw *RWMutex) TryRLock() bool {
	for {
		c := rw.readerCount.Load()
		if c < 0 {
			return false
		}
		if rw.readerCount.CompareAndSwap(c, c+1) {
			return true
		}
	}
}

它總共只干兩件事

  1. 判斷是否有寫者協程正在工作,有的話則加鎖失敗。

    go
    c := rw.readerCount.Load()
    if c < 0 {
    	return false
    }
  2. 嘗試將readerCount加 1,如果更新成功了則加鎖成功

    go
    if rw.readerCount.CompareAndSwap(c, c+1) {
    	return true
    }
  3. 否則繼續循環判斷直到退出

可以看到這裡依賴的readerCount都是在寫鎖部分維護的,這也是為什麼要先講寫鎖的原因,因為復雜核心的地方都在寫鎖部分維護。

RLock

RLock的邏輯更簡單

go
func (rw *RWMutex) RLock() {
	if rw.readerCount.Add(1) < 0 {
		runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
	}
}

它會嘗試將readerCount的值加 1,如果得到的新值還是小於 0,說明寫者協程正在工作,則進入readerSem信號量阻塞流程,當前協程會進入阻塞隊列等待。

RUnLock

RUnLock的也是一樣的簡單易懂

go
func (rw *RWMutex) RUnlock() {
    if r := rw.readerCount.Add(-1); r < 0 {
       rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
	if r+1 == 0 || r+1 == -rwmutexMaxReaders {
		fatal("sync: RUnlock of unlocked RWMutex")
	}
	if rw.readerWait.Add(-1) == 0 {
		runtime_Semrelease(&rw.writerSem, false, 1)
	}
}

它首先會嘗試講readerCount減一,表示活躍讀者協程數量減一,如果得到的值大於 0 表示可以直接釋放,因為現在沒有寫者協程持有互斥鎖,小於 0 表示有寫者協程已經持有了互斥鎖,它正在等待當前的所有讀者協程完成工作。接下來進入 runlockSlow的流程

  1. 如果原來的readerCount值為 0(鎖是空閒的)或者為-rwmutexMaxReaders(寫者協程沒有需要等待的讀者協程,即讀鎖已經全部釋放),則表示當前沒有活躍的讀者協程,不需要解鎖

    go
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
    	fatal("sync: RUnlock of unlocked RWMutex")
    }
  2. 如果有活躍的讀者協程的話,則將readerWait減一,如果當前讀者協程是最後一個活躍的讀者,則釋放writerSem 信號量,喚醒等待的寫者協程。

    go
    if rw.readerWait.Add(-1) == 0 {
    	runtime_Semrelease(&rw.writerSem, false, 1)
    }

釋放讀鎖的流程結束。

Semaphore

互斥鎖裡面的信號量就是一個單純的uint32整型,通過原子地減一和加一來表示信號量的獲取和釋放,在運行時負責維護信號量的結構是 runtime.semaRoot,它的類型定義就位於runtime/sema.go文件中。semaRoot 使用一個平衡二叉樹(treap)來組織和管理信號量,樹中的每一個節點代表一個信號量,節點類型是 *sudog,它是一個雙向鏈表,維護了對應信號量的等待隊列, 節點通過 *sudog.elem(信號量地址)保持唯一性,並通過 *sudog.ticket 字段保證樹的平衡性。

go
type semaRoot struct {
	lock  mutex
	treap *sudog        // root of balanced tree of unique waiters.
	nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}

信號量樹semaRoot依賴於一個更低層級的互斥鎖runtime.mutex來保證它的並發安全性。

go
var semtable semTable

// Prime to not correlate with any user patterns.
const semTabSize = 251

type semTable [semTabSize]struct {
	root semaRoot
    // 用於內存對齊,提高性能
	pad  [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}

semaRoot在運行時存儲在一個全局的semaTable中,它看起來是一個定長的數組,用來存儲多個信號量樹的根節點集合,但實際上從運作方式上來看,它其實就是一個哈希表。表中的每個元素包含一個 semaRoot 和一些填充字節(pad),用於對齊內存和避免緩存行競爭。semTabSize 是信號量表的大小常量,指定了表的長度為 251,通常選擇一個質數,可以減少哈希沖突,提高散列效率。

go
func (t *semTable) rootFor(addr *uint32) *semaRoot {
	return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}

rootFor方法就相當於哈希函數,它接受一個 uint32 類型的指針 addr(即信號量的地址),返回該地址所對應的 semaRoot 結構體的指針。這行代碼首先將 addr 轉換為一個 uintptr,然後右移 3 位,相當於除以 8(因為一個字節佔 8 位,指針地址除以 8 可以將其映射為數組的索引),通過對 semTabSize 取模,確保索引在信號量表的大小范圍內,通過索引得到了semaRoot 後,再去平衡樹裡面尋找與信號量對應的*sudog等待隊列。

Acquire

獲取信號量,對應的實現是runtime.semacquire1函數,

go
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)

它接收下面幾個參數:

  • addr,信號量的地址
  • lifo,影響平衡樹的出隊順序,默認是 FIFO,LIFO 即後進先出,當協程等待鎖的時間不為 0 時(至少已經阻塞過一次了),它就是 true
  • profile,用於鎖性能分析的標志
  • skipframes,跳過的棧幀數目
  • reason,阻塞的原因

下面會簡述信號量獲取的整個流程:

  1. 判斷協程狀態,如果當前協程不是正在被調度的協程,直接拋出異常

    go
    gp := getg()
    if gp != gp.m.curg {
    	throw("semacquire not on the G stack")
    }
  2. 判斷是否能獲取信號量,並且嘗試通過非阻塞的方法獲取信號量,如果能獲取的話既可以直接返回。

    go
    for {
    	v := atomic.Load(addr)
    	if v == 0 {
    		return false
    	}
    	if atomic.Cas(addr, v, v-1) {
    		return true
    	}
    }
  3. 如果不能非阻塞的獲取,就會進入循環通過正常手段去獲取信號量,首先通過acquireSudog()從緩存中獲取一個*sudog ,該結構表示一個阻塞等待的協程

    s := acquireSudog()
  4. 然後從全局表中得到信號量樹

    go
    root := semtable.rootFor(addr)
  5. 進入循環,給信號量樹加鎖,再次判斷是否能獲取信號量,不能的話就將其加入信號量樹中,然後調用gopark 將其掛起等待,直到被喚醒繼續重復這一過程,一直循環到獲得信號量。

    go
    for {
    	lockWithRank(&root.lock, lockRankRoot)
    	root.nwait.Add(1)
    	if cansemacquire(addr) {
    		root.nwait.Add(-1)
    		unlock(&root.lock)
    		break
    	}
    	root.queue(addr, s, lifo)
    	goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes)
    	if s.ticket != 0 || cansemacquire(addr) {
    		break
    	}
    }
  6. 最後被喚醒的時候會釋放*sudog,將其歸還至緩存中。

    go
    releaseSudog(s)

Release

釋放信號量,喚醒阻塞等待的協程,該功能由runtime.semrelease1函數實現

go
func semrelease1(addr *uint32, handoff bool, skipframes int)

它接收如下參數

  • addr,信號量地址
  • handoff,表示是否將當前 P 正在調度的 G 直接切換為喚醒的 G,僅在飢餓模式的時候為true
  • skipframes,跳過的棧幀數

下面簡述釋放的整個過程:

  1. 獲取信號量樹,然後信號量加一,表示釋放一個信號量

    go
    root := semtable.rootFor(addr)
    atomic.Xadd(addr, 1)
  2. 如果等待協程數為 0,則直接返回

    go
    if root.nwait.Load() == 0 {
    	return
    }
  3. 給信號量樹加鎖,二次判斷是否有等待的協程

    go
    lockWithRank(&root.lock, lockRankRoot)
    if root.nwait.Load() == 0 {
    	unlock(&root.lock)
    	return
    }
  4. 從信號量樹中獲得一個阻塞等待的協程,nwait減一,然後釋放信號量的鎖

    go
    s, t0, tailtime := root.dequeue(addr)
    if s != nil {
    	root.nwait.Add(-1)
    }
    unlock(&root.lock)
  5. 判斷能否獲取信號量

    go
    if handoff && cansemacquire(addr) {
    	s.ticket = 1
    }
  6. readyWithTime函數會直接將喚醒的協程 G 作為 P 下一個將要運行的協程,也就是修改*p.runnext=g

    go
    readyWithTime(s, 5+skipframes)
  7. 倘若handofftrue的話,那麼goyield就會讓當前釋放信號量的協程 G 與當前 M 解綁,並重新加入 P 本地運行隊列的尾部,然後開始新一輪調度循環,以便可以讓被喚醒的協程 G 立即得到調度

    go
    if s.ticket == 1 && getg().m.locks == 0 {
    	goyield()
    }

信號量的獲取與釋放的流程就是這些,Go 語言中用到信號量的不止互斥鎖,放在這裡是因為信號量跟互斥鎖的關聯性是最大的,官方甚至都在注釋上寫明了

// Asynchronous semaphore for sync.Mutex.

了解完信號量後,再回頭去看互斥鎖就會更加清晰。

TIP

有關於semaRoot信號量樹,它的出隊入隊因為涉及自平衡操作的實現比較繁瑣,深究這些跟本文的主題無關且沒有意義,所以將其屏蔽掉了,感興趣可以自行了解源代碼。

小結

互斥鎖sync.Mutex通過自旋和信號量兩種機制實現來協程的等待。自旋是非阻塞的,但需要嚴格限制使用,因為它會消耗 CPU 資源;而信號量則是阻塞的,能夠有效避免不必要的資源消耗。為了實現更加公平的競爭機制,Go 通過區分正常模式和飢餓模式來保證協程在競爭鎖的過程中能夠更加平衡。與 runtime.mutex 這種底層鎖相比,sync.Mutex 作為面向用戶的鎖,設計時考慮了更多的實際使用場景。

讀寫鎖sync.RWMutex ,通過互斥鎖sync.Mutex來實現寫寫互斥,並在此基礎上 額外增加了兩個信號量,用於實現讀寫互斥和讀讀共享,從而支持多種並發場景。

雖然鎖的實現看起來較為復雜,但一旦理解了 Mutex 的原理,再去學習 sync 標准庫中的其他同步工具就會變得輕松許多。

Golang學習網由www.golangdev.cn整理維護