waitgroup
Go'nun standart kütüphanesi tarafından sağlanan WaitGroup, bir grup goroutine'in tamamlanmasını beklemek için kullanılır.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}Bu çok basit bir kod parçası. İşlevi 0-9'u yazdırmak için 10 goroutine başlatmak ve onların tamamlanmasını beklemektir. Kullanımı burada daha fazla detaylandırılmayacaktır. Sonraki, temel çalışma prensibini anlayalım, hiç karmaşık değildir.
Yapı
Tip tanımı sync/waitgroup.go dosyasında bulunur:
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}Alan tanımları şöyledir:
state, WaitGroup'in durumunu temsil eder. Yüksek 32 bit beklenmesi gereken goroutine sayısını saymak için kullanılır ve düşük 32 bit wg'nin tamamlanmasını bekleyen goroutine sayısını saymak için kullanılır.sema, semaphore,syncstandart kütüphanesinde neredeyse her yerde bulunur.
Çekirdeği Add() ve Wait() yöntemlerindedir. Temel çalışma prensibi semaphore'dur. Wait() yöntemi semaphore'ı almaya çalışır ve Add() yöntemi semaphore'ı serbest bırakır, böylece M goroutine'in N goroutine grubunun tamamlanmasını beklemesi uygulanır.
Add
Add metodu beklenmesi gereken goroutine sayısını artırır.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}Akış şöyledir:
İlk olarak
wg.stateüzerinde kaydırma işlemleri yapar, yüksek 32 bit ve düşük 32 bit'i alır,vvewdeğişkenlerine karşılık gelir:gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)Sonra yargılamaya başlar.
vwg sayacını temsil eder vewwg'nin tamamlanmasını bekleyen goroutine sayısını temsil eder:Eğer
v0'dan küçükse, doğrudanpanic. Negatif sayıların anlamı yoktur:goif v < 0 { panic("sync: negative WaitGroup counter") }Eğer
w0 değilse vedeltav'ye eşitse, buWait()metodu veAdd()metodunun eşzamanlı olarak çağrıldığı anlamına gelir, bu yanlış kullanımdır:goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }Eğer
v0'dan büyükse veyaw0'a eşitse, bu wg'nin tamamlanmasını bekleyen goroutine olmadığı anlamına gelir, bu yüzden doğrudan dön:goif v > 0 || w == 0 { return }
Bu adıma ulaşmak
v'nin 0'a eşit vew'nin 0'dan büyük olduğu anlamına gelir, yani şu anda çalışan goroutine yok ama wg'nin tamamlanmasını bekleyen goroutine'ler var. Bu yüzden semaphore'ı serbest bırakması ve bu goroutine'leri uyandırması gerekir:goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
Done() metodu aslında sadece Add(-1)'dir, açıklanacak başka bir şey yok.
Wait
Eğer tamamlanması beklenmesi gereken başka goroutine'ler varsa, Wait metodunu çağırmak mevcut goroutine'in engellenmesine neden olur.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}Akışı sadece bir for döngüsüdür:
Beklenmesi gereken goroutine sayısını ve bekleyen goroutine sayısını almak için yüksek 32 bit ve düşük 32 bit'i oku. Eğer beklenmesi gereken goroutine yoksa, doğrudan dön:
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }Aksi takdirde, CAS operasyonu kullanarak bekleyen goroutine sayısını bir artır, sonra semaphore'ı almaya çalış ve engelleme bekleme kuyruğuna gir:
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }Bekleyen goroutine uyandırıldığında (tüm beklenen goroutine'ler tamamlandığı ve semaphore'ı serbest bıraktığı için),
state'i kontrol et. Eğer 0 değilse, buWait()veAdd()'in eşzamanlı olarak kullanıldığı anlamına gelir:goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnEğer CAS güncellemesi başarılı olmadıysa, döngüye devam et.
Özet
Son olarak, bir hatırlatma: WaitGroup kullanırken, Add ve Wait'i eşzamanlı olarak çağırmayın.
