sync.Mutex Mutex
Lock là một trong những đồng bộ nguyên thủy quan trọng trong hệ điều hành, ngôn ngữ Go cung cấp hai triển khai mutex và read-write lock trong thư viện chuẩn, tương ứng với
sync.Mutex, mutex, đọc đọc loại trừ, đọc viết loại trừ, viết viết loại trừsync.RWMutex, read-write lock, đọc đọc chia sẻ, đọc viết loại trừ, viết viết loại trừ
Kịch bản sử dụng của chúng rất phổ biến, dùng để bảo vệ một vùng bộ nhớ chia sẻ có thể được truy cập và sửa đổi theo thứ tự trong trường hợp đồng thời, như ví dụ dưới đây
import (
"fmt"
"sync"
)
func main() {
var i int
var wg sync.WaitGroup
var mu sync.Mutex
for range 10 {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
viewI := i
mu.Unlock()
viewI++
mu.Lock()
i = viewI
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(i)
}Nếu không có sự bảo vệ của lock, thì kết quả đầu ra của hàm này mỗi lần thực thi đều có thể khác nhau, không thể dự đoán được, rõ ràng trong hầu hết các kịch bản chúng ta không muốn xảy ra tình huống như vậy. Trường hợp này đối với hầu hết mọi người đều rất đơn giản, có lẽ bạn đã sử dụng lock thành thạo, nhưng chưa chắc đã hiểu lock của ngôn ngữ Go được triển khai như thế nào, bản thân code của nó không phức tạp, phần tiếp theo của bài viết này sẽ讲解 chi tiết hơn.
Locker
Trước khi bắt đầu chúng ta cùng xem một loại sync.Locker, đây là một nhóm interface mà Go định nghĩa
// A Locker represents an object that can be locked and unlocked.
type Locker interface {
Lock()
Unlock()
}Nó cung cấp các phương thức rất đơn giản dễ hiểu, là khóa và mở khóa, nhưng do đặc tính implementation của interface Go tốt hơn约定, nên hầu hết mọi người có thể chưa bao giờ nhìn thấy nó, ở đây cũng chỉ đơn giản đề cập, vì nó thực sự không quan trọng lắm, hai lock được讲 sau đều đã triển khai interface này.
Mutex
Định nghĩa loại của mutex Mutex nằm trong file sync/mutex.go, nó là một loại struct, như sau
type Mutex struct {
state int32
sema uint32
}Giải thích các trường như sau:
state, trường biểu thị trạng thái của locksema, tức semaphore, giới thiệu về nó sẽ được讲 sau
Trước tiên hãy讲讲 state này
const (
mutexLocked = 1 << iota // mutex is locked
mutexWoken
mutexStarving
)state là một loại số nguyên 32 bit, 3 bit thấp dùng để biểu thị ba trạng thái trên, tổng cộng có ba trạng thái, ba trạng thái này không độc lập, chúng có thể cùng tồn tại.
mutexLocked=1, bị khóamutexWoken=2, được đánh thứcmutexStarving=4, chế độ đói
29 bit cao dùng để biểu thị có bao nhiêu coroutine đang chờ lock, nên về lý thuyết một mutex tối đa có thể được 2^29+1 coroutine sử dụng đồng thời, nhưng trong thực tế không thể có nhiều coroutine như vậy, dù mỗi cái chỉ chiếm 2KB (kích thước stack ban đầu), không gian bộ nhớ cần thiết để tạo số lượng coroutine như vậy cũng khoảng 1TB.
+-----------------------------------+---------------+------------+-------------+
| waiter | mutexStarving | mutexWoken | mutexLocked |
+-----------------------------------+---------------+------------+-------------+
| 29 bits | 1 bit | 1 bit | 1 bit |
+-----------------------------------+---------------+------------+-------------+Mutex tổng cộng có hai chế độ chạy, một là chế độ bình thường, hai là chế độ đói. Chế độ bình thường là theo thứ tự đến của coroutine trong blocking wait queue để giữ lock, tức FIFO, đây là trường hợp chung, cũng là khi hiệu suất tốt nhất, vì mọi người đều theo thứ tự truy cập để giữ lock sẽ không có vấn đề. Chế độ đói là trường hợp không bình thường, cái đói này chỉ coroutine chờ đợi lâu không thể giữ lock mà luôn ở trạng thái chặn, không phải nói mutex đang ở trạng thái đói, vậy khi nào coroutine sẽ ở trạng thái đói? Official đưa ra một ví dụ, có một coroutine đến trước, vì không thể giữ mutex mà bị chặn, sau đó do lock được giải phóng được đánh thức, đúng lúc này đến một coroutine khác vừa chạy đến đây thử giữ lock (thích chen ngang), vì coroutine sau đang ở trạng thái chạy (đang chiếm time slice CPU), nên xác suất coroutine sau thành công tranh chấp lock rất cao, và trong trường hợp cực đoan có thể có nhiều coroutine như vậy, thì coroutine vừa được đánh thức sẽ luôn không thể giữ lock (chen ngang không ngừng), rõ ràng là nó đến trước, nhưng lại không thể có được lock.
const (
starvationThresholdNs = 1e6
)Để tránh tình huống này, Go thiết lập một ngưỡng chờ starvationThresholdNs, nếu có coroutine vượt quá 1ms vẫn không giữ được lock, mutex sẽ đi vào chế độ đói. Trong chế độ đói, quyền sở hữu của mutex sẽ được chuyển trực tiếp cho coroutine đứng đầu trong wait queue, coroutine mới đến sẽ không thử giữ lock, mà đi vào cuối queue chờ. Như vậy, trong chế độ đói quyền sở hữu của mutex sẽ được các coroutine trong wait queue giữ lần lượt (để người xếp hàng có được lock trước, người chen ngang đi sau), khi coroutine thành công giữ lock, nếu mình là coroutine chờ cuối cùng hoặc thời gian chờ nhỏ hơn 1ms, sẽ chuyển mutex về chế độ bình thường. Thiết kế chế độ đói này, tránh được một số coroutine lâu không thể giữ lock mà "chết đói".
TryLock
Mutex cung cấp hai phương thức để khóa:
Lock(), lấy lock theo cách chặnTryLock(), lấy lock theo cách không chặn
Trước tiên xem code của TryLock, vì implementation của nó đơn giản hơn
func (m *Mutex) TryLock() bool {
old := m.state
if old&(mutexLocked|mutexStarving) != 0 {
return false
}
if !atomic.CompareAndSwapInt32(&m.state, old, old|mutexLocked) {
return false
}
return true
}Nó bắt đầu sẽ kiểm tra, nếu lock đã được giữ, hoặc đang ở chế độ đói (tức là có nhiều coroutine đang chờ lock), thì coroutine hiện tại không thể có được lock. Nếu không thì thông qua thao tác CAS thử cập nhật trạng thái thành mutexLocked, nếu thao tác CAS trả về false, thì biểu thị trong thời gian này có coroutine khác thành công có được lock, thì coroutine hiện tại không thể có được lock, nếu không thành công có được lock. Từ code ở đây có thể thấy, người gọi TryLock() chính là kẻ thử chen ngang, vì nó bất kể có coroutine đang chờ hay không, đều trực tiếp tranh đoạt lock (old có thể không bằng 0).
Lock
Dưới đây là code của Lock, nó cũng sẽ sử dụng thao tác CAS để thử trực tiếp giữ lock, chỉ là nó "ngoan" hơn, nó chỉ khi không có coroutine chặn chờ mới trực tiếp giữ lock (old=0).
func (m *Mutex) Lock() {
// Fast path: grab unlocked mutex.
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// Slow path (outlined so that the fast path can be inlined)
m.lockSlow()
}Nếu nó phát hiện có coroutine đang chặn chờ, thì nó sẽ "ngoan" xếp hàng sau, đi vào流程 tự旋 chờ lock (trung tâm của mutex). Trước tiên sẽ chuẩn bị một số biến
func (m *Mutex) lockSlow() {
var waitStartTime int64
starving := false
awoke := false
iter := 0
old := m.statewaitStartTime: dùng để ghi lại thời gian bắt đầu chờ, kiểm tra có đi vào chế độ đói không.starving: biểu thị coroutine hiện tại có vượt quá 1ms không có được lock không.awoke: đánh dấu coroutine hiện tại có được đánh thức không.iter: bộ đếm, ghi lại số lần tự旋.old: lấy trạng thái hiện tại của mutex
Sau đó đi vào vòng lặp for,判断 coroutine hiện tại có thể đi vào trạng thái tự旋 không.
Tự旋 là một cơ chế đồng bộ hóa giữa các thread, còn gọi là busy-waiting, thread không giữ lock sẽ không trực tiếp suspend chuyển đổi thread context mà đi vào trạng thái quay không, trong quá trình này luôn chiếm time slice CPU, nếu là trong trường hợp lock contention không lớn hoặc thời gian giữ lock rất ngắn, làm như vậy thực sự có thể tránh việc chuyển đổi thread context thường xuyên, có thể nâng cao hiệu suất, nhưng nó không phải vạn năng, trong ngôn ngữ Go lạm dụng tự旋 có thể dẫn đến hậu quả nguy hiểm sau:
- CPU occupancy quá cao: tự旋 coroutine quá nhiều sẽ tiêu tốn大量 CPU resource, đặc biệt là khi lock bị chiếm thời gian dài
- Ảnh hưởng coroutine scheduling: số lượng processor P là có hạn, nếu có nhiều coroutine tự旋 chiếm P, thì các coroutine thực thi user code khác không thể được scheduling kịp thời
- Cache coherence problem: đặc tính busy-wait của spinlock sẽ khiến thread đọc trạng thái lock trong cache lặp đi lặp lại, nếu các coroutine tự旋 chạy trên các core khác nhau, và trạng thái lock không được cập nhật kịp thời vào global memory, dẫn đến trạng thái lock mà coroutine đọc không chính xác, và việc đồng bộ cache coherence thường xuyên cũng sẽ giảm hiệu suất đáng kể,
Nên không phải tất cả coroutine đều có thể đi vào trạng thái tự旋, nó cần trải qua phán đoán nghiêm ngặt sau
for {
if old&(mutexLocked|mutexStarving) == mutexLocked && runtime_canSpin(iter) {
if !awoke && old&mutexWoken == 0 && old>>mutexWaiterShift != 0 &&
atomic.CompareAndSwapInt32(&m.state, old, old|mutexWoken) {
awoke = true
}
runtime_doSpin()
iter++
old = m.state
continue
}
...
}Điều kiện như sau:
Lock hiện tại đã được giữ và không thể ở chế độ đói, nếu không có nghĩa là đã có coroutine lâu không có được lock, lúc này trực tiếp đi vào流程 chặn.
Đi vào流程 phán đoán
runtime.sync_runtime_canSpingoconst ( active_spin = 4 ) func sync_runtime_canSpin(i int) bool { if i >= active_spin || ncpu <= 1 || gomaxprocs <= sched.npidle.Load()+sched.nmspinning.Load()+1 { return false } if p := getg().m.p.ptr(); !runqempty(p) { return false } return true }Số lần tự旋 nhỏ hơn
runtime.active_spin, mặc định là 4 lần, nhiều quá lãng phí resource.Số lõi CPU lớn hơn 1, hệ thống single core tự旋 không có ý nghĩa gì.
gomaxprocshiện tại lớn hơn tổng số P idle và P đang tự旋 cộng 1, tức là hiện tại không có đủ processor khả dụng để tự旋Local queue của P hiện tại phải rỗng, nếu không có nghĩa là có user task khác cần thực thi, không thể tự旋
Nếu phán đoán có thể tự旋, thì sẽ gọi runtime.sync_runtime_doSpin đi vào tự旋, thực tế nó thực thi 30 lần lệnh PAUSE.
const (
active_spin_cnt = 30
)
func sync_runtime_doSpin() {
procyield(active_spin_cnt)
}TEXT runtime·procyield(SB),NOSPLIT,$0-0
MOVL cycles+0(FP), AX
again:
PAUSE
SUBL $1, AX
JNZ again
RETNếu không thể tự旋, thì chỉ có hai kết cục: thành công có được lock và đi vào wait queue陷入 chặn, nhưng trước đó còn nhiều việc cần xử lý:
- Nếu không phải chế độ đói, thì thử lấy lock
new := old
if old&mutexStarving == 0 {
new |= mutexLocked
}- Nếu lock đã bị chiếm hoặc hiện tại là chế độ đói, thì số coroutine chờ +1
if old&(mutexLocked|mutexStarving) != 0 {
new += 1 << mutexWaiterShift
}- Nếu coroutine hiện tại đã ở chế độ đói, và lock vẫn bị chiếm, thì đi vào chế độ đói
if starving && old&mutexLocked != 0 {
new |= mutexStarving
}- Nếu coroutine hiện tại tự旋 được đánh thức, thì thêm标志
mutexWoken
if awoke {
new &^= mutexWoken
}Sau đó bắt đầu thử thông qua CAS để cập nhật trạng thái lock, cập nhật thất bại thì trực tiếp bắt đầu vòng lặp tiếp theo
if atomic.CompareAndSwapInt32(&m.state, old, new) {
...
}else {
...
}Cập nhật thành công thì bắt đầu phán đoán dưới đây.
Trạng thái gốc không phải chế độ đói, và không có coroutine chiếm lock, thì coroutine hiện tại có thể trực tiếp giữ lock, thoát流程, tiếp tục thực thi user code.
goif old&(mutexLocked|mutexStarving) == 0 { break }Thử giữ lock thất bại, ghi lại thời gian chờ, trong đó LIFO nếu là true, biểu thị queue后进先出, nếu không thì là FIFO先进先出.
goqueueLifo := waitStartTime != 0 if waitStartTime == 0 { waitStartTime = runtime_nanotime() }Thử lấy semaphore, đi vào hàm
runtime.semacquire1, nếu có thể lấy semaphore thì trực tiếp trả về không chặn, nếu không thì sẽ gọiruntime.goparksuspend coroutine hiện tại chờ semaphore được giải phóng.goruntime_SemacquireMutex(&m.sema, queueLifo, 1)Đi đến bước này có hai khả năng, một là trực tiếp thành công lấy semaphore, hai là chặn vừa được đánh thức thành công lấy semaphore, bất kể là loại nào đều thành công lấy semaphore, nếu hiện tại là chế độ đói, thì có thể trực tiếp có được lock.
gostarving = starving || runtime_nanotime()-waitStartTime > starvationThresholdNs old = m.state if old&mutexStarving != 0 { delta := int32(mutexLocked - 1<<mutexWaiterShift) if !starving || old>>mutexWaiterShift == 1 { delta -= mutexStarving } atomic.AddInt32(&m.state, delta) break }Nếu không phải chế độ đói, thì reset
iter, bắt đầu lại流程 tự旋.goawoke = true iter = 0
Đến đây,流程 khóa kết thúc, toàn bộ quá trình khá phức tạp, trong quá trình sử dụng hai cách tự旋 chờ và semaphore chặn chờ, cân bằng hiệu suất và công bằng, phù hợp với hầu hết các trường hợp lock contention.
Unlock
流程 mở khóa tương đối đơn giản hơn, trước tiên nó sẽ thử mở khóa nhanh, nếu new là 0 thì biểu thị hiện tại không có coroutine chờ, và không phải chế độ đói, tức mở khóa thành công, có thể trực tiếp trả về.
func (m *Mutex) Unlock() {
new := atomic.AddInt32(&m.state, -mutexLocked)
if new != 0 {
m.unlockSlow(new)
}
}Nếu không thì cần đi vào流程 unlockslow
Trước tiên判断 xem đã mở khóa chưa
goif (new+mutexLocked)&mutexLocked == 0 { fatal("sync: unlock of unlocked mutex") }Nếu là chế độ đói, thì trực tiếp giải phóng semaphore, hoàn thành mở khóa. Trong chế độ đói, coroutine mở khóa hiện tại sẽ trực tiếp chuyển quyền sở hữu lock cho coroutine chờ tiếp theo.
goif new&mutexStarving == 0 { ... } else { runtime_Semrelease(&m.sema, true, 1) }Không phải chế độ đói, đi vào流程 mở khóa bình thường
Nếu không có coroutine đang chờ, hoặc có coroutine khác được đánh thức đã có được lock, hoặc lock đi vào chế độ đói
goif old>>mutexWaiterShift == 0 || old&(mutexLocked|mutexWoken|mutexStarving) != 0 { return }Nếu không thì, giải phóng semaphore đánh thức coroutine chờ tiếp theo, đặt trạng thái lock hiện tại thành
mutexWokengonew = (old - 1<<mutexWaiterShift) | mutexWoken if atomic.CompareAndSwapInt32(&m.state, old, new) { runtime_Semrelease(&m.sema, false, 1) return } old = m.state
Cuối cùng,流程 mở khóa kết thúc.
RWMutex
Định nghĩa loại của read-write mutex RWMutex nằm trong file sync/rwmutex.go, implementation của nó cũng dựa trên mutex.
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount atomic.Int32 // number of pending readers
readerWait atomic.Int32 // number of departing readers
}Giải thích các trường dưới đây
w, một mutex, writer coroutine giữ mutex này, các writer coroutine và reader coroutine khác sẽ bị chặn.writerSem, write semaphore, dùng để chặn writer coroutine chờ reader coroutine, writer coroutine lấy semaphore, reader coroutine giải phóng semaphore.readerSem, read semaphore, dùng để chặn reader coroutine chờ writer coroutine, reader coroutine lấy semaphore, writer coroutine giải phóng semaphore.readerCount, trường cốt lõi, toàn bộ read-write lock đều dựa vào nó để duy trì trạng thái.readerWait, biểu thị số lượng reader coroutine cần chờ khi writer coroutine bị chặn
Nguyên lý đại khái của read-write lock là, thông qua mutex để khiến các writer coroutine loại trừ nhau, thông qua hai semaphore writerSem và readerSem để khiến đọc viết loại trừ, đọc đọc chia sẻ.
readerCount
Vì readerCount này biến hóa khá nhiều, và rất quan trọng, nên riêng ra nói, nó đại khái quy nạp thành mấy trạng thái sau
- 0, hiện tại không có reader coroutine active cũng không có writer coroutine active, ở trạng thái idle
-rwmutexMaxReaders, một writer coroutine đã giữ mutex, hiện tại không có reader coroutine active-rwmutexMaxReaders+N, một writer coroutine đã giữ write lock, reader coroutine hiện tại cần chặn chờ writer coroutine giải phóng write lockN-rwmutexMaxReaders, một writer coroutine đã giữ mutex, cần chặn chờ reader coroutine còn lại giải phóng read lockN, hiện tại có N reader coroutine active, tức là thêmNread lock, không có writer coroutine active
Trong đó rwmutexMaxReaders là một hằng số, giá trị của nó là 2 lần số lượng coroutine chặn chờ của mutex, vì một nửa là reader coroutine, một nửa là writer coroutine.
const rwmutexMaxReaders = 1 << 30Toàn bộ phần read-write lock này thì readerCount này phức tạp hơn, hiểu được biến hóa của nó thì cũng搞明白 được流程 làm việc của read-write lock.
TryLock
Vẫn như cũ, trước tiên xem TryLock() đơn giản nhất
func (rw *RWMutex) TryLock() bool {
if !rw.w.TryLock() {
return false
}
if !rw.readerCount.CompareAndSwap(0, -rwmutexMaxReaders) {
rw.w.Unlock()
return false
}
return true
}Bắt đầu, nó sẽ thử gọi TryLock() của mutex, nếu thất bại thì trực tiếp trả về. Sau đó dùng thao tác CAS thử cập nhật giá trị readerCount từ 0 thành -rwmutexMaxReaders. 0 đại biểu không có reader coroutine đang làm việc, -rwmutexMaxReaders biểu thị hiện tại writer coroutine đã giữ mutex. Thao tác CAS cập nhật thất bại thì mở khóa mutex, thành công thì trả về true.
Lock
Tiếp theo là Lock(), implementation của nó cũng rất đơn giản.
func (rw *RWMutex) Lock() {
rw.w.Lock()
r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
if r != 0 && rw.readerWait.Add(r) != 0 {
runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
}
}Trước tiên nó sẽ tranh chấp với các writer coroutine khác cho đến khi giữ mutex, sau đó thực hiện thao tác này, trước tiên nguyên tử trừ -rwmutexMaxReaders, sau đó phi nguyên tử cộng giá trị mới nhận được với rwmutexMaxReaders
r = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReadersTách thành hai bước để xem
Đây là để thông báo cho các reader coroutine khác hiện tại có writer coroutine thử giữ lock, đã讲 trong phần
TryLock.gorw.readerCount.Add(-rwmutexMaxReaders)Lại cộng
rwmutexMaxReaders得到 r, r này đại biểu số lượng reader coroutine đang làm việc hiện tại.gor = rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
Sau đó判断 xem có reader coroutine đang làm việc không, sau đó cộng r vào giá trị readerWait, cuối cùng vẫn không bằng 0 thì biểu thị cần chờ các reader coroutine này làm việc xong, thì đi vào流程 runtime_SemacquireRWMutex thử lấy semaphore writerSem, semaphore này là do reader coroutine giải phóng, nếu có thể拿到 semaphore thì biểu thị reader coroutine đã làm việc xong, nếu không thì cần đi vào blocking queue chờ (phần logic semaphore này cơ bản giống với mutex).
UnLock
Sau đó là UnLock(), giải phóng write lock.
func (rw *RWMutex) Unlock() {
r := rw.readerCount.Add(rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
fatal("sync: Unlock of unlocked RWMutex")
}
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
rw.w.Unlock()
}流程 của nó như sau
Đã đề cập trước đó khi khóa sẽ cập nhật
readerCountthành giá trị âm, ở đây lại cộngrwmutexMaxReaders, thì biểu thị hiện tại không có writer coroutine đang làm việc, sau đó得到 giá trị là số lượng reader coroutine đang chặn chờ.gor := rw.readerCount.Add(rwmutexMaxReaders)Nếu bản thân nó là 0 hoặc lớn hơn 0, đại biểu write lock đã được giải phóng
goif r >= rwmutexMaxReaders { fatal("sync: Unlock of unlocked RWMutex") }Giải phóng semaphore
readerSem, đánh thức reader coroutine đang chờgofor i := 0; i < int(r); i++ { runtime_Semrelease(&rw.readerSem, false, 0) }Cuối cùng lại giải phóng mutex, đánh thức writer coroutine đang chờ.
gorw.w.Unlock()
Giải phóng write lock hoàn thành.
TryRLock
Tiếp theo xem phần read lock, đây là code của TryRLock
func (rw *RWMutex) TryRLock() bool {
for {
c := rw.readerCount.Load()
if c < 0 {
return false
}
if rw.readerCount.CompareAndSwap(c, c+1) {
return true
}
}
}Nó tổng cộng chỉ làm hai việc
1.判断 xem có writer coroutine đang làm việc không, có thì khóa thất bại.
c := rw.readerCount.Load()
if c < 0 {
return false
}Thử cộng
readerCountlên 1, nếu cập nhật thành công thì khóa thành cônggoif rw.readerCount.CompareAndSwap(c, c+1) { return true }Nếu không thì tiếp tục vòng lặp判断 cho đến khi thoát
Có thể thấy ở đây dựa vào readerCount đều được duy trì trong phần write lock, đây cũng là lý do tại sao phải讲 write lock trước, vì phần phức tạp cốt lõi đều được duy trì trong phần write lock.
RLock
Logic của RLock càng đơn giản hơn
func (rw *RWMutex) RLock() {
if rw.readerCount.Add(1) < 0 {
runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
}
}Nó sẽ thử cộng giá trị readerCount lên 1, nếu giá trị mới nhận được vẫn nhỏ hơn 0,说明 writer coroutine đang làm việc, thì đi vào流程 chặn semaphore readerSem, coroutine hiện tại sẽ đi vào blocking queue chờ.
RUnLock
RUnLock cũng đơn giản dễ hiểu như vậy
func (rw *RWMutex) RUnlock() {
if r := rw.readerCount.Add(-1); r < 0 {
rw.rUnlockSlow(r)
}
}
func (rw *RWMutex) rUnlockSlow(r int32) {
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
fatal("sync: RUnlock of unlocked RWMutex")
}
if rw.readerWait.Add(-1) == 0 {
runtime_Semrelease(&rw.writerSem, false, 1)
}
}Nó trước tiên sẽ thử trừ readerCount đi một, biểu thị số lượng reader coroutine active giảm một, nếu giá trị nhận được lớn hơn 0 biểu thị có thể trực tiếp giải phóng, vì hiện tại không có writer coroutine giữ mutex, nhỏ hơn 0 biểu thị có writer coroutine đã giữ mutex, nó đang chờ tất cả reader coroutine hiện tại hoàn thành công việc. Sau đó đi vào流程 runlockSlow
Nếu giá trị
readerCountgốc là 0 (lock idle) hoặc là-rwmutexMaxReaders(writer coroutine không có reader coroutine cần chờ, tức read lock đã giải phóng hết), thì biểu thị hiện tại không có reader coroutine active, không cần mở khóagoif r+1 == 0 || r+1 == -rwmutexMaxReaders { fatal("sync: RUnlock of unlocked RWMutex") }Nếu có reader coroutine active, thì trừ
readerWaitđi một, nếu reader coroutine hiện tại là reader active cuối cùng, thì giải phóng semaphorewriterSem, đánh thức writer coroutine đang chờ.goif rw.readerWait.Add(-1) == 0 { runtime_Semrelease(&rw.writerSem, false, 1) }
Giải phóng read lock kết thúc.
Semaphore
Semaphore trong mutex chỉ là một số nguyên uint32 đơn thuần, thông qua nguyên tử trừ một và cộng một để biểu thị việc lấy và giải phóng semaphore, trong runtime负责 duy trì cấu trúc semaphore là runtime.semaRoot, định nghĩa loại của nó nằm trong file runtime/sema.go. semaRoot sử dụng một balanced binary tree (treap) để tổ chức và quản lý semaphore, mỗi node trong tree đại diện cho một semaphore, loại node là *sudog, nó là một doubly linked list, duy trì wait queue của semaphore tương ứng, node thông qua *sudog.elem (địa chỉ semaphore) giữ tính duy nhất, và thông qua trường *sudog.ticket đảm bảo tính balanced của tree.
type semaRoot struct {
lock mutex
treap *sudog // root of balanced tree of unique waiters.
nwait atomic.Uint32 // Number of waiters. Read w/o the lock.
}Cây semaphore semaRoot dựa vào một mutex cấp thấp hơn runtime.mutex để đảm bảo tính an toàn đồng thời của nó.
var semtable semTable
// Prime to not correlate with any user patterns.
const semTabSize = 251
type semTable [semTabSize]struct {
root semaRoot
// 用于内存对齐,提高性能
pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte
}semaRoot trong runtime được lưu trữ trong một semaTable toàn cục, nó nhìn giống một array độ dài cố định, dùng để lưu trữ nhiều root node集合 của cây semaphore, nhưng thực tế từ cách运作 mà xem, nó thực ra là một hash table. Mỗi phần tử trong table chứa một semaRoot và một số byte padding (pad), dùng để căn chỉnh bộ nhớ và tránh cache line contention. semTabSize là hằng số kích thước semaphore table, chỉ định độ dài của table là 251, thường chọn một số nguyên tố, có thể giảm hash collision, nâng cao hiệu suất hash.
func (t *semTable) rootFor(addr *uint32) *semaRoot {
return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root
}Phương thức rootFor tương đương với hash function, nó nhận một con trỏ uint32 loại addr (tức địa chỉ semaphore), trả về con trỏ của cấu trúc semaRoot tương ứng với địa chỉ đó. Dòng code này trước tiên chuyển addr thành một uintptr, sau đó right shift 3 bit, tương đương với chia cho 8 (vì một byte chiếm 8 bit, địa chỉ con trỏ chia cho 8 có thể ánh xạ nó thành index của array), thông qua lấy modulo semTabSize, đảm bảo index trong phạm vi kích thước semaphore table, thông qua index得到 semaRoot sau đó, lại đi vào balanced tree tìm *sudog wait queue tương ứng với semaphore.
Acquire
Lấy semaphore, implementation tương ứng là hàm runtime.semacquire1,
func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int, reason waitReason)Nó nhận mấy tham số dưới đây:
addr, địa chỉ semaphorelifo, ảnh hưởng thứ tự出队 của balanced tree, mặc định là FIFO, LIFO tức后进先出, khi thời gian coroutine chờ lock không bằng 0 (ít nhất đã chặn một lần), nó làtrueprofile,标志 dùng để phân tích hiệu suất lockskipframes, số lượng stack frame bỏ quareason, lý do chặn
Dưới đây sẽ mô tả ngắn gọn toàn bộ流程 lấy semaphore:
1.判断 trạng thái coroutine, nếu coroutine hiện tại không phải coroutine đang được scheduling, trực tiếp ném exception
gp := getg()
if gp != gp.m.curg {
throw("semacquire not on the G stack")
}2.判断 xem có thể lấy semaphore không, và thử thông qua phương thức không chặn để lấy semaphore, nếu có thể lấy thì có thể trực tiếp trả về.
for {
v := atomic.Load(addr)
if v == 0 {
return false
}
if atomic.Cas(addr, v, v-1) {
return true
}
}Nếu không thể lấy không chặn, thì sẽ đi vào vòng lặp thông qua phương thức bình thường để lấy semaphore, trước tiên thông qua
acquireSudog()lấy một*sudogtừ cache, cấu trúc này biểu thị một coroutine chặn chờs := acquireSudog()Sau đó từ global table得到 cây semaphore
goroot := semtable.rootFor(addr)Đi vào vòng lặp, khóa cây semaphore, lại判断 xem có thể lấy semaphore không, không thì thêm nó vào cây semaphore, sau đó gọi
goparksuspend nó chờ, cho đến khi được đánh thức tiếp tục lặp lại quá trình này,一直 lặp cho đến khi có được semaphore.gofor { lockWithRank(&root.lock, lockRankRoot) root.nwait.Add(1) if cansemacquire(addr) { root.nwait.Add(-1) unlock(&root.lock) break } root.queue(addr, s, lifo) goparkunlock(&root.lock, reason, traceBlockSync, 4+skipframes) if s.ticket != 0 || cansemacquire(addr) { break } }Cuối cùng khi được đánh thức sẽ giải phóng
*sudog, trả nó lại cache.goreleaseSudog(s)
Release
Giải phóng semaphore, đánh thức coroutine chặn chờ, chức năng này do hàm runtime.semrelease1 thực hiện
func semrelease1(addr *uint32, handoff bool, skipframes int)Nó nhận các tham số sau
addr, địa chỉ semaphorehandoff, biểu thị có chuyển G đang được P scheduling hiện tại thành G được đánh thức không, chỉ làtruetrong chế độ đóiskipframes, số lượng stack frame bỏ qua
Dưới đây mô tả ngắn gọn toàn bộ quá trình giải phóng:
Lấy cây semaphore, sau đó semaphore cộng một, biểu thị giải phóng một semaphore
goroot := semtable.rootFor(addr) atomic.Xadd(addr, 1)Nếu số coroutine chờ là 0, thì trực tiếp trả về
goif root.nwait.Load() == 0 { return }Khóa cây semaphore,二次判断 xem có coroutine chờ không
golockWithRank(&root.lock, lockRankRoot) if root.nwait.Load() == 0 { unlock(&root.lock) return }Từ cây semaphore得到 một coroutine chặn chờ,
nwaittrừ một, sau đó giải phóng lock của semaphoregos, t0, tailtime := root.dequeue(addr) if s != nil { root.nwait.Add(-1) } unlock(&root.lock)
5.判断 xem có thể lấy semaphore không
if handoff && cansemacquire(addr) {
s.ticket = 1
}Hàm
readyWithTimesẽ trực tiếp lấy G được đánh thức làm G tiếp theo sẽ chạy của P, tức là sửa*p.runnext=g.goreadyWithTime(s, 5+skipframes)Nếu
handofflàtrue, thìgoyieldsẽ khiến coroutine G giải phóng semaphore hiện tại unbind với M hiện tại, và thêm lại vào cuối P local run queue, sau đó bắt đầu vòng lặp scheduling mới, để có thể让 G được đánh thức lập tức được schedulinggoif s.ticket == 1 && getg().m.locks == 0 { goyield() }
流程 lấy và giải phóng semaphore là như vậy, trong ngôn ngữ Go sử dụng semaphore không chỉ mutex, đặt ở đây là vì semaphore và mutex có liên quan lớn nhất, official thậm chí đều viết rõ trên comment
// Asynchronous semaphore for sync.Mutex.Hiểu xong semaphore, rồi quay lại xem mutex sẽ rõ ràng hơn.
TIP
Về cây semaphore semaRoot, việc出队入队 của nó vì liên quan đến self-balancing operation implementation khá phiền phức, đào sâu những cái này không liên quan đến chủ đề bài viết này và không có ý nghĩa, nên đã che đi,感兴趣 có thể tự tìm hiểu source code.
Tóm tắt
Mutex sync.Mutex thông qua hai cơ chế tự旋 và semaphore để thực hiện việc chờ của coroutine. Tự旋 là không chặn, nhưng cần hạn chế nghiêm ngặt việc sử dụng, vì nó sẽ tiêu tốn CPU resource; còn semaphore thì chặn, có thể tránh hiệu quả resource tiêu tốn không cần thiết. Để thực hiện cơ chế cạnh tranh công bằng hơn, Go thông qua phân biệt chế độ bình thường và chế độ đói để đảm bảo coroutine trong quá trình cạnh tranh lock có thể cân bằng hơn. So với lock cấp thấp runtime.mutex, sync.Mutex là lock hướng đến user, khi thiết kế cân nhắc nhiều kịch bản sử dụng thực tế hơn.
Read-write lock sync.RWMutex, thông qua mutex sync.Mutex để thực hiện viết viết loại trừ, và trên cơ sở này thêm hai semaphore, dùng để thực hiện đọc viết loại trừ và đọc đọc chia sẻ, từ đó hỗ trợ nhiều kịch bản đồng thời.
Mặc dù implementation của lock nhìn có vẻ phức tạp, nhưng một khi hiểu nguyên lý của Mutex, rồi học các công cụ đồng bộ khác trong thư viện chuẩn sync sẽ trở nên nhẹ nhàng hơn nhiều.
