Skip to content

waitgroup

WaitGroup, disediakan oleh pustaka standar Go, fungsinya adalah untuk menunggu sekumpulan goroutine selesai berjalan.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

Ini adalah kode yang sangat sederhana, fungsinya adalah memulai 10 goroutine untuk mencetak 0-9, dan menunggu mereka selesai berjalan. Penggunaannya tidak akan dijelaskan lagi di sini, selanjutnya kita akan memahami prinsip kerja dasarnya, sama sekali tidak kompleks.

Struktur

Definisi tipenya terletak di file sync/waitgroup.go

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

Penjelasan field adalah sebagai berikut:

  • state, menunjukkan status WaitGroup, 32 bit tinggi digunakan untuk menghitung jumlah goroutine yang ditunggu, 32 bit rendah digunakan untuk menghitung jumlah goroutine yang menunggu wg selesai.
  • sema, semaphore, di pustaka standar sync hampir ada di mana-mana.

Intinya terletak pada dua metode Add() dan Wait(), prinsip kerja dasarnya adalah semaphore, metode Wait() mencoba mendapatkan semaphore, metode Add() melepaskan semaphore, untuk mewujudkan M goroutine menunggu sekumpulan N goroutine selesai berjalan.

Add

Metode Add adalah untuk menambah jumlah goroutine yang perlu ditunggu.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

Alurnya adalah sebagai berikut:

  1. Pertama-tama akan melakukan operasi shift pada wg.state, masing-masing mendapatkan 32 bit tinggi dan 32 bit rendah, sesuai dengan variabel v dan w

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. Kemudian mulai melakukan判断, v mewakili penghitung wg, w mewakili jumlah goroutine yang menunggu wg selesai

    1. Jika v kurang dari 0, langsung panic, bilangan negatif tidak memiliki arti apa-apa

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. w tidak sama dengan 0, dan delta sama dengan v, menunjukkan metode Wait() dan metode Add() dipanggil secara konkuren, ini adalah cara penggunaan yang salah

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. Jika v lebih besar dari 0, atau w sama dengan 0, menunjukkan sekarang tidak ada goroutine yang menunggu wg selesai, dapat langsung return

      go
      if v > 0 || w == 0 {
      	return
      }
  3. Sampai langkah ini menunjukkan v sama dengan 0, dan w lebih besar dari 0, yaitu saat ini tidak ada goroutine yang berjalan, tetapi ada goroutine yang menunggu wg selesai, jadi perlu melepaskan semaphore, membangunkan goroutine-goroutine ini.

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Metode Done() sebenarnya adalah Add(-1), tidak ada yang perlu dijelaskan.

Wait

Jika saat ini ada goroutine lain yang perlu menunggu selesai berjalan, pemanggilan metode Wait akan membuat goroutine saat ini陷入阻塞.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

Alurnya adalah sebuah loop for

  1. Baca 32 bit tinggi dan 32 bit rendah, dapatkan jumlah goroutine yang perlu ditunggu, dan jumlah goroutine yang menunggu, jika tidak ada goroutine yang perlu ditunggu, langsung return

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. Jika tidak, melalui operasi CAS tambahkan jumlah goroutine yang menunggu dengan satu, kemudian mencoba mendapatkan semaphore, masuk ke antrian阻塞等待

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. Ketika goroutine yang menunggu dibangunkan (karena semua goroutine yang ditunggu sudah selesai berjalan, melepaskan semaphore), periksa state, jika tidak sama dengan 0, menunjukkan Wait() dan Add() dipanggil secara konkuren lagi

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. Jika CAS tidak berhasil memperbarui, lanjutkan loop

Ringkasan

Terakhir perlu diingatkan, saat menggunakan WaitGroup, Add dan Wait jangan dipanggil secara konkuren.

Golang by www.golangdev.cn edit