sync.Mutex ミューテックスロック
ロックはオペレーティングシステムの重要な同期プリミティブの 1 つで、Go 言語は標準ライブラリでミューテックスロックと読み書きロックの 2 つの実装を提供しています。それぞれ以下に対応します。
sync.Mutex、ミューテックスロック、読読相互排他、読書相互排他、書書相互排他sync.RWMutex、読み書きロック、読読共有、読書相互排他、書書相互排他
これらのビジネス使用シーンは非常に一般的で、同時実行状況下で共有メモリの一部を順序立ててアクセスおよび変更するために保護するために使用されます。以下の例の通りです。
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}ロックの保護がない場合、この関数を実行するたびに出力される結果は異なる可能性があり、予測できません。明らかに、ほとんどのシーンではこのような状況が発生することを望んでいません。このケースはほとんどの人にとって非常に簡単で、おそらくあなたはすでにロックの使用に習熟しているかもしれませんが、Go 言語のロック内部がどのように実装されているかを理解していないかもしれません。コード自体は複雑ではなく、この記事で詳しく説明します。
Locker
始める前に、sync.Locker というタイプを見てみましょう。これは Go が定義した一連のインターフェースです。
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}提供されるメソッドは非常にシンプルでわかりやすく、ロックとアンロックですが、Go のインターフェース実装は約束より優れているという特性により、ほとんどの人はおそらくそれを見たことがないでしょう。ここでは簡単に触れるだけで、それほど重要ではないからです。後述する 2 つのロックもこのインターフェースを実装しています。
Mutex
ミューテックスロック Mutex のタイプ定義は sync/mutex.go ファイルに位置し、構造体タイプです。
type Mutex struct {
state int32
sema uint32
}フィールドの説明は以下の通りです。
state、フィールドはロックの状態を表しますsema、セマフォ semaphore で、それに関する紹介は後述します
まず state について説明します。
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)state は 32 ビットの整数タイプで、下位 3 ビットは上記の 3 つの状態を表すために使用されます。合計で 3 つの状態があり、これらは独立していません。共存できます。
mutexLocked=1、ロックされているmutexWoken=2、ウェイクアップされているmutexStarving=4、飢餓モード
上位 29 ビットは待機中のコルーチン数を表すために使用されます。したがって、理論的にはミューテックスロックは最大で 2^29+1 個のコルーチンによって同時に使用できます。しかし現実にはこれほど多くのコルーチンが存在する可能性は低く、それぞれが 2KB(初期スタックスペースサイズ)しか占有しなくても、これほど多くのコルーチンを作成するには約 1TB のメモリスペースが必要です。
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+ミューテックスロックには合計で 2 つの実行モードがあります。1 つは通常モード、2 つ目は飢餓モードです。通常モードはコルーチンがブロッキング待機キューに到達した順序に従ってロックを保持するもので、FIFO です。これは一般的な状況で、誰もがアクセス順序に従ってロックを保持するため問題がなく、パフォーマンスも最良です。飢餓モードは一般的ではない状況で、この飢餓とは待機コルーチンが長時間ロックを保持できずに常にブロッキング状態にあることを指し、ミューテックスロック自体が飢餓状態にあるわけではありません。では、いつコルーチンが飢餓状態になるのでしょうか。公式の例では、先に到着したコルーチンがミューテックスロックを保持できずにブロッキングし、その後ロックの解放によってウェイクアップされます。このとき、別のコードがロックを試行するために実行されたコルーチン(順番を追い越すのが好きな)が現れます。後者は実行中の状態(CPU 時間スライスを占有中)にあるため、後者がロックを正常に競合する確率は非常に高く、極端な状況ではそのようなコルーチンが多数存在する可能性があります。すると、ウェイクアップされたコルーチンは常にロックを保持できなくなります(常に順番を追い越され続ける)。明らかに先に到着したのに、ロックを取得し続けます。
const (
starvationThresholdNs = 1e6
)このような状況を避けるために、Go は待機閾値 starvationThresholdNs を設定しています。コルーチンが 1ms を超えてロックを保持できない場合、ミューテックスロックは飢餓モードに入ります。飢餓モードでは、ミューテックスロックの所有権は待機キューの先頭のコルーチンに直接渡され、新規のコルーチンはロックを試行せず、キューの末尾で待機します。このように、飢餓モードではミューテックスロックの所有権は待機キューのコルーチンによって順次保持されます(順番に待っている人が先にロックを取得し、順番を追い越す者は後ろに行く)。コルーチンが正常にロックを保持した後、自分が最後の待機コルーチンであるか、待機時間が 1ms 未満である場合、ミューテックスロックを通常モードに切り替えます。この飢餓モードの設計により、一部のコルーチンが長時間ロックを保持できずに「飢餓死」する状況を回避します。
TryLock
ミューテックスロックはロックを取得するために 2 つのメソッドを提供します。
Lock()、ブロッキング方式でロックを取得TryLock()、ノンブロッキング方式でロックを取得
まず TryLock のコードを見てみましょう。実装がよりシンプルだからです。
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}開始時にチェックを行い、ロックがすでに保持されているか、飢餓状態(つまり多くのコルーチンがロックを待機中)にある場合、現在のコルーチンはロックを取得できません。それ以外の場合、CAS 操作を通じて状態を mutexLocked に更新しようとします。CAS 操作が false を返す場合、この期間中に他のコルーチンが正常にロックを取得したことを示し、現在のコルーチンはロックを取得できません。それ以外の場合は正常にロックを取得します。ここのコードから、TryLock() の呼び出し者は順番を追い越そうとする人であることがわかります。待機中のコルーチンがあるかどうかに関係なく、直接ロックを奪い取るからです(old は 0 とは限りません)。
Lock
以下は Lock のコードで、CAS 操作を使用して直接ロックを保持しようとしますが、より「正直」です。ブロッキング待機中のコルーチンがない場合にのみ直接ロックを保持します(old=0)。
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}ブロッキング待機中のコルーチンがあることを発見した場合、「正直に」後ろに並んで lockSlow 自転フローに入ってロックを待機します(ミューテックスロックの核心)。まずいくつかの変数を準備します。
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: 待機開始時間を記録し、飢餓モードに入るかどうかをチェックします。starving: 現在のコルーチンが 1ms を超えてロックを取得できないかどうかを示します。awoke: 現在のコルーチンがすでにウェイクアップされているかどうかをマークします。iter: カウンターで、自転回数を記録します。old: 現在のミューテックスロックの状態を取得します。
その後 for ループに入り、現在のコルーチンが自転状態に入れるかどうかを判断します。
自転はマルチスレッド間の同期メカニズムで、ビジーウェイト(busy-waiting)とも呼ばれます。スレッドがロックを保持していない場合、スレッドコンテキストを切り替えてサスペンドするのではなく、アイドル状態に入り、このプロセスで CPU 時間スライスを占有し続けます。ロック競合が大きくないか、ロック保持時間が非常に短い状況では、このようにしてスレッドコンテキストの頻繁な切り替えを回避でき、パフォーマンスを効果的に向上させることができます。しかし万能ではなく、Go 言語で自転を乱用すると以下の危険な結果を招く可能性があります。
- CPU 使用率过高:自転するコルーチンが多すぎると、特にロック占有時間が長い場合に大量の CPU リソースを消費します
- コルーチンスケジューリングへの影響:プロセッサ P の総数は限られており、多くの自転コルーチンが P を占有している場合、他のユーザーコードを実行するコルーチンはタイムリーにスケジューリングできません
- キャッシュ一貫性の問題:自転ロックのビジーウェイト特性により、スレッドは高速キャッシュ(cache)でロックの状態を繰り返し読み取ります。自転コルーチンが異なるコアで実行され、ロックの状態がグローバルメモリにタイムリーに更新されない場合、コルーチンが読み取るロック状態が不正確になり、頻繁なキャッシュ一貫性同期もパフォーマンスを著しく低下させます。
したがって、すべてのコルーチンが自転状態に入れるわけではありません。以下の厳格な判断を経る必要があります。
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}条件は以下の通りです。
現在のロックがすでに占有されており、飢餓状態ではないこと。そうでない場合はすでにコルーチンが長時間ロックを取得できないことを意味し、この場合は直接ブロッキングフローに入ります。
runtime.sync_runtime_canSpin判断フローに入ります。goconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }自転回数が
runtime.active_spin未満であること。デフォルトは 4 回で、回数が多いとリソースを浪費します。CPU コア数が 1 より大きいこと。シングルコアシステムでは自転に意味がありません。
現在の
gomaxprocsがアイドル P と自転中の P 数の合計 +1 より大きいこと。つまり現在十分な利用可能プロセッサが自転用にないことを示します。現在の P のローカルキューは空である必要があります。そうでない場合は他のユーザータスクを実行する必要があるため、自転できません。
自転可能と判断された場合、runtime.sync_runtime_doSpin を呼び出して自転に入ります。実際には 30 回の PAUSE 命令を実行するだけです。
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RET自転できない場合、2 つの結果しかありません。正常にロックを取得するか、待機キューに入ってブロッキングします。しかしその前に多くの処理があります。
- 飢餓モードでない場合は、ロックの取得を試みます。
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- ロックがすでに占有されているか、現在飢餓モードである場合、待機コルーチン数を 1 つ増加します。
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- 現在のコルーチンがすでに飢餓状態にあり、ロックがまだ占有されている場合、飢餓モードに入ります。
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- 現在のコルーチンが自転でウェイクアップされた場合、
mutexWokenフラグを追加します。
if awoke {
new &^= mutexWoken
}その後 CAS を通じてロック状態の更新を試み、更新に失敗した場合は次のループに進みます。
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}更新に成功した場合、以下の判断を開始します。
元の状態が飢餓モードではなく、コルーチンがロックを占有していない場合、現在のコルーチンは直接ロックを保持でき、フローを終了してユーザーコードの実行を続行できます。
goif old&(mutexLocked|mutexStarving) == 0 { break }ロックの取得に失敗した場合、待機時間を記録します。LIFO が true の場合、キューが後入れ先出しであることを示し、そうでなければ先入れ先出しです。
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }セマフォの取得を試み、
runtime.semacquire1関数に入ります。セマフォを取得できれば直接戻りブロッキングしません。それ以外の場合、runtime.goparkを呼び出して現在のコルーチンをサスペンドし、セマフォの解放を待機します。goruntime_SemacquireMutex(&m.sema, queueLifo, 1)ここに到達するには 2 つの可能性があります。1 つは直接セマフォの取得に成功した場合、2 つ目はブロッキングがウェイクアップされて正常にセマフォを取得した場合です。どちらの場合もセマフォを正常に取得しており、現在飢餓モードであれば直接ロックを取得できます。
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }飢餓モードでない場合、
iterをリセットして自転フローを再開します。goawoke = true iter = 0
これでロックのフローが終了します。全体のプロセスは複雑で、自転待機とセマフォブロッキング待機の 2 つの方法を使用し、パフォーマンスと公平性のバランスを取り、ほとんどのロック競合状況に適しています。
Unlock
アンロックのフローは比較的シンプルで、まずクイックアンロックを試みます。new が 0 の場合、現在待機コルーチンがなく、飢餓モードではないことを示し、アンロックに成功して直接戻れます。
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}それ以外の場合は unlockSlow のフローに入ります。
まずすでにアンロックされているかどうかを判断します。
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }飢餓モードである場合、直接セマフォを解放してアンロックを完了します。飢餓モードでは、現在アンロックするコルーチンはロックの所有権を次の待機コルーチンに直接渡します。
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }飢餓モードでない場合、通常のアンロックフローに入ります。
待機中のコルーチンがない場合、または他のウェイクアップされたコルーチンがすでにロックを取得している場合、またはロックが飢餓モードに入った場合。
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }それ以外の場合は、セマフォを解放して次の待機コルーチンをウェイクアップし、現在のロック状態を
mutexWokenに設定します。gonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
最後に、アンロックのフローが終了します。
RWMutex
読み書きミューテックスロック RWMutex のタイプ定義は sync/rwmutex.go ファイルに位置し、実装もミューテックスロックに基づいています。
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}各フィールドの説明は以下の通りです。
w、ミューテックスロックで、ライターコルーチンがこのミューテックスロックを保持している場合、他のライターコルーチンとリーダーコルーチンはブロッキングされます。writerSem、ライターセマフォで、リーダーコルーチンの完了を待機するためにライターコルーチンをブロッキングするために使用されます。ライターコルーチンがセマフォを取得し、リーダーコルーチンがセマフォを解放します。readerSem、リーダーセマフォで、ライターコルーチンの完了を待機するためにリーダーコルーチンをブロッキングするために使用されます。リーダーコルーチンがセマフォを取得し、ライターコルーチンがセマフォを解放します。readerCount、核心フィールドで、読み書きロック全体の状態を維持します。readerWait、ライターコルーチンがブロッキングされているときに、待機する必要があるリーダーコルーチンの数を示します。
読み書きロックの概略原理は、ミューテックスロックを通じてライターコルーチン間を相互排他とし、2 つのセマフォ writerSem と readerSem を通じて読み書き相互排他、読読共有を実現することです。
readerCount
この readerCount は変化が多く、重要であるため、单独で説明します。以下のいくつかの状態に大別されます。
- 0、現在リーダーコルーチンもライターコルーチンもアクティブではなく、アイドル状態です。
-rwmutexMaxReaders、ライターコルーチンがミューテックスロックを保持しており、現在アクティブなリーダーコルーチンはありません。-rwmutexMaxReaders+N、ライターコルーチンが書き込みロックを保持しており、現在のリーダーコルーチンはライターコルーチンが書き込みロックを解放するのをブロッキングして待機する必要があります。N-rwmutexMaxReaders、ライターコルーチンがミューテックスロックを保持しており、残りのリーダーコルーチンが読み取りロックを解放するのをブロッキングして待機する必要があります。N、現在 N 個のアクティブなリーダーコルーチンがあり、つまり N 個の読み取りロックが追加されており、アクティブなライターコルーチンはありません。
rwmutexMaxReaders は定数値で、その値はミューテックスロックがブロッキング待機できるコルーチン数の 2 倍です。半分はリーダーコルーチン、半分はライターコルーチンです。
const rwmutexMaxReaders = 1 << 30読み書きロックの一部はこの readerCount が複雑で、その変化を理解すれば読み書きロックの動作フローがわかります。
TryLock
いつものように、最もシンプルな TryLock() から見てみましょう。
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}開始時に、ミューテックスロックの TryLock() を呼び出して試みます。失敗した場合は直接戻ります。その後 CAS 操作を使用して readerCount の値を 0 から -rwmutexMaxReaders に更新しようとします。0 は作業中のリーダーコルーチンがないことを表し、-rwmutexMaxReaders は現在ライターコルーチンがミューテックスロックを保持していることを示します。CAS 操作の更新に失敗した場合はミューテックスロックをアンロックし、成功した場合は true を返します。
Lock
次に Lock() で、実装も非常にシンプルです。
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}まず他のライターコルーチンと競合してミューテックスロックを保持するまで競合し、その後以下の操作を行います。まずアトミックに -rwmutexMaxReaders を減算し、その後得られた新しい値を非アトミックに rwmutexMaxReaders を加算します。
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders2 つに分けて見てみましょう。
これは他のリーダーコルーチンに現在ライターコルーチンがロックを保持しようとしていることを通知するためです。
TryLock部分で説明しました。gorw.readerCount.Add(-rwmutexMaxReaders)さらに
rwmutexMaxReadersを加算して r を得ます。この r は現在作業中のリーダーコルーチン数を表します。gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
その後、作業中のリーダーコルーチンがあるかどうかを判断し、readerWait の値に r を加算します。最終的に 0 でない場合は、これらのリーダーコルーチンの作業完了を待機する必要があることを示し、runtime_SemacquireRWMutex フローに入ってセマフォ writerSem の取得を試みます。このセマフォはリーダーコルーチンによって解放されます。セマフォを取得できればリーダーコルーチンが作業完了したことを示し、それ以外の場合はブロッキングキューに入って待機する必要があります(この部分のセマフォロジックはミューテックスロックの部分と基本的に同じです)。
UnLock
次に UnLock() で、書き込みロックを解放します。
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}そのフローは以下の通りです。
前述の通り、ロック時に
readerCountを負の値に更新します。ここにrwmutexMaxReadersを加算すると、現在作業中のライターコルーチンがないことを示し、得られた値はブロッキング待機中のリーダーコルーチン数です。gor := rw.readerCount.Add(rwmutexMaxReaders)それ自体が 0 または 0 より大きい場合、書き込みロックがすでに解放されていることを示します。
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }セマフォ
readerSemを解放して、待機中のリーダーコルーチンをウェイクアップします。gofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }最後にミューテックスロックを解放して、待機中のライターコルーチンをウェイクアップします。
gorw.w.Unlock()
書き込みロックの解放が完了しました。
TryRLock
次に読み取りロック部分を見てみましょう。これは TryRLock のコードです。
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}合計で 2 つのことしか行いません。
ライターコルーチンが作業中かどうかを判断します。ある場合はロックに失敗します。
goc := rw.readerCount.Load() if c < 0 { return false }readerCountに 1 を加算しようとします。更新に成功した場合はロックに成功します。goif rw.readerCount.CompareAndSwap(c, c+1) { return true }それ以外の場合はループを継続して判断し、終了するまで続けます。
ここで依存する readerCount はすべて書き込みロック部分で維持されます。这也是なぜ書き込みロックを先に説明する必要がある理由です。複雑な核心部分はすべて書き込みロック部分で維持されるからです。
RLock
RLock のロジックはさらにシンプルです。
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}readerCount の値に 1 を加算しようとします。得られた新しい値がまだ 0 より小さい場合、ライターコルーチンが作業中であることを示し、readerSem セマフォブロッキングフローに入り、現在のコルーチンはブロッキングキューに入って待機します。
RUnLock
RUnLock も同様にシンプルでわかりやすいです。
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}まず readerCount に 1 を減算しようとします。アクティブなリーダーコルーチン数が 1 つ減ることを示します。得られた値が 0 より大きい場合、現在ライターコルーチンがミューテックスロックを保持していないため直接解放できることを示します。0 より小さい場合、ライターコルーチンがすでにミューテックスロックを保持しており、現在のすべてのリーダーコルーチンの作業完了を待機していることを示します。その後 runlockSlow のフローに入ります。
元の
readerCount値が 0(ロックがアイドル)または-rwmutexMaxReaders(ライターコルーチンに待機する必要があるリーダーコルーチンがない、つまり読み取りロックがすべて解放された)である場合、現在アクティブなリーダーコルーチンがないため、アンロックする必要がないことを示します。goif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }アクティブなリーダーコルーチンがある場合、
readerWaitに 1 を減算します。現在のリーダーコルーチンが最後のアクティブなリーダーである場合、writerSemセマフォを解放して、待機中のライターコルーチンをウェイクアップします。goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
読み取りロックの解放フローが終了しました。
Semaphore
ミューテックスロック内のセマフォは単純な uint32 整数型で、アトミックに 1 を減算および加算してセマフォの取得と解放を表します。ランタイムでセマフォ構造を維持する責任を負うのは runtime.semaRoot で、そのタイプ定義は runtime/sema.go ファイルに位置しています。semaRoot は平衡二叉樹(treap)を使用してセマフォを組織化および管理します。樹の各ノードはセマフォを表し、ノードタイプは *sudog で、対応するセマフォの待機キューを維持する双方向リンクドリストです。ノードは *sudog.elem(セマフォアドレス)を通じて一意性を維持し、*sudog.ticket フィールドを通じて樹の平衡性を保証します。
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}セマフォ樹 semaRoot はより低レベルのミューテックスロック runtime.mutex に依存して並行安全性を保証します。
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot はランタイムでグローバルな semaTable に格納されます。定長配列のように見えますが、複数のセマフォ樹のルートノード集合を格納するために使用されます。実際には動作方法から見ると、ハッシュテーブルです。表の各要素は semaRoot といくつかのパディングバイト(pad)を含み、メモリアラインメントとキャッシュライン競合の回避に使用されます。semTabSize はセマフォテーブルのサイズ定数で、テーブルの長さを 251 に指定します。通常は素数を選択し、ハッシュ競合を減らし、ハッシュ効率を向上させます。
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}rootFor メソッドはハッシュ関数に相当し、uint32 タイプのポインタ addr(つまりセマフォアドレス)を受け取り、そのアドレスに対応する semaRoot 構造体のポインタを返します。この行のコードはまず addr を uintptr に変換し、その後 3 ビット右シフトします。8 で割ることに相当します(1 バイトは 8 ビットを占有するため、ポインタアドレスを 8 で割ることで配列のインデックスにマップできます)。semTabSize で割った余りを取ることで、インデックスがセマフォテーブルのサイズ範囲内にあることを保証します。インデックスを通じて semaRoot を取得した後、平衡樹内でセマフォに対応する *sudog 待機キューを探します。
Acquire
セマフォの取得で、対応する実装は runtime.semacquire1 関数です。
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)以下のパラメータを受け取ります。
addr、セマフォのアドレスlifo、平衡樹のデキュー順序に影響し、デフォルトは FIFO で、LIFO は後入れ先出しです。コルーチンがロックを待機する時間が 0 でない場合(少なくとも 1 回ブロッキングされたことがある)、それはtrueです。profile、ロックパフォーマンス分析用のフラグskipframes、スキップするスタックフレーム数reason、ブロッキングの理由
以下はセマフォ取得の全フローの概要です。
コルーチン状態を判断します。現在のコルーチンがスケジューリング中のコルーチンでない場合、例外をスローします。
gogp := getg() if gp != gp.m.curg { throw("semacquire not on the G stack") }セマフォを取得できるかどうかを判断し、ノンブロッキング方法でセマフォの取得を試みます。取得できれば直接戻れます。
gofor { v := atomic.Load(addr) if v == 0 { return false } if atomic.Cas(addr, v, v-1) { return true } }ノンブロッキングで取得できない場合、通常の手段でセマフォを取得するためにループに入ります。まず
acquireSudog()を通じてキャッシュから*sudogを取得します。この構造はブロッキング待機中のコルーチンを表します。s := acquireSudog()その後グローバルテーブルからセマフォ樹を取得します。
goroot := semtable.rootFor(addr)ループに入り、セマフォ樹にロックをかけ、再度セマフォを取得できるかどうかを判断します。取得できない場合はセマフォ樹に追加し、その後
goparkを呼び出してサスペンドし、ウェイクアップされるまで待機します。セマフォを取得するまでこのプロセスを繰り返します。gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }最後にウェイクアップされる際に
*sudogを解放し、キャッシュに返却します。goreleaseSudog(s)
Release
セマフォを解放し、ブロッキング待機中のコルーチンをウェイクアップします。この機能は runtime.semrelease1 関数によって実装されます。
func semrelease1(addr *uint32, handoff bool, skipframes int)以下のパラメータを受け取ります。
addr、セマフォアドレスhandoff、現在の P がスケジューリング中の G をウェイクアップされた G に直接切り替えるかどうかを示し、飢餓モードの時のみtrueです。skipframes、スキップするスタックフレーム数
以下は解放の全フローの概要です。
セマフォ樹を取得し、その後セマフォに 1 を加算し、セマフォを 1 つ解放することを示します。
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)待機コルーチン数が 0 の場合は直接戻ります。
goif root.nwait.Load() == 0 { return }セマフォ樹にロックをかけ、待機コルーチンがあるかどうかを 2 回判断します。
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }セマフォ樹からブロッキング待機中のコルーチンを 1 つ取得し、
nwaitを 1 つ減算し、その後セマフォのロックを解放します。gos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)セマフォを取得できるかどうかを判断します。
goif handoff && cansemacquire(addr) { s.ticket = 1 }readyWithTime関数はウェイクアップされたコルーチン G を P の次に実行されるコルーチンとして直接切り替えます。つまり*p.runnext=gを変更します。goreadyWithTime(s, 5+skipframes)handoffがtrueの場合、goyieldは現在のセマフォを解放するコルーチン G を現在の M とアンバインドし、P ローカル実行キューの末尾に再追加し、その後新しいスケジューリングループを開始して、ウェイクアップされたコルーチン G が即座にスケジューリングされるようにします。goif s.ticket == 1 && getg().m.locks == 0 { goyield() }
セマフォの取得と解放のフローは以上です。Go 言語でセマフォを使用するのはミューテックスロックだけでなく、ここに配置したのはセマフォとミューテックスロックの関連性が最大だからです。公式でさえコメントに明記しています。
// Asynchronous semaphore for sync.Mutex.セマフォを理解した後、ミューテックスロックを振り返るとより明確になります。
TIP
semaRoot セマフォ樹について、そのデキューとエンキューは自己平衡操作に関わるため実装が複雑で、これを深掘りすることは本文のテーマと無関係で意味がないため、屏蔽しました。興味がある場合はソースコードを自行で確認してください。
まとめ
ミューテックスロック sync.Mutex は自転とセマフォの 2 つのメカニズムを通じてコルーチンの待機を実装します。自転はノンブロッキングですが、CPU リソースを消費するため厳格に制限する必要があります。セマフォはブロッキングで、不要なリソース消費を効果的に回避できます。より公平な競合メカニズムを実現するために、Go は通常モードと飢餓モードを区別して、コルーチンがロックを競合するプロセスでよりバランスが取れるように保証します。runtime.mutex のような低レベルロックと比較して、sync.Mutex はユーザー向けのロックとして、設計時により多くの実際の使用シーンを考慮しています。
読み書きロック sync.RWMutex は、ミューテックスロック sync.Mutex を通じて書書相互排他を実装し、これに基づいて 2 つの追加のセマフォを増やし、読み書き相互排他と読読共有を実装し、さまざまな並行シーンをサポートします。
ロックの実装は複雑に見えますが、Mutex の原理を理解すれば、sync 標準ライブラリの他の同期ツールを学ぶことが容易になります。
