Skip to content

select

select là một cấu trúc có thể đồng thời监听 nhiều trạng thái channel, cú pháp của nó tương tự như switch

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Đoạn code này thông qua kết hợp sử dụng context, channel, select để thực hiện logic thoát mượt mà đơn giản của một chương trình, code trong select đồng thời监听 hai channel ctx.Donefinished, điều kiện thoát của nó có hai, một là hệ điều hành gửi tín hiệu thoát, hai là channel finished có tin nhắn có thể đọc tức là nhiệm vụ code người dùng hoàn thành, như vậy chúng ta có thể làm công việc tổng kết khi chương trình thoát.

Ai cũng biết, select có hai đặc tính vô cùng quan trọng, một là không chặn, trong code nguồn gửi và nhận channel có thể thấy đã xử lý một số thứ cho select, có thể phán đoán channel có khả dụng không trong tình huống không chặn, hai là ngẫu nhiên hóa, nếu có nhiều channel khả dụng thì nó sẽ ngẫu nhiên chọn một để thực thi, không tuân theo thứ tự đã định có thể khiến mỗi channel đều tương đối công bằng được thực thi, nếu không trong tình huống cực đoan một số channel có thể永远 không được xử lý. Vì công việc của nó đều liên quan đến channel, nên trước tiên đề nghị đọc bài viết chan, sau khi hiểu channel rồi再来 hiểu select sẽ thông suốt rất nhiều.

Cấu trúc

Runtime chỉ có một cấu trúc runtime.scase biểu diễn nhánh select, biểu diễn runtime của mỗi case chính là scase.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

Trong đó c chỉ là channel, elem biểu thị con trỏ phần tử nhận hoặc gửi, thực tế từ khoá select chỉ là hàm runtime.selectgo.

Nguyên lý

Cách sử dụng select được go chia thành bốn tình huống để tối ưu hóa, điểm này có thể thấy trong hàm cmd/compile/internal/walk.walkSelectCases logic xử lý cho bốn tình huống này.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimization: zero-case select
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimization: one-case select: single op.
  if ncas == 1 {
    ...
  }

  // optimization: two-case select but one is default: single non-blocking op.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Tối ưu hóa

Compiler sẽ tối ưu hóa cho ba tình huống đầu, tình huống đầu tiên là khi số lượng case bằng 0 tức một select rỗng, chúng ta đều biết select rỗng sẽ gây ra goroutine hiện tại bị chặn vĩnh viễn.

go
select{}

Sở dĩ bị chặn là vì compiler dịch nó thành lời gọi trực tiếp đến hàm runtime.block

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

Mà hàm block lại gọi hàm runtime.gopark, khiến goroutine hiện tại biến thành trạng thái _Gwaitting, và đi vào chặn vĩnh viễn,永远 không được điều độ.

Tình huống thứ hai, chỉ có một case và không phải default, tình huống này compiler sẽ trực tiếp dịch nó thành thao tác gửi nhận channel, hơn nữa còn là chặn, ví dụ như code dưới đây

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

Nó sẽ được dịch thành lời gọi trực tiếp đến hàm runtime.chanrecv1, có thể nhìn ra từ code assembly

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

Trong tình huống chỉ có một case thì gửi dữ liệu đến channel cũng là đạo lý tương tự, nó sẽ được dịch thành lời gọi trực tiếp đến hàm runtime.chansend1, cũng là chặn.

Tình huống thứ ba, có hai case và trong đó một cái là default

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

Tình huống này sẽ dịch nó thành một câu lệnh if gọi hàm runtime.selectnbsend, như dưới đây

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

Nếu là nhận dữ liệu channel sẽ dịch thành lời gọi đến runtime.selectnbrecv

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

Điều đáng chú ý là, trong tình huống này việc nhận hoặc gửi channel là không chặn, chúng ta có thể nhìn thấy rõ ràng tham số blcokfalse.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Mà bất kể là gửi hoặc nhận dữ liệu channel, khi blcokfalse đều có một đường nhanh có thể phán đoán có thể gửi hoặc nhận không trong tình huống không khóa, như dưới đây

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Khi đọc channel, nếu channel rỗng sẽ trực tiếp trả về, khi viết channel, nếu channel chưa đóng và đã đầy cũng sẽ trực tiếp trả về, trong tình huống thông thường chúng sẽ gây chặn goroutine, nhưng kết hợp với select thì không.

Xử lý

