chan
channel là một cấu trúc dữ liệu đặc biệt, là đại diện tiêu biểu cho tư tưởng CSP của ngôn ngữ Go, cốt lõi của tư tưởng CSP chính là các tiến trình trao đổi dữ liệu thông qua truyền thông điệp, tương ứng, thông qua channel chúng ta có thể dễ dàng truyền thông giữa các goroutine.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Ngoài truyền thông ra, thông qua channel cũng có thể thực hiện các thao tác đồng bộ goroutine, và trong hệ thống cần đồng thời, bóng dáng của channel hầu như có thể thấy khắp nơi, để có thể hiểu rõ hơn cách channel hoạt động, dưới đây sẽ giới thiệu nguyên lý của nó.
Cấu trúc
channel trong runtime được biểu diễn bởi cấu trúc runtime.hchan, các trường mà nó chứa không nhiều, như sau:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}Từ trên có thể thấy rõ trường lock, channel thực tế là một hàng đợi vòng đồng bộ có khóa, các trường khác được giới thiệu như sau:
qcount, biểu thị tổng số dữ liệudataqsize, kích thước của hàng đợi vòngbuf, con trỏ trỏ đến mảng có kích thướcdataqsize,也就是 hàng đợi vòngclosed, channel có đóng hay khôngsendx,recvx, biểu thị chỉ số gửi và nhậnsendq,recvq, danh sách liên kết các goroutine gửi và nhận, thành phần cấu thành làruntime.sudog
type waitq struct {
first *sudog
last *sudog
}Thông qua hình dưới đây có thể hiểu rõ cấu trúc của channel:

