Skip to content

cond

sync.Cond là biến điều kiện trong thư viện chuẩn của Go, nó là công cụ đồng bộ hóa duy nhất cần được khởi tạo thủ công. Khác với các đồng bộ nguyên thủy khác, sync.Cond cần truyền vào một mutex (sync.Mutex) để bảo vệ việc truy cập tài nguyên chia sẻ. Nó cho phép coroutine đi vào trạng thái chờ đợi trước khi một điều kiện nào đó được thỏa mãn, và được đánh thức khi điều kiện được đáp ứng.

Code ví dụ

go
package main

import (
    "fmt"
    "sync"
    "time"
)

var i = 0

func main() {
    var mu sync.Mutex
    var wg sync.WaitGroup

    // Tạo một biến điều kiện, và truyền vào mutex
    cd := sync.NewCond(&mu)

    // Thêm 4 coroutine cần xử lý
    wg.Add(4)

    // Tạo 3 coroutine, mỗi coroutine sẽ chờ điều kiện được thỏa mãn
   	for j := range 3 {
		go func() {
			defer wg.Done()

			mu.Lock()
			for i <= 100 {
                 // Khi điều kiện không được thỏa mãn, coroutine sẽ bị chặn ở đây
				cd.Wait()
			}
			fmt.Printf("%d wake up\n", j)
			mu.Unlock()
		}()
	}

    // Tạo một coroutine để cập nhật điều kiện và đánh thức các coroutine khác
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            i++ // Cập nhật biến chia sẻ
            mu.Unlock()
            if i > 100 {
                cd.Broadcast() // Đánh thức tất cả các coroutine đang chờ khi điều kiện được thỏa mãn
                break
            }
            time.Sleep(time.Millisecond * 10) // Mô phỏng workload
        }
    }()

    // Chờ tất cả các coroutine hoàn thành
    wg.Wait()
}

Trong ví dụ trên, biến chia sẻ i được truy cập và sửa đổi đồng thời bởi nhiều coroutine. Thông qua mutex mu để đảm bảo rằng trong điều kiện đồng thời, việc truy cập i là an toàn. Sau đó, thông qua sync.NewCond(&mu) tạo một biến điều kiện cd, nó phụ thuộc vào lock mu để đảm bảo việc truy cập tài nguyên chia sẻ là đồng bộ khi chờ đợi.

  • Ba coroutine đang chờ: Mỗi coroutine thông qua cd.Wait() tự chặn mình, cho đến khi điều kiện được thỏa mãn (i > 100). Các coroutine này sẽ ở trạng thái chặn cho đến khi giá trị của tài nguyên chia sẻ i được cập nhật.
  • Một coroutine cập nhật điều kiện và đánh thức các coroutine khác: Khi điều kiện được thỏa mãn (tức là i > 100), coroutine này thông qua cd.Broadcast() đánh thức tất cả các coroutine đang chờ, để chúng tiếp tục thực thi.

Cấu trúc

go
type Cond struct {
	// L được giữ trong khi quan sát hoặc thay đổi điều kiện
	L Locker

	notify  notifyList
}

type notifyList struct {
	// wait là số ticket của waiter tiếp theo. Nó được tăng nguyên tử bên ngoài lock.
	wait atomic.Uint32

	notify uint32

	// Danh sách các waiter đang park.
	lock mutex
	head *sudog
	tail *sudog
}

Cấu trúc của nó không phức tạp:

  • L, mutex, loại ở đây là interface Locker, chứ không phải loại lock cụ thể
  • notify, danh sách thông báo của các coroutine đang chờ

Quan trọng hơn là cấu trúc runtime.notifyList

  • wait, giá trị nguyên tử, ghi lại có bao nhiêu coroutine đang chờ
  • notify, trỏ đến coroutine tiếp theo sẽ được đánh thức, bắt đầu tăng từ 0
  • lock, mutex, không phải là lock mà chúng ta truyền vào, mà là một lock được triển khai nội bộ trong runtime
  • head, tail, con trỏ linked list

Nó总共 chỉ có ba phương thức

  • Wait, chặn và chờ đợi
  • Signal, đánh thức một coroutine đang chờ
  • Broadcast, đánh thức tất cả các coroutine đang chờ

