chan
channel 是一個特殊的數據結構,是 go 語言貫徹 CSP 思想的典型代表,CSP 思想的核心就是進程之間通過消息通信來進行數據的交換,對應的,通過 channel 我們可以很輕松地在協程之間通信。
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}除了通信之外,通過 channel 也還可以實現協程同步之類的操作,而在需要並發的系統中,channel 的身影幾乎隨處可見,為了能夠更好的理解 channel 工作方式,下面就會介紹其原理。
結構
channel 在運行時對於的表示是runtime.hchan結構體,它所包含的字段並不多,如下
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}從上面可以很明顯的看到lock字段,channel 實際上是一個有鎖的同步環形隊列,其它的字段介紹如下
qcount,表示總數據數dataqsize,環形隊列的大小buf,指向大小為dataqsize的數組的指針,也就是環形隊列closed,channel 是否關閉sendx,recvx,表示發送和接收的索引sendq,recvq,表示發送和接收的協程鏈表,其組成元素是runtime.sudoggotype waitq struct { first *sudog last *sudog }通過下面一張圖可以很清晰的明白 channel 的結構

當對 channel 使用len和cap函數時,返回的實際上是它的hchan.qcoun和hchan.dataqsiz字段。
創建
正常來說創建管道有且只有一種方式,使用make函數創建
ch := make(chan int, size)編譯器會將其翻譯成對runtime.makechan函數的調用,由它來負責管道的實際創建,它的代碼如下所示。
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}這部分邏輯比較簡單,主要是在給管道分配內存,它首先會根據傳入的size和元素類型elem.size來計算預計需要的內存大小,然後分為三種情況來處理
size為 0,只分配hchanSize- 元素不包含指針,則分配對應內存大小的空間,並且環形隊列的內存與管道的內存是連續的
- 元素包含指針,管道和環形隊列的內存單獨分配
如果環形隊列中存放的是指針元素的話,因為它們引用了外部的元素,GC 在標記-清除階段就可能會掃描這些指針,當存放的是非指針元素時分配在連續的內存上就避免了不必要的掃描。內存分配完畢後,最後再更新其它的一些記錄信息的字段。
順便提一下,當管道容量是 64 位整數的時候,會使用runtime.makechan64函數來進行創建,它本質上也是對runtime.makechan的調用,只是多做了一個類型檢查。
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}一般來說size都最好不要超過math.MaxInt32。
發送
向管道發送數據時,我們會將要發送的數據置於箭頭的右方
ch <- struct{}{}編譯器會將其翻譯成runtime.chansend1,真正負責發送數據的是runtime.chansend函數,chansend1會向其傳遞elem指針,它指向發送元素的指針。
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}它首先會檢查管道是否為nil,block表示當前的發送操作是否是阻塞的(block的值與select結構有關),如果阻塞發送且管道是nil則直接崩潰。在非阻塞發送情況下,會在不加鎖的情況下直接判斷管道是否滿了,如果滿了就直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}隨後才開始加鎖,並檢測管道是否關閉,如果已經關閉了就會panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}再之後從recvq隊列中出隊一個sudog,然後由runtime.send函數進行發送。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}send函數內容如下,它會更新recvx和sendx,然後使用runtime.memmove函數將通信數據的內存直接復制到接收方協程的目標元素地址上,然後通過runtime.goready函數使接收方協程變為_Grunnable狀態,以便重新參與調度。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}在上面的這個過程中,因為可以找到等待接收的協程,所以數據就直接發送給了接收方,並沒有存放在環形隊列中,倘若沒有可用的接受方協程且容量足夠,就會將其放入環形隊列緩沖區中,然後直接返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}倘若緩沖區滿了,如果是在非阻塞發送的情況下就會直接返回
if !block {
unlock(&c.lock)
return false
}如果是阻塞發送,則會進入下面的代碼流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}首先它會將當前的協程構造成sudog並加入hchan.sendq等待發送協程隊列,然後由runtime.gopark使當前協程阻塞,變為_Gwaitting狀態直到再次被接收方喚醒,並且在會通過runtime.KeepLAlive對要發送的數據進行保活來確保接收方成功復制。當被喚醒後就會進入接下來的收尾流程
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}可以看到的是對於管道發送數據而言總共有下面幾種情況
- 管道為
nil,程序崩潰 - 管道已關閉,發生
panic revq隊列不為空,直接發送給接收方- 沒有協程等待,加入緩沖區
- 緩沖區滿了,發送協程進入阻塞狀態,等待其它協程接收數據
值得注意的是,在上面發送的邏輯中沒有看到對於緩沖區溢出數據的處理,這部分數據不可能拋棄掉,它保存在了sudog.elem,由接收方來進行處理。
接收
在 go 中從管道接收數據的語法有兩種,第一種是只讀取數據
data <- ch第二種是判斷數據是否讀取成功
data, ok <- ch上面兩種語法會被編譯器翻譯成對runtime.chanrecv1和runtime.chanrecv1的調用,不過它們實際上只是對runtime.chanrecv的調用。接收的邏輯開頭部分與發送的邏輯類似,都會先對管道判空。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}然後在非阻塞讀取情況下,不加鎖判斷管道是否為空,如果管道未關閉就直接返回,管道已關閉則清空接收元素的內存。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}然後加鎖訪問hchan.sendq隊列,通過下面if c.closed != 0這個分支可以看到,即便管道已經關閉了,但如果管道中還有元素存在,並不會直接返回,依然會往下執行消費元素的代碼,這也是為什麼管道關閉後仍然允許讀取的原因。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}如果管道沒有關閉,就會查看sendq隊列是否有協程正在等待發送,是的話就由runitme.recv來處理該發送方協程。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}第一種情況,管道容量為 0 即無緩沖管道,接收方會直接通過runtime.recvDirect函數從發送方復制數據,第二種情況緩沖區已滿,雖然在前面並沒有看到判斷緩沖區是否滿了的邏輯,但實際上當緩沖區容量不為 0 且有發送方等待發送就已經代表了緩沖區已經滿了,因為只有緩沖區滿了發送方才會阻塞等待發送,這部分邏輯是由發送方來進行判斷的。然後接收方會從緩沖區將頭部元素出隊並將其內存復制到目標接收元素的指針,再將發送方協程要發送的數據復制後並入隊(在這裡我們就看到了接收方對於溢出緩沖區數據的處理方式),最後會由runtime.goready去喚醒發送方協程,使其變為_Grunnable狀態,以便重新加入調度。
倘若沒有等待發送的協程,就會去查看緩沖區是否有等待消費的元素,將頭部元素出隊並復制其內存到接收方目標元素,然後返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}到最後如果沒有管道中沒有可消費的元素,就會由runtime.gopark將當前協程變為_Gwwaiting狀態,阻塞等待直到被發送方協程喚醒。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}被喚醒後,就會返回,此時返回的success值來自sudog.success,如果發送方成功發送數據那麼該字段應該由發送方設置為true,這部分邏輯我們可以在runtime.send函數中看到。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}相對應的,在發送方runtime.chansend末尾對於sudog.success判斷,其來源也是接收方在runtime.recv函數中的設置,通過這些可以發現接收方和發送方兩者相輔相成才能讓管道正常運作。總的來說,接收數據要比發送數據稍微復雜一些,總共有以下幾種情況
- 管道為
nil,程序崩潰 - 管道已關閉,如果管道是空的就直接返回,如果不為空則跳到第 5 個情況執行
- 緩沖區容量為 0,
sendq中有等待發送的協程,則直接復制發送方的數據,然後喚醒發送方。 - 緩沖區滿了,
sendq中有等待發送的協程,將緩沖區頭部元素出隊,發送方的數據入隊,然後喚醒發送方。 - 緩沖區沒滿且數量不為 0,將緩沖區頭部元素出隊,然後返回。
- 緩沖區是空的,進入阻塞狀態,等待被發送方喚醒。
關閉
對於關閉管道而言,我們會使用內置函數close
close(ch)編譯器會將其翻譯成對runtime.closechan的調用,如果傳遞的管道為nil或者已關閉,則會直接panic
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}然後將這個管道的sendq和recvq中的所有阻塞的協程都出隊並將它們全部都通過runtime.goready喚醒
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
順帶一提,go 允許單向管道,有著下面幾個規則
- 只讀管道不能發送數據
- 只讀管道不能關閉
- 只寫管道不能讀取數據
這些錯誤早在編譯期的類型檢查階段就會找出來,不會留到運行時,感興趣可以到下面這兩個包閱讀相關代碼
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")判斷關閉
在很早的時候(go1 之前),有一個內置函數closed用於判斷管道是否關閉,不過後面很快就被刪掉了。這是因為管道的使用場景通常都是多協程的情況,假設它返回true確實可以代表管道已經關閉了,但是如果它返回了false,那麼並不能代表管道就真的沒有關閉,因為誰也不知道在下一刻誰會把管道關閉掉,所以這個返回值是不可信的,如果以這個返回值為依據來判斷是否向管道發送數據就更是危險了,因為向已關閉的管道發送數據會發生panic。
如果實在需要,可以自己實現一個。一種方案是通過寫管道來判斷管道是關閉,代碼如下
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}不過這樣也是有副作用的,如果沒關閉的話就會向裡面寫入冗余的數據,而且會進入 defer-recover 處理過程,造成額外的性能損失,所以寫方案可以直接放棄。讀方案的話可以考慮,不過不能直接讀管道,因為直接讀block參數值為true將會阻塞協程,應該結合select來使用,管道與select結合時就是非阻塞的。
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}這種只是看起來要比上面好一點點,它的情況僅僅適用於管道已關閉且管道緩沖區中沒有元素,如果有元素的話還會平白無故的消費掉這個元素,還是沒有一個很好的實現。
但其實我們根本就不需要判斷管道是否關閉,理由在開頭已經講過了因為返回值並不可信,正確地使用管道並正確的關閉才是我們應該做的,所以
- 永遠不要在接收方關閉管道,關閉只讀管道不能通過編譯這點已經很明確地告訴你不要這麼做了,交給發送方來做這件事。
- 如果有多個發送方,應該單獨讓一個協程來完成關閉操作,確保
close只有一方調用且只會調用一次。 - 傳遞管道時,最好限制只讀或只寫
遵循這幾個原則,就能確保不會出太大的問題。
