Skip to content

select

select es una estructura que puede monitorear múltiples estados de canales simultáneamente. Su sintaxis es similar a switch.

go
import (
  "context"
  "log/slog"
  "os"
  "os/signal"
  "time"
)

func main() {
  finished := make(chan struct{})
  ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
  defer stop()
  slog.Info("running")

  go func() {
    time.Sleep(time.Second * 2)
    finished <- struct{}{}
  }()

  select {
  case <-ctx.Done():
    slog.Info("shutting down")
  case <-finished:
    slog.Info("finished")
  }
}

Este código combina context, canal y select para implementar una lógica simple de salida suave del programa. En el código, select monitorea simultáneamente dos canales ctx.Done y finished. Tiene dos condiciones de salida: primero, el sistema operativo envía una señal de salida; segundo, el canal finished tiene mensajes para leer, es decir, la tarea del código de usuario está completa. De esta manera, podemos realizar trabajo de cierre al salir del programa.

Como es sabido, select tiene dos características muy importantes: primero, no bloqueo. En el código fuente de envío y recepción de canales se puede ver que se hizo algún tratamiento para select, permitiendo determinar si un canal está disponible sin bloqueo. Segundo, aleatoriedad. Si hay múltiples canales disponibles, seleccionará uno aleatoriamente para ejecutar, sin seguir un orden establecido. Esto permite que cada canal se ejecute de manera relativamente equitativa; de lo contrario, en situaciones extremas, algunos canales podrían nunca ser procesados. Debido a que todo su trabajo está relacionado con canales, se recomienda primero leer el artículo chan. Después de entender los canales, entender select será mucho más fluido.

Estructura

En tiempo de ejecución, solo hay una estructura runtime.scase que representa una rama de select. Cada case en tiempo de ejecución se representa como scase.

go
type scase struct {
  c    *hchan         // chan
  elem unsafe.Pointer // data element
}

Donde c se refiere al canal, y elem es el puntero al elemento recibido o enviado. En realidad, la palabra clave select se refiere a la función runtime.selectgo.

Principio

El uso de select se divide en cuatro situaciones para optimización. Esto se puede ver en la función cmd/compile/internal/walk.walkSelectCases, donde se maneja el tratamiento para estas cuatro situaciones.

go
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
  ncas := len(cases)
  sellineno := base.Pos

  // optimización: select con cero casos
  if ncas == 0 {
    return []ir.Node{mkcallstmt("block")}
  }

  // optimización: select con un caso: operación única.
  if ncas == 1 {
    ...
  }

  // optimización: select con dos casos pero uno es default: operación única no bloqueante.
  if ncas == 2 && dflt != nil {
    ...
  }

  ...
  return init
}

Optimización

El compilador optimiza las primeras tres situaciones. La primera situación es cuando la cantidad de casos es 0, es decir, un select vacío. Todos sabemos que una sentencia select vacía causará que la goroutine actual se bloquee permanentemente.

go
select{}

La razón del bloqueo es que el compilador lo traduce en una llamada directa a la función runtime.block.

go
func block() {
  gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}

Y la función block llama a la función runtime.gopark, haciendo que la goroutine actual cambie al estado _Gwaiting y entre en bloqueo permanente, sin ser programada nunca más.

La segunda situación es cuando hay solo un caso y no es default. En este caso, el compilador lo traduce directamente en una operación de envío/recepción de canal, y además es bloqueante. Por ejemplo, el siguiente código:

go
func main() {
  ch := make(chan int)
  select {
  case <-ch:
        // do something
  }
}

Se traduce en una llamada directa a la función runtime.chanrecv1, lo cual se puede ver en el código ensamblador:

go
TEXT  main.main(SB), ABIInternal, $2
...
LEAQ  type:chan int(SB), AX
XORL  BX, BX
PCDATA  $1, $0
CALL  runtime.makechan(SB)
XORL  BX, BX
NOP
CALL  runtime.chanrecv1(SB)
ADDQ  $16, SP
POPQ  BP
...

