Skip to content

waitgroup

WaitGroup، توفره المكتبة القياسية لـ Go، وظيفته هي انتظار اكتمال تنفيذ مجموعة من الكوروتينات.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

هذا كود بسيط جدًا، وظيفته هي تشغيل 10 كوروتينات لطباعة 0-9، وانتظار اكتمال تنفيذها. لن نطيل في شرح طريقة استخدامه، بل سنتعرف على مبدأ عمله الأساسي، وهو ليس معقدًا إطلاقًا.

البنية

تعريف نوعه موجود في ملف sync/waitgroup.go

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

شرح الحقول:

  • state، يمثل حالة WaitGroup، البتات العليا 32 تُستخدم لحساب عدد الكوروتينات المنتظرة، والبتات السفلى 32 تُستخدم لحساب عدد الكوروتينات التي تنتظر اكتمال wg.
  • sema، السيمافور، موجود في كل مكان تقريبًا في المكتبة القياسية sync.

جوهره يكمن في الطريقتين Add() و Wait()، ومبدأ عمله الأساسي هو السيمافور، طريقة Wait() تحاول الحصول على السيمافور، وطريقة Add() تحرر السيمافور، لتحقيق انتظار M كوروتين لاكتمال مجموعة من N كوروتين.

Add

طريقة Add تزيد عدد الكوروتينات التي يجب انتظارها.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

تدفقه كالتالي:

  1. أولًا تُجري عملية إزاحة على wg.state، وتحصل على البتات العليا 32 والبتات السفلى 32، وتخزنهما في المتغيرين v و w

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. ثم تبدأ في الحكم، v يمثل عداد wg، و w يمثل عدد الكوروتينات التي تنتظر اكتمال wg

    1. إذا كان v أقل من 0، تُرمى panic مباشرةً، فالأرقام السالبة لا معنى لها

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. إذا كان w لا يساوي 0، و delta يساوي v، فهذا يعني أن طريقة Wait() وطريقة Add() استُدعيتا بشكل متزامن، وهذا استخدام خاطئ

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. إذا كان v أكبر من 0، أو w يساوي 0، فهذا يعني أنه لا توجد كوروتينات تنتظر اكتمال wg حاليًا، فيمكن العودة مباشرة

      go
      if v > 0 || w == 0 {
      	return
      }
  3. الوصول لهذه المرحلة يعني أن v يساوي 0، و w أكبر من 0، أي لا توجد كوروتينات تعمل حاليًا، لكن توجد كوروتينات تنتظر اكتمال wg، لذا يجب تحرير السيمافور، وإيقاظ هذه الكوروتينات.

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

طريقة Done() هي في الواقع Add(-1)، لا يوجد ما يُقال عنها.

Wait

إذا كانت هناك كوروتينات أخرى يجب انتظار اكتمالها، استدعاء طريقة Wait سيجعل الكوروتين الحالي يدخل في حالة حجب.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

تدفقه هو حلقة for:

  1. تقرأ البتات العليا 32 والبتات السفلى 32، وتحصل على عدد الكوروتينات التي يجب انتظارها، وعدد الكوروتينات المنتظرة، إذا لم يكن هناك كوروتينات يجب انتظارها، تعود مباشرة

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. وإلا عبر عملية CAS تزيد عدد الكوروتينات المنتظرة بواحد، ثم تحاول الحصول على السيمافور، وتدخل طابور الانتظار المحجوب

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. عند إيقاظ الكوروتين المنتظر (لأن جميع الكوروتينات المنتظرة أكملت تنفيذها وحررت السيمافور)، يُفحص state، إذا لم يكن 0، فهذا يعني أن Wait() و Add() استُدعيتا بشكل متزامن مرة أخرى

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. إذا فشل تحديث CAS، تستمر الحلقة

ملخص

أخيرًا، تذكير عند استخدام WaitGroup، لا تستدعي Add و Wait بشكل متزامن.

Golang تم تحريره بواسطة www.golangdev.cn