Skip to content

select

select aynı anda birden fazla kanal durumunu dinleyebilen bir yapıdır. Söz dizimi switch'e benzer:

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Bu kod, context, kanal ve select'i birleştirerek basit bir zarif kapanış mantığı uygular. select aynı anda iki kanalı dinler: ctx.Done ve finished. İki çıkış koşulu vardır: birincisi, işletim sistemi bir çıkış sinyali gönderir; ikincisi, finished kanalında okunacak veri vardır, yani kullanıcı kodu görevi tamamlanmıştır. Bu, program çıktığında temizlik işi yapmamızı sağlar.

Bildiğimiz gibi, select'in iki çok önemli özelliği vardır: birincisi, engellemez. Kanal gönderme ve alma kaynak kodunda, select'in özel olarak ele alındığını görebilirsiniz, bir kanalın müsait olup olmadığını engellemeden kontrol etmenizi sağlar. İkincisi, rastgeleleştirme. Birden fazla kanal müsait olduğunda, birini rastgele seçip yürütür. Sabit bir sırayı takip etmemek, her kanalın nispeten adil şekilde yürütülmesini sağlar; aksi takdirde, aşırı durumlarda bazı kanallar hiç işlenmeyebilir. Tüm işi kanallarla ilgili olduğundan, önce chan makalesini okumanız önerilir. Kanalları öğrenmek select hakkında bilgi edinmeden önce işleri çok daha net hale getirecektir.

Yapı

Çalışma zamanında, bir select dalını temsil eden sadece bir runtime.scase struct'ı vardır. Çalışma zamanında her case, scase olarak temsil edilir:

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // veri elementi
}

Burada c, kanalı ifade eder ve elem, alma veya gönderme elementinin işaretçisidir. Aslında, select anahtar kelimesi runtime.selectgo fonksiyonunu ifade eder.

Prensipler

Go, select kullanımını dört duruma ayırır ve optimize eder. Bu, cmd/compile/internal/walk.walkSelectCases fonksiyonunda görülebilir, bu dört durumu işler:

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimizasyon: sıfır durumlu select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimizasyon: tek durumlu select: tek işlem.
  if ncas == 1 {
    ...
  }

  // optimizasyon: iki durumlu select ama biri default: tek engellemez işlem.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Optimizasyon

Derleyici ilk üç durumu optimize eder. İlk durum, durum sayısının 0 olmasıdır, yani boş select. Hepimiz biliyoruz ki boş select ifadesi mevcut goroutine'i kalıcı olarak engeller:

go
select{}

Engellemesinin nedeni, derleyicinin bunu doğrudan runtime.block fonksiyonuna çağrı olarak çevirmesidir:

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // sonsuza kadar
}

block fonksiyonu runtime.gopark fonksiyonunu çağırır, mevcut goroutine'i _Gwaiting durumuna değiştirir ve kalıcı engellemeye girer, bir daha asla zamanlanmaz.

İkinci durum: sadece bir durum var ve default değil. Bu durumda, derleyici bunu doğrudan bir kanal gönderme/alma işlemine çevirir ve engelleyicidir. Örneğin, aşağıdaki kod:

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // bir şeyler yap
  }
}

Bu, doğrudan runtime.chanrecv1 fonksiyonuna çağrı olarak çevrilir, bu assembly kodundan görülebilir:

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

Sadece bir durum olması durumunda, kanala veri göndermek de aynı prensibi izler. Doğrudan runtime.chansend1 fonksiyonuna çağrı olarak çevrilir, bu da engelleyicidir.

Üçüncü durum: biri default olan iki durum:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // bir şeyler yap
  default:
        // bir şeyler yap
  }
}

Bu durum, runtime.selectnbsend fonksiyonunu çağırarak bir if ifadesine çevrilir:

go
if selectnbsend(ch, 1) {
  // bir şeyler yap
} else {
  // bir şeyler yap
}

Kanal verisi alıyorsa, runtime.selectnbrecv fonksiyonuna çağrı olarak çevrilir:

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // bir şeyler yap
  default:
      // bir şeyler yap
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // bir şeyler yap
} else {
  // bir şeyler yap
}

Bu durumda, kanal alma veya göndermenin engellemez olduğu dikkat çekicidir. block parametresinin false olduğunu açıkça görebiliriz:

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Kanal verisi gönderme veya alma işleminde, block false olduğunda, kilitleme yapmadan gönderme veya almanın mümkün olup olmadığını kontrol edebilen hızlı bir yol vardır. Aşağıda gösterildiği gibi:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Kanalдан veri okurken, kanal boşsa doğrudan döner. Kanala veri yazarken, kanal kapalı değil ve zaten doluysa, doğrudan döner. Genel durumlarda, bunlar goroutine engellemesine neden olur, ancak select ile birleştirildiğinde, olmazlar.

İşleme

