Skip to content

chan

channel özel bir veri yapısıdır ve Go'nun CSP felsefesinin tipik bir temsilcisidir. CSP felsefesinin özü, işlemlerin mesaj iletişimi yoluyla veri alışverişi yapmasıdır. Buna bağlı olarak, channel aracılığıyla goroutine'ler arasında kolayca iletişim kurabiliriz.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // bir şeyler yap
    done <- struct{}{}
  }()
  <-done
  fmt.Println("bitti")
}

İletişimin yanı sıra, channel aracılığıyla goroutine senkronizasyon işlemleri de gerçekleştirebiliriz. Eşzamanlılık gerektiren sistemlerde channel neredeyse her yerde bulunur. channel'ın nasıl çalıştığını daha iyi anlamak için aşağıda prensiplerini tanıtacağız.

Yapı

Çalışma zamanında, channel runtime.hchan struct'ı ile temsil edilir ve aşağıdaki alanları içerir:

go
type hchan struct {
  qcount   uint           // kuyruktaki toplam veri
  dataqsiz uint           // dairesel kuyruğun boyutu
  buf      unsafe.Pointer // dataqsiz elemanlı bir diziye işaret eder
  elemsize uint16
  closed   uint32
  elemtype *_type // eleman tipi
  sendx    uint   // gönderme indeksi
  recvx    uint   // alma indeksi
  recvq    waitq  // alıcı bekleyenlerin listesi
  sendq    waitq  // gönderici bekleyenlerin listesi

  lock mutex
}

lock alanından görebileceğiniz gibi, channel aslında kilitli senkronize bir dairesel kuyruktur. Diğer alanlar aşağıda açıklanmıştır:

  • qcount: toplam veri elemanı sayısı

  • dataqsiz: dairesel kuyruğun boyutu

  • buf: dataqsiz boyutunda bir diziye işaret eden pointer, bu dairesel kuyruktur

  • closed: channel'ın kapalı olup olmadığı

  • sendx, recvx: gönderme ve alma indisleri

  • sendq, recvq: runtime.sudog'dan oluşan gönderme ve alma goroutine bağlantılı listeleri

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

channel yapısı aşağıdaki şekilden açıkça anlaşılabilir:

Bir channel üzerinde len ve cap fonksiyonlarını kullanırken, aslında hchan.qcount ve hchan.dataqsiz alanlarını döndürür.

Oluşturma

Normalde, channel oluşturmanın sadece bir yolu vardır, make fonksiyonunu kullanmak:

go
ch := make(chan int, size)

Derleyici bunu runtime.makechan çağrısına çevirir ve bu fonksiyon channel'ın gerçek oluşturulmasından sorumludur. Kodu şu şekildedir:

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

Bu mantık oldukça basittir, öncelikle channel için bellek ayırır. Önce geçirilen size ve eleman tipine elem.Size_ göre beklenen bellek boyutunu hesaplar, ardından üç durumu ele alır:

  1. size 0'dır, sadece hchanSize ayır
  2. Eleman pointer içermez, ilgili bellek boyutunu ayır ve dairesel kuyruk belleği channel belleği ile bitişiktir
  3. Eleman pointer içerir, channel ve dairesel kuyruk belleği ayrı olarak ayrılır

Eğer dairesel kuyruk pointer elemanları içeriyorsa, bunlar harici elemanlara referans verdiği için, GC işaretleme-tarama aşamasında bu pointer'ları tarayabilir. Bitişik bellekte pointer olmayan elemanları depolarken, gereksiz taramaları önler. Bellek tahsisinden sonra, diğer bilgi alanlarını günceller.

Bu arada, channel kapasitesi 64 bit tamsayı olduğunda, runtime.makechan64 fonksiyonu oluşturma için kullanılır. Bu temelde runtime.makechan çağrısıdır, sadece ek bir tip kontrolü vardır:

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

Genel olarak, size math.MaxInt32 değerini aşmamalıdır.

Gönderme

Bir channel'a veri gönderirken, gönderilecek veriyi okun sağ tarafına yerleştiririz:

