Skip to content

waitgroup

WaitGroup 은 Go 표준 라이브러리로 제공되며 기능은 한 그룹의 코루틴이 실행 완료될 때까지 대기하는 데 사용됩니다.

go
package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := range 10 {
		wg.Add(1)
		go func() {
			defer wg.Done()
			fmt.Println(i)
		}()
	}
	wg.Wait()
}

이는 매우 간단한 코드로 10 개의 코루틴을 시작하여 0-9 를 인쇄하고 실행이 완료될 때까지 대기합니다. 사용법에 대해서는 더 이상 설명하지 않으며 기본 작동 원리에 대해 알아보겠습니다. 전혀 복잡하지 않습니다.

구조

유형 정의는 sync/waitgroup.go 파일에 있습니다.

go
type WaitGroup struct {
	noCopy noCopy

	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
	sema  uint32
}

필드 설명은 다음과 같습니다.

  • state, WaitGroup 상태를 나타내며 상위 32 비트는 대기 중인 코루틴 수를 통계하고 하위 32 비트는 wg 완료를 대기하는 코루틴 수를 통계합니다.
  • sema, 세마포어로 sync 표준 라이브러리에서 거의 어디에나 있습니다.

핵심은 Add()Wait() 두 메서드에 있으며 기본 작동 원리는 세마포어입니다. Wait() 메서드는 세마포어를 얻으려고 시도하고 Add() 메서드는 세마포어를 해제하여 M 개 코루틴이 N 개 코루틴 그룹의 실행 완료를 대기하도록 구현합니다.

Add

Add 메서드는 대기할 코루틴 수를 증가시킵니다.

go
func (wg *WaitGroup) Add(delta int) {
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
    if v < 0 {
       panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
       return
    }
    if wg.state.Load() != state {
       panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
       runtime_Semrelease(&wg.sema, false, 0)
    }
}

플로우는 다음과 같습니다.

  1. 먼저 wg.state 에 시프트 작업을 수행하여 상위 32 비트와 하위 32 비트를 각각 가져오며 변수 vw 에 해당합니다.

    go
    state := wg.state.Add(uint64(delta) << 32)
    v := int32(state >> 32)
    w := uint32(state)
  2. 그런 다음 판단을 시작하며 v 는 wg 카운터를 나타내고 w 는 wg 완료를 대기하는 코루틴 수를 나타냅니다.

    1. v 가 0 보다 작으면 직접 panic 하며 음수는 의미가 없습니다.

      go
      if v < 0 {
          panic("sync: negative WaitGroup counter")
      }
    2. w 가 0 이 아니고 deltav 가 같으면 Wait() 메서드와 Add() 메서드가 동시에 호출되었음을 의미하며 이는 잘못된 사용 방식입니다.

      go
      if w != 0 && delta > 0 && v == int32(delta) {
      	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
      }
    3. v 가 0 보다 크거나 w 가 0 이면 현재 wg 완료를 대기하는 코루틴이 없으므로 직접 반환할 수 있습니다.

      go
      if v > 0 || w == 0 {
      	return
      }
  3. 이 단계에 도달하면 v 는 0 이고 w 는 0 보다 큽니다. 즉 현재 실행 중인 코루틴이 없지만 wg 완료를 대기하는 코루틴이 있으므로 세마포어를 해제하여 이러한 코루틴을 깨워야 합니다.

    go
    if wg.state.Load() != state {
    	panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    wg.state.Store(0)
    for ; w != 0; w-- {
    	runtime_Semrelease(&wg.sema, false, 0)
    }

Done() 메서드는 실제로 Add(-1) 이므로 설명할 것이 없습니다.

Wait

현재 다른 코루틴이 실행 완료를 대기해야 하는 경우 Wait 메서드 호출은 현재 코루틴이 블록되게 합니다.

go
func (wg *WaitGroup) Wait() {
    for {
       state := wg.state.Load()
       v := int32(state >> 32)
       w := uint32(state)
       if v == 0 {
          return
       }
       // Increment waiters count.
       if wg.state.CompareAndSwap(state, state+1) {
          runtime_Semacquire(&wg.sema)
          if wg.state.Load() != 0 {
             panic("sync: WaitGroup is reused before previous Wait has returned")
          }
          return
       }
    }
}

플로우는 for 루프입니다.

  1. 상위 32 비트와 하위 32 비트를 읽어 대기할 코루틴 수와 대기하는 코루틴 수를 얻으며 대기할 코루틴이 없으면 직접 반환합니다.

    go
    state := wg.state.Load()
    v := int32(state >> 32)
    w := uint32(state)
    if v == 0 {
    	return
    }
  2. 그렇지 않으면 CAS 작업을 통해 대기 코루틴 수를 1 증가시킨 후 세마포어를 얻으려고 시도하며 블록 대기 큐에 진입합니다.

    go
    // Increment waiters count.
    if wg.state.CompareAndSwap(state, state+1) {
    	runtime_Semacquire(&wg.sema)
    	...
    }
  3. 대기 코루틴이 깨어난 후 (모든 대기 코루틴이 실행 완료되어 세마포어를 해제했으므로) state 를 확인하며 0 이 아니면 Wait()Add() 가 다시 동시에 사용되었음을 의미합니다.

    go
    if wg.state.Load() != 0 {
    	panic("sync: WaitGroup is reused before previous Wait has returned")
    }
    return
  4. CAS 업데이트에 성공하지 못하면 루프를 계속합니다.

요약

마지막으로 WaitGroup 사용 시 AddWait 을 동시에 호출하지 않도록 주의해야 합니다.

Golang by www.golangdev.cn edit