Skip to content

select

select — это конструкция, которая может одновременно отслеживать состояния нескольких channel. Её синтаксис похож на switch:

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Этот код реализует простую логику плавного завершения работы, комбинируя context, channel и select. Select одновременно отслеживает два channel: ctx.Done и finished. Имеет два условия выхода: первое — операционная система отправляет сигнал выхода; второе — в channel finished есть данные для чтения, что означает завершение задачи пользовательского кода. Это позволяет нам выполнять работу по очистке при выходе программы.

Как мы знаем, select имеет две очень важные характеристики: во-первых, неблокирующий. В исходном коде отправки и получения channel видно, что select обрабатывается специально, позволяя проверять доступность channel без блокировки. Во-вторых, рандомизация. Если несколько channel доступны, он случайным образом выбирает один для выполнения. Неследование фиксированному порядку обеспечивает относительно справедливое выполнение каждого channel; иначе, в крайних случаях, некоторые channel могут никогда не быть обработаны. Поскольку вся его работа связана с channel, рекомендуется сначала прочитать статью chan. Понимание channel перед изучением select значительно упростит дело.

Структура

Во время выполнения существует только структура runtime.scase, представляющая ветвь select. Каждый case во время выполнения представляется как scase:

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // элемент данных
}

Здесь c относится к channel, а elem — указатель на элемент получения или отправки. Фактически, ключевое слово select относится к функции runtime.selectgo.

Принципы

Go делит использование select на четыре случая для оптимизации. Это можно увидеть в функции cmd/compile/internal/walk.walkSelectCases, которая обрабатывает эти четыре случая:

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // оптимизация: select с нулём случаев
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // оптимизация: select с одним случаем: одна операция.
  if ncas == 1 {
    ...
  }

  // оптимизация: select с двумя случаями, но один из них default: одна неблокирующая операция.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Оптимизация

Компилятор оптимизирует первые три случая. Первый случай — когда количество case равно 0, т.е. пустой select. Мы все знаем, что пустой оператор select вызывает блокировку текущей goroutine навсегда:

go
select{}

Причина блокировки в том, что компилятор транслирует это в прямой вызов функции runtime.block:

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // навсегда
}

Функция block вызывает функцию runtime.gopark, изменяя состояние текущей goroutine на _Gwaiting и входя в постоянную блокировку, никогда больше не планируя.

Второй случай: только один case и он не default. В этом случае компилятор напрямую транслирует это в операцию отправки/получения channel, и она блокирующая. Например, следующий код:

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

Транслируется в прямой вызов функции runtime.chanrecv1, что видно из ассемблерного кода:

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

В случае только одного case отправка данных в channel следует тому же принципу. Транслируется в прямой вызов функции runtime.chansend1, также блокирующая.

Третий случай: два case, где один из них default:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

Этот случай транслируется в оператор if, вызывающий runtime.selectnbsend:

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

Если получение данных channel, транслируется в вызов runtime.selectnbrecv:

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

Стоит отметить, что в этом случае получение или отправка channel неблокирующие. Мы можем чётко видеть, что параметр block равен false:

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

При отправке или получении данных channel, когда block равен false, существует быстрый путь, который может проверить возможность отправки или получения без блокировки. Как показано ниже:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

При чтении из channel, если channel пуст, возвращается напрямую. При записи в channel, если channel не закрыт и уже заполнен, возвращается напрямую. В общих случаях это вызвало бы блокировку goroutine, но в комбинации с select этого не происходит.

Обработка

Вышеупомянутые три случая — это оптимизации для особых случаев. Обычно используемое ключевое слово select транслируется в вызов функции runtime.selectgo, чья логика обработки имеет длину более 400 строк:

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

Компилятор собирает все операторы case в массив scase и передаёт его функции selectgo. После обработки возвращаются два значения:

  1. Первое — случайно выбранный индекс channel, указывающий, какой channel был обработан. Возвращает -1, если нет.
  2. Второе указывает, была ли успешна операция чтения channel.

Вот простое объяснение её параметров:

  • cas0: головной указатель массива scase. Первая половина хранит case записи channel, вторая половина хранит case чтения channel, различаются по nsends.
  • order0: её длина в два раза больше массива scase. Первая половина выделена массиву pollorder, вторая половина массиву lockorder.
  • nsends и nrecvs: количество case чтения/записи channel. Их сумма — общее количество case.
  • block: блокирующий или нет. Если есть case default, представляет неблокирующий, значение false, иначе true.
  • pc0: указывает на головной элемент массива [ncases]uintptr, используется для race analysis. Можно игнорировать позже; это не помогает понять select.

Предположим, у нас есть следующий код:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Просмотрим его ассемблерную форму. Для ясности некоторый код опущен:

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // временные переменные 1 2 3 4
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // вызов функции runtime.selectgo
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // переход к ветви default
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // переход к ветви 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // переход к ветви 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // переход к ветви 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Как видно, после вызова функции selectgo существует логика суждения + перехода. Из них мы можем легко вывести её исходную форму:

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

Фактический код, сгенерированный компилятором, может отличаться от этого, но общая идея похожа. Поэтому после вызова функции selectgo компилятор использует операторы if для определения, какой channel будет выполнен. Перед вызовом компилятор также генерирует цикл for для сбора массива scase, но это опущено здесь.

