chan
Channel เป็นโครงสร้างข้อมูลพิเศษที่เป็นตัวแทนของแนวคิด CSP ในภาษา Go แกนกลางของแนวคิด CSP คือการสื่อสารระหว่างโปรเซสผ่านข้อความเพื่อแลกเปลี่ยนข้อมูล ในทำนองเดียวกัน ผ่าน channel เราสามารถสื่อสารระหว่าง goroutine ได้อย่างง่ายดาย
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}นอกจากสื่อสารแล้ว ผ่าน channel ยังสามารถทำการซิงโครไนซ์ goroutine ได้ และในระบบที่ต้องการความพร้อมกัน channel เกือบจะปรากฏอยู่ทุกที่ เพื่อที่จะเข้าใจการทำงานของ channel ได้ดีขึ้น ด้านล่างจะอธิบายหลักการของมัน
โครงสร้าง
Channel ในขณะทำงานแสดงด้วยโครงสร้าง runtime.hchan ซึ่งมีฟิลด์ไม่มาก ดังนี้
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}จากข้างต้นจะเห็นฟิลด์ lock ได้ชัดเจน Channel ที่จริงคือคิววงกลมแบบซิงโครไนซ์ที่มีล็อก ฟิลด์อื่นๆ มีคำอธิบายดังนี้
qcountแสดงจำนวนข้อมูลทั้งหมดdataqsizeขนาดของคิววงกลมbufตัวชี้ไปยังอาร์เรย์ขนาดdataqsizeคือคิววงกลมclosedchannel ปิดหรือไม่sendx,recvxแสดงดัชนีการส่งและการรับsendq,recvqแสดงลิงก์รายการของ goroutine ที่รอการส่งและการรับ องค์ประกอบคือruntime.sudoggotype waitq struct { first *sudog last *sudog }ผ่านภาพด้านล่างจะเข้าใจโครงสร้างของ channel ได้ชัดเจน

