Skip to content

chan

Channel เป็นโครงสร้างข้อมูลพิเศษที่เป็นตัวแทนของแนวคิด CSP ในภาษา Go แกนกลางของแนวคิด CSP คือการสื่อสารระหว่างโปรเซสผ่านข้อความเพื่อแลกเปลี่ยนข้อมูล ในทำนองเดียวกัน ผ่าน channel เราสามารถสื่อสารระหว่าง goroutine ได้อย่างง่ายดาย

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // do something
    done <- struct{}{}
  }()
  <-done
  fmt.Println("finished")
}

นอกจากสื่อสารแล้ว ผ่าน channel ยังสามารถทำการซิงโครไนซ์ goroutine ได้ และในระบบที่ต้องการความพร้อมกัน channel เกือบจะปรากฏอยู่ทุกที่ เพื่อที่จะเข้าใจการทำงานของ channel ได้ดีขึ้น ด้านล่างจะอธิบายหลักการของมัน

โครงสร้าง

Channel ในขณะทำงานแสดงด้วยโครงสร้าง runtime.hchan ซึ่งมีฟิลด์ไม่มาก ดังนี้

go
type hchan struct {
  qcount   uint           // total data in the queue
  dataqsiz uint           // size of the circular queue
  buf      unsafe.Pointer // points to an array of dataqsiz elements
  elemsize uint16
  closed   uint32
  elemtype *_type // element type
  sendx    uint   // send index
  recvx    uint   // receive index
  recvq    waitq  // list of recv waiters
  sendq    waitq  // list of send waiters

  lock mutex
}

จากข้างต้นจะเห็นฟิลด์ lock ได้ชัดเจน Channel ที่จริงคือคิววงกลมแบบซิงโครไนซ์ที่มีล็อก ฟิลด์อื่นๆ มีคำอธิบายดังนี้

  • qcount แสดงจำนวนข้อมูลทั้งหมด

  • dataqsize ขนาดของคิววงกลม

  • buf ตัวชี้ไปยังอาร์เรย์ขนาด dataqsize คือคิววงกลม

  • closed channel ปิดหรือไม่

  • sendx, recvx แสดงดัชนีการส่งและการรับ

  • sendq, recvq แสดงลิงก์รายการของ goroutine ที่รอการส่งและการรับ องค์ประกอบคือ runtime.sudog

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

    ผ่านภาพด้านล่างจะเข้าใจโครงสร้างของ channel ได้ชัดเจน

เมื่อใช้ฟังก์ชัน len และ cap กับ channel จะคืนค่าฟิลด์ hchan.qcount และ hchan.dataqsiz ตามลำดับ

การสร้าง

โดยปกติแล้วการสร้าง channel มีวิธีเดียวคือใช้ฟังก์ชัน make

go
ch := make(chan int, size)

คอมไพเลอร์จะแปลเป็นการเรียกฟังก์ชัน runtime.makechan ซึ่งรับผิดชอบการสร้าง channel จริง โค้ดมีดังนี้

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

ตรรกะส่วนนี้ง่ายมาก ส่วนใหญ่จัดสรรหน่วยความจำให้ channel อย่างแรกคำนวณขนาดหน่วยความจำที่ต้องการตาม size และประเภทองค์ประกอบ elem.size ที่ส่งมา แล้วจัดการสามกรณี

  1. size เป็น 0 จัดสรรเพียง hchanSize
  2. องค์ประกอบไม่มีตัวชี้ ก็จัดสรรพื้นที่ขนาดหน่วยความจำที่ตรงกัน และหน่วยความจำของคิววงกลมจะต่อเนื่องกับหน่วยความจำของ channel
  3. องค์ประกอบมีตัวชี้ หน่วยความจำของ channel และคิววงกลมจัดสรรแยกกัน

หากคิววงกลมเก็บองค์ประกอบตัวชี้ เนื่องจากอ้างอิงองค์ประกอบภายนอก GC ในช่วง mark-sweep อาจสแกนตัวชี้เหล่านี้ เมื่อเก็บองค์ประกอบที่ไม่ใช่ตัวชี้ การจัดสรรบนหน่วยความจำต่อเนื่องจะหลีกเลี่ยงการสแกนที่ไม่จำเป็น หลังจากจัดสรรหน่วยความจำเสร็จแล้ว สุดท้ายอัปเดตฟิลด์อื่นๆ ที่บันทึกข้อมูล

顺便提一下,当管道容量是 64 位整数的时候,会使用 runtime.makechan64 函数来进行创建,它本质上也是对 runtime.makechan 的调用,只是多做了一个类型检查。

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

