Skip to content

cond

sync.Cond adalah variabel kondisi di pustaka standar Go, merupakan satu-satunya alat sinkronisasi yang memerlukan inisialisasi manual. Berbeda dengan primitif sinkronisasi lainnya, sync.Cond memerlukan mutex (sync.Mutex) untuk melindungi akses ke sumber daya bersama. Variabel ini memungkinkan goroutine menunggu hingga kondisi tertentu terpenuhi sebelum dibangunkan.

Contoh Kode

go
package main

import (
    "fmt"
    "sync"
    "time"
)

var i = 0

func main() {
    var mu sync.Mutex
    var wg sync.WaitGroup

    // Buat variabel kondisi, dan传入 mutex
    cd := sync.NewCond(&mu)

    // Tambahkan 4 goroutine yang akan diproses
    wg.Add(4)

    // Buat 3 goroutine, setiap goroutine akan menunggu kondisi terpenuhi
   	for j := range 3 {
		go func() {
			defer wg.Done()

			mu.Lock()
			for i <= 100 {
                 // Ketika kondisi tidak terpenuhi, goroutine akan terhambat di sini
				cd.Wait()
			}
			fmt.Printf("%d wake up\n", j)
			mu.Unlock()
		}()
	}

    // Buat goroutine untuk memperbarui kondisi dan membangunkan goroutine lain
    go func() {
        defer wg.Done()
        for {
            mu.Lock()
            i++ // Perbarui variabel bersama
            mu.Unlock()
            if i > 100 {
                cd.Broadcast() // Bangunkan semua goroutine yang menunggu ketika kondisi terpenuhi
                break
            }
            time.Sleep(time.Millisecond * 10) // Simulasikan beban kerja
        }
    }()

    // Tunggu semua goroutine selesai
    wg.Wait()
}

Dalam contoh di atas, variabel bersama i diakses dan dimodifikasi secara konkuren oleh beberapa goroutine. Mutex mu digunakan untuk memastikan akses ke i aman dalam kondisi konkuren. Kemudian, melalui sync.NewCond(&mu) dibuat variabel kondisi cd, yang bergantung pada lock mu untuk memastikan akses sinkron ke sumber daya bersama saat menunggu.

  • Tiga goroutine yang menunggu: Setiap goroutine menghambat dirinya sendiri melalui cd.Wait(), hingga kondisi terpenuhi (i > 100). Goroutine-goroutine ini akan tetap dalam status terhambat hingga nilai variabel bersama i diperbarui.
  • Satu goroutine yang memperbarui kondisi dan membangunkan goroutine lain: Ketika kondisi terpenuhi (yaitu i > 100), goroutine ini membangunkan semua goroutine yang menunggu melalui cd.Broadcast(), memungkinkan mereka继续执行.

Struktur

go
type Cond struct {
	// L dipegang saat mengamati atau mengubah kondisi
	L Locker

	notify  notifyList
}

type notifyList struct {
	// wait adalah nomor tiket waiter berikutnya. Ini ditambahkan secara atomik
	// di luar lock.
	wait atomic.Uint32

	notify uint32

	// Daftar waiter yang diparkir.
	lock mutex
	head *sudog
	tail *sudog
}

Strukturnya tidak terlalu kompleks:

  • L, mutex, tipenya di sini adalah antarmuka Locker, bukan tipe lock konkret
  • notify, linked list notifikasi goroutine yang menunggu

Yang cukup penting adalah struktur runtime.notifyList

  • wait, nilai atomik, mencatat berapa banyak goroutine yang menunggu
  • notify, menunjuk ke goroutine berikutnya yang akan dibangunkan, mulai dari 0 dan bertambah
  • lock, mutex, bukan lock yang kita传入, tetapi lock yang diimplementasikan internal runtime
  • head, tail, pointer linked list

Ia hanya memiliki tiga metode

  • Wait,阻塞等待
  • Signal, membangunkan satu goroutine yang menunggu
  • Broadcast, membangunkan semua goroutine yang menunggu

Sebagian besar implementasinya disembunyikan di bawah pustaka runtime, implementasi ini terletak di file runtime/sema.go, sehingga kodenya sangat singkat di pustaka standar, prinsip dasarnya adalah antrian阻塞 yang dilengkapi lock.

Wait

Metode Wait akan membuat goroutine sendiri陷入阻塞等待, hingga dibangunkan.