En el caso de un solo caso, enviar datos a un canal es lo mismo. Se traduce en una llamada directa a la función runtime.chansend1, también bloqueante.

La tercera situación es cuando hay dos casos y uno de ellos es default:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
        // do something
  default:
        // do something
  }
}

En este caso, se traduce en una sentencia if que llama a runtime.selectnbsend, como sigue:

go
if selectnbsend(ch, 1) {
  // do something
} else {
  // do something
}

Si se reciben datos del canal, se traduce en una llamada a runtime.selectnbrecv:

go
ch := make(chan int)
select {
  case x, ok := <-ch:
      // do something
  default:
      // do something
}
go
if selected, ok = selectnbrecv(&v, c); selected {
  // do something
} else {
  // do something
}

Vale la pena señalar que en este caso, la recepción o envío del canal es no bloqueante. Podemos ver claramente que el parámetro block es false.

go
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
  return chansend(c, elem, false, getcallerpc())
}

func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
  return chanrecv(c, elem, false)
}

Y tanto para enviar como para recibir datos de un canal, cuando block es false, hay un camino rápido que puede determinar si se puede enviar o recibir sin bloquear, como se muestra:

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
      return
    }
    return true, false
  }
  ...
}

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Al leer del canal, si el canal está vacío, se devuelve directamente. Al escribir en el canal, si el canal no está cerrado y está lleno, también se devuelve directamente. En general, esto causaría bloqueo de la goroutine, pero combinado con select no lo hace.

Tratamiento

Las tres situaciones anteriores son solo optimizaciones para casos especiales. La palabra clave select de uso normal se traduce en una llamada a la función runtime.selectgo, cuya lógica de procesamiento tiene más de 400 líneas.

go
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)

El compilador recopila todas las sentencias case en un array scase y lo pasa a la función selectgo. Después del procesamiento, devuelve dos valores de retorno:

  1. El primero es el índice del canal seleccionado aleatoriamente, indicando qué canal fue procesado. Si no hay ninguno, devuelve -1.
  2. El segundo indica si la operación de lectura del canal fue exitosa.

Aquí se explica brevemente sus parámetros:

  • cas0: puntero principal del array scase. La primera mitad almacena casos de escritura de canal, la segunda mitad almacena casos de lectura de canal, diferenciados por nsends.
  • order0: su longitud es el doble del array scase. La primera mitad se asigna al array pollorder, la segunda mitad se asigna al array lockorder.
  • nsends y nrecvs: indican la cantidad de casos de lectura/escritura de canal. La suma de ambos es el total de casos.
  • block: indica si es bloqueante. Si hay un caso default, representa no bloqueo, su valor es false, de lo contrario es true.
  • pc0: apunta a la cabeza de un array [ncases]uintptr, usado para análisis de carrera. Se puede ignorar más adelante, no ayuda a entender select.

Supongamos que tenemos el siguiente código:

go
func main() {
  ch := make(chan int)
  select {
  case ch <- 1:
    println(1)
  case ch <- 2:
    println(2)
  case ch <- 3:
    println(3)
  case ch <- 4:
    println(4)
  default:
    println("default")
  }
}

Al ver su forma ensambladora, aquí para facilitar la comprensión se omite parte del código:

