Skip to content

select

select adalah struktur yang dapat memantau status beberapa saluran secara bersamaan, sintaksnya mirip dengan switch

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Kode ini menggunakan kombinasi context, saluran, dan select untuk mengimplementasikan logika penghentian program yang mulus. Select dalam kode ini secara bersamaan memantau dua saluran ctx.Done dan finished, ada dua kondisi penghentian, pertama adalah sistem operasi mengirim sinyal keluar, kedua adalah saluran finished memiliki pesan yang dapat dibaca yaitu tugas kode pengguna selesai, sehingga kita dapat melakukan pekerjaan penyelesaian saat program keluar.

Seperti yang kita ketahui, select memiliki dua fitur yang sangat penting, pertama adalah non-blocking, dalam kode sumber pengiriman dan penerimaan saluran dapat dilihat bahwa select telah ditangani, dapat menentukan apakah saluran tersedia dalam kondisi non-blocking, kedua adalah randomisasi, jika ada beberapa saluran yang tersedia, ia akan memilih satu secara acak untuk dieksekusi, tidak mengikuti urutan yang telah ditentukan dapat membuat setiap saluran dieksekusi secara relatif adil, jika tidak dalam kondisi ekstrem beberapa saluran mungkin tidak akan pernah diproses. Karena semua pekerjaannya terkait dengan saluran, disarankan untuk membaca artikel chan terlebih dahulu, setelah memahami saluran kemudian memahami select akan lebih lancar.

Struktur

Runtime hanya memiliki satu struktur runtime.scase yang merepresentasikan cabang select, setiap case dalam runtime direpresentasikan sebagai scase.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

Di antaranya c mengacu pada saluran, elem menunjukkan pointer elemen yang diterima atau dikirim, sebenarnya kata kunci select mengacu pada fungsi runtime.selectgo.

Prinsip

Cara penggunaan select dibagi oleh Go menjadi empat situasi untuk dioptimalkan, hal ini dapat dilihat di fungsi cmd/compile/internal/walk.walkSelectCases untuk logika penanganan keempat situasi ini.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimasi: select tanpa case
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimasi: select satu case: operasi tunggal.
  if ncas == 1 {
    ...
  }

  // optimasi: select dua case tetapi satu adalah default: operasi non-blocking tunggal.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Optimasi

Compiler akan mengoptimalkan tiga situasi pertama, situasi pertama adalah ketika jumlah case adalah 0 yaitu select kosong, kita semua tahu bahwa select kosong akan menyebabkan goroutine saat ini tersumbat secara permanen.

go
select{}

Alasan tersumbat adalah karena compiler menerjemahkannya menjadi pemanggilan langsung ke fungsi runtime.block

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

Dan fungsi block memanggil fungsi runtime.gopark, membuat goroutine saat ini berubah menjadi status _Gwaiting, dan masuk ke blocking permanen, tidak akan pernah mendapatkan penjadwalan lagi.

Situasi kedua, hanya ada satu case dan bukan default, dalam situasi ini compiler akan langsung menerjemahkannya menjadi operasi pengiriman/penerimaan saluran, dan masih blocking, misalnya kode berikut

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // lakukan sesuatu
  }
}

Ia akan diterjemahkan menjadi pemanggilan langsung ke fungsi runtime.chanrecv1, dapat dilihat dari kode assembly

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

Dalam kasus satu case, mengirim data ke saluran juga sama, ia akan diterjemahkan menjadi pemanggilan langsung ke fungsi runtime.chansend1, juga blocking.

Situasi ketiga, ada dua case dan salah satunya adalah default

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // lakukan sesuatu
  default:
        // lakukan sesuatu
  }
}

Situasi ini akan diterjemahkan menjadi pernyataan if yang memanggil runtime.selectnbsend, sebagai berikut

go
if selectnbsend(ch, 1) {
  // lakukan sesuatu
} else {
  // lakukan sesuatu
}

Jika menerima data saluran akan diterjemahkan menjadi pemanggilan runtime.selectnbrecv

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // lakukan sesuatu
  default:
      // lakukan sesuatu
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // lakukan sesuatu
} else {
  // lakukan sesuatu
}

Yang perlu diperhatikan adalah, dalam situasi ini penerimaan atau pengiriman saluran adalah non-blocking, kita dapat dengan jelas melihat bahwa parameter block adalah false.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Dan baik untuk mengirim atau menerima data saluran, ketika block adalah false ada jalur cepat yang dapat menentukan apakah dapat mengirim atau menerima data tanpa mengunci, seperti yang ditunjukkan berikut

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Saat membaca saluran, jika saluran kosong akan langsung return, saat menulis saluran, jika saluran tidak ditutup dan sudah penuh juga akan langsung return, dalam kondisi umum ini akan menyebabkan goroutine blocking, tetapi ketika dikombinasikan dengan select tidak akan.

