Skip to content

chan

channel 은 특수한 데이터 구조로, Go 언어의 CSP 사상 구현의 대표적인 예입니다. CSP 사상의 핵심은 프로세스 간 메시지 통신을 통해 데이터를 교환하는 것이며, channel 을 통해 고루틴 간 통신을 쉽게 할 수 있습니다.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // do something
    done <- struct{}{}
  }()
  <-done
  fmt.Println("finished")
}

통신 외에도 channel 을 통해 고루틴 동기화 등의 작업도 할 수 있으며, 병행이 필요한 시스템에서 channel 은 거의 어디에나 존재합니다. channel 이 어떻게 작동하는지 더 잘 이해하기 위해 아래에서 그 원리를 설명하겠습니다.

구조

channel 은 런타임에서 runtime.hchan 구조체로 표현되며, 포함하는 필드는 많지 않습니다.

go
type hchan struct {
  qcount   uint           // 큐의 총 데이터 수
  dataqsiz uint           // 순환 큐의 크기
  buf      unsafe.Pointer // dataqsiz 크기의 배열을 가리키는 포인터
  elemsize uint16
  closed   uint32
  elemtype *_type // 요소 유형
  sendx    uint   // 송신 인덱스
  recvx    uint   // 수신 인덱스
  recvq    waitq  // 수신 대기 고루틴 리스트
  sendq    waitq  // 송신 대기 고루틴 리스트

  lock mutex
}

위에서 lock 필드를 명확히 볼 수 있듯이, channel 은 실제로 잠금이 있는 동기화 순환 큐입니다. 다른 필드는 다음과 같습니다.

  • qcount, 총 데이터 수를 나타냄

  • dataqsize, 순환 큐의 크기

  • buf, dataqsize 크기의 배열을 가리키는 포인터, 즉 순환 큐

  • closed, channel 이 닫혔는지 여부

  • sendx, recvx, 송신 및 수신 인덱스를 나타냄

  • sendq, recvq, 송신 및 수신 고루틴 링크 리스트를 나타내며, 구성 요소는 runtime.sudog입니다.

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

    아래 그림을 통해 channel 의 구조를 더 명확히 이해할 수 있습니다.

channel 에 lencap 함수를 사용할 때 실제로 반환되는 것은 hchan.qcounthchan.dataqsiz 필드입니다.

생성

일반적으로 파이프를 생성하는 방법은 make 함수를 사용하는 것뿐입니다.

go
ch := make(chan int, size)

컴파일러는 이를 runtime.makechan 함수 호출로 변환하며, 이 함수가 파이프의 실제 생성을 담당합니다. 코드는 다음과 같습니다.

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

이 부분의 로직은 비교적 간단하며, 주로 파이프에 메모리를 할당하는 것입니다. 먼저 전달된 size 와 요소 유형 elem.size 를 통해 예상 필요한 메모리 크기를 계산한 후 세 가지 경우로 나누어 처리합니다.

  1. size 가 0 인 경우, hchanSize 만 할당
  2. 요소에 포인터가 없는 경우, 해당 메모리 크기의 공간을 할당하며 순환 큐의 메모리와 파이프의 메모리는 연속됨
  3. 요소에 포인터가 있는 경우, 파이프와 순환 큐의 메모리를 별도로 할당

순환 큐에 포인터 요소가 저장되는 경우, 외부 요소를 참조하므로 GC 가 마크 - 스윕 단계에서 이러한 포인터를 스캔할 수 있습니다. 포인터가 아닌 요소가 연속된 메모리에 할당되면 불필요한 스캔을 피할 수 있습니다. 메모리 할당 완료 후 다른 정보 기록 필드를 업데이트합니다.

참고로 파이프 용량이 64 비트 정수인 경우 runtime.makechan64 함수를 사용하여 생성하며, 이는 본질적으로 runtime.makechan 을 호출하는 것과 같지만 타입 검사를 한 번 더 수행합니다.

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

일반적으로 sizemath.MaxInt32 를 초과하지 않는 것이 좋습니다.