go
0x0000 00000 TEXT  main.main(SB), ABIInterna
...
0x0023 00035 CALL  runtime.makechan(SB)
0x0028 00040 MOVQ  $1, main..autotmp_2+72(SP) // variables temporales 1 2 3 4
0x0031 00049 MOVQ  $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ  $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ  $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL  runtime.selectgo(SB) // llama a la función runtime.selectgo
0x00cd 00205 TESTQ  AX, AX
0x00d0 00208 JLT  352 // salta a la rama default
0x00d6 00214 PCDATA  $1, $-1
0x00d6 00214 JEQ  320 // salta a la rama 4
0x00d8 00216 CMPQ  AX, $1
0x00dc 00220 JEQ  288 // salta a la rama 3
0x00de 00222 NOP
0x00e0 00224 CMPQ  AX, $2
0x00e4 00228 JNE  258 // salta a la rama 2
0x00e6 00230 PCDATA  $1, $0
0x00e6 00230 CALL  runtime.printlock(SB)
0x00eb 00235 MOVL  $3, AX
0x00f0 00240 CALL  runtime.printint(SB)
0x00f5 00245 CALL  runtime.printnl(SB)
0x00fa 00250 CALL  runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP  379
0x0102 00258 CALL  runtime.printlock(SB)
0x0107 00263 MOVL  $4, AX
0x010c 00268 CALL  runtime.printint(SB)
0x0111 00273 CALL  runtime.printnl(SB)
0x0116 00278 CALL  runtime.printunlock(SB)
0x011b 00283 JMP  379
0x011d 00285 NOP
0x0120 00288 CALL  runtime.printlock(SB)
0x0125 00293 MOVL  $2, AX
0x012a 00298 CALL  runtime.printint(SB)
0x012f 00303 CALL  runtime.printnl(SB)
0x0134 00308 CALL  runtime.printunlock(SB)
0x0139 00313 JMP  379
0x013b 00315 NOP
0x0140 00320 CALL  runtime.printlock(SB)
0x0145 00325 MOVL  $1, AX
0x014a 00330 CALL  runtime.printint(SB)
0x014f 00335 CALL  runtime.printnl(SB)
0x0154 00340 CALL  runtime.printunlock(SB)
0x0159 00345 JMP  379
0x015b 00347 NOP
0x0160 00352 CALL  runtime.printlock(SB)
0x0165 00357 LEAQ  go:string."default\n"(SB)
0x016c 00364 MOVL  $8, BX
0x0171 00369 CALL  runtime.printstring(SB)
0x0176 00374 CALL  runtime.printunlock(SB)
0x017b 00379 PCDATA  $1, $-1
0x017b 00379 ADDQ  $160, SP
0x0182 00386 POPQ  BP
0x0183 00387 RET

Se puede ver que después de llamar a la función selectgo, hay una lógica de juicio + salto. Con esto no es difícil deducir su forma original:

go
casei, ok := runtime.selectgo()
if casei == -1 {
    println("default")
} else if casei == 3 {
    println(4)
} else if casei == 2 {
    println(3)
} else if casei == 1 {
    println(2)
} else {
    println(1)
}

El código real generado por el compilador puede diferir de este, pero el significado general es similar. Por lo tanto, después de llamar a la función selectgo, el compilador usa sentencias if para determinar qué canal se ejecuta. Y antes de llamar, el compilador también genera un bucle for para recopilar el array scase, aunque aquí se omite.

Después de conocer cómo se usa externamente la función selectgo, ahora entendamos cómo funciona internamente la función selectgo. Primero inicializa varios arrays. nsends+nrecvs representa el total de casos. Del siguiente código también se puede ver que el valor máximo de casos es 1 << 16. pollorder determina el orden de ejecución de los canales, y lockorder determina el orden de bloqueo de los canales.

go
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// Su longitud es el doble del array scase, la primera mitad se asigna al array pollorder, la segunda mitad se asigna al array lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]

Luego inicializa el array pollorder, que almacena los índices del array sacses de los canales pendientes de ejecución:

go
norder := 0
for i := range scases {
    cas := &scases[i]

    // Omit cases without channels from the poll and lock orders.
    if cas.c == nil {
       cas.elem = nil // allow GC
       continue
    }

    j := fastrandn(uint32(norder + 1))
    pollorder[norder] = pollorder[j]
    pollorder[j] = uint16(i)
    norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]

Recorre todo el array scases, luego genera un número aleatorio entre [0, i] mediante runtime.fastrandn, y lo intercambia con i. Durante el proceso, omite los casos cuyo canal es nil. Después de completar el recorrido, se obtiene un array pollorder con elementos desordenados, como se muestra:

Luego, el array pollorder se ordena por heap según el tamaño de la dirección del canal para obtener el array lockorder, y luego se llama a runtime.sellock para bloquearlos en orden:

