Skip to content

chan

channel là một cấu trúc dữ liệu đặc biệt, là đại diện tiêu biểu cho tư tưởng CSP của ngôn ngữ Go, cốt lõi của tư tưởng CSP chính là các tiến trình trao đổi dữ liệu thông qua truyền thông điệp, tương ứng, thông qua channel chúng ta có thể dễ dàng truyền thông giữa các goroutine.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // do something
    done <- struct{}{}
  }()
  <-done
  fmt.Println("finished")
}

Ngoài truyền thông ra, thông qua channel cũng có thể thực hiện các thao tác đồng bộ goroutine, và trong hệ thống cần đồng thời, bóng dáng của channel hầu như có thể thấy khắp nơi, để có thể hiểu rõ hơn cách channel hoạt động, dưới đây sẽ giới thiệu nguyên lý của nó.

Cấu trúc

channel trong runtime được biểu diễn bởi cấu trúc runtime.hchan, các trường mà nó chứa không nhiều, như sau:

go
type hchan struct {
  qcount   uint           // total data in the queue
  dataqsiz uint           // size of the circular queue
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  elemsize uint16
  closed   uint32
  elemtype *_type // element type
  sendx    uint   // send index
  recvx    uint   // receive index
  recvq    waitq  // list of recv waiters
  sendq    waitq  // list of send waiters

  lock mutex
}

Từ trên có thể thấy rõ trường lock, channel thực tế là một hàng đợi vòng đồng bộ có khóa, các trường khác được giới thiệu như sau:

  • qcount, biểu thị tổng số dữ liệu
  • dataqsize, kích thước của hàng đợi vòng
  • buf, con trỏ trỏ đến mảng có kích thước dataqsize,也就是 hàng đợi vòng
  • closed, channel có đóng hay không
  • sendx, recvx, biểu thị chỉ số gửi và nhận
  • sendq, recvq, danh sách liên kết các goroutine gửi và nhận, thành phần cấu thành là runtime.sudog
go
type waitq struct {
  first *sudog
  last  *sudog
}

Thông qua hình dưới đây có thể hiểu rõ cấu trúc của channel:

Khi sử dụng hàm lencap cho channel, trả về thực tế là các trường hchan.qcounthchan.dataqsiz của nó.

Tạo

Bình thường mà nói việc tạo pipeline chỉ có một cách duy nhất, sử dụng hàm make để tạo:

go
ch := make(chan int, size)

Compiler sẽ biên dịch nó thành lời gọi hàm runtime.makechan, hàm này负责 việc tạo pipeline thực tế, code của nó như sau:

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

Phần logic này khá đơn giản, chủ yếu là phân phối bộ nhớ cho pipeline, đầu tiên nó sẽ tính toán kích thước bộ nhớ dự kiến cần thiết dựa trên size và kiểu phần tử elem.size được truyền vào, sau đó xử lý theo ba trường hợp:

  1. size bằng 0, chỉ phân phối hchanSize
  2. Phần tử không chứa con trỏ, thì phân phối không gian có kích thước bộ nhớ tương ứng, và bộ nhớ của hàng đợi vòng liên tục với bộ nhớ của pipeline
  3. Phần tử chứa con trỏ, bộ nhớ của pipeline và hàng đợi vòng được phân phối riêng

Nếu trong hàng đợi vòng lưu trữ là phần tử con trỏ, vì chúng tham chiếu đến các phần tử bên ngoài, GC trong giai đoạn đánh dấu-dọn dẹp có thể quét các con trỏ này, khi lưu trữ là phần tử không phải con trỏ thì việc phân phối trên bộ nhớ liên tục sẽ tránh việc quét không cần thiết. Sau khi phân phối bộ nhớ xong, cuối cùng cập nhật một số trường thông tin ghi lại khác.

Nhân tiện đề cập, khi dung lượng pipeline là số nguyên 64 bit, sẽ sử dụng hàm runtime.makechan64 để tạo, về bản chất nó cũng là lời gọi đến runtime.makechan, chỉ là làm thêm một kiểm tra kiểu.

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

