Skip to content

waitgroup

Go'nun standart kütüphanesi tarafından sağlanan WaitGroup, bir grup goroutine'in tamamlanmasını beklemek için kullanılır.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Bu çok basit bir kod parçası. İşlevi 0-9'u yazdırmak için 10 goroutine başlatmak ve onların tamamlanmasını beklemektir. Kullanımı burada daha fazla detaylandırılmayacaktır. Sonraki, temel çalışma prensibini anlayalım, hiç karmaşık değildir.

Yapı

Tip tanımı sync/waitgroup.go dosyasında bulunur:

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Alan tanımları şöyledir:

  • state, WaitGroup'in durumunu temsil eder. Yüksek 32 bit beklenmesi gereken goroutine sayısını saymak için kullanılır ve düşük 32 bit wg'nin tamamlanmasını bekleyen goroutine sayısını saymak için kullanılır.
  • sema, semaphore, sync standart kütüphanesinde neredeyse her yerde bulunur.

Çekirdeği Add() ve Wait() yöntemlerindedir. Temel çalışma prensibi semaphore'dur. Wait() yöntemi semaphore'ı almaya çalışır ve Add() yöntemi semaphore'ı serbest bırakır, böylece M goroutine'in N goroutine grubunun tamamlanmasını beklemesi uygulanır.

Add

Add metodu beklenmesi gereken goroutine sayısını artırır.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

Akış şöyledir:

  1. İlk olarak wg.state üzerinde kaydırma işlemleri yapar, yüksek 32 bit ve düşük 32 bit'i alır, v ve w değişkenlerine karşılık gelir:

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Sonra yargılamaya başlar. v wg sayacını temsil eder ve w wg'nin tamamlanmasını bekleyen goroutine sayısını temsil eder:

    1. Eğer v 0'dan küçükse, doğrudan panic. Negatif sayıların anlamı yoktur:

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. Eğer w 0 değilse ve delta v'ye eşitse, bu Wait() metodu ve Add() metodunun eşzamanlı olarak çağrıldığı anlamına gelir, bu yanlış kullanımdır:

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Eğer v 0'dan büyükse veya w 0'a eşitse, bu wg'nin tamamlanmasını bekleyen goroutine olmadığı anlamına gelir, bu yüzden doğrudan dön:

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Bu adıma ulaşmak v'nin 0'a eşit ve w'nin 0'dan büyük olduğu anlamına gelir, yani şu anda çalışan goroutine yok ama wg'nin tamamlanmasını bekleyen goroutine'ler var. Bu yüzden semaphore'ı serbest bırakması ve bu goroutine'leri uyandırması gerekir:

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Done() metodu aslında sadece Add(-1)'dir, açıklanacak başka bir şey yok.

Wait

Eğer tamamlanması beklenmesi gereken başka goroutine'ler varsa, Wait metodunu çağırmak mevcut goroutine'in engellenmesine neden olur.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Akışı sadece bir for döngüsüdür:

  1. Beklenmesi gereken goroutine sayısını ve bekleyen goroutine sayısını almak için yüksek 32 bit ve düşük 32 bit'i oku. Eğer beklenmesi gereken goroutine yoksa, doğrudan dön:

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Aksi takdirde, CAS operasyonu kullanarak bekleyen goroutine sayısını bir artır, sonra semaphore'ı almaya çalış ve engelleme bekleme kuyruğuna gir:

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Bekleyen goroutine uyandırıldığında (tüm beklenen goroutine'ler tamamlandığı ve semaphore'ı serbest bıraktığı için), state'i kontrol et. Eğer 0 değilse, bu Wait() ve Add()'in eşzamanlı olarak kullanıldığı anlamına gelir:

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Eğer CAS güncellemesi başarılı olmadıysa, döngüye devam et.

Özet

Son olarak, bir hatırlatma: WaitGroup kullanırken, Add ve Wait'i eşzamanlı olarak çağırmayın.

Golang by www.golangdev.cn edit