เมื่อใช้ฟังก์ชัน len และ cap กับ channel จะคืนค่าฟิลด์ hchan.qcount และ hchan.dataqsiz ตามลำดับ
การสร้าง
โดยปกติแล้วการสร้าง channel มีวิธีเดียวคือใช้ฟังก์ชัน make
ch := make(chan int, size)คอมไพเลอร์จะแปลเป็นการเรียกฟังก์ชัน runtime.makechan ซึ่งรับผิดชอบการสร้าง channel จริง โค้ดมีดังนี้
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}ตรรกะส่วนนี้ง่ายมาก ส่วนใหญ่จัดสรรหน่วยความจำให้ channel อย่างแรกคำนวณขนาดหน่วยความจำที่ต้องการตาม size และประเภทองค์ประกอบ elem.size ที่ส่งมา แล้วจัดการสามกรณี
sizeเป็น 0 จัดสรรเพียงhchanSize- องค์ประกอบไม่มีตัวชี้ ก็จัดสรรพื้นที่ขนาดหน่วยความจำที่ตรงกัน และหน่วยความจำของคิววงกลมจะต่อเนื่องกับหน่วยความจำของ channel
- องค์ประกอบมีตัวชี้ หน่วยความจำของ channel และคิววงกลมจัดสรรแยกกัน
หากคิววงกลมเก็บองค์ประกอบตัวชี้ เนื่องจากอ้างอิงองค์ประกอบภายนอก GC ในช่วง mark-sweep อาจสแกนตัวชี้เหล่านี้ เมื่อเก็บองค์ประกอบที่ไม่ใช่ตัวชี้ การจัดสรรบนหน่วยความจำต่อเนื่องจะหลีกเลี่ยงการสแกนที่ไม่จำเป็น หลังจากจัดสรรหน่วยความจำเสร็จแล้ว สุดท้ายอัปเดตฟิลด์อื่นๆ ที่บันทึกข้อมูล
顺便提一下,当管道容量是 64 位整数的时候,会使用 runtime.makechan64 函数来进行创建,它本质上也是对 runtime.makechan 的调用,只是多做了一个类型检查。
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}一般来说 size 都最好不要超过 math.MaxInt32。
การส่ง
เมื่อส่งข้อมูลไปยัง channel เราจะวางข้อมูลที่ต้องการส่งทางขวาของลูกศร
ch <- struct{}{}คอมไพเลอร์จะแปลเป็นการเรียก runtime.chansend1 ซึ่งรับผิดชอบการส่งข้อมูลจริงคือฟังก์ชัน runtime.chansend โดย chansend1 จะส่งตัวชี้ elem ซึ่งชี้ไปยังตัวชี้ขององค์ประกอบที่ส่ง
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}อย่างแรกจะตรวจสอบว่า channel เป็น nil หรือไม่ block แสดงว่าการส่งปัจจุบันเป็นการบล็อกหรือไม่ (ค่าของ block เกี่ยวข้องกับโครงสร้าง select) หากเป็นการส่งแบบบล็อกและ channel เป็น nil จะเกิดข้อผิดพลาดทันที ในกรณีการส่งแบบไม่บล็อก จะตรวจสอบโดยตรงว่า channel เต็มหรือไม่โดยไม่ล็อก หากเต็มจะคืนค่าทันที
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}จากนั้นจึงล็อกและตรวจสอบว่า channel ปิดหรือไม่ หากปิดแล้วจะ panic
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}หลังจากนั้นจะ dequeue sudog หนึ่งตัวจากคิว recvq แล้วส่งโดยฟังก์ชัน runtime.send
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}เนื้อหาฟังก์ชัน send มีดังนี้ จะอัปเดต recvx และ sendx แล้วใช้ฟังก์ชัน runtime.memmove คัดลอกหน่วยความจำของข้อมูลการสื่อสารโดยตรงไปยังที่อยู่ขององค์ประกอบเป้าหมายของ goroutine ที่รับ จากนั้นใช้ฟังก์ชัน runtime.goready ทำให้ goroutine ที่รับเปลี่ยนเป็นสถานะ _Grunnable เพื่อเข้าร่วมการจัดตารางเวลาอีกครั้ง
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}ในกระบวนการข้างต้น เนื่องจากสามารถหา goroutine ที่รอการรับได้ ข้อมูลจึงถูกส่งไปยังผู้รับโดยตรง ไม่ได้เก็บไว้ในคิววงกลม หากไม่มีผู้รับที่ใช้ได้และความจุเพียงพอ จะใส่ลงในบัฟเฟอร์คิววงกลมแล้วคืนค่าโดยตรง
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}หากบัฟเฟอร์เต็ม ในกรณีการส่งแบบไม่บล็อกจะคืนค่าโดยตรง
if !block {
unlock(&c.lock)
return false
}หากเป็นการส่งแบบบล็อก จะเข้าสู่กระบวนการโค้ดด้านล่าง
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}อย่างแรกจะสร้าง goroutine ปัจจุบันเป็น sudog และเพิ่มเข้าไปในคิว hchan.sendq ที่รอการส่ง จากนั้นใช้ runtime.gopark ทำให้ goroutine ปัจจุบันบล็อก เปลี่ยนเป็นสถานะ _Gwaiting จนกว่าจะถูกปลุกโดยผู้รับอีกครั้ง และจะใช้ runtime.KeepAlive เพื่อรักษาข้อมูลที่ส่งเพื่อให้แน่ใจว่าผู้รับคัดลอกสำเร็จ เมื่อถูกปลุกจะเข้าสู่กระบวนการ收尾 ต่อไปนี้
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}จะเห็นว่าสำหรับการส่งข้อมูลผ่าน channel มีกรณีต่อไปนี้
- Channel เป็น
nilโปรแกรมเกิดข้อผิดพลาด - Channel ปิดแล้ว เกิด
panic - คิว
recvqไม่ว่าง ส่งไปยังผู้รับโดยตรง - ไม่มี goroutine รอ เพิ่มลงในบัฟเฟอร์
- บัฟเฟอร์เต็ม goroutine ที่ส่งเข้าสู่สถานะบล็อก รอ goroutine อื่นรับข้อมูล
ควรสังเกตว่าในตรรกะการส่งข้างต้นไม่เห็นการจัดการข้อมูลล้นบัฟเฟอร์ ข้อมูล这部分ไม่สามารถทิ้งได้ มันเก็บไว้ใน sudog.elem และผู้รับจะจัดการ
การรับ
ใน Go มีไวยากรณ์สองแบบสำหรับการรับข้อมูลจาก channel แบบแรกคือการอ่านข้อมูลเท่านั้น
data <- chแบบที่สองคือการตรวจสอบว่าการอ่านข้อมูลสำเร็จหรือไม่
data, ok <- chไวยากรณ์สองแบบข้างต้นจะถูกคอมไพเลอร์แปลเป็นการเรียก runtime.chanrecv1 และ runtime.chanrecv2 แต่ที่จริงแล้วเป็นการเรียก runtime.chanrecv เท่านั้น ตรรกะเริ่มต้นของการรับคล้ายกับตรรกะการส่ง จะตรวจสอบว่า channel เป็น nil ก่อน
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}จากนั้นในกรณีการอ่านแบบไม่บล็อก จะตรวจสอบโดยไม่ล็อกว่า channel ว่างหรือไม่ หาก channel ไม่ปิดจะคืนค่าโดยตรง หาก channel ปิดแล้วจะล้างหน่วยความจำขององค์ประกอบที่รับ
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}จากนั้นล็อกและเข้าถึงคิว hchan.sendq ผ่านสาขา if c.closed != 0 ด้านล่างจะเห็นว่า แม้ channel จะปิดแล้ว แต่ถ้ายังมีองค์ประกอบใน channel จะไม่คืนค่าโดยตรง ยังคงดำเนินการโค้ดบริโภคองค์ประกอบต่อไป นี่คือเหตุผลที่ยังอนุญาตให้อ่านได้หลังจาก channel ปิด
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}หาก channel ไม่ปิด จะตรวจสอบว่ามี goroutine กำลังรอการส่งในคิว sendq หรือไม่ หากมีจะจัดการโดย runtime.recv
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}กรณีแรก ความจุ channel เป็น 0 คือ channel แบบไม่มีบัฟเฟอร์ ผู้รับจะคัดลอกข้อมูลจากผู้ส่งโดยตรงผ่านฟังก์ชัน runtime.recvDirect กรณีที่สอง บัฟเฟอร์เต็ม แม้ข้างต้นไม่เห็นตรรกะตรวจสอบว่าบัฟเฟอร์เต็มหรือไม่ แต่ที่จริงเมื่อความจุบัฟเฟอร์ไม่เป็น 0 และมีผู้ส่งรอการส่งก็แสดงว่าบัฟเฟอร์เต็มแล้ว เพราะเฉพาะบัฟเฟอร์เต็มผู้ส่งจึงจะบล็อกรอการส่ง ตรรกะส่วนนี้ผู้ส่งเป็นผู้ตรวจสอบ จากนั้นผู้รับจะ dequeue องค์ประกอบหัวจากบัฟเฟอร์และคัดลอกหน่วยความจำไปยังตัวชี้ขององค์ประกอบผู้รับเป้าหมาย แล้วคัดลอกข้อมูลที่ goroutine ผู้ส่งต้องการส่งและเข้าคิว (ที่นี่เราเห็นวิธีการจัดการข้อมูลล้นบัฟเฟอร์ของผู้รับ) สุดท้ายจะใช้ runtime.goready ปลุก goroutine ผู้ส่ง ทำให้เปลี่ยนเป็นสถานะ _Grunnable เพื่อเข้าร่วมการจัดตารางเวลาอีกครั้ง
หากไม่มี goroutine รอการส่ง จะตรวจสอบว่ามีองค์ประกอบรอการบริโภคในบัฟเฟอร์หรือไม่ แล้ว dequeue องค์ประกอบหัวและคัดลอกหน่วยความจำไปยังองค์ประกอบเป้าหมายของผู้รับ แล้วคืนค่า
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}สุดท้ายหากไม่มีองค์ประกอบที่บริโภคได้ใน channel จะใช้ runtime.gopark ทำให้ goroutine ปัจจุบันเปลี่ยนเป็นสถานะ _Gwaiting บล็อกรอจนกว่าจะถูกปลุกโดย goroutine ผู้ส่ง
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}หลังจากถูกปลุก จะคืนค่า ในขณะนี้ค่า success ที่คืนมามาจาก sudog.success หากผู้ส่งส่งข้อมูลสำเร็จ ฟิลด์นี้ควรถูกตั้งค่าเป็น true โดยผู้ส่ง ซึ่งเราสามารถเห็นได้ในฟังก์ชัน runtime.send
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}ในทำนองเดียวกัน ในส่วนท้ายของ runtime.chansend ของผู้ส่ง การตรวจสอบ sudog.success มาจากการตั้งค่าในฟังก์ชัน runtime.recv ของผู้รับ ผ่านนี้สามารถพบว่าผู้รับและผู้ส่งต้องทำงานร่วมกัน channel จึงจะทำงานได้ปกติ โดยรวมแล้ว การรับข้อมูลซับซ้อนกว่าการส่งข้อมูลเล็กน้อย มีกรณีดังนี้
- Channel เป็น
nilโปรแกรมเกิดข้อผิดพลาด - Channel ปิดแล้ว หาก channel ว่างจะคืนค่าโดยตรง หากไม่ว่างจะข้ามไปดำเนินการกรณีที่ 5
- ความจุบัฟเฟอร์เป็น 0 มี goroutine รอการส่งใน
sendqจะคัดลอกข้อมูลจากผู้ส่งโดยตรง แล้วปลุกผู้ส่ง - บัฟเฟอร์เต็ม มี goroutine รอการส่งใน
sendqจะ dequeue องค์ประกอบหัวจากบัฟเฟอร์ ข้อมูลผู้ส่งเข้าคิว แล้วปลุกผู้ส่ง - บัฟเฟอร์ไม่เต็มและจำนวนไม่เป็น 0 จะ dequeue องค์ประกอบหัวจากบัฟเฟอร์ แล้วคืนค่า
- บัฟเฟอร์ว่าง เข้าสู่สถานะบล็อก รอถูกปลุกโดยผู้ส่ง
การปิด
สำหรับการปิด channel เราจะใช้ฟังก์ชัน built-in close
close(ch)คอมไพเลอร์จะแปลเป็นการเรียก runtime.closechan หาก channel ที่ส่งเป็น nil หรือปิดแล้ว จะ panic โดยตรง
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}จากนั้น dequeue goroutine ที่บล็อกทั้งหมดในคิว sendq และ recvq ของ channel นี้และปลุกทั้งหมดผ่าน runtime.goready
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
顺便提一下,Go 允许单向管道,有着下面几个规则
- 只读管道不能发送数据
- 只读管道不能关闭
- 只写管道不能读取数据
这些错误早在编译期的类型检查阶段就会找出来,不会留到运行时,感兴趣可以到下面这两个包阅读相关代码
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")การตรวจสอบการปิด
ในสมัยก่อนมาก (ก่อน go1) มีฟังก์ชัน built-in closed ใช้ตรวจสอบว่า channel ปิดหรือไม่ แต่ต่อมาถูกลบออกอย่างรวดเร็ว这是因为管道的使用场景通常都是多协程的情况,假设它返回 true 确实可以代表管道已经关闭了,但是如果它返回了 false,那么并不能代表管道就真的没有关闭,因为谁也不知道在下一刻谁会把管道关闭掉,所以这个返回值是不可信的,如果以这个返回值为依据来判断是否向管道发送数据就更是危险了,因为向已关闭的管道发送数据会发生 panic。
หากต้องการจริงๆ สามารถทำเองได้ วิธีหนึ่งคือตรวจสอบผ่านการเขียน channel โค้ดดังนี้
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}但这样也是有副作用的,如果没关闭的话就会向里面写入冗余的数据,而且会进入 defer-recover 处理过程,造成额外的性能损失,所以写方案可以直接放弃。读方案的话可以考虑,不过不能直接读管道,因为直接读 block 参数值为 true 将会阻塞协程,应该结合 select 来使用,管道与 select 结合时就是非阻塞的。
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}这种只是看起来要比上面好一点点,它的情况仅仅适用于管道已关闭且管道缓冲区中没有元素,如果有元素的话还会平白无故的消费掉这个元素,还是没有一个很好的实现。
但其实我们根本就不需要判断管道是否关闭,理由在开头已经讲过了因为返回值并不可信,正确地使用管道并正确的关闭才是我们应该做的,所以
- 永远不要在接收方关闭管道,关闭只读管道不能通过编译这点已经很明确地告诉你不要这么做了,交给发送方来做这件事。
- 如果有多个发送方,应该单独让一个协程来完成关闭操作,确保
close只有一方调用且只会调用一次。 - 传递管道时,最好限制只读或只写
遵循这几个原则,就能确保不会出太大的问题。