Nói chung size tốt nhất đừng vượt quá math.MaxInt32.

Gửi

Khi gửi dữ liệu đến pipeline, chúng ta sẽ đặt dữ liệu cần gửi ở bên phải mũi tên:

go
ch <- struct{}{}

Compiler sẽ biên dịch nó thành runtime.chansend1, hàm thực sự负责 gửi dữ liệu là runtime.chansend, chansend1 sẽ truyền cho nó con trỏ elem, con trỏ này trỏ đến phần tử gửi.

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

Đầu tiên nó sẽ kiểm tra pipeline có phải là nil không, block biểu thị thao tác gửi hiện tại có phải là chặn hay không (giá trị của block liên quan đến cấu trúc select), nếu gửi chặn và pipeline là nil thì trực tiếp panic. Trong trường hợp gửi không chặn, sẽ trực tiếp kiểm tra pipeline có đầy không mà không cần khóa, nếu đầy thì trực tiếp trả về.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Sau đó mới bắt đầu khóa, và kiểm tra pipeline có đóng không, nếu đã đóng sẽ panic:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

Sau đó xuất đội một sudog từ hàng đợi recvq, sau đó gửi bởi hàm runtime.send.

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

Nội dung hàm send như sau, nó sẽ cập nhật recvxsendx, sau đó sử dụng hàm runtime.memmove để sao chép trực tiếp bộ nhớ của dữ liệu truyền thông đến địa chỉ phần tử đích của goroutine nhận, sau đó thông qua hàm runtime.goready để chuyển goroutine nhận sang trạng thái _Grunnable, để có thể tham gia điều độ lại.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

Trong quá trình trên, vì có thể tìm thấy goroutine đang chờ nhận, nên dữ liệu được gửi trực tiếp cho bên nhận, không được lưu trong hàng đợi vòng, nếu không có goroutine nhận khả dụng và dung lượng đủ, sẽ đặt nó vào bộ đệm hàng đợi vòng, sau đó trực tiếp trả về.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

Nếu bộ đệm đầy, trong trường hợp gửi không chặn sẽ trực tiếp trả về:

go
if !block {
    unlock(&c.lock)
    return false
}

Nếu là gửi chặn, sẽ đi vào quy trình code dưới đây:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

Đầu tiên nó sẽ xây dựng goroutine hiện tại thành sudog và thêm vào hàng đợi goroutine chờ gửi hchan.sendq, sau đó bởi runtime.gopark làm cho goroutine hiện tại bị chặn, chuyển sang trạng thái _Gwaiting cho đến khi được bên nhận đánh thức lại, và sẽ thông qua runtime.KeepAlive để giữ hoạt động cho dữ liệu cần gửi nhằm đảm bảo bên nhận sao chép thành công. Khi được đánh thức sẽ đi vào quy trình kết thúc tiếp theo:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

Có thể thấy rằng đối với việc gửi dữ liệu qua pipeline tổng cộng có các trường hợp sau:

  1. Pipeline là nil, chương trình panic
  2. Pipeline đã đóng, xảy ra panic
  3. Hàng đợi recvq không rỗng, trực tiếp gửi cho bên nhận
  4. Không có goroutine chờ, thêm vào bộ đệm
  5. Bộ đệm đầy, goroutine gửi vào trạng thái chặn, chờ goroutine khác nhận dữ liệu

Đáng chú ý là, trong logic gửi trên không thấy xử lý cho dữ liệu tràn bộ đệm, phần dữ liệu này không thể vứt bỏ, nó được lưu trong sudog.elem, do bên nhận xử lý.

Nhận

Trong Go, cú pháp nhận dữ liệu từ pipeline có hai loại, loại đầu tiên là chỉ đọc dữ liệu:

go
data <- ch

Loại thứ hai là phán đoán dữ liệu có đọc thành công không:

go
data, ok <- ch

Hai cú pháp trên sẽ được compiler biên dịch thành lời gọi runtime.chanrecv1runtime.chanrecv2, nhưng chúng thực tế chỉ là lời gọi đến runtime.chanrecv. Phần đầu logic nhận tương tự như logic gửi, đều kiểm tra pipeline có rỗng không trước.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