一般来说 size 都最好不要超过 math.MaxInt32

การส่ง

เมื่อส่งข้อมูลไปยัง channel เราจะวางข้อมูลที่ต้องการส่งทางขวาของลูกศร

go
ch <- struct{}{}

คอมไพเลอร์จะแปลเป็นการเรียก runtime.chansend1 ซึ่งรับผิดชอบการส่งข้อมูลจริงคือฟังก์ชัน runtime.chansend โดย chansend1 จะส่งตัวชี้ elem ซึ่งชี้ไปยังตัวชี้ขององค์ประกอบที่ส่ง

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

อย่างแรกจะตรวจสอบว่า channel เป็น nil หรือไม่ block แสดงว่าการส่งปัจจุบันเป็นการบล็อกหรือไม่ (ค่าของ block เกี่ยวข้องกับโครงสร้าง select) หากเป็นการส่งแบบบล็อกและ channel เป็น nil จะเกิดข้อผิดพลาดทันที ในกรณีการส่งแบบไม่บล็อก จะตรวจสอบโดยตรงว่า channel เต็มหรือไม่โดยไม่ล็อก หากเต็มจะคืนค่าทันที

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

จากนั้นจึงล็อกและตรวจสอบว่า channel ปิดหรือไม่ หากปิดแล้วจะ panic

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

หลังจากนั้นจะ dequeue sudog หนึ่งตัวจากคิว recvq แล้วส่งโดยฟังก์ชัน runtime.send

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

เนื้อหาฟังก์ชัน send มีดังนี้ จะอัปเดต recvx และ sendx แล้วใช้ฟังก์ชัน runtime.memmove คัดลอกหน่วยความจำของข้อมูลการสื่อสารโดยตรงไปยังที่อยู่ขององค์ประกอบเป้าหมายของ goroutine ที่รับ จากนั้นใช้ฟังก์ชัน runtime.goready ทำให้ goroutine ที่รับเปลี่ยนเป็นสถานะ _Grunnable เพื่อเข้าร่วมการจัดตารางเวลาอีกครั้ง

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

ในกระบวนการข้างต้น เนื่องจากสามารถหา goroutine ที่รอการรับได้ ข้อมูลจึงถูกส่งไปยังผู้รับโดยตรง ไม่ได้เก็บไว้ในคิววงกลม หากไม่มีผู้รับที่ใช้ได้และความจุเพียงพอ จะใส่ลงในบัฟเฟอร์คิววงกลมแล้วคืนค่าโดยตรง

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

หากบัฟเฟอร์เต็ม ในกรณีการส่งแบบไม่บล็อกจะคืนค่าโดยตรง

go
if !block {
    unlock(&c.lock)
    return false
}

หากเป็นการส่งแบบบล็อก จะเข้าสู่กระบวนการโค้ดด้านล่าง

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

อย่างแรกจะสร้าง goroutine ปัจจุบันเป็น sudog และเพิ่มเข้าไปในคิว hchan.sendq ที่รอการส่ง จากนั้นใช้ runtime.gopark ทำให้ goroutine ปัจจุบันบล็อก เปลี่ยนเป็นสถานะ _Gwaiting จนกว่าจะถูกปลุกโดยผู้รับอีกครั้ง และจะใช้ runtime.KeepAlive เพื่อรักษาข้อมูลที่ส่งเพื่อให้แน่ใจว่าผู้รับคัดลอกสำเร็จ เมื่อถูกปลุกจะเข้าสู่กระบวนการ收尾 ต่อไปนี้

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

จะเห็นว่าสำหรับการส่งข้อมูลผ่าน channel มีกรณีต่อไปนี้

  1. Channel เป็น nil โปรแกรมเกิดข้อผิดพลาด
  2. Channel ปิดแล้ว เกิด panic
  3. คิว recvq ไม่ว่าง ส่งไปยังผู้รับโดยตรง
  4. ไม่มี goroutine รอ เพิ่มลงในบัฟเฟอร์
  5. บัฟเฟอร์เต็ม goroutine ที่ส่งเข้าสู่สถานะบล็อก รอ goroutine อื่นรับข้อมูล

ควรสังเกตว่าในตรรกะการส่งข้างต้นไม่เห็นการจัดการข้อมูลล้นบัฟเฟอร์ ข้อมูล这部分ไม่สามารถทิ้งได้ มันเก็บไว้ใน sudog.elem และผู้รับจะจัดการ

การรับ

