waitgroup
WaitGroup 은 Go 표준 라이브러리로 제공되며 기능은 한 그룹의 코루틴이 실행 완료될 때까지 대기하는 데 사용됩니다.
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := range 10 {
wg.Add(1)
go func() {
defer wg.Done()
fmt.Println(i)
}()
}
wg.Wait()
}이는 매우 간단한 코드로 10 개의 코루틴을 시작하여 0-9 를 인쇄하고 실행이 완료될 때까지 대기합니다. 사용법에 대해서는 더 이상 설명하지 않으며 기본 작동 원리에 대해 알아보겠습니다. 전혀 복잡하지 않습니다.
구조
유형 정의는 sync/waitgroup.go 파일에 있습니다.
type WaitGroup struct {
noCopy noCopy
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
sema uint32
}필드 설명은 다음과 같습니다.
state, WaitGroup 상태를 나타내며 상위 32 비트는 대기 중인 코루틴 수를 통계하고 하위 32 비트는 wg 완료를 대기하는 코루틴 수를 통계합니다.sema, 세마포어로sync표준 라이브러리에서 거의 어디에나 있습니다.
핵심은 Add() 와 Wait() 두 메서드에 있으며 기본 작동 원리는 세마포어입니다. Wait() 메서드는 세마포어를 얻으려고 시도하고 Add() 메서드는 세마포어를 해제하여 M 개 코루틴이 N 개 코루틴 그룹의 실행 완료를 대기하도록 구현합니다.
Add
Add 메서드는 대기할 코루틴 수를 증가시킵니다.
func (wg *WaitGroup) Add(delta int) {
state := wg.state.Add(uint64(delta) << 32)
v := int32(state >> 32)
w := uint32(state)
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
wg.state.Store(0)
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}플로우는 다음과 같습니다.
먼저
wg.state에 시프트 작업을 수행하여 상위 32 비트와 하위 32 비트를 각각 가져오며 변수v와w에 해당합니다.gostate := wg.state.Add(uint64(delta) << 32) v := int32(state >> 32) w := uint32(state)그런 다음 판단을 시작하며
v는 wg 카운터를 나타내고w는 wg 완료를 대기하는 코루틴 수를 나타냅니다.v가 0 보다 작으면 직접panic하며 음수는 의미가 없습니다.goif v < 0 { panic("sync: negative WaitGroup counter") }w가 0 이 아니고delta와v가 같으면Wait()메서드와Add()메서드가 동시에 호출되었음을 의미하며 이는 잘못된 사용 방식입니다.goif w != 0 && delta > 0 && v == int32(delta) { panic("sync: WaitGroup misuse: Add called concurrently with Wait") }v가 0 보다 크거나w가 0 이면 현재 wg 완료를 대기하는 코루틴이 없으므로 직접 반환할 수 있습니다.goif v > 0 || w == 0 { return }
이 단계에 도달하면
v는 0 이고w는 0 보다 큽니다. 즉 현재 실행 중인 코루틴이 없지만 wg 완료를 대기하는 코루틴이 있으므로 세마포어를 해제하여 이러한 코루틴을 깨워야 합니다.goif wg.state.Load() != state { panic("sync: WaitGroup misuse: Add called concurrently with Wait") } wg.state.Store(0) for ; w != 0; w-- { runtime_Semrelease(&wg.sema, false, 0) }
Done() 메서드는 실제로 Add(-1) 이므로 설명할 것이 없습니다.
Wait
현재 다른 코루틴이 실행 완료를 대기해야 하는 경우 Wait 메서드 호출은 현재 코루틴이 블록되게 합니다.
func (wg *WaitGroup) Wait() {
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state)
if v == 0 {
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
runtime_Semacquire(&wg.sema)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
return
}
}
}플로우는 for 루프입니다.
상위 32 비트와 하위 32 비트를 읽어 대기할 코루틴 수와 대기하는 코루틴 수를 얻으며 대기할 코루틴이 없으면 직접 반환합니다.
gostate := wg.state.Load() v := int32(state >> 32) w := uint32(state) if v == 0 { return }그렇지 않으면 CAS 작업을 통해 대기 코루틴 수를 1 증가시킨 후 세마포어를 얻으려고 시도하며 블록 대기 큐에 진입합니다.
go// Increment waiters count. if wg.state.CompareAndSwap(state, state+1) { runtime_Semacquire(&wg.sema) ... }대기 코루틴이 깨어난 후 (모든 대기 코루틴이 실행 완료되어 세마포어를 해제했으므로)
state를 확인하며 0 이 아니면Wait()와Add()가 다시 동시에 사용되었음을 의미합니다.goif wg.state.Load() != 0 { panic("sync: WaitGroup is reused before previous Wait has returned") } returnCAS 업데이트에 성공하지 못하면 루프를 계속합니다.
요약
마지막으로 WaitGroup 사용 시 Add 와 Wait 을 동시에 호출하지 않도록 주의해야 합니다.