Sau đó trong trường hợp đọc không chặn, không khóa phán đoán pipeline có rỗng không, nếu pipeline chưa đóng thì trực tiếp trả về, pipeline đã đóng thì xóa bộ nhớ của phần tử nhận.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

Sau đó khóa truy cập hàng đợi hchan.sendq, thông qua nhánh if c.closed != 0 dưới đây có thể thấy, cho dù pipeline đã đóng, nhưng nếu trong pipeline vẫn còn phần tử, sẽ không trực tiếp trả về, vẫn sẽ tiếp tục thực thi code tiêu thụ phần tử, đây cũng là lý do tại sao sau khi pipeline đóng vẫn cho phép đọc.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

Nếu pipeline không đóng, sẽ xem hàng đợi sendq có goroutine nào đang chờ gửi không, nếu có thì do runtime.recv xử lý goroutine gửi đó.

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

Trường hợp đầu tiên, dung lượng pipeline bằng 0 tức pipeline không đệm, bên nhận sẽ trực tiếp sao chép dữ liệu từ bên gửi thông qua hàm runtime.recvDirect, trường hợp thứ hai bộ đệm đã đầy, mặc dù trước đó không thấy logic phán đoán bộ đệm có đầy không, nhưng thực tế khi dung lượng bộ đệm khác 0 và có goroutine chờ gửi đã đại diện cho bộ đệm đã đầy, vì chỉ khi bộ đệm đầy goroutine gửi mới chặn chờ gửi, phần logic này do bên gửi phán đoán. Sau đó bên nhận sẽ xuất đội phần tử đầu từ bộ đệm và sao chép bộ nhớ của nó vào con trỏ phần tử nhận đích, rồi sao chép dữ liệu mà goroutine gửi cần gửi và nhập đội (ở đây chúng ta thấy cách bên nhận xử lý dữ liệu tràn bộ đệm), cuối cùng sẽ do runtime.goready đánh thức goroutine gửi, chuyển nó sang trạng thái _Grunnable, để có thể tham gia điều độ lại.

Nếu không có goroutine chờ gửi, sẽ xem bộ đệm có phần tử chờ tiêu thụ không, xuất đội phần tử đầu và sao chép bộ nhớ của nó vào phần tử đích của bên nhận, sau đó trả về.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

Đến cuối nếu trong pipeline không có phần tử có thể tiêu thụ, sẽ do runtime.gopark chuyển goroutine hiện tại sang trạng thái _Gwaiting, chặn chờ cho đến khi được goroutine gửi đánh thức.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

Sau khi được đánh thức, sẽ trả về, lúc này giá trị success trả về đến từ sudog.success, nếu bên gửi gửi dữ liệu thành công thì trường này nên được bên gửi đặt thành true, phần logic này chúng ta có thể thấy trong hàm runtime.send.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

Tương ứng, ở cuối runtime.chansend của bên gửi có phán đoán sudog.success, nguồn gốc của nó cũng là do bên nhận đặt trong hàm runtime.recv, thông qua những điều này có thể thấy bên nhận và bên gửi hỗ trợ nhau mới có thể khiến pipeline hoạt động bình thường. Nhìn chung, nhận dữ liệu phức tạp hơn gửi dữ liệu một chút, tổng cộng có các trường hợp sau:

  1. Pipeline là nil, chương trình panic
  2. Pipeline đã đóng, nếu pipeline rỗng thì trực tiếp trả về, nếu không rỗng thì nhảy đến trường hợp 5 thực thi
  3. Dung lượng bộ đệm bằng 0, trong sendq có goroutine chờ gửi, thì trực tiếp sao chép dữ liệu của bên gửi, sau đó đánh thức bên gửi.
  4. Bộ đệm đầy, trong sendq có goroutine chờ gửi, xuất đội phần tử đầu của bộ đệm, dữ liệu của bên gửi nhập đội, sau đó đánh thức bên gửi.
  5. Bộ đệm chưa đầy và số lượng khác 0, xuất đội phần tử đầu của bộ đệm, sau đó trả về.
  6. Bộ đệm rỗng, vào trạng thái chặn, chờ được bên gửi đánh thức.