После понимания того, как функция selectgo используется внешне, давайте поймём, как функция selectgo работает внутренне. Сначала инициализируются несколько массивов. nsends+nrecvs представляет общее количество case. Из кода ниже видно, что максимальное количество case — 1 << 16. pollorder определяет порядок выполнения channel, lockorder определяет порядок блокировки channel:

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// Его длина в два раза больше массива scase, первая половина выделена массиву pollorder, вторая половина массиву lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Далее инициализируется массив pollorder, который хранит индексы массива scases channel для выполнения:

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Пропускаем case без channel из порядков опроса и блокировки.
    if cas.c == nil {
       cas.elem = nil // разрешаем GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Обходит весь массив scases, затем использует runtime.fastrandn для генерации случайного числа между [0, i], затем меняет его с i. Во время процесса пропускает case, где channel равен nil. После завершения обхода получаем массив pollorder с перемешанными элементами, как показано ниже:

Затем массив pollorder сортируется по адресу channel с использованием сортировки кучей для получения массива lockorder. Затем вызывается runtime.sellock для блокировки их по порядку:

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Стоит отметить, что сортировка channel по адресу是为了 избежать взаимоблокировки, потому что операция select сама не требует блокировок и позволяет конкурентность. Предположим блокировку по случайному порядку pollorder, затем рассмотрим следующий код:

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Три goroutine ABC все достигают этого шага блокировки, и их порядок блокировки случаен и отличается друг от друга. Это может вызвать следующую ситуацию, как показано ниже:

Предположим, порядок блокировки ABC такой же, как на рисунке выше. Возможность взаимоблокировки очень высока. Например, A сначала захватит блокировку ch2, затем попытается захватить блокировку ch1. Но предположим, ch1 уже заблокирована goroutine B, goroutine B затем попытается захватить блокировку ch2. Это вызывает взаимоблокировку.

Если все goroutine блокируют в одном порядке, взаимоблокировка не произойдёт. Это основная причина, по которой lockorder сортируется по адресу.

После блокировки начинается настоящая фаза обработки. Сначала обходит массив pollorder, обращаясь к channel в ранее перемешанном порядке, находя доступный channel:

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // чтение channel
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // запись channel
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Как видно, существует 6 случаев, обрабатываемых для channel чтения/записи. Давайте объясним их отдельно.

Первый случай: чтение channel и отправитель ожидает отправки. Переходит к функции runtime.recv, чья роль была объяснена. В конечном итоге пробуждает goroutine-отправителя. Перед пробуждением функция обратного вызова разблокирует все channel:

go
recv:
  // можно получить от спящего отправителя (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Второй случай: чтение channel, нет ожидающего отправителя, количество элементов буфера больше 0. Это читает данные напрямую из буфера. Его логика полностью согласована с runtime.chanrecv, затем разблокирует:

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Третий случай: чтение channel, но channel уже закрыт, и в буфере нет оставшихся элементов. Это сначала разблокирует, затем возвращается напрямую:

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Четвёртый случай: отправка данных в закрытый channel. Это сначала разблокирует, затем panic:

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Пятый случай: получатель блокирует и ожидает. Это вызывает функцию runtime.send, и в конечном итоге пробуждает goroutine-получателя. Перед пробуждением функция обратного вызова разблокирует все channel:

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Шестой случай: нет ожидающей goroutine-получателя, помещает данные для отправки в буфер, затем разблокирует:

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Затем все вышеуказанные случаи в конечном итоге переходят к ветви retc, которая только возвращает индекс выбранного channel casi и recvOk, представляющий успешность чтения:

go
retc:
    return casi, recvOK

Седьмой случай: доступный channel не найден, и код содержит ветвь default. Разблокирует channel, затем возвращается напрямую. Здесь возвращённый casi равен -1, указывая на отсутствие доступного channel:

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Последний случай: доступный channel не найден, и код не содержит ветвь default. Затем текущая goroutine входит в заблокированное состояние. Перед этим selectgo добавляет текущую goroutine в очереди recvq/sendq всех прослушиваемых channel:

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Это создаёт несколько sudog и связывает их с соответствующими channel, как показано ниже:

Затем блокирует через runtime.gopark. Перед блокировкой разблокирует channel. Работа разблокировки завершается функцией runtime.selparkcommit, которая передаётся как обратный вызов gopark:

go
gp.param = nil
// Сигнализируем всем, пытающимся уменьшить наш стек, что мы
// собираем припарковаться на channel. Окно между когда статус
// этой G меняется и когда мы устанавливаем gp.activeStackChans, небезопасен для уменьшения стека.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

Первое после пробуждения — отвязать sudog от channel:

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Очищаем все elem перед отвязкой от gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Затем удаляет sudog из предыдущих очередей ожидания channel:

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg уже был извлечён G, которая нас разбудила.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

В вышеуказанном процессе channel, обработанный пробудившей goroutine, обязательно будет найден. Затем делает окончательную обработку на основе caseSuccess. Для операций записи sg.success равный false означает, что channel закрыт. Более того, на протяжении всего runtime Go только функция close активно устанавливает это поле в false. Это указывает, что текущая goroutine пробуждена пробудившим через функцию close. Для операций чтения, если пробуждён отправителем, операция чтения данных уже была завершена отправителем через функцию runtime.send перед пробуждением, её значение true. Если пробуждён функцией close, так же как и раньше, возвращается напрямую:

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

В этот момент вся логика select примерно прояснена. Несколько случаев были разделены выше, показывая, что обработка select всё ещё довольно сложна.

Golang by www.golangdev.cn edit