Skip to content

chan

channel adalah struktur data khusus yang merupakan representasi khas dari filosofi CSP dalam bahasa Go. Inti dari filosofi CSP adalah proses berkomunikasi dan bertukar data melalui pesan. Dengan channel, kita dapat dengan mudah berkomunikasi antar goroutine.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // lakukan sesuatu
    done <- struct{}{}
  }()
  <-done
  fmt.Println("selesai")
}

Selain komunikasi, channel juga dapat digunakan untuk operasi sinkronisasi goroutine. Dalam sistem yang membutuhkan konkurensi, channel hampir selalu ada. Untuk memahami cara kerja channel dengan lebih baik, berikut akan dijelaskan prinsip kerjanya.

Struktur

Channel dalam runtime direpresentasikan sebagai struktur runtime.hchan, yang memiliki field-field berikut

go
type hchan struct {
  qcount   uint           // total data dalam antrian
  dataqsiz uint           // ukuran antrian melingkar
  buf      unsafe.Pointer // menunjuk ke array dengan elemen dataqsiz
  elemsize uint16
  closed   uint32
  elemtype *_type // tipe elemen
  sendx    uint   // indeks pengiriman
  recvx    uint   // indeks penerimaan
  recvq    waitq  // daftar goroutine yang menunggu penerimaan
  sendq    waitq  // daftar goroutine yang menunggu pengiriman

  lock mutex
}

Dari struktur di atas, field lock terlihat jelas. Channel sebenarnya adalah antrian melingkar sinkron dengan kunci. Field-field lainnya dijelaskan sebagai berikut

  • qcount, menunjukkan jumlah total data

  • dataqsiz, ukuran antrian melingkar

  • buf, pointer yang menunjuk ke array dengan ukuran dataqsiz, yaitu antrian melingkar

  • closed, menunjukkan apakah channel ditutup

  • sendx, recvx, menunjukkan indeks pengiriman dan penerimaan

  • sendq, recvq, menunjukkan daftar goroutine yang menunggu pengiriman dan penerimaan, yang terdiri dari elemen runtime.sudog

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

    Melalui gambar berikut, struktur channel dapat dipahami dengan lebih jelas

Ketika fungsi len dan cap digunakan pada channel, yang dikembalikan sebenarnya adalah field hchan.qcount dan hchan.dataqsiz.

Pembuatan

Secara normal, hanya ada satu cara untuk membuat pipe, yaitu menggunakan fungsi make

go
ch := make(chan int, size)

Compiler akan menerjemahkannya menjadi pemanggilan fungsi runtime.makechan, yang bertanggung jawab untuk membuat pipe secara aktual. Kodenya adalah sebagai berikut.

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

Logika bagian ini cukup sederhana, terutama mengalokasikan memori untuk pipe. Pertama-tama akan menghitung ukuran memori yang dibutuhkan berdasarkan size dan tipe elemen elem.size yang传入, kemudian dibagi menjadi tiga kasus untuk ditangani

  1. size adalah 0, hanya mengalokasikan hchanSize
  2. Elemen tidak mengandung pointer, maka mengalokasikan ruang dengan ukuran memori yang sesuai, dan memori antrian melingkar bersambung dengan memori pipe
  3. Elemen mengandung pointer, memori pipe dan antrian melingkar dialokasikan secara terpisah

Jika elemen yang disimpan dalam antrian melingkar adalah elemen pointer, karena mereka mereferensikan elemen eksternal, GC akan memindai pointer-pointer ini selama fase mark-and-sweep. Ketika elemen non-pointer disimpan di memori yang bersambung, ini menghindari pemindaian yang tidak perlu. Setelah alokasi memori selesai, kemudian update field-field lain yang mencatat informasi.

Sebagai tambahan, ketika kapasitas pipe adalah kelipatan 64-bit, fungsi runtime.makechan64 akan digunakan untuk pembuatan. Ini pada dasarnya juga merupakan pemanggilan ke runtime.makechan, hanya saja melakukan pemeriksaan tipe tambahan.

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