Đóng

Đối với việc đóng pipeline, chúng ta sẽ sử dụng hàm built-in close:

go
close(ch)

Compiler sẽ biên dịch nó thành lời gọi runtime.closechan, nếu pipeline được truyền là nil hoặc đã đóng, thì sẽ trực tiếp panic:

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

Sau đó xuất đội tất cả các goroutine bị chặn trong sendqrecvq của pipeline này và đánh thức chúng thông qua runtime.goready:

go
func closechan(c *hchan) {
    ...
  var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

Nhân tiện đề cập, Go cho phép pipeline một chiều, có các quy tắc sau:

  1. Pipeline chỉ đọc không thể gửi dữ liệu
  2. Pipeline chỉ đọc không thể đóng
  3. Pipeline chỉ viết không thể đọc dữ liệu

Những lỗi này sớm sẽ được tìm ra trong giai đoạn kiểm tra kiểu thời gian biên dịch, không để lại đến runtime, nếu感兴趣 có thể đọc code liên quan trong hai gói sau:

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

Phán đoán đóng

Vào thời điểm rất sớm (trước go1), có một hàm built-in closed dùng để phán đoán pipeline có đóng không, nhưng sau đó rất nhanh bị xóa bỏ. Lý do là vì tình huống sử dụng của pipeline thường là đa goroutine, giả sử nó trả về true确实 có thể đại diện cho pipeline đã đóng, nhưng nếu nó trả về false, thì không thể đại diện cho pipeline thực sự chưa đóng, vì không ai biết được trong khoảnh khắc tiếp theo ai sẽ đóng pipeline, nên giá trị trả về này không đáng tin, nếu dựa vào giá trị trả về này để phán đoán có gửi dữ liệu vào pipeline không thì càng nguy hiểm hơn, vì gửi dữ liệu vào pipeline đã đóng sẽ xảy ra panic.

Nếu thực sự cần, có thể tự triển khai. Một phương án là thông qua viết pipeline để phán đoán pipeline có đóng không, code như sau:

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

Nhưng cách này cũng có tác dụng phụ, nếu không đóng thì sẽ ghi dữ liệu thừa vào trong, và sẽ đi vào quá trình xử lý defer-recover, gây tổn thất hiệu năng bổ sung, nên phương án viết có thể trực tiếp từ bỏ. Phương án đọc có thể cân nhắc, nhưng không thể trực tiếp đọc pipeline, vì trực tiếp đọc giá trị tham số blocktrue sẽ chặn goroutine, nên kết hợp với select để sử dụng, khi pipeline kết hợp với select thì là không chặn.

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

Cách này chỉ có vẻ tốt hơn một chút so với cách trên, tình huống của nó chỉ áp dụng cho pipeline đã đóng và trong bộ đệm pipeline không có phần tử, nếu có phần tử thì còn vô cớ tiêu thụ phần tử này, vẫn không có cách triển khai nào tốt.

Nhưng thực tế chúng ta hoàn toàn không cần phán đoán pipeline có đóng không, lý do đã nói ở đầu là vì giá trị trả về không đáng tin, việc sử dụng pipeline đúng cách và đóng đúng cách mới là điều chúng ta nên làm, nên:

1.永远 không đóng pipeline ở bên nhận, việc đóng pipeline chỉ đọc không thể qua biên dịch đã nói rõ với bạn không nên làm vậy, hãy giao cho bên gửi làm việc này. 2. Nếu có nhiều bên gửi, nên để riêng một goroutine hoàn thành thao tác đóng, đảm bảo close chỉ có một bên gọi và chỉ gọi một lần. 3. Khi truyền pipeline, tốt nhất giới hạn chỉ đọc hoặc chỉ viết

Tuân thủ các nguyên tắc này, sẽ đảm bảo không xảy ra vấn đề quá lớn.

Golang by www.golangdev.cn edit