cond
sync.Cond là biến điều kiện trong thư viện chuẩn của Go, nó là công cụ đồng bộ hóa duy nhất cần được khởi tạo thủ công. Khác với các đồng bộ nguyên thủy khác, sync.Cond cần truyền vào một mutex (sync.Mutex) để bảo vệ việc truy cập tài nguyên chia sẻ. Nó cho phép coroutine đi vào trạng thái chờ đợi trước khi một điều kiện nào đó được thỏa mãn, và được đánh thức khi điều kiện được đáp ứng.
Code ví dụ
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Tạo một biến điều kiện, và truyền vào mutex
cd := sync.NewCond(&mu)
// Thêm 4 coroutine cần xử lý
wg.Add(4)
// Tạo 3 coroutine, mỗi coroutine sẽ chờ điều kiện được thỏa mãn
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Khi điều kiện không được thỏa mãn, coroutine sẽ bị chặn ở đây
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Tạo một coroutine để cập nhật điều kiện và đánh thức các coroutine khác
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Cập nhật biến chia sẻ
mu.Unlock()
if i > 100 {
cd.Broadcast() // Đánh thức tất cả các coroutine đang chờ khi điều kiện được thỏa mãn
break
}
time.Sleep(time.Millisecond * 10) // Mô phỏng workload
}
}()
// Chờ tất cả các coroutine hoàn thành
wg.Wait()
}Trong ví dụ trên, biến chia sẻ i được truy cập và sửa đổi đồng thời bởi nhiều coroutine. Thông qua mutex mu để đảm bảo rằng trong điều kiện đồng thời, việc truy cập i là an toàn. Sau đó, thông qua sync.NewCond(&mu) tạo một biến điều kiện cd, nó phụ thuộc vào lock mu để đảm bảo việc truy cập tài nguyên chia sẻ là đồng bộ khi chờ đợi.
- Ba coroutine đang chờ: Mỗi coroutine thông qua
cd.Wait()tự chặn mình, cho đến khi điều kiện được thỏa mãn (i > 100). Các coroutine này sẽ ở trạng thái chặn cho đến khi giá trị của tài nguyên chia sẻiđược cập nhật. - Một coroutine cập nhật điều kiện và đánh thức các coroutine khác: Khi điều kiện được thỏa mãn (tức là
i > 100), coroutine này thông quacd.Broadcast()đánh thức tất cả các coroutine đang chờ, để chúng tiếp tục thực thi.
Cấu trúc
type Cond struct {
// L được giữ trong khi quan sát hoặc thay đổi điều kiện
L Locker
notify notifyList
}
type notifyList struct {
// wait là số ticket của waiter tiếp theo. Nó được tăng nguyên tử bên ngoài lock.
wait atomic.Uint32
notify uint32
// Danh sách các waiter đang park.
lock mutex
head *sudog
tail *sudog
}Cấu trúc của nó không phức tạp:
L, mutex, loại ở đây là interfaceLocker, chứ không phải loại lock cụ thểnotify, danh sách thông báo của các coroutine đang chờ
Quan trọng hơn là cấu trúc runtime.notifyList
wait, giá trị nguyên tử, ghi lại có bao nhiêu coroutine đang chờnotify, trỏ đến coroutine tiếp theo sẽ được đánh thức, bắt đầu tăng từ 0lock, mutex, không phải là lock mà chúng ta truyền vào, mà là một lock được triển khai nội bộ trongruntimehead,tail, con trỏ linked list
Nó总共 chỉ có ba phương thức
Wait, chặn và chờ đợiSignal, đánh thức một coroutine đang chờBroadcast, đánh thức tất cả các coroutine đang chờ
Hầu hết việc triển khai của nó đều được ẩn trong thư viện runtime, những triển khai này nằm trong file runtime/sema.go, đến mức code của nó trong thư viện chuẩn rất ngắn gọn, nguyên lý cơ bản của nó là một blocking queue có thêm lock.
Wait
Phương thức Wait sẽ khiến coroutine tự rơi vào trạng thái chặn chờ, cho đến khi được đánh thức.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Nó trước tiên sẽ thêm chính nó vào notifyList, nhưng thực ra chỉ là tăng notifyList.wait lên một mà thôi, thao tác ở đây tương đương với len(notifyList)-1, nhận được index của phần tử cuối cùng
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}Thao tác thêm thực sự được hoàn thành trong hàm notifyListWait
func notifyListWait(l *notifyList, t uint32) {
...
}Trong hàm này, nó trước tiên sẽ khóa linked list, sau đó nhanh chóng判断 xem coroutine hiện tại có được đánh thức chưa, nếu đã được đánh thức thì trực tiếp trả về, không cần chặn chờ.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}Nếu không được đánh thức, thì cấu trúc thành sudog thêm vào queue, sau đó thông qua gopark suspend.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)Sau khi được đánh thức thì giải phóng cấu trúc sudog
releaseSudog(s)Signal
Signal sẽ đánh thức các coroutine bị chặn theo thứ tự FIFO của queue
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Quy trình của nó như sau
Không khóa trực tiếp判断,
l.waitcó bằngl.notifykhông, bằng thì biểu thị tất cả coroutine đều đã được đánh thứcgoif l.wait.Load() == atomic.Load(&l.notify) { return }Sau khi khóa, lại判断 một lần nữa xem có tất cả đều đã được đánh thức chưa
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }l.notifytăng mộtgoatomic.Store(&l.notify, t+1)Duyệt linked list, tìm coroutine cần được đánh thức, cuối cùng thông qua
runtime.goreadyđể đánh thức coroutine.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast sẽ đánh thức tất cả các coroutine bị chặn
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Quy trình của nó cơ bản là giống nhau
Kiểm tra không lock, xem có tất cả đều đã được đánh thức chưa
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }Khóa, xóa linked list, sau đó giải phóng lock, các coroutine mới đến sau sẽ được thêm vào đầu linked list
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)Duyệt linked list, đánh thức tất cả các coroutine
gofor s != nil { next := s.next s.next = nil readyWithTime(s, 4) s = next }
Tóm tắt
sync.Cond kịch bản sử dụng phổ biến nhất là cần đồng bộ hóa một số điều kiện giữa nhiều coroutine, thường được áp dụng trong các kịch bản như mô hình producer-consumer, lập lịch tác vụ, v.v. Trong những kịch bản này, nhiều coroutine cần chờ một số điều kiện được thỏa mãn mới có thể tiếp tục thực thi, hoặc cần thông báo cho nhiều coroutine khi điều kiện thay đổi. Nó cung cấp một cách linh hoạt và hiệu quả để quản lý đồng bộ hóa giữa các coroutine. Thông qua việc phối hợp sử dụng với mutex, sync.Cond có thể đảm bảo an toàn cho việc truy cập tài nguyên chia sẻ, và có thể kiểm soát thứ tự thực thi của coroutine khi một điều kiện cụ thể được thỏa mãn. Hiểu rõ nguyên lý triển khai nội bộ của nó giúp chúng ta nắm bắt tốt hơn các kỹ thuật lập trình đồng thời, đặc biệt là khi liên quan đến đồng bộ hóa điều kiện phức tạp.