송신

파이프에 데이터를 송신할 때는 송신할 데이터를 화살표 오른쪽에 둡니다.

go
ch <- struct{}{}

컴파일러는 이를 runtime.chansend1 로 변환하며, 실제로 데이터를 송신하는 것은 runtime.chansend 함수입니다. chansend1 은 송신 요소의 포인터인 elem 포인터를 전달합니다.

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

먼저 파이프가 nil인지 확인하며, block 은 현재 송신 작업이 블로킹인지 여부를 나타냅니다 (block 값은 select 구조와 관련 있음). 블로킹 송신이고 파이프가 nil이면 직접 panic 합니다. 논블로킹 송신의 경우 잠금 없이 파이프가 가득 찼는지 직접 확인하고, 가득 차면 직접 반환합니다.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

이후 잠금을 획득하고 파이프가 닫혔는지 감지하며, 닫혔으면 panic 합니다.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

그 후 recvq 큐에서 sudog 를 디큐하고 runtime.send 함수를 통해 송신합니다.

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send 함수의 내용은 다음과 같으며, recvxsendx 를 업데이트하고 runtime.memmove 함수를 사용하여 통신 데이터의 메모리를 수신측 고루틴의 대상 요소 주소에 직접 복사한 후 runtime.goready 함수를 통해 수신측 고루틴을 _Grunnable 상태로 만들어 다시 스케줄링에 참여할 수 있도록 합니다.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

위 과정에서 수신 대기 중인 고루틴을 찾을 수 있으므로 데이터는 순환 큐에 저장되지 않고 직접 수신측으로 송신됩니다. 사용 가능한 수신측 고루틴이 없고 용량이 충분하면 순환 큐 버퍼에 넣은 후 직접 반환합니다.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

버퍼가 가득 찬 경우 논블로킹 송신이면 직접 반환합니다.

go
if !block {
    unlock(&c.lock)
    return false
}

블로킹 송신인 경우 아래 코드 흐름으로 진입합니다.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

먼저 현재 고루틴을 sudog로 구성하여 hchan.sendq 송신 대기 고루틴 큐에 추가한 후 runtime.gopark 를 통해 현재 고루틴을 블로킹하고 _Gwaiting 상태로 만들어 수신측이 깨울 때까지 대기하며, runtime.KeepAlive 를 통해 송신할 데이터를 활성화하여 수신측이 성공적으로 복사할 수 있도록 합니다. 깨어난 후 마무리 프로세스로 진입합니다.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

파이프 송신 데이터의 경우 총 다음과 같은 경우가 있습니다.

  1. 파이프가 nil인 경우, 프로그램 panic
  2. 파이프가 닫힌 경우, panic 발생
  3. recvq 큐가 비어있지 않은 경우, 직접 수신측으로 송신
  4. 대기 중인 고루틴이 없는 경우, 버퍼에 추가
  5. 버퍼가 가득 찬 경우, 송신 고루틴이 블로킹 상태로 진입하여 다른 고루틴이 데이터를 수신할 때까지 대기

值得注意的是,在上面发送的逻辑中没有看到对于缓冲区溢出数据的处理,这部分数据不可能抛弃掉,它保存在了 sudog.elem,由接收方来进行处理。

수신

Go 에서 파이프로부터 데이터를 수신하는 문법은 두 가지가 있습니다. 첫 번째는 데이터만 읽는 경우입니다.

go
data <- ch

두 번째는 데이터 읽기 성공 여부를 판단하는 경우입니다.

go
data, ok <- ch

위 두 문법은 컴파일러에 의해 runtime.chanrecv1runtime.chanrecv2 호출로 변환되며, 실제로는 둘 다 runtime.chanrecv 를 호출합니다. 수신 로직의 시작 부분은 송신 로직과 유사하게 먼저 파이프가 nil인지 확인합니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

이후 논블로킹 읽기 경우 잠금 없이 파이프가 비어있는지 확인하며, 파이프가 닫히지 않았으면 직접 반환하고 파이프가 닫혔으면 수신 요소의 메모리를 비웁니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