Penanganan

Tiga situasi di atas hanyalah optimasi untuk kasus khusus, kata kunci select yang digunakan secara normal akan diterjemahkan menjadi pemanggilan fungsi runtime.selectgo, logika penanganannya mencapai lebih dari 400 baris.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Compiler akan mengumpulkan semua pernyataan case menjadi array scase, kemudian meneruskannya ke fungsi selectgo, setelah penanganan selesai mengembalikan dua nilai return

  1. Yang pertama adalah indeks saluran yang dipilih secara acak, menunjukkan saluran mana yang diproses, jika tidak ada return -1
  2. Yang kedua menunjukkan apakah operasi baca saluran berhasil membaca

Berikut penjelasan singkat untuk parameternya

  • cas0, pointer kepala array scase, bagian pertama存放的是写管道 case,后半部分存放读管道 case,以nsends来区分
  • order0, panjangnya dua kali array scase, bagian pertama dialokasikan untuk array pollorder, bagian kedua dialokasikan untuk array lockorder
  • nsends dan nrecvs menunjukkan jumlah case baca/tulis saluran, jumlah keduanya adalah total case
  • block menunjukkan apakah blocking, jika ada case default mewakili non-blocking, nilainya false, jika tidak true.
  • pc0, menunjuk ke kepala array [ncases]uintptr, digunakan untuk analisis race, nanti dapat diabaikan, tidak membantu untuk memahami select

Misalkan ada kode berikut

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Lihat bentuk assembly-nya, di sini untuk memudahkan pemahaman menghilangkan sebagian kode

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // 1 2 3 4 beberapa variabel sementara
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // panggil fungsi runtime.selectgo
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // lompat ke cabang default
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // lompat ke cabang 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // lompat ke cabang 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // lompat ke cabang 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Dapat dilihat bahwa setelah memanggil fungsi selectgo ada logika判断+lonjakan, melalui ini kita tidak sulit untuk menyimpulkan bentuk aslinya

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Kode yang dihasilkan compiler mungkin berbeda dengan ini, tetapi maksudnya kurang lebih sama. Jadi compiler setelah memanggil fungsi selectgo akan menggunakan pernyataan if untuk menentukan saluran mana yang akan dieksekusi, dan sebelum memanggil, compiler juga akan menghasilkan loop for untuk mengumpulkan array scase tetapi di sini dihilangkan.

Setelah mengetahui bagaimana eksternal menggunakan fungsi selectgo, di bawah ini mari pahami bagaimana fungsi selectgo bekerja di internal. Pertama akan menginisialisasi beberapa array, nsends+nrecvs menunjukkan total case, dari kode di bawah juga dapat dilihat bahwa nilai maksimum jumlah case adalah 1 << 16, pollorder menentukan urutan eksekusi saluran, lockorder menentukan urutan penguncian saluran.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// panjangnya dua kali array scase, bagian pertama dialokasikan untuk array pollorder, bagian kedua dialokasikan untuk array lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Selanjutnya inisialisasi array pollorder, ia存放的是待执行管道的sacses数组下标

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Ia akan iterasi seluruh array scases, kemudian melalui runtime.fastrandn menghasilkan angka acak antara [0, i], kemudian menukarnya dengan i, selama proses akan melewatkan case yang salurannya nil, setelah iterasi selesai mendapatkan array pollorder yang elemennya diacak, seperti gambar berikut

Kemudian terhadap array pollorder berdasarkan ukuran alamat saluran menggunakan heap sort mendapatkan array lockorder, kemudian panggil runtime.sellock untuk menguncinya sesuai urutan

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Yang perlu diperhatikan di sini adalah, mengunci saluran berdasarkan ukuran alamat adalah untuk menghindari deadlock, karena operasi select itu sendiri tidak mengizinkan konkurensi. Asumsikan mengunci sesuai urutan acak pollorder, maka pertimbangkan situasi kode berikut

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Tiga goroutine ABC semua sampai ke langkah penguncian ini, dan urutan penguncian mereka彼此 acak tidak sama, mungkin menyebabkan situasi seperti ini, seperti gambar berikut