Hầu hết việc triển khai của nó đều được ẩn trong thư viện runtime, những triển khai này nằm trong file runtime/sema.go, đến mức code của nó trong thư viện chuẩn rất ngắn gọn, nguyên lý cơ bản của nó là một blocking queue có thêm lock.

Wait

Phương thức Wait sẽ khiến coroutine tự rơi vào trạng thái chặn chờ, cho đến khi được đánh thức.

go
func (c *Cond) Wait() {
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Nó trước tiên sẽ thêm chính nó vào notifyList, nhưng thực ra chỉ là tăng notifyList.wait lên một mà thôi, thao tác ở đây tương đương với len(notifyList)-1, nhận được index của phần tử cuối cùng

go
func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

Thao tác thêm thực sự được hoàn thành trong hàm notifyListWait

go
func notifyListWait(l *notifyList, t uint32) {
	...
}

Trong hàm này, nó trước tiên sẽ khóa linked list, sau đó nhanh chóng判断 xem coroutine hiện tại có được đánh thức chưa, nếu đã được đánh thức thì trực tiếp trả về, không cần chặn chờ.

go
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
	unlock(&l.lock)
	return
}

Nếu không được đánh thức, thì cấu trúc thành sudog thêm vào queue, sau đó thông qua gopark suspend.

go
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
	l.head = s
} else {
	l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)

Sau khi được đánh thức thì giải phóng cấu trúc sudog

go
releaseSudog(s)

Signal

Signal sẽ đánh thức các coroutine bị chặn theo thứ tự FIFO của queue

go
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

Quy trình của nó như sau

  1. Không khóa trực tiếp判断, l.wait có bằng l.notify không, bằng thì biểu thị tất cả coroutine đều đã được đánh thức

    go
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. Sau khi khóa, lại判断 một lần nữa xem có tất cả đều đã được đánh thức chưa

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == l.wait.Load() {
    	unlock(&l.lock)
    	return
    }
  3. l.notify tăng một

    go
    atomic.Store(&l.notify, t+1)
  4. Duyệt linked list, tìm coroutine cần được đánh thức, cuối cùng thông qua runtime.goready để đánh thức coroutine.

    go
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    	if s.ticket == t {
    		n := s.next
    		if p != nil {
    			p.next = n
    		} else {
    			l.head = n
    		}
    		if n == nil {
    			l.tail = p
    		}
    		unlock(&l.lock)
    		s.next = nil
    		readyWithTime(s, 4)
    		return
    	}
    }
    unlock(&l.lock)

Broadcast

Broadcast sẽ đánh thức tất cả các coroutine bị chặn

go
func (c *Cond) Broadcast() {
    runtime_notifyListNotifyAll(&c.notify)
}

Quy trình của nó cơ bản là giống nhau

  1. Kiểm tra không lock, xem có tất cả đều đã được đánh thức chưa

    go
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. Khóa, xóa linked list, sau đó giải phóng lock, các coroutine mới đến sau sẽ được thêm vào đầu linked list

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    atomic.Store(&l.notify, l.wait.Load())
    unlock(&l.lock)
  3. Duyệt linked list, đánh thức tất cả các coroutine

    go
    for s != nil {
    	next := s.next
    	s.next = nil
    	readyWithTime(s, 4)
    	s = next
    }

Tóm tắt

sync.Cond kịch bản sử dụng phổ biến nhất là cần đồng bộ hóa một số điều kiện giữa nhiều coroutine, thường được áp dụng trong các kịch bản như mô hình producer-consumer, lập lịch tác vụ, v.v. Trong những kịch bản này, nhiều coroutine cần chờ một số điều kiện được thỏa mãn mới có thể tiếp tục thực thi, hoặc cần thông báo cho nhiều coroutine khi điều kiện thay đổi. Nó cung cấp một cách linh hoạt và hiệu quả để quản lý đồng bộ hóa giữa các coroutine. Thông qua việc phối hợp sử dụng với mutex, sync.Cond có thể đảm bảo an toàn cho việc truy cập tài nguyên chia sẻ, và có thể kiểm soát thứ tự thực thi của coroutine khi một điều kiện cụ thể được thỏa mãn. Hiểu rõ nguyên lý triển khai nội bộ của nó giúp chúng ta nắm bắt tốt hơn các kỹ thuật lập trình đồng thời, đặc biệt là khi liên quan đến đồng bộ hóa điều kiện phức tạp.

Golang by www.golangdev.cn edit