Secara umum, size sebaiknya tidak melebihi math.MaxInt32.

Pengiriman

Saat mengirim data ke pipe, kita menempatkan data yang akan dikirim di sebelah kanan panah

go
ch <- struct{}{}

Compiler akan menerjemahkannya menjadi runtime.chansend1, dan fungsi yang benar-benar bertanggung jawab untuk mengirim data adalah runtime.chansend. chansend1 akan meneruskan pointer elem yang menunjuk ke elemen yang akan dikirim.

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

Pertama-tama akan memeriksa apakah pipe adalah nil. block menunjukkan apakah operasi pengiriman saat ini bersifat blocking (nilai block terkait dengan struktur select). Jika pengiriman blocking dan pipe adalah nil, maka akan langsung panic. Dalam kasus pengiriman non-blocking, akan langsung memeriksa apakah pipe sudah penuh tanpa mengunci, dan jika sudah penuh akan langsung return.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Kemudian baru mengunci, dan memeriksa apakah pipe ditutup. Jika sudah ditutup akan panic

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

Setelah itu, dequeue sebuah sudog dari antrian recvq, kemudian kirim menggunakan fungsi runtime.send.

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

Konten fungsi send adalah sebagai berikut, akan mengupdate recvx dan sendx, kemudian menggunakan fungsi runtime.memmove untuk menyalin memori data komunikasi langsung ke alamat elemen tujuan goroutine penerima, kemudian melalui fungsi runtime.goready membuat goroutine penerima berubah menjadi status _Grunnable, agar dapat kembali berpartisipasi dalam penjadwalan.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

Dalam proses di atas, karena dapat menemukan goroutine yang menunggu untuk menerima, data langsung dikirim ke penerima dan tidak disimpan dalam antrian melingkar. Jika tidak ada goroutine penerima yang tersedia dan kapasitas cukup, data akan dimasukkan ke dalam buffer antrian melingkar, kemudian langsung return.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

Jika buffer penuh, dalam kasus pengiriman non-blocking akan langsung return

go
if !block {
    unlock(&c.lock)
    return false
}

Jika pengiriman blocking, akan masuk ke alur kode berikut

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

Pertama-tama akan mengkonstruksi goroutine saat ini menjadi sudog dan menambahkannya ke antrian goroutine pengiriman yang menunggu hchan.sendq, kemudian runtime.gopark akan membuat goroutine saat ini blocking, berubah menjadi status _Gwaiting sampai dibangunkan kembali oleh penerima, dan melalui runtime.KeepAlive akan menjaga data yang akan dikirim untuk memastikan penerima berhasil menyalin. Setelah dibangunkan, akan masuk ke proses收尾 berikut

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

Yang dapat dilihat adalah untuk pengiriman data pipe, total ada beberapa kasus berikut

  1. Pipe adalah nil, program panic
  2. Pipe sudah ditutup, terjadi panic
  3. Antrian recvq tidak kosong, langsung kirim ke penerima
  4. Tidak ada goroutine yang menunggu, masukkan ke buffer
  5. Buffer penuh, goroutine pengiriman masuk ke status blocking, menunggu goroutine lain menerima data

Perlu dicatat bahwa dalam logika pengiriman di atas tidak terlihat penanganan data yang overflow dari buffer. Data ini tidak mungkin dibuang, disimpan di sudog.elem, dan akan ditangani oleh penerima.

Penerimaan

Di Go, ada dua sintaks untuk menerima data dari pipe. Yang pertama adalah hanya membaca data

go
data <- ch

Yang kedua adalah menentukan apakah pembacaan data berhasil

go
data, ok <- ch

Kedua sintaks di atas akan diterjemahkan oleh compiler menjadi pemanggilan runtime.chanrecv1 dan runtime.chanrecv2, namun keduanya sebenarnya adalah pemanggilan ke runtime.chanrecv. Logika penerimaan di bagian awal mirip dengan logika pengiriman, pertama-tama akan memeriksa apakah pipe adalah nil.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