그 후 잠금을 획득하고 hchan.sendq 큐를 확인하며, 아래 if c.closed != 0 분기를 통해 파이프가 닫혔더라도 파이프에 요소가 남아있으면 직접 반환하지 않고 계속 요소를 소비하는 코드를 실행하는 것을 볼 수 있습니다. 이것이 파이프가 닫힌 후에도 계속 읽을 수 있는 이유입니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

파이프가 닫히지 않았으면 sendq 큐에서 대기 중인 송신 고루틴이 있는지 확인하고, 있으면 runtime.recv 를 통해 해당 송신측 고루틴을 처리합니다.

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

첫 번째 경우, 파이프 용량이 0 인 즉 버퍼가 없는 파이프인 경우, 수신측은 runtime.recvDirect 함수를 통해 송신측에서 직접 데이터를 복사합니다. 두 번째 경우 버퍼가 가득 찬 경우, 앞부분에서 버퍼가 가득 찼는지 판단하는 로직을 보지 못했지만 실제로 버퍼 용량이 0 이 아니고 송신측이 대기 중인 것은 버퍼가 이미 가득 찼음을 의미합니다. 버퍼가 가득 차야 송신측이 블로킹되어 송신을 대기하기 때문입니다. 이 부분 로직은 송신측에서 판단합니다. 이후 수신측은 버퍼에서 헤드 요소를 디큐하여 그 메모리를 대상 수신 요소 포인터에 복사한 후 송신측 고루틴이 송신할 데이터를 복사하여 인큐합니다 (여기서 수신측이 버퍼 오버플로우 데이터를 처리하는 방식을 볼 수 있습니다). 마지막으로 runtime.goready 를 통해 송신측 고루틴을 깨워 _Grunnable 상태로 만들어 다시 스케줄링에 참여할 수 있도록 합니다.

대기 중인 송신 고루틴이 없으면 버퍼에 소비할 요소가 있는지 확인하고, 헤드 요소를 디큐하여 그 메모리를 수신측 대상 요소에 복사한 후 반환합니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

마지막으로 파이프에 소비할 요소가 없으면 runtime.gopark 를 통해 현재 고루틴을 _Gwaiting 상태로 만들어 송신측 고루틴이 깨울 때까지 블로킹 대기합니다.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

깨어난 후 반환하며, 이때 반환되는 success 값은 sudog.success 에서而来합니다. 송신측이 성공적으로 데이터를 송신했다면 해당 필드는 송신측에 의해 true 로 설정되어야 하며, 이 부분 로직은 runtime.send 함수에서 볼 수 있습니다.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

상대적으로, 송신측 runtime.chansend 끝부분의 sudog.success 판단은 수신측이 runtime.recv 함수에서 설정한 값에서 비롯된 것입니다. 이를 통해 수신측과 송신측이 서로 협력해야 파이프가 정상적으로 작동할 수 있음을 알 수 있습니다.总的来说,수신 데이터는 송신 데이터보다 약간 복잡하며, 총 다음과 같은 경우가 있습니다.

  1. 파이프가 nil인 경우, 프로그램 panic
  2. 파이프가 닫힌 경우, 파이프가 비어있으면 직접 반환하고, 비어있지 않으면 5 번째 경우로 이동하여 실행
  3. 버퍼 용량이 0 이고 sendq 에 대기 중인 송신 고루틴이 있으면, 직접 송신측 데이터를 복사한 후 송신측을 깨움
  4. 버퍼가 가득 차고 sendq 에 대기 중인 송신 고루틴이 있으면, 버퍼 헤드 요소를 디큐하고 송신측 데이터를 인큐한 후 송신측을 깨움
  5. 버퍼가 가득 차지 않고 요소가 0 개보다 많으면, 버퍼 헤드 요소를 디큐한 후 반환
  6. 버퍼가 비어있으면, 블로킹 상태로 진입하여 송신측이 깨울 때까지 대기

닫기