Ba tình huống trên chỉ là tối ưu hóa cho tình huống đặc biệt, từ khoá select được sử dụng bình thường sẽ được dịch thành lời gọi đến hàm runtime.selectgo, logic xử lý của nó dài đến hơn 400 dòng.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Compiler sẽ thu thập tất cả câu lệnh case thành một mảng scase, sau đó truyền cho hàm selectgo, sau khi xử lý xong trả về hai giá trị trả về

  1. Cái đầu tiên là chỉ số channel được chọn ngẫu nhiên, biểu thị channel nào được xử lý, không có thì trả về -1
  2. Cái thứ hai biểu thị đối với thao tác đọc channel而言 có đọc thành công không

Ở đây giải thích đơn giản các tham số của nó

  • cas0, con trỏ头部 của mảng scase, nửa phần đầu chứa case viết channel, nửa phần sau chứa case đọc channel, phân biệt bằng nsends
  • order0, độ dài của nó là hai lần mảng scase, nửa phần đầu phân phối cho mảng pollorder, nửa phần sau phân phối cho mảng lockorder
  • nsendsnrecvs biểu thị số lượng case đọc/viết channel, tổng của hai cái là tổng số case
  • block biểu thị có chặn không, nếu có case default thì biểu thị không chặn, giá trị của nó là false, ngược lại là true.
  • pc0, trỏ đến头部 của một mảng [ncases]uintptr, dùng để phân tích竞态, sau này có thể bỏ qua nó, không có giúp ích gì cho việc hiểu select

Giả sử có code dưới đây

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Xem hình thức assembly của nó, ở đây để tiện hiểu đã bỏ qua một phần code

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // 1 2 3 4几个临时变量
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // 调用 runtime.selectgo 函数
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // 跳转到 default 分支
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // 跳转到分支 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // 跳转到分支 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // 跳转到分支 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Có thể thấy sau khi gọi hàm selectgo có tồn tại logic phán đoán+nhảy, thông qua những điều này chúng ta không khó suy ra hình thức ban đầu của nó

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Code thực tế compiler sinh ra có thể có khác biệt với cái này, nhưng ý nghĩa đại khái là tương tự. Nên compiler sẽ sau khi gọi xong hàm selectgo đồng thời sử dụng câu lệnh if để phán đoán đến channel nào được thực thi, hơn nữa trước khi gọi, compiler còn sẽ sinh ra một vòng lặp for để thu thập mảng scase nhưng ở đây đã bỏ qua.

Sau khi biết bên ngoài sử dụng hàm selectgo như thế nào, dưới đây就来 hiểu bên trong hàm selectgo làm việc ra sao. Đầu tiên nó sẽ khởi tạo một vài mảng, nsends+nrecvs biểu thị tổng số case, từ code dưới đây cũng có thể thấy giá trị tối đa của số lượng case chính là 1 << 16, pollorder quyết định thứ tự thực thi của channel, lockorder quyết định thứ tự khóa của channel.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// 它的长度是 scase 数组的两倍,前半部分分配给 pollorder 数组,后半部分分配给 lockorder 数组。
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Tiếp theo khởi tạo mảng pollorder, nó chứa chỉ số mảng sacses của channel待 thực thi

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Nó sẽ duyệt toàn bộ mảng scases, sau đó thông qua runtime.fastrandn sinh ra số ngẫu nhiên trong khoảng [0, i], rồi đổi chỗ nó với i, trong quá trình sẽ bỏ qua case mà channel là nil, sau khi duyệt xong thì được một mảng pollorder mà các phần tử đã bị打乱, như hình dưới đây

Sau đó đối với mảng pollorder dựa trên kích thước địa chỉ channel sử dụng sắp xếp heap thì được mảng lockorder, rồi gọi runtime.sellock khóa theo thứ tự

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Ở đây đáng chú ý là, đối với channel dựa trên kích thước địa chỉ sắp xếp là để tránh deadlock, vì bản thân thao tác select không cần khóa cho phép đồng thời. Giả sử khóa theo thứ tự ngẫu nhiên của pollorder, vậy xem xét tình huống code dưới đây

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Ba goroutine ABC đều đi đến bước khóa này, và thứ tự khóa của chúng彼此 đều là ngẫu nhiên không giống nhau, có thể gây ra tình huống như dưới đây, như hình dưới đây

Giả sử thứ tự khóa của ABC giống như hình trên, vậy khả năng gây deadlock rất lớn, ví dụ A sẽ先持有 khóa của ch2, rồi đi thử lấy khóa của ch1, nhưng giả sử ch1 đã bị goroutine B khóa, goroutine B lại đi thử lấy khóa của ch2, vậy như vậy là gây ra deadlock.