go
ch <- struct{}{}

Derleyici bunu runtime.chansend1 olarak çevirir ve veriyi göndermekten gerçekten sorumlu fonksiyon runtime.chansend'dir. chansend1 gönderilen elemana işaret eden elem pointer'ını geçirir:

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

Önce channel'ın nil olup olmadığını kontrol eder. block mevcut gönderme işleminin bloklayıcı olup olmadığını gösterir (block değeri select yapısı ile ilgilidir). Bloklayıcı gönderme ve channel nil ise, doğrudan panic oluşur. Bloklamayan gönderme durumunda, kilitlemeden channel'ın dolu olup olmadığını kontrol eder ve doluysa hemen döner:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Ardından kilidi alır ve channel'ın kapalı olup olmadığını kontrol eder. Eğer zaten kapalıysa, panic oluşur:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

Bundan sonra, recvq kuyruğundan bir sudog kuyruktan çıkarır, ardından runtime.send fonksiyonu aracılığıyla gönderir:

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

send fonksiyonunun içeriği şu şekildedir. recvx ve sendx değerlerini günceller, ardından runtime.memmove fonksiyonunu kullanarak iletişim verisi belleğini doğrudan alıcı goroutine'in hedef eleman adresine kopyalar, sonra runtime.goready fonksiyonunu kullanarak alıcı goroutine'in _Grunnable durumuna geçmesini ve yeniden zamanlamaya katılmasını sağlar:

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

Bu süreçte, bekleyen bir alıcı goroutine bulunabildiği için, veri dairesel kuyrukta depolanmadan doğrudan alıcıya gönderilir. Eğer uygun kapasitede alıcı yoksa, dairesel kuyruk tamponuna yerleştirilir ve doğrudan döner:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

Eğer tampon doluysa, bloklamayan gönderme durumunda doğrudan döner:

go
if !block {
    unlock(&c.lock)
    return false
}

Eğer bloklayıcı gönderme ise, aşağıdaki kod akışına girer:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

Önce mevcut goroutine'i sudog olarak oluşturur ve hchan.sendq bekleyen gönderici goroutine kuyruğuna ekler, ardından runtime.gopark kullanarak mevcut goroutine'i bloklar ve _Gwaiting durumuna geçirir, böylece alıcı tarafından uyandırılana kadar bekler. Ayrıca runtime.KeepAlive kullanarak gönderilen veriyi canlı tutar ve alıcının başarıyla kopyalamasını sağlar. Uyandırıldığında, aşağıdaki temizleme akışına girer:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

Görüldüğü gibi, channel gönderme işlemleri için aşağıdaki durumlar vardır:

  1. Channel nil'dir, program panic oluşur
  2. Channel kapalıdır, panic oluşur
  3. recvq kuyruğu boş değildir, doğrudan alıcıya gönder
  4. Goroutine beklemiyorsa, tampona ekle
  5. Tampon doludur, gönderici goroutine engellenmiş duruma girer, diğer goroutine'lerin veri almasını bekler

Dikkat çekici olan, yukarıdaki gönderme mantığında taşma tampon verileri için işleme görmedik. Bu veriler atılamaz; sudog.elem'de saklanır ve alıcı tarafından işlenir.

Alma

Go'da, channel'dan veri almanın iki sözdizimi vardır. Birincisi sadece veri okuma:

go
data <- ch

İkincisi verinin başarıyla okunup okunmadığını kontrol etme:

go
data, ok <- ch

Yukarıdaki iki sözdizim derleyici tarafından runtime.chanrecv1 ve runtime.chanrecv2 çağrılarına çevrilir, ancak aslında runtime.chanrecv çağrısıdır. Alma mantığının başı gönderme mantığına benzer, her ikisi de önce channel'ın nil olup olmadığını kontrol eder:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

Ardından bloklamayan okuma durumunda, kilitlemeden channel'ın boş olup olmadığını kontrol eder. Channel kapalı değilse, doğrudan döner. Channel kapalıysa, alma elemanı belleğini temizler:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