Yukarıdaki üç durum özel durumlar için optimizasyonlardır. Normalde kullanılan select anahtar kelimesi, runtime.selectgo fonksiyonuna çağrı olarak çevrilir, işleme mantığı 400 satırdan fazladır:

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Derleyici tüm case ifadelerini bir scase dizisine toplar ve selectgo fonksiyonuna geçirir. İşlemeden sonra iki değer döndürür:

  1. İlki, rastgele seçilen kanal indeksidir, hangi kanalın işlendiğini gösterir. Hiçbiri yoksa -1 döner.
  2. İkincisi, kanal okuma işleminin başarılı olup olmadığını gösterir.

İşte parametrelerinin basit bir açıklaması:

  • cas0: scase dizisinin baş işaretçisi. İlk yarısı yazma kanalı durumlarını saklar, ikinci yarısı okuma kanalı durumlarını saklar, nsends ile ayrılır.
  • order0: uzunluğu scase dizisinin iki katıdır. İlk yarısı pollorder dizisine, ikinci yarısı lockorder dizisine ayrılır.
  • nsends ve nrecvs: okuma/yazma kanalı durumlarının sayısı. Toplamları toplam durum sayısıdır.
  • block: engellensin mi. Eğer bir default durumu varsa, engellez olmayanı temsil eder, değer false, aksi takdirde true.
  • pc0: bir [ncases]uintptr dizisinin başına işaret eder, yarış analizi için kullanılır. Daha sonra göz ardı edilebilir; select'i anlamaya yardımcı olmaz.

Aşağıdaki koda sahip olduğumuzu varsayalım:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Assembly formunu görüntüleyin. Netlik için, bazı kodlar atlanmıştır:

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // geçici değişkenler 1 2 3 4
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // runtime.selectgo fonksiyonunu çağır
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // default dalına atla
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // 4. dala atla
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // 3. dala atla
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // 2. dala atla
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Gördüğünüz gibi, selectgo fonksiyonunu çağırdıktan sonra bir karar + atlama mantığı vardır. Bunlardan kolayca orijinal formunu çıkarabiliriz:

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Derleyici tarafından oluşturulan gerçek kod bundan farklı olabilir, ancak genel fikir benzerdir. Yani selectgo fonksiyonunu çağırdıktan sonra, derleyici hangi kanalın yürütüleceğini belirlemek için if ifadeleri kullanır. Çağırmadan önce, derleyici ayrıca scase dizisini toplamak için bir for döngüsü oluşturur, ancak bu burada atlanmıştır.