ใน Go มีไวยากรณ์สองแบบสำหรับการรับข้อมูลจาก channel แบบแรกคือการอ่านข้อมูลเท่านั้น

go
data <- ch

แบบที่สองคือการตรวจสอบว่าการอ่านข้อมูลสำเร็จหรือไม่

go
data, ok <- ch

ไวยากรณ์สองแบบข้างต้นจะถูกคอมไพเลอร์แปลเป็นการเรียก runtime.chanrecv1 และ runtime.chanrecv2 แต่ที่จริงแล้วเป็นการเรียก runtime.chanrecv เท่านั้น ตรรกะเริ่มต้นของการรับคล้ายกับตรรกะการส่ง จะตรวจสอบว่า channel เป็น nil ก่อน

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

จากนั้นในกรณีการอ่านแบบไม่บล็อก จะตรวจสอบโดยไม่ล็อกว่า channel ว่างหรือไม่ หาก channel ไม่ปิดจะคืนค่าโดยตรง หาก channel ปิดแล้วจะล้างหน่วยความจำขององค์ประกอบที่รับ

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

จากนั้นล็อกและเข้าถึงคิว hchan.sendq ผ่านสาขา if c.closed != 0 ด้านล่างจะเห็นว่า แม้ channel จะปิดแล้ว แต่ถ้ายังมีองค์ประกอบใน channel จะไม่คืนค่าโดยตรง ยังคงดำเนินการโค้ดบริโภคองค์ประกอบต่อไป นี่คือเหตุผลที่ยังอนุญาตให้อ่านได้หลังจาก channel ปิด

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

หาก channel ไม่ปิด จะตรวจสอบว่ามี goroutine กำลังรอการส่งในคิว sendq หรือไม่ หากมีจะจัดการโดย runtime.recv

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // copy data from queue to receiver
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copy data from sender to queue
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

กรณีแรก ความจุ channel เป็น 0 คือ channel แบบไม่มีบัฟเฟอร์ ผู้รับจะคัดลอกข้อมูลจากผู้ส่งโดยตรงผ่านฟังก์ชัน runtime.recvDirect กรณีที่สอง บัฟเฟอร์เต็ม แม้ข้างต้นไม่เห็นตรรกะตรวจสอบว่าบัฟเฟอร์เต็มหรือไม่ แต่ที่จริงเมื่อความจุบัฟเฟอร์ไม่เป็น 0 และมีผู้ส่งรอการส่งก็แสดงว่าบัฟเฟอร์เต็มแล้ว เพราะเฉพาะบัฟเฟอร์เต็มผู้ส่งจึงจะบล็อกรอการส่ง ตรรกะส่วนนี้ผู้ส่งเป็นผู้ตรวจสอบ จากนั้นผู้รับจะ dequeue องค์ประกอบหัวจากบัฟเฟอร์และคัดลอกหน่วยความจำไปยังตัวชี้ขององค์ประกอบผู้รับเป้าหมาย แล้วคัดลอกข้อมูลที่ goroutine ผู้ส่งต้องการส่งและเข้าคิว (ที่นี่เราเห็นวิธีการจัดการข้อมูลล้นบัฟเฟอร์ของผู้รับ) สุดท้ายจะใช้ runtime.goready ปลุก goroutine ผู้ส่ง ทำให้เปลี่ยนเป็นสถานะ _Grunnable เพื่อเข้าร่วมการจัดตารางเวลาอีกครั้ง

หากไม่มี goroutine รอการส่ง จะตรวจสอบว่ามีองค์ประกอบรอการบริโภคในบัฟเฟอร์หรือไม่ แล้ว dequeue องค์ประกอบหัวและคัดลอกหน่วยความจำไปยังองค์ประกอบเป้าหมายของผู้รับ แล้วคืนค่า

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Receive directly from queue
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

สุดท้ายหากไม่มีองค์ประกอบที่บริโภคได้ใน channel จะใช้ runtime.gopark ทำให้ goroutine ปัจจุบันเปลี่ยนเป็นสถานะ _Gwaiting บล็อกรอจนกว่าจะถูกปลุกโดย goroutine ผู้ส่ง

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

หลังจากถูกปลุก จะคืนค่า ในขณะนี้ค่า success ที่คืนมามาจาก sudog.success หากผู้ส่งส่งข้อมูลสำเร็จ ฟิลด์นี้ควรถูกตั้งค่าเป็น true โดยผู้ส่ง ซึ่งเราสามารถเห็นได้ในฟังก์ชัน runtime.send

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