go
func sellock(scases []scase, lockorder []uint16) {
  var c *hchan
  for _, o := range lockorder {
    c0 := scases[o].c
    if c0 != c {
      c = c0
      lock(&c.lock)
    }
  }
}

Vale la pena señalar que ordenar los canales por tamaño de dirección es para evitar deadlocks, porque la operación select en sí no permite concurrencia con bloqueo. Supongamos que se bloquea en orden aleatorio según pollorder, entonces considere la situación del siguiente código:

go
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)

poll := func() {
    select {
    case ch1 <- 1:
        println(1)
    case ch2 <- 2:
        println(2)
    case ch3 <- 3:
        println(3)
    case ch4 <- 4:
        println(4)
    default:
        println("default")
    }
}

// A
go poll()
// B
go poll()
// C
go poll()

Las tres goroutines ABC llegan a este paso de bloqueo, y su orden de bloqueo mutuo es aleatorio y diferente entre sí. Es posible causar la siguiente situación, como se muestra:

Supongamos que el orden de bloqueo de ABC es igual a la figura anterior, entonces la posibilidad de deadlock es muy grande. Por ejemplo, A primero retendrá el bloqueo de ch2, luego intentará obtener el bloqueo de ch1, pero supongamos que ch1 ya fue bloqueado por la goroutine B. La goroutine B intentará obtener el bloqueo de ch2, causando así un deadlock.

Si todas las goroutines bloquean en el mismo orden, no ocurrirá el problema de deadlock. Esta es la razón fundamental por la que lockorder debe ordenarse por tamaño de dirección.

Después de bloquear, comienza la verdadera fase de procesamiento. Primero se recorre el array pollorder, accediendo a los canales en el orden desordenado anterior, recorriendo uno por uno para encontrar un canal disponible:

go
for _, casei := range pollorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c

    if casi >= nsends { // canal de lectura
        sg = c.sendq.dequeue()
        if sg != nil {
            goto recv
        }
        if c.qcount > 0 {
            goto bufrecv
        }
        if c.closed != 0 {
            goto rclose
        }
    } else { // canal de escritura
        if c.closed != 0 {
            goto sclose
        }
        sg = c.recvq.dequeue()
        if sg != nil {
            goto send
        }
        if c.qcount < c.dataqsiz {
            goto bufsend
        }
    }
}

Se puede ver que aquí se manejan 6 situaciones para canales de lectura/escritura. A continuación se explican por separado.

Primera situación: leer canal y hay un remitente esperando para enviar. Aquí se va a la función runtime.recv, cuyo papel ya se explicó. Finalmente despertará la goroutine remitente, y antes de despertar, la función de retorno de llamada desbloqueará todos los canales.

go
recv:
  // can receive from sleeping sender (sg)
  recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  recvOK = true
  goto retc

Segunda situación: leer canal, no hay remitente esperando, la cantidad de elementos en búfer es mayor que 0. Aquí se lee directamente desde el búfer, cuya lógica es completamente consistente con runtime.chanrecv, luego se desbloquea.

go
bufrecv:
  recvOK = true
  qp = chanbuf(c, c.recvx)
  if cas.elem != nil {
    typedmemmove(c.elemtype, cas.elem, qp)
  }
  typedmemclr(c.elemtype, qp)
  c.recvx++
  if c.recvx == c.dataqsiz {
    c.recvx = 0
  }
  c.qcount--
  selunlock(scases, lockorder)
  goto retc

Tercera situación: leer canal, pero el canal ya está cerrado, y no hay elementos restantes en el búfer. Aquí se desbloquea primero y luego se devuelve directamente.

go
rclose:
  selunlock(scases, lockorder)
  recvOK = false
  if cas.elem != nil {
    typedmemclr(c.elemtype, cas.elem)
  }
  goto retc

Cuarta situación: enviar datos a un canal cerrado. Aquí se desbloquea primero y luego se produce panic.

go
sclose:
  selunlock(scases, lockorder)
  panic(plainError("send on closed channel"))

