waitgroup
WaitGroup, disediakan oleh pustaka standar Go, fungsinya adalah untuk menunggu sekumpulan goroutine selesai berjalan.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}Ini adalah kode yang sangat sederhana, fungsinya adalah memulai 10 goroutine untuk mencetak 0-9, dan menunggu mereka selesai berjalan. Penggunaannya tidak akan dijelaskan lagi di sini, selanjutnya kita akan memahami prinsip kerja dasarnya, sama sekali tidak kompleks.
Struktur
Definisi tipenya terletak di file sync/waitgroup.go
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Penjelasan field adalah sebagai berikut:
state, menunjukkan status WaitGroup, 32 bit tinggi digunakan untuk menghitung jumlah goroutine yang ditunggu, 32 bit rendah digunakan untuk menghitung jumlah goroutine yang menunggu wg selesai.sema, semaphore, di pustaka standarsynchampir ada di mana-mana.
Intinya terletak pada dua metode Add() dan Wait(), prinsip kerja dasarnya adalah semaphore, metode Wait() mencoba mendapatkan semaphore, metode Add() melepaskan semaphore, untuk mewujudkan M goroutine menunggu sekumpulan N goroutine selesai berjalan.
Add
Metode Add adalah untuk menambah jumlah goroutine yang perlu ditunggu.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}Alurnya adalah sebagai berikut:
Pertama-tama akan melakukan operasi shift pada
wg.state, masing-masing mendapatkan 32 bit tinggi dan 32 bit rendah, sesuai dengan variabelvdanwgostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Kemudian mulai melakukan判断,
vmewakili penghitung wg,wmewakili jumlah goroutine yang menunggu wg selesaiJika
vkurang dari 0, langsungpanic, bilangan negatif tidak memiliki arti apa-apagoif v < 0 { panic("sync: negative WaitGroup counter") }wtidak sama dengan 0, dandeltasama denganv, menunjukkan metodeWait()dan metodeAdd()dipanggil secara konkuren, ini adalah cara penggunaan yang salahgoif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Jika
vlebih besar dari 0, atauwsama dengan 0, menunjukkan sekarang tidak ada goroutine yang menunggu wg selesai, dapat langsung returngoif v > 0 || w == 0 { return }
Sampai langkah ini menunjukkan
vsama dengan 0, danwlebih besar dari 0, yaitu saat ini tidak ada goroutine yang berjalan, tetapi ada goroutine yang menunggu wg selesai, jadi perlu melepaskan semaphore, membangunkan goroutine-goroutine ini.goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
Metode Done() sebenarnya adalah Add(-1), tidak ada yang perlu dijelaskan.
Wait
Jika saat ini ada goroutine lain yang perlu menunggu selesai berjalan, pemanggilan metode Wait akan membuat goroutine saat ini陷入阻塞.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Alurnya adalah sebuah loop for
Baca 32 bit tinggi dan 32 bit rendah, dapatkan jumlah goroutine yang perlu ditunggu, dan jumlah goroutine yang menunggu, jika tidak ada goroutine yang perlu ditunggu, langsung return
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }Jika tidak, melalui operasi CAS tambahkan jumlah goroutine yang menunggu dengan satu, kemudian mencoba mendapatkan semaphore, masuk ke antrian阻塞等待
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Ketika goroutine yang menunggu dibangunkan (karena semua goroutine yang ditunggu sudah selesai berjalan, melepaskan semaphore), periksa
state, jika tidak sama dengan 0, menunjukkanWait()danAdd()dipanggil secara konkuren lagigoif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnJika CAS tidak berhasil memperbarui, lanjutkan loop
Ringkasan
Terakhir perlu diingatkan, saat menggunakan WaitGroup, Add dan Wait jangan dipanggil secara konkuren.