파이프를 닫으려면 내장 함수 close 를 사용합니다.

go
close(ch)

컴파일러는 이를 runtime.closechan 호출로 변환하며, 전달된 파이프가 nil이거나 이미 닫힌 경우 직접 panic 합니다.

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

그 후 이 파이프의 sendqrecvq 에서 모든 블로킹된 고루틴을 디큐하고 runtime.goready 를 통해 모두 깨웁니다.

go
func closechan(c *hchan) {
    ...
  var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

참고로 Go 는 단방향 파이프를 허용하며, 다음과 같은 규칙이 있습니다.

  1. 읽기 전용 파이프에는 데이터를 송신할 수 없음
  2. 읽기 전용 파이프는 닫을 수 없음
  3. 쓰기 전용 파이프는 데이터를 읽을 수 없음

이러한 오류는 컴파일 시기의 타입 검사 단계에서 발견되며 런타임까지 남지 않습니다. 관심 있는 경우 다음 두 패키지에서 관련 코드를 읽을 수 있습니다.

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

닫힘 판단

아주 옛날 (go1 이전) 에는 파이프가 닫혔는지 판단하는 내장 함수 closed 가 있었지만 곧 삭제되었습니다. 이는 파이프의 사용场景이 일반적으로 여러 고루틴인 경우이기 때문입니다. true 를 반환하면 파이프가 닫혔음을 나타낼 수 있지만, false 를 반환한다고 해서 파이프가 정말로 닫히지 않았다고 할 수 없습니다. 다음 순간 누가 파이프를 닫을지 아무도 모르기 때문에 이 반환 값은 신뢰할 수 없으며, 이 반환 값을 기준으로 파이프에 데이터를 송신할지 판단하는 것은 더욱 위험합니다. 닫힌 파이프에 데이터를 송신하면 panic 이 발생하기 때문입니다.

정말로 필요한 경우 직접 구현할 수 있습니다. 한 가지 방법은 파이프에 쓰는 것으로 파이프가 닫혔는지 판단하는 것입니다. 코드는 다음과 같습니다.

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

하지만 이 방법에도 부작용이 있습니다. 닫히지 않은 경우 중복 데이터를写入하게 되며 defer-recover 처리 프로세스로 진입하여 추가적인 성능 손실이 발생합니다. 따라서 쓰기 방식은 직접 포기할 수 있습니다. 읽기 방식은 고려해볼 수 있지만 파이프를 직접 읽으면 안 됩니다. 직접 읽으면 block 매개변수 값이 true 여서 고루틴이 블로킹되므로 select 와 함께 사용해야 합니다. 파이프와 select 를 결합하면 논블로킹이 됩니다.

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

이 방법은 위보다 조금 나아 보이지만, 이 경우 파이프가 닫히고 파이프 버퍼에 요소가 없을 때만 적용됩니다. 요소가 있으면 무의미하게 이 요소를 소비하게 되므로 여전히 좋은 구현이라고 할 수 없습니다.

하지만 실제로는 파이프가 닫혔는지 판단할 필요가 전혀 없습니다. 이유는 처음에 이미 설명했듯이 반환 값이 신뢰할 수 없기 때문입니다. 파이프를 올바르게 사용하고 올바르게 닫는 것이 우리가 해야 할 일입니다. 따라서

  1. 절대 수신측에서 파이프를 닫지 마십시오. 읽기 전용 파이프를 닫는 것은 컴파일을 통과할 수 없다는 점이 이미 이를 명시적으로 알려주고 있습니다. 송신측에 이 일을 맡기십시오.
  2. 여러 송신측이 있는 경우, 별도의 고루틴을 만들어 닫기 작업을 수행하도록 하여 close 가 한쪽에서만 호출되고 한 번만 호출되도록 하십시오.
  3. 파이프를 전달할 때는 읽기 전용 또는 송신 전용으로 제한하는 것이 좋습니다.

이러한 원칙을 따르면 큰 문제가 발생하지 않습니다.

Golang by www.golangdev.cn edit