Kemudian dalam kasus pembacaan non-blocking, tanpa mengunci memeriksa apakah pipe kosong. Jika pipe tidak ditutup langsung return, jika pipe sudah ditutup maka kosongkan memori elemen yang diterima.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

Kemudian mengunci untuk mengakses antrian hchan.sendq. Melalui cabang if c.closed != 0 di bawah ini, dapat dilihat bahwa meskipun pipe sudah ditutup, jika masih ada elemen di dalam pipe, tidak akan langsung return, tetap akan melanjutkan ke kode konsumsi elemen di bawah. Inilah alasan mengapa pipe masih memungkinkan pembacaan setelah ditutup.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

Jika pipe tidak ditutup, akan memeriksa apakah ada goroutine yang menunggu pengiriman di antrian sendq. Jika ya, runtime.recv akan menangani goroutine pengiriman tersebut.

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // salin data dari antrian ke penerima
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // salin data dari pengirim ke antrian
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

Kasus pertama, kapasitas pipe adalah 0 yaitu pipe tanpa buffer, penerima akan langsung menyalin data dari pengirim melalui fungsi runtime.recvDirect. Kasus kedua, buffer sudah penuh. Meskipun di depan tidak terlihat logika untuk memeriksa apakah buffer penuh, sebenarnya ketika kapasitas buffer tidak 0 dan ada pengirim yang menunggu pengiriman, ini sudah mewakili bahwa buffer sudah penuh. Karena hanya ketika buffer penuh pengirim akan blocking menunggu pengiriman. Logika ini ditentukan oleh pengirim. Kemudian penerima akan dequeue elemen头部 dari buffer dan menyalin memorinya ke pointer elemen penerima tujuan, kemudian menyalin data yang akan dikirim oleh goroutine pengirim dan enqueue (di sini kita melihat cara penerima menangani data yang overflow dari buffer). Terakhir, runtime.goready akan membangunkan goroutine pengirim, membuatnya berubah menjadi status _Grunnable, agar dapat kembali bergabung dengan penjadwalan.

Jika tidak ada goroutine yang menunggu pengiriman, akan memeriksa apakah ada elemen yang menunggu untuk dikonsumsi di buffer. Dequeue elemen头部 dan salin memorinya ke elemen tujuan penerima, kemudian return.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Terima langsung dari antrian
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

Pada akhirnya, jika tidak ada elemen yang dapat dikonsumsi di pipe, runtime.gopark akan membuat goroutine saat ini berubah menjadi status _Gwaiting, blocking menunggu sampai dibangunkan oleh goroutine pengirim.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

Setelah dibangunkan, akan return. Saat ini nilai success yang return berasal dari sudog.success. Jika pengirim berhasil mengirim data, field ini harus diatur ke true oleh pengirim. Logika ini dapat kita lihat di fungsi runtime.send.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

Sebaliknya, pada akhir runtime.chansend pengirim, pengecekan sudog.success, sumbernya juga dari pengaturan di fungsi runtime.recv oleh penerima. Melalui ini dapat ditemukan bahwa penerima dan pengirim saling melengkapi agar pipe dapat berfungsi dengan normal. Secara keseluruhan, menerima data sedikit lebih kompleks daripada mengirim data, total ada beberapa kasus berikut

  1. Pipe adalah nil, program panic
  2. Pipe sudah ditutup, jika pipe kosong langsung return, jika tidak kosong lanjut ke kasus ke-5
  3. Kapasitas buffer adalah 0, ada goroutine yang menunggu pengiriman di sendq, langsung salin data dari pengirim, kemudian bangunkan pengirim.
  4. Buffer penuh, ada goroutine yang menunggu pengiriman di sendq, dequeue elemen头部 dari buffer, enqueue data pengirim, kemudian bangunkan pengirim.
  5. Buffer tidak penuh dan jumlah tidak 0, dequeue elemen头部 dari buffer, kemudian return.
  6. Buffer kosong, masuk ke status blocking, menunggu untuk dibangunkan oleh pengirim.

Penutupan

