chan
channel は特殊なデータ構造で、Go 言語が CSP 思想を貫徹する代表的なものです。CSP 思想の核心はプロセス間がメッセージ通信によってデータ交換を行うことで、対応して channel を通じてゴルーチン間の通信を容易に行えます。
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}通信だけでなく、channel を通じてゴルーチンの同期などの操作も実現できます。並行性を必要とするシステムでは、channel の姿がほぼ至る所に見られます。channel の動作方法をよりよく理解するため、以下ではその原理について紹介します。
構造
channel は実行時において runtime.hchan 構造体で表され、含まれるフィールドはそれほど多くありません。
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}上記から明らかに lock フィールドが見え、channel は実際にはロック付きの同期環状キューです。他のフィールドの説明は以下の通りです。
qcount:総データ数を表すdataqsize:環状キューのサイズbuf:サイズがdataqsizeの配列へのポインタ。つまり環状キューclosed:channel が閉じられているかどうかsendx、recvx:送信と受信のインデックスを表すsendq、recvq:送信と受信のゴルーチンリストを表す。構成要素はruntime.sudoggotype waitq struct { first *sudog last *sudog }以下の図から channel の構造を明確に理解できます。

channel に対して len と cap 関数を使用する際、実際に返されるのは hchan.qcount と hchan.dataqsiz フィールドです。
作成
通常、パイプの作成方法は 1 つだけです。make 関数を使用して作成します。
ch := make(chan int, size)コンパイラはこれを runtime.makechan 関数の呼び出しに変換し、パイプの実際の作成を担当します。
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}この部分のロジックは比較的シンプルで、主にパイプにメモリを割り当てています。まず传入された size と要素型 elem.size に基づいて予想される必要なメモリサイズを計算し、3 つの状況に分けて処理します。
sizeが 0 の場合、hchanSizeのみを割り当て- 要素がポインタを含まない場合、対応するメモリサイズの空間を割り当て、環状キューのメモリはパイプのメモリと連続している
- 要素がポインタを含む場合、パイプと環状キューのメモリを個別に割り当て
環状キュー内にポインタ要素が格納されている場合、それらは外部の要素を参照しているため、GC がマーク・スウィープ段階でこれらのポインタをスキャンする可能性があります。ポインタでない要素が連続したメモリに格納されている場合、不要なスキャンを回避できます。メモリ割り当て完毕后、最後に他のいくつかの記録情報フィールドを更新します。
ついでに言及すると、パイプ容量が 64 ビット整数の場合、runtime.makechan64 関数を使用して作成されます。本質的には runtime.makechan の呼び出しでもありますが、型チェックを 1 つ多く行います。
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}一般的に size は math.MaxInt32 を超えない方がよいでしょう。
送信
パイプにデータを送信する際、送信するデータを矢印の右側に配置します。
ch <- struct{}{}コンパイラはこれを runtime.chansend1 に変換し、実際にデータ送信を担当するのは runtime.chansend 関数です。chansend1 は elem ポインタを渡し、これは送信要素へのポインタを指します。
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}まずパイプが nil かどうかをチェックします。block は現在の送信操作がブロッキングかどうかを表します(block の値は select 構造に関連)。ブロッキング送信でパイプが nil の場合、直接パニックします。ノンブロッキング送信の場合、ロックなしでパイプが満杯かどうかを直接判断し、満杯の場合は直接返します。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}その後ロックを取得し、パイプが閉じられているかどうかを検出します。既に閉じられている場合は panic します。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}その後 recvq キューから sudog を 1 つデキューし、runtime.send 関数によって送信します。
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}send 関数の内容は以下の通りで、recvx と sendx を更新し、runtime.memmove 関数を使用して通信データのメモリを受信側ゴルーチンの目標要素アドレスに直接コピーし、runtime.goready 関数を通じて受信側ゴルーチンを _Grunnable 状態に変更し、スケジューリングに再参加できるようにします。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}上記のプロセスでは、受信待機中のゴルーチンが見つかるため、データは環状キューに格納されず直接受信側に送信されます。利用可能な受信側ゴルーチンがなく容量が十分な場合、環状キューバッファに配置され、直接返します。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}バッファが満杯の場合、ノンブロッキング送信の場合は直接返します。
if !block {
unlock(&c.lock)
return false
}ブロッキング送信の場合、以下のコードフローに進みます。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}まず現在のゴルーチンを sudog として構築し、hchan.sendq 送信待機ゴルーチンキューに追加し、runtime.gopark によって現在のゴルーチンをブロッキングし、_Gwaiting 状態に変更し、受信側によって再びウェイクアップされるまで待機します。runtime.KeepAlive によって送信データの保活を行い、受信側が正常にコピーすることを保証します。ウェイクアップ後、以下の仕上げフローに進みます。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}パイプのデータ送信については以下の几种の状況があります。
- パイプが
nil、プログラムがパニック - パイプが閉じられている、
panic発生 recvqキューが空でない、直接受信側に送信- ゴルーチンが待機していない、バッファに追加
- バッファが満杯、送信ゴルーチンがブロッキング状態に入り、他のゴルーチンがデータを受信するのを待機
值得注意的是、上記の送信ロジックではバッファあふれデータの処理が見られませんが、これらのデータを破棄することはできず、sudog.elem に保存され、受信側によって処理されます。
受信
Go でパイプからデータを受信する構文には 2 種類あります。1 つ目はデータのみを読み取る方法です。
data <- ch2 つ目はデータが正常に読み取られたかどうかを判断する方法です。
data, ok <- ch上記 2 つの構文はコンパイラによって runtime.chanrecv1 と runtime.chanrecv2 の呼び出しに変換されますが、実際には runtime.chanrecv の呼び出しです。受信ロジックの開始部分は送信ロジックと同様で、まずパイプが nil かどうかを判断します。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}その後ノンブロッキング読み取りの場合、ロックなしでパイプが空かどうかを判断し、パイプが閉じられていない場合は直接返します。パイプが閉じられている場合は受信要素のメモリをクリアします。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}その後ロックを取得して hchan.sendq キューにアクセスします。if c.closed != 0 の分岐から、パイプが閉じられていても、パイプ内に要素が存在する場合は直接返さず、引き続き要素を消費するコードを実行することがわかります。这也是パイプが閉じられた後でも読み取りが許可される理由です。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}パイプが閉じられていない場合、sendq キューに待機中の送信ゴルーチンがあるかどうかを確認し、あれば runtime.recv によってその送信側ゴルーチンを処理します。
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}1 つ目の状況、パイプ容量が 0 すなわちバッファなしパイプの場合、受信側は runtime.recvDirect 関数を通じて送信側から直接データをコピーします。2 つ目の状況、バッファが満杯の場合、前述のロジックではバッファが満杯かどうかを判断するロジックが見られませんが、実際にはバッファ容量が 0 でなく送信側が待機している場合、バッファが既に満杯であることを表しています。バッファが満杯でなければ送信側がブロッキングして送信を待機することはないからです。この部分のロジックは送信側によって判断されます。その後、受信側はバッファから先頭要素をデキューし、そのメモリを受信側目標要素ポインタにコピーし、送信側ゴルーチンが送信するデータをコピーしてエンキューします(ここで受信側がバッファあふれデータを処理する方法が見られます)。最後に runtime.goready によって送信側ゴルーチンをウェイクアップし、_Grunnable 状態に変更し、スケジューリングに再参加できるようにします。
待機中の送信ゴルーチンがない場合、バッファに待機中の消費要素があるかどうかを確認し、先頭要素をデキューしてそのメモリを受信側目標要素にコピーし、返します。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}最後にパイプ内に消費可能な要素がない場合、runtime.gopark によって現在のゴルーチンを _Gwaiting 状態に変更し、送信側ゴルーチンによってウェイクアップされるまでブロッキング待機します。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}ウェイクアップ後、返します。此时 返される success 値は sudog.success から来ており、送信側が正常にデータを送信した場合、このフィールドは送信側によって true に設定されるはずです。この部分のロジックは runtime.send 関数で見られます。
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}対応して、送信側 runtime.chansend 末尾の sudog.success 判断は、受信側が runtime.recv 関数内の設定から来ています。これらを通じて受信側と送信側が相互に補完し合って初めてパイプが正常に動作することがわかります。総じて言えば、データ受信はデータ送信より少し複雑で、以下の几种の状況があります。
- パイプが
nil、プログラムがパニック - パイプが閉じられている、パイプが空の場合は直接返す。空でない場合は 5 番目の状況にジャンプして実行
- バッファ容量が 0、
sendqに待機中の送信ゴルーチンがある場合、直接送信側のデータをコピーし、送信側をウェイクアップ - バッファが満杯、
sendqに待機中の送信ゴルーチンがある場合、バッファ先頭要素をデキューし、送信側のデータをエンキューし、送信側をウェイクアップ - バッファが満杯でなく数量が 0 でない場合、バッファ先頭要素をデキューし、返す
- バッファが空の場合、ブロッキング状態に入り、送信側によってウェイクアップされるのを待機
閉じる
パイプを閉じるには、組み込み関数 close を使用します。
close(ch)コンパイラはこれを runtime.closechan の呼び出しに変換します。渡されたパイプが nil または既に閉じられている場合、直接 panic します。
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}その後、このパイプの sendq と recvq 内のすべてのブロッキングゴルーチンをデキューし、runtime.goready によってすべてウェイクアップします。
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
ついでに言及すると、Go は単方向パイプを許可し、以下のいくつかのルールがあります。
- 読み取り専用パイプはデータを送信できない
- 読み取り専用パイプは閉じられない
- 書き込み専用パイプはデータを読み取れない
これらのエラーはコンパイル期の型チェック段階で見つかり、実行時まで残されません。興味があれば以下の 2 つのパッケージで関連コードを閲覧できます。
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")閉じる判断
非常に昔(Go 1 之前)、パイプが閉じられているかどうかを判断するための組み込み関数 closed がありましたが、すぐに削除されました。これはパイプの使用シーンが通常マルチゴルーチンのためです。true を返す場合は確かにパイプが閉じられていることを表せますが、false を返す場合、必ずしもパイプが閉じられていないことを表すわけではありません。次の瞬間に誰がパイプを閉じるかわからないため、この戻り値は信頼できません。この戻り値を基準にパイプにデータを送信するかどうかを判断するのはさらに危険です。閉じられたパイプにデータを送信すると panic するためです。
本当に必要がある場合、自分で実装できます。1 つの方案はパイプに書き込むことで判断する方法です。
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}しかしこれにも副作用があります。閉じられていない場合、冗長なデータが書き込まれ、defer-recover 処理プロセスに入り、追加のパフォーマンス損失を引き起こします。そのため、書き込み方案は直接放棄できます。読み取り方案は検討できますが、パイプを直接読み取ることはできません。直接読み取る block パラメータ値が true の場合、ゴルーチンがブロッキングするためです。select と組み合わせて使用する必要があります。パイプと select を組み合わせる場合、ノンブロッキングになります。
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}これは上記より少し良いように見えますが、この状況はパイプが閉じられており、パイプバッファ内に要素がない場合にのみ適用されます。要素がある場合、この要素を無駄に消費してしまいます。依然として良い実装とは言えません。
しかし実際には、パイプが閉じられているかどうかを判断する必要は全くありません。理由は既に冒頭で説明した通り、戻り値が信頼できないためです。パイプを正しく使用し、正しく閉じることが私たちが行うべきことです。そのため、
- 決して受信側でパイプを閉じない。読み取り専用パイプを閉じることはコンパイルできない点がこれを明確に示しています。送信側に行わせましょう。
- 複数の送信側がある場合、单独で 1 つのゴルーチンに閉じる操作を完了させ、
closeが 1 方のみによって呼び出され、1 回のみ呼び出されることを保証します。 - パイプを渡す際、読み取り専用または書き込み専用を制限するのが最善
これらの原則に従えば、大きな問題が発生しないことを保証できます。
