cond
sync.Cond adalah variabel kondisi di pustaka standar Go, merupakan satu-satunya alat sinkronisasi yang memerlukan inisialisasi manual. Berbeda dengan primitif sinkronisasi lainnya, sync.Cond memerlukan mutex (sync.Mutex) untuk melindungi akses ke sumber daya bersama. Variabel ini memungkinkan goroutine menunggu hingga kondisi tertentu terpenuhi sebelum dibangunkan.
Contoh Kode
package main
import (
"fmt"
"sync"
"time"
)
var i = 0
func main() {
var mu sync.Mutex
var wg sync.WaitGroup
// Buat variabel kondisi, dan传入 mutex
cd := sync.NewCond(&mu)
// Tambahkan 4 goroutine yang akan diproses
wg.Add(4)
// Buat 3 goroutine, setiap goroutine akan menunggu kondisi terpenuhi
for j := range 3 {
go func() {
defer wg.Done()
mu.Lock()
for i <= 100 {
// Ketika kondisi tidak terpenuhi, goroutine akan terhambat di sini
cd.Wait()
}
fmt.Printf("%d wake up\n", j)
mu.Unlock()
}()
}
// Buat goroutine untuk memperbarui kondisi dan membangunkan goroutine lain
go func() {
defer wg.Done()
for {
mu.Lock()
i++ // Perbarui variabel bersama
mu.Unlock()
if i > 100 {
cd.Broadcast() // Bangunkan semua goroutine yang menunggu ketika kondisi terpenuhi
break
}
time.Sleep(time.Millisecond * 10) // Simulasikan beban kerja
}
}()
// Tunggu semua goroutine selesai
wg.Wait()
}Dalam contoh di atas, variabel bersama i diakses dan dimodifikasi secara konkuren oleh beberapa goroutine. Mutex mu digunakan untuk memastikan akses ke i aman dalam kondisi konkuren. Kemudian, melalui sync.NewCond(&mu) dibuat variabel kondisi cd, yang bergantung pada lock mu untuk memastikan akses sinkron ke sumber daya bersama saat menunggu.
- Tiga goroutine yang menunggu: Setiap goroutine menghambat dirinya sendiri melalui
cd.Wait(), hingga kondisi terpenuhi (i > 100). Goroutine-goroutine ini akan tetap dalam status terhambat hingga nilai variabel bersamaidiperbarui. - Satu goroutine yang memperbarui kondisi dan membangunkan goroutine lain: Ketika kondisi terpenuhi (yaitu
i > 100), goroutine ini membangunkan semua goroutine yang menunggu melaluicd.Broadcast(), memungkinkan mereka继续执行.
Struktur
type Cond struct {
// L dipegang saat mengamati atau mengubah kondisi
L Locker
notify notifyList
}
type notifyList struct {
// wait adalah nomor tiket waiter berikutnya. Ini ditambahkan secara atomik
// di luar lock.
wait atomic.Uint32
notify uint32
// Daftar waiter yang diparkir.
lock mutex
head *sudog
tail *sudog
}Strukturnya tidak terlalu kompleks:
L, mutex, tipenya di sini adalah antarmukaLocker, bukan tipe lock konkretnotify, linked list notifikasi goroutine yang menunggu
Yang cukup penting adalah struktur runtime.notifyList
wait, nilai atomik, mencatat berapa banyak goroutine yang menunggunotify, menunjuk ke goroutine berikutnya yang akan dibangunkan, mulai dari 0 dan bertambahlock, mutex, bukan lock yang kita传入, tetapi lock yang diimplementasikan internalruntimehead,tail, pointer linked list
Ia hanya memiliki tiga metode
Wait,阻塞等待Signal, membangunkan satu goroutine yang menungguBroadcast, membangunkan semua goroutine yang menunggu
Sebagian besar implementasinya disembunyikan di bawah pustaka runtime, implementasi ini terletak di file runtime/sema.go, sehingga kodenya sangat singkat di pustaka standar, prinsip dasarnya adalah antrian阻塞 yang dilengkapi lock.
Wait
Metode Wait akan membuat goroutine sendiri陷入阻塞等待, hingga dibangunkan.
func (c *Cond) Wait() {
t := runtime_notifyListAdd(&c.notify)
c.L.Unlock()
runtime_notifyListWait(&c.notify, t)
c.L.Lock()
}Pertama-tama ia akan menambahkan dirinya sendiri ke notifyList, tetapi sebenarnya hanya menambahkan notifyList.wait satu而已, operasi di sini setara dengan len(notifyList)-1, mendapatkan indeks elemen terakhir
func notifyListAdd(l *notifyList) uint32 {
return l.wait.Add(1) - 1
}Operasi penambahan sebenarnya diselesaikan di fungsi notifyListWait
func notifyListWait(l *notifyList, t uint32) {
...
}Di fungsi ini, pertama-tama ia akan mengunci linked list, kemudian cepat memeriksa apakah goroutine saat ini sudah dibangunkan, jika sudah dibangunkan langsung return, tidak perlu阻塞等待.
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
unlock(&l.lock)
return
}Jika tidak dibangunkan, maka dibentuk sebagai sudog dan ditambahkan ke antrian, kemudian ditangguhkan melalui gopark.
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
l.head = s
} else {
l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)Setelah dibangunkan, lepaskan struktur sudog
releaseSudog(s)Signal
Signal akan membangunkan goroutine yang terhambat sesuai urutan FIFO antrian
func (c *Cond) Signal() {
runtime_notifyListNotifyOne(&c.notify)
}Alurnya adalah sebagai berikut
Tanpa lock langsung periksa, apakah
l.waitsama denganl.notify, jika sama berarti semua goroutine sudah dibangunkangoif l.wait.Load() == atomic.Load(&l.notify) { return }Setelah dikunci, periksa lagi apakah semua sudah dibangunkan
golockWithRank(&l.lock, lockRankNotifyList) t := l.notify if t == l.wait.Load() { unlock(&l.lock) return }Tambahkan
l.notifysatugoatomic.Store(&l.notify, t+1)Loop遍历 linked list, temukan goroutine yang perlu dibangunkan, terakhir bangunkan goroutine melalui
runtime.goready.gofor p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { if s.ticket == t { n := s.next if p != nil { p.next = n } else { l.head = n } if n == nil { l.tail = p } unlock(&l.lock) s.next = nil readyWithTime(s, 4) return } } unlock(&l.lock)
Broadcast
Broadcast akan membangunkan semua goroutine yang terhambat
func (c *Cond) Broadcast() {
runtime_notifyListNotifyAll(&c.notify)
}Alurnya pada dasarnya sama
Periksa tanpa lock, apakah semua sudah dibangunkan
go// Fast-path: if there are no new waiters since the last notification // we don't need to acquire the lock. if l.wait.Load() == atomic.Load(&l.notify) { return }Kunci, kosongkan linked list, kemudian lepaskan lock, goroutine baru yang tiba kemudian akan ditambahkan ke kepala linked list
golockWithRank(&l.lock, lockRankNotifyList) s := l.head l.head = nil l.tail = nil atomic.Store(&l.notify, l.wait.Load()) unlock(&l.lock)
3.遍历 linked list, bangunkan semua goroutine
for s != nil {
next := s.next
s.next = nil
readyWithTime(s, 4)
s = next
}Ringkasan
sync.Cond Skenario penggunaan yang paling umum adalah perlu menyinkronkan kondisi tertentu antar beberapa goroutine, biasanya diterapkan dalam model producer-consumer, penjadwalan tugas, dan skenario lainnya. Dalam skenario ini, beberapa goroutine perlu menunggu kondisi tertentu terpenuhi sebelum继续执行, atau perlu memberi tahu beberapa goroutine ketika kondisi berubah. Variabel ini menyediakan cara yang fleksibel dan efisien untuk mengelola sinkronisasi antar goroutine. Dengan bekerja sama dengan mutex, sync.Cond dapat memastikan keamanan akses sumber daya bersama, dan dapat mengontrol urutan eksekusi goroutine ketika kondisi tertentu terpenuhi. Memahami prinsip implementasi internalnya membantu kita lebih baik menguasai teknik pemrograman konkuren, terutama ketika melibatkan sinkronisasi kondisi yang kompleks.
