waitgroup
WaitGroup، توفره المكتبة القياسية لـ Go، وظيفته هي انتظار اكتمال تنفيذ مجموعة من الكوروتينات.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}هذا كود بسيط جدًا، وظيفته هي تشغيل 10 كوروتينات لطباعة 0-9، وانتظار اكتمال تنفيذها. لن نطيل في شرح طريقة استخدامه، بل سنتعرف على مبدأ عمله الأساسي، وهو ليس معقدًا إطلاقًا.
البنية
تعريف نوعه موجود في ملف sync/waitgroup.go
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}شرح الحقول:
state، يمثل حالة WaitGroup، البتات العليا 32 تُستخدم لحساب عدد الكوروتينات المنتظرة، والبتات السفلى 32 تُستخدم لحساب عدد الكوروتينات التي تنتظر اكتمال wg.sema، السيمافور، موجود في كل مكان تقريبًا في المكتبة القياسيةsync.
جوهره يكمن في الطريقتين Add() و Wait()، ومبدأ عمله الأساسي هو السيمافور، طريقة Wait() تحاول الحصول على السيمافور، وطريقة Add() تحرر السيمافور، لتحقيق انتظار M كوروتين لاكتمال مجموعة من N كوروتين.
Add
طريقة Add تزيد عدد الكوروتينات التي يجب انتظارها.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}تدفقه كالتالي:
أولًا تُجري عملية إزاحة على
wg.state، وتحصل على البتات العليا 32 والبتات السفلى 32، وتخزنهما في المتغيرينvوwgostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)ثم تبدأ في الحكم،
vيمثل عداد wg، وwيمثل عدد الكوروتينات التي تنتظر اكتمال wgإذا كان
vأقل من 0، تُرمىpanicمباشرةً، فالأرقام السالبة لا معنى لهاgoif v < 0 { panic("sync: negative WaitGroup counter") }إذا كان
wلا يساوي 0، وdeltaيساويv، فهذا يعني أن طريقةWait()وطريقةAdd()استُدعيتا بشكل متزامن، وهذا استخدام خاطئgoif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }إذا كان
vأكبر من 0، أوwيساوي 0، فهذا يعني أنه لا توجد كوروتينات تنتظر اكتمال wg حاليًا، فيمكن العودة مباشرةgoif v > 0 || w == 0 { return }
الوصول لهذه المرحلة يعني أن
vيساوي 0، وwأكبر من 0، أي لا توجد كوروتينات تعمل حاليًا، لكن توجد كوروتينات تنتظر اكتمال wg، لذا يجب تحرير السيمافور، وإيقاظ هذه الكوروتينات.goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
طريقة Done() هي في الواقع Add(-1)، لا يوجد ما يُقال عنها.
Wait
إذا كانت هناك كوروتينات أخرى يجب انتظار اكتمالها، استدعاء طريقة Wait سيجعل الكوروتين الحالي يدخل في حالة حجب.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}تدفقه هو حلقة for:
تقرأ البتات العليا 32 والبتات السفلى 32، وتحصل على عدد الكوروتينات التي يجب انتظارها، وعدد الكوروتينات المنتظرة، إذا لم يكن هناك كوروتينات يجب انتظارها، تعود مباشرة
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }وإلا عبر عملية CAS تزيد عدد الكوروتينات المنتظرة بواحد، ثم تحاول الحصول على السيمافور، وتدخل طابور الانتظار المحجوب
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }عند إيقاظ الكوروتين المنتظر (لأن جميع الكوروتينات المنتظرة أكملت تنفيذها وحررت السيمافور)، يُفحص
state، إذا لم يكن 0، فهذا يعني أنWait()وAdd()استُدعيتا بشكل متزامن مرة أخرىgoif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnإذا فشل تحديث CAS، تستمر الحلقة
ملخص
أخيرًا، تذكير عند استخدام WaitGroup، لا تستدعي Add و Wait بشكل متزامن.