Quinta situación: hay un receptor bloqueado esperando. Aquí se llama a la función runitme.send, y finalmente se despierta la goroutine receptora. Antes de despertar, la función de retorno de llamada desbloqueará todos los canales.

go
send:
  send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
  goto retc

Sexta situación: no hay goroutine receptora esperando, se ponen los datos a enviar en el búfer, luego se desbloquea.

go
bufsend:
  typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
  c.sendx++
  if c.sendx == c.dataqsiz {
    c.sendx = 0
  }
  c.qcount++
  selunlock(scases, lockorder)
  goto retc

Luego, todas las situaciones anteriores finalmente llegan a la rama retc, y lo único que debe hacer es devolver el índice del canal seleccionado casi y recvOk que representa si la lectura fue exitosa.

go
retc:
    return casi, recvOK

Séptima situación: no se encuentra un canal disponible, y el código contiene una rama default. Entonces se desbloquean los canales y se devuelve directamente. Aquí casi devuelto es -1, lo que indica que no hay canales disponibles.

go
if !block {
    selunlock(scases, lockorder)
    casi = -1
    goto retc
}

Última situación: no se encuentra un canal disponible, y el código no contiene una rama default. Entonces la goroutine actual entra en estado de bloqueo. Antes de esto, selectgo agrega la goroutine actual a las colas recvq/sendq de todos los canales monitoreados:

go
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
    casi = int(casei)
    cas = &scases[casi]
    c = cas.c
    sg := acquireSudog()
    sg.g = gp
    sg.isSelect = true
    sg.elem = cas.elem
    sg.releasetime = 0
    sg.c = c
    *nextp = sg
    nextp = &sg.waitlink

    if casi < nsends {
        c.sendq.enqueue(sg)
    } else {
        c.recvq.enqueue(sg)
    }
}

Aquí se crean varios sudog y se enlazan con los canales correspondientes, como se muestra:

Luego se bloquea mediante runtime.gopark. Antes de bloquear, se desbloquean los canales, y el trabajo de desbloqueo lo completa la función runtime.selparkcommit, que se pasa como función de retorno de llamada a gopark.

go
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = false

Lo primero que se hace después de ser despertado es解除 el enlace entre sudog y los canales:

go
sellock(scases, lockorder)

gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil

casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
    sg1.isSelect = false
    sg1.elem = nil
    sg1.c = nil
}
gp.waiting = nil

Luego se elimina sudog de las colas de espera de los canales anteriores:

go
for _, casei := range lockorder {
    k = &scases[casei]
    if sg == sglist {
        // sg has already been dequeued by the G that woke us up.
        casi = int(casei)
        cas = k
        caseSuccess = sglist.success
        if sglist.releasetime > 0 {
            caseReleaseTime = sglist.releasetime
        }
    } else {
        c = k.c
        if int(casei) < nsends {
            c.sendq.dequeueSudoG(sglist)
        } else {
            c.recvq.dequeueSudoG(sglist)
        }
    }
    sgnext = sglist.waitlink
    sglist.waitlink = nil
    releaseSudog(sglist)
    sglist = sgnext
}

En el proceso anterior, definitivamente se encontrará un canal procesado por la goroutine que despertó. Luego se hace el tratamiento final según caseSuccess. Para operaciones de escritura, sg.success siendo false representa que el canal está cerrado. Y en todo el runtime de go, solo la función close establece activamente este campo en false, lo que indica que la goroutine actual fue despertada por el despertador mediante la función close. Para operaciones de lectura, si fue despertada por el remitente, la operación de lectura de datos ya se completó antes de ser despertada por el remitente mediante la función runtime.send, y su valor es true. Si fue despertada por la función close, igual que antes, se devuelve directamente.

go
c = cas.c

if casi < nsends {
    if !caseSuccess {
       goto sclose
    }
} else {
    recvOK = caseSuccess
}

selunlock(scases, lockorder)
goto retc

Hasta aquí, toda la lógica de select está大致 aclarada. Se dividieron varias situaciones arriba, lo que muestra que el tratamiento de select es bastante complejo.

Golang editado por www.golangdev.cn