selectgo fonksiyonunun dışarıda nasıl kullanıldığını anladıktan sonra, selectgo fonksiyonunun içeride nasıl çalıştığını anlayalım. İlk olarak birkaç dizi başlatır. nsends+nrecvs toplam durum sayısını temsil eder. Aşağıdaki koddan, maksimum durum sayısının 1 << 16 olduğunu görebilirsiniz. pollorder kanal yürütme sırasını belirler, lockorder kanal kilitleme sırasını belirler:

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// Uzunluğu scase dizisinin iki katıdır, ilk yarısı pollorder dizisine, ikinci yarısı lockorder dizisine ayrılır.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Sonra, pollorder dizisini başlatır, bu yürütülecek kanalların scases dizi indekslerini saklar:

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Kanalı olmayan durumları poll ve lock sıralarından çıkar.
    if cas.c == nil {
       cas.elem = nil // GC'ye izin ver
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Tüm scases dizisini dolaşır, sonra runtime.fastrandn kullanarak [0, i] arasında rastgele bir sayı oluşturur, sonra i ile değiştirir. Bu süreçte, kanalı nil olan durumları atlar. Dolaşım tamamlandığında, karıştırılmış elemanlara sahip bir pollorder dizisi elde ederiz, aşağıda gösterildiği gibi:

Sonra pollorder dizisi, lockorder dizisini elde etmek için kanal adresine göre yığın sıralaması kullanılarak sıralanır. Sonra onları sırayla kilitlemek için runtime.sellock çağrılır:

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Kanalın adresine göre sıralanmasının ölümcül kilitlenmeyi önlemek için olduğu dikkat çekicidir, çünkü select işleminin kendisi kilit gerektirmez ve eşzamanlılığa izin verir. pollorder rastgele sırasına göre kilitleme yaptığını varsayalım, sonra aşağıdaki kodu düşünün:

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Üç goroutine ABC bu kilitleme adımına ulaşır ve kilitleme sıraları rastgele ve birbirinden farklıdır. Bu aşağıdaki duruma neden olabilir, aşağıda gösterildiği gibi:

ABC kilitleme sırasının yukarıdaki şekille aynı olduğunu varsayalım. Ölümcül kilitlenme olasılığı çok yüksektir. Örneğin, A önce ch2 kilidini tutar, sonra ch1 kilidini almaya çalışır. Ancak ch1 kilidinin zaten goroutine B tarafından kilitlendiğini varsayalım, goroutine B sonra ch2 kilidini almaya çalışır. Bu ölümcül kilitlenmeye neden olur.

Tüm goroutineler aynı sırayla kilitlerse, ölümcül kilitlenme oluşmaz. lockorder'ın adrese göre sıralanmasının temel nedeni budur.

Kilitlemeden sonra, gerçek işleme aşaması başlar. İlk olarak, pollorder dizisini dolaşır, daha önce karıştırılan sırayla kanallara erişir, kullanılabilir bir kanal bulur:

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // kanal oku
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // kanal yaz
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Gördüğünüz gibi, okuma/yazma kanalları için işlenen 6 durum vardır. Ayrı ayrı açıklayalım.

İlk durum: kanal oku ve gönderici göndermek için bekliyor. Bu runtime.recv fonksiyonuna gider, rolü açıklanmıştır. Sonunda gönderici goroutine'i uyandırır. Uyandırmadan önce, geri çağırma fonksiyonu tüm kanalların kilidini açar:

go
recv:
  // uyuyan göndericiden (sg) alınabilir
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

İkinci durum: kanal oku, gönderici beklemiyor, tampon eleman sayısı 0'dan büyük. Bu doğrudan tampondan veri okur. Mantığı tamamen runtime.chanrecv ile tutarlıdır, sonra kilidi açar:

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Üçüncü durum: kanal oku, ancak kanal zaten kapalı ve tamponda kalan eleman yok. Bu önce kilidi açar sonra doğrudan döner:

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Dördüncü durum: kapalı kanala veri gönder. Bu önce kilidi açar sonra panic yapar:

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Beşinci durum: alıcı engelleme bekliyor. Bu runtime.send fonksiyonunu çağırır ve sonunda alıcı goroutine'i uyandırır. Uyandırmadan önce, geri çağırma fonksiyonu tüm kanalların kilidini açar:

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Altıncı durum: bekleyen alıcı goroutine yok, gönderilecek veriyi tampona koy, sonra kilidi aç:

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Sonra yukarıdaki tüm durumlar sonunda retc dalına gider, bu sadece seçilen kanal indeksini casi ve okumanın başarılı olup olmadığını temsil eden recvOk döndürür:

go
retc:
    return casi, recvOK

Yedinci durum: kullanılabilir kanal bulunamadı ve kod default dalı içeriyor. Kanalların kilidini aç sonra doğrudan dön. Burada dönen casi -1'dir, kullanılabilir kanal olmadığını gösterir:

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Son durum: kullanılabilir kanal bulunamadı ve kod default dalı içermiyor. Sonra mevcut goroutine engellenmiş duruma girer. Bundan önce, selectgo mevcut goroutine'i dinlediği tüm kanalların recvq/sendq kuyruklarına ekler:

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Bu birkaç sudog oluşturur ve bunları ilgili kanallarla bağlar, aşağıda gösterildiği gibi:

Sonra runtime.gopark ile engelle. Engellemeden önce, kanalların kilidini açar. Kilidi açma işi runtime.selparkcommit fonksiyonu tarafından tamamlanır, bu gopark'a geri çağırma olarak geçirilir:

go
gp.param = nil
// Stack'i küçültmeye çalışan herkese bir kanal üzerinde park edeceğimizi bildir.
// Bu G'nin durumu değiştiğinde ve gp.activeStackChans ayarlandığında arasındaki pencere
// stack küçültme için güvenli değildir.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

Uyandıktan sonra ilk şey, sudog'u kanallardan ayırmaktır:

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Ayırmadan önce tüm elem'leri temizle.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Sonra sudog'u önceki kanal bekleme kuyruklarından kaldırır:

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg, bizi uyandıran G tarafından zaten kuyruktan çıkarılmış.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

Yukarıdaki süreçte, uyandırıcı goroutine tarafından işlenen kanal kesinlikle bulunur. Sonra caseSuccess'e göre son işleme yapılır. Yazma işlemleri için, sg.success false olması kanalın kapalı olduğu anlamına gelir. Dahası, Go runtime boyunca, sadece close fonksiyonu bu alanı aktif olarak false olarak ayarlar. Bu, mevcut goroutine'in uyandırıcı tarafından close fonksiyonu aracılığıyla uyandırıldığını gösterir. Okuma işlemleri için, eğer gönderici tarafından uyandırılırsa, veri okuma işlemi zaten uyandırmadan önce gönderici tarafından runtime.send fonksiyonu aracılığıyla tamamlanmıştır, değeri true'dur. Eğer close fonksiyonu tarafından uyandırılırsa, öncekiyle aynıdır, doğrudan döner:

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

Bu noktada, tüm select mantığı kabaca açıklığa kavuşturulmuştur. Yukarıda birkaç durum ayrılmıştır, select işlemenin hala oldukça karmaşık olduğunu göstermektedir.

Golang by www.golangdev.cn edit