Ardından kilidi alır ve hchan.sendq kuyruğuna erişir. Aşağıdaki if c.closed != 0 dalından görebileceğiniz gibi, channel kapalı olsa bile, channel'da hala elemanlar varsa, doğrudan dönmez ancak eleman tüketim kodunu çalıştırmaya devam eder. Bu nedenle channel kapandıktan sonra okumanın hala izin verilmesinin nedeni budur:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

Eğer channel kapalı değilse, sendq kuyruğunda göndermek için bekleyen goroutine'ler olup olmadığını kontrol eder. Varsa, runtime.recv gönderici goroutine'i işler:

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // kuyruktan alıcıya veri kopyala
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // göndericiden kuyruğa veri kopyala
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

İlk durum: channel kapasitesi 0'dır (tamponsuz channel), alıcı runtime.recvDirect fonksiyonu aracılığıyla göndericiden veriyi doğrudan kopyalar. İkinci durum: tampon doludur. Tamponun dolu olup olmadığını kontrol etmek için mantık görmesek de, tampon kapasitesi 0 olmadığında ve göndermek için bekleyen bir gönderici olduğunda, tamponun zaten dolu olduğu anlamına gelir, çünkü sadece tampon dolu olduğunda gönderici göndermek için bloklayıcı bekler. Bu mantık gönderici tarafından değerlendirilir. Ardından alıcı tampondan baş elemanı kuyruktan çıkarır ve belleğini hedef alma elemanı pointer'ına kopyalar, ardından gönderici goroutine'in gönderilecek verisini kopyalar ve kuyruğa ekler (burada alıcının taşma tampon verilerini nasıl işlediğini görürüz). Son olarak, runtime.goready gönderici goroutine'i uyandırır ve _Grunnable durumuna geçirerek yeniden zamanlamaya katılmasını sağlar.

Eğer göndermek için bekleyen goroutine yoksa, tamponda tüketilmeyi bekleyen elemanlar olup olmadığını kontrol eder, baş elemanı kuyruktan çıkarır ve belleğini alıcı hedef elemanına kopyalar, ardından döner:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Kuyruktan doğrudan al
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

Son olarak, channel'da tüketilebilir eleman yoksa, runtime.gopark mevcut goroutine'i _Gwaiting durumuna geçirir, bloklar ve gönderici goroutine tarafından uyandırılana kadar bekler:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

Uyandırıldıktan sonra döner. Bu noktada, döndürülen success değeri sudog.success'tan gelir. Eğer gönderici veriyi başarıyla gönderdiyse, bu alan gönderici tarafından true olarak ayarlanmalıdır, bunu runtime.send fonksiyonunda görebiliriz:

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

Buna bağlı olarak, gönderici runtime.chansend sonunda, sudog.success değerlendirmesi alıcının runtime.recv fonksiyonundaki ayarından gelir. Bunlar aracılığıyla, alıcı ve göndericinin channel'ın düzgün çalışması için birbirini tamamladığını keşfedebiliriz. Genel olarak, veri alma göndermeden biraz daha karmaşıktır, aşağıdaki durumlar vardır:

  1. Channel nil'dir, program panic oluşur
  2. Channel kapalıdır, channel boşsa doğrudan döner, boş değilse durum 5'e atlar
  3. Tampon kapasitesi 0'dır, sendq'da bekleyen gönderici goroutine var, doğrudan göndericinin verisini kopyalar, ardından göndericiyi uyandırır
  4. Tampon doludur, sendq'da bekleyen gönderici goroutine var, tamponun baş elemanını kuyruktan çıkarır, göndericinin verisini kuyruğa ekler, ardından göndericiyi uyandırır
  5. Tampon dolu değil ve sayı 0 değil, tamponun baş elemanını kuyruktan çıkarır, ardından döner
  6. Tampon boştur, engellenmiş duruma girer, gönderici tarafından uyandırılmayı bekler

Kapatma

Bir channel'ı kapatmak için, yerleşik close fonksiyonunu kullanırız:

go
close(ch)

Derleyici bunu runtime.closechan çağrısına çevirir. Eğer geçirilen channel nil ise veya zaten kapalıysa, doğrudan panic oluşur:

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

Ardından bu channel'ın sendq ve recvq'larından tüm engellenmiş goroutine'leri kuyruktan çıkarır ve runtime.goready aracılığıyla hepsini uyandırır:

go
func closechan(c *hchan) {
    ...
  var glist gList

    // tüm alıcıları serbest bırak
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // tüm göndericileri serbest bırak (panic oluşacak)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Channel kilidini bıraktığımız için şimdi tüm G'leri hazır hale getir
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

Bu arada, Go tek yönlü channel'lara izin verir, aşağıdaki kurallarla:

  1. Salt okunur channel veri gönderemez
  2. Salt okunur channel kapatılamaz
  3. Salt yazılır channel veri okuyamaz

Bu hatalar derleme zamanı tip denetimi sırasında bulunur, çalışma zamanına bırakılmaz. İlgileniyorsanız, bu iki paketteki ilgili kodları okuyabilirsiniz:

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

Kapalı Olup Olmadığını Kontrol Etme

Erken dönemlerde (go1 öncesinde), bir channel'ın kapalı olup olmadığını kontrol etmek için closed adlı yerleşik bir fonksiyon vardı, ancak hızla kaldırıldı. Bunun nedeni channel'ların tipik olarak çoklu goroutine senaryolarında kullanılmasıdır. Eğer true döndürürse, bu gerçekten channel'ın kapalı olduğu anlamına gelir. Ancak false döndürürse, channel'ın gerçekten kapalı olmadığı anlamına gelmez, çünkü bir sonraki anda channel'ı kimin kapatacağını kimse bilemez. Yani bu dönüş değeri güvenilir değildir. Bu dönüş değerini channel'a veri göndermek için temel olarak kullanmak daha da tehlikelidir, çünkü kapalı bir channel'a veri göndermek panic oluşmasına neden olur.

Eğer gerçekten gerekirse, kendi implementasyonunuzu yapabilirsiniz. Bir yaklaşım channel'a yazarak kontrol etmektir, aşağıda gösterildiği gibi:

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

Ancak bunun da yan etkileri vardır. Eğer kapalı değilse, gereksiz veri yazar ve defer-recover işleme sürecine girer, ek performans kaybına neden olur. Yani yazma yaklaşımı doğrudan terk edilebilir. Okuma yaklaşımı için, düşünülebilir, ancak channel'ı doğrudan okuyamaz, çünkü doğrudan block parametre değeri true ile okumak goroutine'i bloklar. select ile kullanılmalıdır. Channel select ile birleştirildiğinde, bloklamaz:

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

Bu sadece yukarıdakinden biraz daha iyi görünür. Sadece channel kapalı olduğunda ve channel tamponunda hiç eleman olmadığında uygulanır. Eğer elemanlar varsa, gereksiz yere o elemanı tüketir. Hala iyi bir implementasyon yok.

Ancak aslında, channel'ın kapalı olup olmadığını kontrol etmemize hiç gerek yok. Nedeni zaten başta açıklandı: dönüş değeri güvenilir değildir. Channel'ı doğru kullanmak ve channel'ı doğru şekilde kapatmak yapmamız gereken şeydir. Yani:

  1. Channel'ı asla alıcı tarafında kapatmayın. Salt okunur channel'ı kapatmanın derlenmemesi zaten bunu yapmamanız gerektiğini söylüyor. Bunu göndericinin yapmasına izin verin.
  2. Birden fazla gönderici varsa, ayrı bir goroutine kapatma işlemini tamamlasın, close'un sadece bir tarafça çağrıldığından ve sadece bir kez çağrıldığından emin olun.
  3. Channel'ı geçirirken, en iyisi salt okunur veya salt yazılır olarak kısıtlamak

Bu prensipleri takip etmek büyük sorunların olmamasını sağlar.

Golang by www.golangdev.cn edit