Nếu tất cả goroutine đều khóa theo thứ tự tương tự, sẽ không xảy ra vấn đề deadlock, đây cũng là nguyên nhân căn bản mà lockorder phải dựa trên kích thước địa chỉ để sắp xếp.

Sau khi khóa xong, thì bắt đầu giai đoạn xử lý thực sự, đầu tiên duyệt mảng pollorder, dựa theo thứ tự đã打乱之前 để truy cập channel,逐个 duyệt tìm một channel khả dụng

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // 读管道
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // 写管道
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Có thể thấy ở đây đã xử lý 6 tình huống cho channel đọc/viết, dưới đây lần lượt讲解. Tình huống đầu tiên, đọc channel và có bên gửi đang chờ gửi, ở đây sẽ đi đến hàm runtime.recv, tác dụng của nó đã nói rồi, nó cuối cùng sẽ đánh thức goroutine bên gửi, trước khi đánh thức thì hàm callback sẽ khóa toàn bộ channel.

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Tình huống thứ hai, đọc channel, không có bên gửi đang chờ, số lượng phần tử bộ đệm lớn hơn 0, ở đây sẽ trực tiếp đọc dữ liệu từ bộ đệm, logic của nó hoàn toàn tương tự như trong runtime.chanrecv, rồi mở khóa.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Tình huống thứ ba, đọc channel, nhưng channel đã đóng, và trong bộ đệm không có phần tử còn lại, ở đây sẽ先 mở khóa rồi trực tiếp trả về.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Tình huống thứ tư, gửi dữ liệu đến channel đã đóng, ở đây sẽ先 mở khóa rồi panic,

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Tình huống thứ năm, có bên nhận đang chặn chờ, ở đây sẽ gọi hàm runitme.send, và cuối cùng đánh thức goroutine bên nhận, trước khi đánh thức thì hàm callback sẽ khóa toàn bộ channel.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Tình huống thứ sáu, không có goroutine bên nhận chờ, sẽ đặt dữ liệu cần gửi vào bộ đệm, rồi mở khóa.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Rồi tất cả tình huống trên cuối cùng đều đi vào nhánh retc này, mà việc nó cần làm chỉ là trả về chỉ số channel được chọn casirecvOk biểu thị có đọc thành công không.

go
retc:
    return casi, recvOK

Tình huống thứ bảy, không tìm thấy channel khả dụng, và code中包含 nhánh default, thì mở khóa channel rồi trực tiếp trả về, ở đây trả về casi là -1 tức biểu thị không có channel khả dụng.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Tình huống cuối cùng, không tìm thấy channel khả dụng, và code中不包含 nhánh default, vậy goroutine hiện tại sẽ rơi vào trạng thái chặn, trước đó selectgo sẽ thêm goroutine hiện tại vào hàng đợi recvq/sendq của tất cả channel监听

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Ở đây sẽ tạo若干个 sudog và liên kết chúng với channel tương ứng, như hình dưới đây

Rồi bởi runtime.gopark chặn, trước khi chặn sẽ mở khóa channel, công việc mở khóa do hàm runtime.selparkcommit hoàn thành, nó được truyền vào gopark dưới hình thức hàm callback.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

Việc đầu tiên sau khi被 đánh thức là解除 liên kết sudog với channel

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Rồi移除 sudog từ hàng đợi chờ của channel之前

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

Trong quá trình trên nhất định sẽ tìm thấy channel được xử lý bởi goroutine bên đánh thức, rồi dựa vào caseSuccess để đưa ra xử lý cuối cùng. Đối với thao tác viết而言, sg.successfalse biểu thị channel đã đóng, và toàn bộ go runtime cũng chỉ có hàm close sẽ chủ động đặt trường này thành false, điều này biểu thị goroutine hiện tại là bên đánh thức thông qua hàm close đánh thức. Đối với thao tác đọc而言, nếu là被 bên gửi đánh thức, thao tác đọc dữ liệu cũng đã được bên gửi hoàn thành thông qua hàm runtime.send trước khi被 đánh thức, giá trị của nó là true, nếu là被 hàm close đánh thức, cũng giống như前面 đều là trực tiếp trả về.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

Đến đây toàn bộ logic của select đều đã rõ ràng, trên đây chia làm mấy tình huống, có thể thấy select xử lý ra vẫn là khá phức tạp.

Golang by www.golangdev.cn edit