go
func (c *Cond) Wait() {
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

Pertama-tama ia akan menambahkan dirinya sendiri ke notifyList, tetapi sebenarnya hanya menambahkan notifyList.wait satu而已, operasi di sini setara dengan len(notifyList)-1, mendapatkan indeks elemen terakhir

go
func notifyListAdd(l *notifyList) uint32 {
	return l.wait.Add(1) - 1
}

Operasi penambahan sebenarnya diselesaikan di fungsi notifyListWait

go
func notifyListWait(l *notifyList, t uint32) {
	...
}

Di fungsi ini, pertama-tama ia akan mengunci linked list, kemudian cepat memeriksa apakah goroutine saat ini sudah dibangunkan, jika sudah dibangunkan langsung return, tidak perlu阻塞等待.

go
lockWithRank(&l.lock, lockRankNotifyList)
// Return right away if this ticket has already been notified.
if less(t, l.notify) {
	unlock(&l.lock)
	return
}

Jika tidak dibangunkan, maka dibentuk sebagai sudog dan ditambahkan ke antrian, kemudian ditangguhkan melalui gopark.

go
s := acquireSudog()
s.g = getg()
s.ticket = t
s.releasetime = 0
if l.tail == nil {
	l.head = s
} else {
	l.tail.next = s
}
l.tail = s
goparkunlock(&l.lock, waitReasonSyncCondWait, traceBlockCondWait, 3)

Setelah dibangunkan, lepaskan struktur sudog

go
releaseSudog(s)

Signal

Signal akan membangunkan goroutine yang terhambat sesuai urutan FIFO antrian

go
func (c *Cond) Signal() {
	runtime_notifyListNotifyOne(&c.notify)
}

Alurnya adalah sebagai berikut

  1. Tanpa lock langsung periksa, apakah l.wait sama dengan l.notify, jika sama berarti semua goroutine sudah dibangunkan

    go
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. Setelah dikunci, periksa lagi apakah semua sudah dibangunkan

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    t := l.notify
    if t == l.wait.Load() {
    	unlock(&l.lock)
    	return
    }
  3. Tambahkan l.notify satu

    go
    atomic.Store(&l.notify, t+1)
  4. Loop遍历 linked list, temukan goroutine yang perlu dibangunkan, terakhir bangunkan goroutine melalui runtime.goready.

    go
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
    	if s.ticket == t {
    		n := s.next
    		if p != nil {
    			p.next = n
    		} else {
    			l.head = n
    		}
    		if n == nil {
    			l.tail = p
    		}
    		unlock(&l.lock)
    		s.next = nil
    		readyWithTime(s, 4)
    		return
    	}
    }
    unlock(&l.lock)

Broadcast

Broadcast akan membangunkan semua goroutine yang terhambat

go
func (c *Cond) Broadcast() {
    runtime_notifyListNotifyAll(&c.notify)
}

Alurnya pada dasarnya sama

  1. Periksa tanpa lock, apakah semua sudah dibangunkan

    go
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if l.wait.Load() == atomic.Load(&l.notify) {
    	return
    }
  2. Kunci, kosongkan linked list, kemudian lepaskan lock, goroutine baru yang tiba kemudian akan ditambahkan ke kepala linked list

    go
    lockWithRank(&l.lock, lockRankNotifyList)
    s := l.head
    l.head = nil
    l.tail = nil
    atomic.Store(&l.notify, l.wait.Load())
    unlock(&l.lock)

3.遍历 linked list, bangunkan semua goroutine

go
for s != nil {
	next := s.next
	s.next = nil
	readyWithTime(s, 4)
	s = next
}

Ringkasan

sync.Cond Skenario penggunaan yang paling umum adalah perlu menyinkronkan kondisi tertentu antar beberapa goroutine, biasanya diterapkan dalam model producer-consumer, penjadwalan tugas, dan skenario lainnya. Dalam skenario ini, beberapa goroutine perlu menunggu kondisi tertentu terpenuhi sebelum继续执行, atau perlu memberi tahu beberapa goroutine ketika kondisi berubah. Variabel ini menyediakan cara yang fleksibel dan efisien untuk mengelola sinkronisasi antar goroutine. Dengan bekerja sama dengan mutex, sync.Cond dapat memastikan keamanan akses sumber daya bersama, dan dapat mengontrol urutan eksekusi goroutine ketika kondisi tertentu terpenuhi. Memahami prinsip implementasi internalnya membantu kita lebih baik menguasai teknik pemrograman konkuren, terutama ketika melibatkan sinkronisasi kondisi yang kompleks.

Golang by www.golangdev.cn edit