Asumsikan urutan penguncian ABC sama dengan gambar di atas, maka kemungkinan menyebabkan deadlock sangat besar, misalnya A akan pertama memegang kunci ch2, kemudian mencoba mendapatkan kunci ch1, tetapi asumsikan ch1 sudah dikunci oleh goroutine B, goroutine B akan mencoba mendapatkan kunci ch2, maka ini menyebabkan deadlock.

Jika semua goroutine mengunci sesuai urutan yang sama, tidak akan terjadi masalah deadlock, ini juga alasan mendasar mengapa lockorder harus diurutkan berdasarkan ukuran alamat.

Setelah mengunci, dimulai tahap penanganan yang sebenarnya, pertama iterasi array pollorder, sesuai urutan yang diacak sebelumnya mengakses saluran, iterasi satu per satu menemukan saluran yang tersedia

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // baca saluran
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // tulis saluran
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Dapat dilihat di sini terhadap baca/tulis saluran dilakukan penanganan 6 situasi, di bawah masing-masing dijelaskan. Situasi pertama, membaca saluran dan ada pengirim sedang menunggu mengirim, di sini akan masuk ke fungsi runtime.recv, fungsinya sudah dijelaskan, pada akhirnya akan membangunkan goroutine pengirim, sebelum membangunkan fungsi callback akan membuka semua kunci saluran.

go
recv:
  // dapat menerima dari pengirim yang tidur (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Situasi kedua, membaca saluran, tidak ada pengirim sedang menunggu, jumlah elemen buffer lebih besar dari 0, di sini akan langsung membaca data dari buffer, logikanya sama persis dengan runtime.chanrecv, kemudian membuka kunci.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Situasi ketiga, membaca saluran, tetapi saluran sudah ditutup, dan tidak ada elemen tersisa di buffer, di sini akan pertama membuka kunci kemudian langsung return.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Situasi keempat, mengirim data ke saluran yang sudah ditutup, di sini akan pertama membuka kunci kemudian panic,

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Situasi kelima, ada penerima sedang blocking menunggu, di sini akan memanggil fungsi runtime.send, dan pada akhirnya membangunkan goroutine penerima, sebelum membangunkan fungsi callback akan membuka semua kunci saluran.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Situasi keenam, tidak ada goroutine penerima menunggu, masukkan data yang akan dikirim ke buffer, kemudian buka kunci.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Kemudian semua situasi di atas akhirnya akan masuk ke cabang retc, dan yang harus dilakukannya hanya mengembalikan indeks saluran yang dipilih casi dan recvOk yang menunjukkan apakah baca berhasil.

go
retc:
    return casi, recvOK

Situasi ketujuh, tidak menemukan saluran yang tersedia, dan kode mengandung cabang default, maka buka kunci saluran kemudian langsung return, di sini casi yang return adalah -1 yaitu menunjukkan tidak ada saluran yang tersedia.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Situasi terakhir, tidak menemukan saluran yang tersedia, dan kode tidak mengandung cabang default, maka goroutine saat ini akan masuk ke status blocking, sebelum ini selectgo akan menambahkan goroutine saat ini ke antrian recvq/sendq semua saluran yang dipantau

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Di sini akan membuat beberapa sudog dan menghubungkannya dengan saluran yang sesuai, seperti gambar berikut

Kemudian runtime.gopark blocking, sebelum blocking akan membuka kunci saluran, pekerjaan membuka kunci diselesaikan oleh fungsi runtime.selparkcommit, ia diteruskan sebagai fungsi callback ke gopark.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

Hal pertama setelah dibangunkan adalah melepaskan tautan sudog dengan saluran

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Kemudian hapus sudog dari antrian tunggu saluran sebelumnya

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg sudah di-dequeue oleh G yang membangunkan kita.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

Dalam proses di atas pasti akan menemukan saluran yang diproses oleh goroutine yang membangunkan, kemudian做出 penanganan terakhir berdasarkan caseSuccess. Untuk operasi tulis而言,sg.success adalah false mewakili saluran sudah ditutup, dan seluruh runtime go hanya fungsi close yang akan secara aktif mengatur field ini menjadi false, ini menunjukkan bahwa goroutine saat ini dibangunkan oleh peng唤醒方 melalui fungsi close. Untuk operasi baca而言,jika dibangunkan oleh pengirim, operasi baca data juga sudah diselesaikan oleh pengirim melalui fungsi runtime.send sebelum dibangunkan, nilainya true, jika dibangunkan oleh fungsi close, sama seperti sebelumnya langsung return.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

Sampai di sini seluruh logika select sudah大致 jelas, di atas dibagi menjadi beberapa situasi, dapat dilihat penanganan select masih cukup kompleks.

Golang by www.golangdev.cn edit