ในทำนองเดียวกัน ในส่วนท้ายของ runtime.chansend ของผู้ส่ง การตรวจสอบ sudog.success มาจากการตั้งค่าในฟังก์ชัน runtime.recv ของผู้รับ ผ่านนี้สามารถพบว่าผู้รับและผู้ส่งต้องทำงานร่วมกัน channel จึงจะทำงานได้ปกติ โดยรวมแล้ว การรับข้อมูลซับซ้อนกว่าการส่งข้อมูลเล็กน้อย มีกรณีดังนี้

  1. Channel เป็น nil โปรแกรมเกิดข้อผิดพลาด
  2. Channel ปิดแล้ว หาก channel ว่างจะคืนค่าโดยตรง หากไม่ว่างจะข้ามไปดำเนินการกรณีที่ 5
  3. ความจุบัฟเฟอร์เป็น 0 มี goroutine รอการส่งใน sendq จะคัดลอกข้อมูลจากผู้ส่งโดยตรง แล้วปลุกผู้ส่ง
  4. บัฟเฟอร์เต็ม มี goroutine รอการส่งใน sendq จะ dequeue องค์ประกอบหัวจากบัฟเฟอร์ ข้อมูลผู้ส่งเข้าคิว แล้วปลุกผู้ส่ง
  5. บัฟเฟอร์ไม่เต็มและจำนวนไม่เป็น 0 จะ dequeue องค์ประกอบหัวจากบัฟเฟอร์ แล้วคืนค่า
  6. บัฟเฟอร์ว่าง เข้าสู่สถานะบล็อก รอถูกปลุกโดยผู้ส่ง

การปิด

สำหรับการปิด channel เราจะใช้ฟังก์ชัน built-in close

go
close(ch)

คอมไพเลอร์จะแปลเป็นการเรียก runtime.closechan หาก channel ที่ส่งเป็น nil หรือปิดแล้ว จะ panic โดยตรง

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

จากนั้น dequeue goroutine ที่บล็อกทั้งหมดในคิว sendq และ recvq ของ channel นี้และปลุกทั้งหมดผ่าน runtime.goready

go
func closechan(c *hchan) {
    ...
  var glist gList

    // release all readers
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

顺便提一下,Go 允许单向管道,有着下面几个规则

  1. 只读管道不能发送数据
  2. 只读管道不能关闭
  3. 只写管道不能读取数据

这些错误早在编译期的类型检查阶段就会找出来,不会留到运行时,感兴趣可以到下面这两个包阅读相关代码

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

การตรวจสอบการปิด

ในสมัยก่อนมาก (ก่อน go1) มีฟังก์ชัน built-in closed ใช้ตรวจสอบว่า channel ปิดหรือไม่ แต่ต่อมาถูกลบออกอย่างรวดเร็ว这是因为管道的使用场景通常都是多协程的情况,假设它返回 true 确实可以代表管道已经关闭了,但是如果它返回了 false,那么并不能代表管道就真的没有关闭,因为谁也不知道在下一刻谁会把管道关闭掉,所以这个返回值是不可信的,如果以这个返回值为依据来判断是否向管道发送数据就更是危险了,因为向已关闭的管道发送数据会发生 panic

หากต้องการจริงๆ สามารถทำเองได้ วิธีหนึ่งคือตรวจสอบผ่านการเขียน channel โค้ดดังนี้

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

但这样也是有副作用的,如果没关闭的话就会向里面写入冗余的数据,而且会进入 defer-recover 处理过程,造成额外的性能损失,所以写方案可以直接放弃。读方案的话可以考虑,不过不能直接读管道,因为直接读 block 参数值为 true 将会阻塞协程,应该结合 select 来使用,管道与 select 结合时就是非阻塞的。

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

这种只是看起来要比上面好一点点,它的情况仅仅适用于管道已关闭且管道缓冲区中没有元素,如果有元素的话还会平白无故的消费掉这个元素,还是没有一个很好的实现。

但其实我们根本就不需要判断管道是否关闭,理由在开头已经讲过了因为返回值并不可信,正确地使用管道并正确的关闭才是我们应该做的,所以

  1. 永远不要在接收方关闭管道,关闭只读管道不能通过编译这点已经很明确地告诉你不要这么做了,交给发送方来做这件事。
  2. 如果有多个发送方,应该单独让一个协程来完成关闭操作,确保 close 只有一方调用且只会调用一次。
  3. 传递管道时,最好限制只读或只写

遵循这几个原则,就能确保不会出太大的问题。

Golang by www.golangdev.cn edit