Khi sử dụng hàm len và cap cho channel, trả về thực tế là các trường hchan.qcount và hchan.dataqsiz của nó.
Tạo
Bình thường mà nói việc tạo pipeline chỉ có một cách duy nhất, sử dụng hàm make để tạo:
ch := make(chan int, size)Compiler sẽ biên dịch nó thành lời gọi hàm runtime.makechan, hàm này负责 việc tạo pipeline thực tế, code của nó như sau:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Phần logic này khá đơn giản, chủ yếu là phân phối bộ nhớ cho pipeline, đầu tiên nó sẽ tính toán kích thước bộ nhớ dự kiến cần thiết dựa trên size và kiểu phần tử elem.size được truyền vào, sau đó xử lý theo ba trường hợp:
sizebằng 0, chỉ phân phốihchanSize- Phần tử không chứa con trỏ, thì phân phối không gian có kích thước bộ nhớ tương ứng, và bộ nhớ của hàng đợi vòng liên tục với bộ nhớ của pipeline
- Phần tử chứa con trỏ, bộ nhớ của pipeline và hàng đợi vòng được phân phối riêng
Nếu trong hàng đợi vòng lưu trữ là phần tử con trỏ, vì chúng tham chiếu đến các phần tử bên ngoài, GC trong giai đoạn đánh dấu-dọn dẹp có thể quét các con trỏ này, khi lưu trữ là phần tử không phải con trỏ thì việc phân phối trên bộ nhớ liên tục sẽ tránh việc quét không cần thiết. Sau khi phân phối bộ nhớ xong, cuối cùng cập nhật một số trường thông tin ghi lại khác.
Nhân tiện đề cập, khi dung lượng pipeline là số nguyên 64 bit, sẽ sử dụng hàm runtime.makechan64 để tạo, về bản chất nó cũng là lời gọi đến runtime.makechan, chỉ là làm thêm một kiểm tra kiểu.
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}Nói chung size tốt nhất đừng vượt quá math.MaxInt32.
Gửi
Khi gửi dữ liệu đến pipeline, chúng ta sẽ đặt dữ liệu cần gửi ở bên phải mũi tên:
ch <- struct{}{}Compiler sẽ biên dịch nó thành runtime.chansend1, hàm thực sự负责 gửi dữ liệu là runtime.chansend, chansend1 sẽ truyền cho nó con trỏ elem, con trỏ này trỏ đến phần tử gửi.
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Đầu tiên nó sẽ kiểm tra pipeline có phải là nil không, block biểu thị thao tác gửi hiện tại có phải là chặn hay không (giá trị của block liên quan đến cấu trúc select), nếu gửi chặn và pipeline là nil thì trực tiếp panic. Trong trường hợp gửi không chặn, sẽ trực tiếp kiểm tra pipeline có đầy không mà không cần khóa, nếu đầy thì trực tiếp trả về.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Sau đó mới bắt đầu khóa, và kiểm tra pipeline có đóng không, nếu đã đóng sẽ panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Sau đó xuất đội một sudog từ hàng đợi recvq, sau đó gửi bởi hàm runtime.send.
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}Nội dung hàm send như sau, nó sẽ cập nhật recvx và sendx, sau đó sử dụng hàm runtime.memmove để sao chép trực tiếp bộ nhớ của dữ liệu truyền thông đến địa chỉ phần tử đích của goroutine nhận, sau đó thông qua hàm runtime.goready để chuyển goroutine nhận sang trạng thái _Grunnable, để có thể tham gia điều độ lại.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}Trong quá trình trên, vì có thể tìm thấy goroutine đang chờ nhận, nên dữ liệu được gửi trực tiếp cho bên nhận, không được lưu trong hàng đợi vòng, nếu không có goroutine nhận khả dụng và dung lượng đủ, sẽ đặt nó vào bộ đệm hàng đợi vòng, sau đó trực tiếp trả về.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Nếu bộ đệm đầy, trong trường hợp gửi không chặn sẽ trực tiếp trả về:
if !block {
unlock(&c.lock)
return false
}Nếu là gửi chặn, sẽ đi vào quy trình code dưới đây:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Đầu tiên nó sẽ xây dựng goroutine hiện tại thành sudog và thêm vào hàng đợi goroutine chờ gửi hchan.sendq, sau đó bởi runtime.gopark làm cho goroutine hiện tại bị chặn, chuyển sang trạng thái _Gwaiting cho đến khi được bên nhận đánh thức lại, và sẽ thông qua runtime.KeepAlive để giữ hoạt động cho dữ liệu cần gửi nhằm đảm bảo bên nhận sao chép thành công. Khi được đánh thức sẽ đi vào quy trình kết thúc tiếp theo:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Có thể thấy rằng đối với việc gửi dữ liệu qua pipeline tổng cộng có các trường hợp sau:
- Pipeline là
nil, chương trình panic - Pipeline đã đóng, xảy ra
panic - Hàng đợi
recvqkhông rỗng, trực tiếp gửi cho bên nhận - Không có goroutine chờ, thêm vào bộ đệm
- Bộ đệm đầy, goroutine gửi vào trạng thái chặn, chờ goroutine khác nhận dữ liệu
Đáng chú ý là, trong logic gửi trên không thấy xử lý cho dữ liệu tràn bộ đệm, phần dữ liệu này không thể vứt bỏ, nó được lưu trong sudog.elem, do bên nhận xử lý.
Nhận
Trong Go, cú pháp nhận dữ liệu từ pipeline có hai loại, loại đầu tiên là chỉ đọc dữ liệu:
data <- chLoại thứ hai là phán đoán dữ liệu có đọc thành công không:
data, ok <- chHai cú pháp trên sẽ được compiler biên dịch thành lời gọi runtime.chanrecv1 và runtime.chanrecv2, nhưng chúng thực tế chỉ là lời gọi đến runtime.chanrecv. Phần đầu logic nhận tương tự như logic gửi, đều kiểm tra pipeline có rỗng không trước.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Sau đó trong trường hợp đọc không chặn, không khóa phán đoán pipeline có rỗng không, nếu pipeline chưa đóng thì trực tiếp trả về, pipeline đã đóng thì xóa bộ nhớ của phần tử nhận.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Sau đó khóa truy cập hàng đợi hchan.sendq, thông qua nhánh if c.closed != 0 dưới đây có thể thấy, cho dù pipeline đã đóng, nhưng nếu trong pipeline vẫn còn phần tử, sẽ không trực tiếp trả về, vẫn sẽ tiếp tục thực thi code tiêu thụ phần tử, đây cũng là lý do tại sao sau khi pipeline đóng vẫn cho phép đọc.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Nếu pipeline không đóng, sẽ xem hàng đợi sendq có goroutine nào đang chờ gửi không, nếu có thì do runtime.recv xử lý goroutine gửi đó.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Trường hợp đầu tiên, dung lượng pipeline bằng 0 tức pipeline không đệm, bên nhận sẽ trực tiếp sao chép dữ liệu từ bên gửi thông qua hàm runtime.recvDirect, trường hợp thứ hai bộ đệm đã đầy, mặc dù trước đó không thấy logic phán đoán bộ đệm có đầy không, nhưng thực tế khi dung lượng bộ đệm khác 0 và có goroutine chờ gửi đã đại diện cho bộ đệm đã đầy, vì chỉ khi bộ đệm đầy goroutine gửi mới chặn chờ gửi, phần logic này do bên gửi phán đoán. Sau đó bên nhận sẽ xuất đội phần tử đầu từ bộ đệm và sao chép bộ nhớ của nó vào con trỏ phần tử nhận đích, rồi sao chép dữ liệu mà goroutine gửi cần gửi và nhập đội (ở đây chúng ta thấy cách bên nhận xử lý dữ liệu tràn bộ đệm), cuối cùng sẽ do runtime.goready đánh thức goroutine gửi, chuyển nó sang trạng thái _Grunnable, để có thể tham gia điều độ lại.
Nếu không có goroutine chờ gửi, sẽ xem bộ đệm có phần tử chờ tiêu thụ không, xuất đội phần tử đầu và sao chép bộ nhớ của nó vào phần tử đích của bên nhận, sau đó trả về.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Đến cuối nếu trong pipeline không có phần tử có thể tiêu thụ, sẽ do runtime.gopark chuyển goroutine hiện tại sang trạng thái _Gwaiting, chặn chờ cho đến khi được goroutine gửi đánh thức.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Sau khi được đánh thức, sẽ trả về, lúc này giá trị success trả về đến từ sudog.success, nếu bên gửi gửi dữ liệu thành công thì trường này nên được bên gửi đặt thành true, phần logic này chúng ta có thể thấy trong hàm runtime.send.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Tương ứng, ở cuối runtime.chansend của bên gửi có phán đoán sudog.success, nguồn gốc của nó cũng là do bên nhận đặt trong hàm runtime.recv, thông qua những điều này có thể thấy bên nhận và bên gửi hỗ trợ nhau mới có thể khiến pipeline hoạt động bình thường. Nhìn chung, nhận dữ liệu phức tạp hơn gửi dữ liệu một chút, tổng cộng có các trường hợp sau:
- Pipeline là
nil, chương trình panic - Pipeline đã đóng, nếu pipeline rỗng thì trực tiếp trả về, nếu không rỗng thì nhảy đến trường hợp 5 thực thi
- Dung lượng bộ đệm bằng 0, trong
sendqcó goroutine chờ gửi, thì trực tiếp sao chép dữ liệu của bên gửi, sau đó đánh thức bên gửi. - Bộ đệm đầy, trong
sendqcó goroutine chờ gửi, xuất đội phần tử đầu của bộ đệm, dữ liệu của bên gửi nhập đội, sau đó đánh thức bên gửi. - Bộ đệm chưa đầy và số lượng khác 0, xuất đội phần tử đầu của bộ đệm, sau đó trả về.
- Bộ đệm rỗng, vào trạng thái chặn, chờ được bên gửi đánh thức.
Đóng
Đối với việc đóng pipeline, chúng ta sẽ sử dụng hàm built-in close:
close(ch)Compiler sẽ biên dịch nó thành lời gọi runtime.closechan, nếu pipeline được truyền là nil hoặc đã đóng, thì sẽ trực tiếp panic:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Sau đó xuất đội tất cả các goroutine bị chặn trong sendq và recvq của pipeline này và đánh thức chúng thông qua runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
Nhân tiện đề cập, Go cho phép pipeline một chiều, có các quy tắc sau:
- Pipeline chỉ đọc không thể gửi dữ liệu
- Pipeline chỉ đọc không thể đóng
- Pipeline chỉ viết không thể đọc dữ liệu
Những lỗi này sớm sẽ được tìm ra trong giai đoạn kiểm tra kiểu thời gian biên dịch, không để lại đến runtime, nếu感兴趣 có thể đọc code liên quan trong hai gói sau:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Phán đoán đóng
Vào thời điểm rất sớm (trước go1), có một hàm built-in closed dùng để phán đoán pipeline có đóng không, nhưng sau đó rất nhanh bị xóa bỏ. Lý do là vì tình huống sử dụng của pipeline thường là đa goroutine, giả sử nó trả về true确实 có thể đại diện cho pipeline đã đóng, nhưng nếu nó trả về false, thì không thể đại diện cho pipeline thực sự chưa đóng, vì không ai biết được trong khoảnh khắc tiếp theo ai sẽ đóng pipeline, nên giá trị trả về này không đáng tin, nếu dựa vào giá trị trả về này để phán đoán có gửi dữ liệu vào pipeline không thì càng nguy hiểm hơn, vì gửi dữ liệu vào pipeline đã đóng sẽ xảy ra panic.
Nếu thực sự cần, có thể tự triển khai. Một phương án là thông qua viết pipeline để phán đoán pipeline có đóng không, code như sau:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Nhưng cách này cũng có tác dụng phụ, nếu không đóng thì sẽ ghi dữ liệu thừa vào trong, và sẽ đi vào quá trình xử lý defer-recover, gây tổn thất hiệu năng bổ sung, nên phương án viết có thể trực tiếp từ bỏ. Phương án đọc có thể cân nhắc, nhưng không thể trực tiếp đọc pipeline, vì trực tiếp đọc giá trị tham số block là true sẽ chặn goroutine, nên kết hợp với select để sử dụng, khi pipeline kết hợp với select thì là không chặn.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Cách này chỉ có vẻ tốt hơn một chút so với cách trên, tình huống của nó chỉ áp dụng cho pipeline đã đóng và trong bộ đệm pipeline không có phần tử, nếu có phần tử thì còn vô cớ tiêu thụ phần tử này, vẫn không có cách triển khai nào tốt.
Nhưng thực tế chúng ta hoàn toàn không cần phán đoán pipeline có đóng không, lý do đã nói ở đầu là vì giá trị trả về không đáng tin, việc sử dụng pipeline đúng cách và đóng đúng cách mới là điều chúng ta nên làm, nên:
1.永远 không đóng pipeline ở bên nhận, việc đóng pipeline chỉ đọc không thể qua biên dịch đã nói rõ với bạn không nên làm vậy, hãy giao cho bên gửi làm việc này. 2. Nếu có nhiều bên gửi, nên để riêng một goroutine hoàn thành thao tác đóng, đảm bảo close chỉ có một bên gọi và chỉ gọi một lần. 3. Khi truyền pipeline, tốt nhất giới hạn chỉ đọc hoặc chỉ viết
Tuân thủ các nguyên tắc này, sẽ đảm bảo không xảy ra vấn đề quá lớn.