Untuk menutup pipe, kita menggunakan fungsi built-in close

go
close(ch)

Compiler akan menerjemahkannya menjadi pemanggilan runtime.closechan. Jika pipe yang diteruskan adalah nil atau sudah ditutup, akan langsung panic

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

Kemudian dequeue semua goroutine yang blocking di antrian sendq dan recvq dari pipe ini, dan membangunkan semuanya melalui runtime.goready

go
func closechan(c *hchan) {
    ...
  var glist gList

    // lepaskan semua pembaca
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // lepaskan semua penulis (mereka akan panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Siapkan semua G sekarang setelah kita melepaskan kunci channel.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

Sebagai tambahan, Go mengizinkan pipe satu arah, dengan beberapa aturan berikut

  1. Pipe hanya-baca tidak dapat mengirim data
  2. Pipe hanya-baca tidak dapat ditutup
  3. Pipe hanya-tulis tidak dapat membaca data

Kesalahan ini akan ditemukan pada tahap pemeriksaan tipe di waktu kompilasi, tidak akan dibiarkan sampai waktu eksekusi. Jika tertarik, dapat membaca kode terkait di dua paket berikut

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

Menentukan Penutupan

Pada waktu yang sangat awal (sebelum Go 1), ada fungsi built-in closed untuk menentukan apakah pipe ditutup, namun kemudian segera dihapus. Ini karena skenario penggunaan pipe biasanya adalah kasus multi-goroutine. Asumsikan return true memang dapat mewakili pipe sudah ditutup, tetapi jika return false, tidak dapat mewakili bahwa pipe benar-benar belum ditutup. Karena tidak ada yang tahu siapa yang akan menutup pipe di下一刻,jadi nilai return ini tidak dapat dipercaya. Jika menggunakan nilai return ini sebagai dasar untuk menentukan apakah akan mengirim data ke pipe, lebih berbahaya lagi, karena mengirim data ke pipe yang sudah ditutup akan menyebabkan panic.

Jika memang diperlukan, dapat mengimplementasikannya sendiri. Salah satu solusi adalah menentukan apakah pipe ditutup dengan menulis ke pipe, kodenya adalah sebagai berikut

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

Namun ini juga memiliki efek samping. Jika tidak ditutup, akan menulis data redundan ke dalamnya, dan akan masuk ke proses penanganan defer-recover, menyebabkan kehilangan performa tambahan. Jadi solusi menulis dapat langsung diabaikan. Untuk solusi membaca, dapat dipertimbangkan, namun tidak dapat langsung membaca pipe, karena membaca langsung dengan nilai parameter block adalah true akan memblokir goroutine. Seharusnya dikombinasikan dengan select, ketika pipe dikombinasikan dengan select adalah non-blocking.

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

Ini hanya terlihat sedikit lebih baik daripada yang di atas. Kasusnya hanya berlaku untuk pipe yang sudah ditutup dan tidak ada elemen di buffer pipe. Jika ada elemen, akan mengonsumsi elemen ini dengan sia-sia, masih tidak ada implementasi yang baik.

Namun sebenarnya kita sama sekali tidak perlu menentukan apakah pipe ditutup. Alasan sudah dijelaskan di awal karena nilai return tidak dapat dipercaya. Menggunakan pipe dengan benar dan menutup dengan benar adalah yang seharusnya kita lakukan. Jadi

  1. Jangan pernah menutup pipe di sisi penerima. Menutup pipe hanya-baca tidak dapat dikompilasi, ini sudah dengan jelas memberitahu Anda untuk tidak melakukan ini. Serahkan ini ke pengirim untuk melakukannya.
  2. Jika ada banyak pengirim, biarkan satu goroutine saja yang menyelesaikan operasi penutupan, pastikan close dipanggil oleh satu sisi saja dan hanya dipanggil sekali.
  3. Saat meneruskan pipe, sebaiknya batasi hanya-baca atau hanya-tulis

Mengikuti beberapa prinsip ini, dapat memastikan tidak akan ada masalah besar.

Golang by www.golangdev.cn edit