Skip to content

chan

channel es una estructura de datos especial, es el representante típico del pensamiento CSP en Go. El núcleo del pensamiento CSP es que los procesos intercambian datos mediante mensajes. Correspondientemente, a través de channel podemos comunicarnos fácilmente entre goroutines.

go
import "fmt"

func main() {
  done := make(chan struct{})
  go func() {
    // do something
    done <- struct{}{}
  }()
  <-done
  fmt.Println("finished")
}

Además de la comunicación, a través de channel también se pueden implementar operaciones de sincronización entre goroutines. En sistemas que requieren concurrencia, channel es casi omnipresente. Para comprender mejor cómo funciona channel, a continuación se presentarán sus principios.

Estructura

En tiempo de ejecución, channel se representa mediante la estructura runtime.hchan, que contiene los siguientes campos:

go
type hchan struct {
  qcount   uint           // total de datos en la cola
  dataqsiz uint           // tamaño de la cola circular
  buf      unsafe.Pointer // puntero a un array de dataqsiz elementos
  elemsize uint16
  closed   uint32
  elemtype *_type // tipo de elemento
  sendx    uint   // índice de envío
  recvx    uint   // índice de recepción
  recvq    waitq  // lista de esperas de recepción
  sendq    waitq  // lista de esperas de envío

  lock mutex
}

Como se puede ver claramente, existe el campo lock. channel es esencialmente una cola sincronizada con bloqueo. Los otros campos se explican a continuación:

  • qcount: representa el número total de datos

  • dataqsiz: tamaño de la cola circular

  • buf: puntero al array de tamaño dataqsiz, es decir, la cola circular

  • closed: indica si el channel está cerrado

  • sendx, recvx: índices de envío y recepción

  • sendq, recvq: listas de goroutines en espera de envío y recepción, cuyos elementos son runtime.sudog

    go
    type waitq struct {
      first *sudog
      last  *sudog
    }

Con la siguiente imagen se puede comprender claramente la estructura de channel:

Al usar las funciones len y cap en un channel, en realidad se devuelven sus campos hchan.qcount y hchan.dataqsiz.

Creación

Normalmente, hay una única forma de crear un channel, usando la función make:

go
ch := make(chan int, size)

El compilador lo traduce a una llamada a la función runtime.makechan, que es responsable de la creación real del channel. Su código es el siguiente:

go
func makechan(t *chantype, size int) *hchan {
  elem := t.Elem
  mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
  var c *hchan
  switch {
  case mem == 0:
    c = (*hchan)(mallocgc(hchanSize, nil, true))
    c.buf = c.raceaddr()
  case elem.PtrBytes == 0:
    c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
    c.buf = add(unsafe.Pointer(c), hchanSize)
  default:
    c = new(hchan)
    c.buf = mallocgc(mem, elem, true)
  }

  c.elemsize = uint16(elem.Size_)
  c.elemtype = elem
  c.dataqsiz = uint(size)

  return c
}

Esta lógica es bastante simple, principalmente asigna memoria para el channel. Primero calcula el tamaño de memoria estimado necesario según el size y el tipo de elemento elem.Size_ pasados, luego lo maneja en tres casos:

  1. size es 0, solo asigna hchanSize
  2. El elemento no contiene punteros, asigna el espacio de memoria correspondiente, y la memoria de la cola circular es continua con la memoria del channel
  3. El elemento contiene punteros, la memoria del channel y la cola circular se asigna por separado

Si los elementos almacenados en la cola circular son punteros, ya que referencian elementos externos, GC podría escanear estos punteros durante la fase de marcado-eliminación. Cuando se almacenan elementos no puntero en memoria continua, se evitan escaneos innecesarios. Después de asignar la memoria, finalmente se actualizan otros campos de información.

Cabe mencionar que cuando la capacidad del channel es un entero de 64 bits, se usa la función runtime.makechan64 para crearlo. Esencialmente también es una llamada a runtime.makechan, solo que hace una verificación de tipo adicional.

go
func makechan64(t *chantype, size int64) *hchan {
  if int64(int(size)) != size {
    panic(plainError("makechan: size out of range"))
  }

  return makechan(t, int(size))
}

Generalmente, es mejor que size no exceda math.MaxInt32.

Envío

Al enviar datos a un channel, colocamos los datos a enviar a la derecha de la flecha:

go
ch <- struct{}{}

El compilador lo traduce a runtime.chansend1. La función realmente responsable de enviar datos es runtime.chansend. chansend1 le pasa el puntero elem, que apunta al puntero del elemento enviado.

go
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
  chansend(c, elem, true, getcallerpc())
}

Primero verifica si el channel es nil. block indica si la operación de envío actual es bloqueante (el valor de block está relacionado con la estructura select). Si el envío es bloqueante y el channel es nil, se produce un panic directamente. En caso de envío no bloqueante, se verifica directamente sin bloqueo si el channel está lleno. Si está lleno, se devuelve directamente.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
        throw("unreachable")
    }

    if !block && c.closed == 0 && full(c) {
    return false
  }
    ...
}

Luego se adquiere el bloqueo y se verifica si el channel está cerrado. Si ya está cerrado, se produce un panic:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    lock(&c.lock)

    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    ...
}

Después se hace dequeue de un sudog de la cola recvq, y luego se envía mediante la función runtime.send.

go
if sg := c.recvq.dequeue(); sg != nil {
    send(c, sg, ep, func() { unlock(&c.lock) }, 3)
    return true
}

El contenido de la función send es el siguiente. Actualiza recvx y sendx, luego usa la función runtime.memmove para copiar directamente la memoria de los datos de comunicación a la dirección del elemento objetivo de la goroutine receptora. Luego, mediante la función runtime.goready, la goroutine receptora cambia al estado _Grunnable para participar nuevamente en la programación.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    c.recvx++
    if c.recvx == c.dataqsiz {
        c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    if sg.elem != nil {
       sendDirect(c.elemtype, sg, ep)
       sg.elem = nil
    }
    gp := sg.g
    unlockf()
    gp.param = unsafe.Pointer(sg)
    sg.success = true
    goready(gp, skip+1)
}

func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
  dst := sg.elem
  memmove(dst, src, t.Size_)
}

En este proceso, como se puede encontrar una goroutine en espera de recepción, los datos se envían directamente al receptor sin almacenarse en la cola circular. Si no hay un receptor disponible y la capacidad es suficiente, se coloca en el búfer de la cola circular y se devuelve directamente.

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
  if c.qcount < c.dataqsiz {
    qp := chanbuf(c, c.sendx)
    typedmemmove(c.elemtype, qp, ep)
    c.sendx++
    if c.sendx == c.dataqsiz {
      c.sendx = 0
    }
    c.qcount++
    unlock(&c.lock)
    return true
  }
    ...
}

Si el búfer está lleno, en caso de envío no bloqueante se devuelve directamente:

go
if !block {
    unlock(&c.lock)
    return false
}

Si es envío bloqueante, se entra en el siguiente flujo de código:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.releasetime = 0
    mysg.elem = ep
    mysg.waitlink = nil
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.waiting = mysg
    gp.param = nil
    c.sendq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)

    KeepAlive(ep)
    ...
}

Primero construye la goroutine actual como un sudog y la agrega a la cola de goroutines en espera de envío hchan.sendq. Luego, runtime.gopark hace que la goroutine actual se bloquee, cambie al estado _Gwaiting hasta que sea despertada nuevamente por el receptor. Además, mediante runtime.KeepAlive se mantiene vivo el dato a enviar para asegurar que el receptor lo copie exitosamente. Después de ser despertada, se entra en el proceso de finalización:

go
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    closed := !mysg.success
    gp.param = nil
    mysg.c = nil
    if closed {
    if c.closed == 0 {
      throw("chansend: spurious wakeup")
    }
    panic(plainError("send on closed channel"))
  }
    releaseSudog(mysg)
    return true
}

Como se puede ver, para el envío de datos a un channel hay las siguientes situaciones:

  1. El channel es nil, el programa colapsa
  2. El channel está cerrado, ocurre un panic
  3. La cola recvq no está vacía, se envía directamente al receptor
  4. No hay goroutines en espera, se agrega al búfer
  5. El búfer está lleno, la goroutine de envío se bloquea y espera a que otras goroutines reciban datos

Vale la pena señalar que en la lógica de envío anterior no se ve el manejo de datos que desbordan el búfer. Estos datos no pueden descartarse, se guardan en sudog.elem y son procesados por el receptor.

Recepción

En Go, hay dos formas de recibir datos de un channel. La primera es solo leer los datos:

go
data <- ch

La segunda es determinar si la lectura de datos fue exitosa:

go
data, ok <- ch

Las dos sintaxis anteriores son traducidas por el compilador a llamadas a runtime.chanrecv1 y runtime.chanrecv2, pero en realidad son llamadas a runtime.chanrecv. La lógica de recepción es similar al inicio de la lógica de envío, primero verifica si el channel es nil.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  if c == nil {
    if !block {
      return
    }
    gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
    throw("unreachable")
  }
  ...
}

Luego, en caso de lectura no bloqueante, se verifica sin bloqueo si el channel está vacío. Si el channel no está cerrado, se devuelve directamente. Si el channel está cerrado, se limpia la memoria del elemento recibido.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if !block && empty(c) {
        if atomic.Load(&c.closed) == 0 {
            return
        }
        if empty(c) {
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
  }
  ...
}

Luego se accede con bloqueo a la cola hchan.sendq. Se puede ver en la rama if c.closed != 0 que, incluso si el channel está cerrado, pero aún hay elementos en el channel, no se devuelve directamente. Todavía se ejecuta el código de consumo de elementos. Esta es la razón por la que se permite leer después de que el channel está cerrado.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.closed != 0 {
        if c.qcount == 0 {
            unlock(&c.lock)
            if ep != nil {
                typedmemclr(c.elemtype, ep)
            }
            return true, false
        }
    } else {
        if sg := c.sendq.dequeue(); sg != nil {
            recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
            return true, true
        }
    }
  ...
}

Si el channel no está cerrado, se verifica si hay goroutines en espera de envío en la cola sendq. Si es así, la función runtime.recv maneja esa goroutine de envío.

go
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
  if c.dataqsiz == 0 {
    if ep != nil {
      recvDirect(c.elemtype, sg, ep)
    }
  } else {
    qp := chanbuf(c, c.recvx)
    // copiar datos de la cola al receptor
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    // copiar datos del emisor a la cola
    typedmemmove(c.elemtype, qp, sg.elem)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
  }
  sg.elem = nil
  gp := sg.g
  unlockf()
  gp.param = unsafe.Pointer(sg)
  sg.success = true
  goready(gp, skip+1)
}

Primer caso, la capacidad del channel es 0, es decir, channel sin búfer. El receptor copia directamente los datos del emisor mediante la función runtime.recvDirect. Segundo caso, el búfer está lleno. Aunque antes no se vio la lógica para verificar si el búfer está lleno, cuando la capacidad del búfer no es 0 y hay un emisor en espera, esto ya representa que el búfer está lleno. Solo cuando el búfer está lleno, el emisor se bloquea esperando enviar. Esta lógica es juzgada por el emisor. Luego, el receptor hace dequeue del elemento principal del búfer y copia su memoria al puntero del elemento receptor objetivo. Luego copia los datos que la goroutine de envío quiere enviar y los encola (aquí vemos cómo el receptor maneja los datos que desbordan el búfer). Finalmente, runtime.goready despierta la goroutine de envío, cambiándola al estado _Grunnable para unirse nuevamente a la programación.

Si no hay goroutines en espera de envío, se verifica si hay elementos en espera de consumo en el búfer. Se hace dequeue del elemento principal y se copia su memoria al elemento objetivo del receptor, luego se devuelve.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    if c.qcount > 0 {
    // Recibir directamente de la cola
    qp := chanbuf(c, c.recvx)
    if ep != nil {
      typedmemmove(c.elemtype, ep, qp)
    }
    typedmemclr(c.elemtype, qp)
    c.recvx++
    if c.recvx == c.dataqsiz {
      c.recvx = 0
    }
    c.qcount--
    unlock(&c.lock)
    return true, true
  }
  ...
}

Finalmente, si no hay elementos consumibles en el channel, runtime.gopark cambia la goroutine actual al estado _Gwaiting, bloqueándose hasta que sea despertada por la goroutine de envío.

go
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp := getg()
    mysg := acquireSudog()
    mysg.elem = ep
    mysg.waitlink = nil
    gp.waiting = mysg
    mysg.g = gp
    mysg.isSelect = false
    mysg.c = c
    gp.param = nil
    c.recvq.enqueue(mysg)
    gp.parkingOnChan.Store(true)
    gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
  ...
}

Después de ser despertada, se devuelve. En este momento, el valor success devuelto proviene de sudog.success. Si el emisor envió datos exitosamente, este campo debería ser establecido en true por el emisor. Esta lógica se puede ver en la función runtime.send.

go
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ...
    sg.success = true
    goready(gp, skip+1)
}

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  ...
    gp.waiting = nil
    gp.activeStackChans = false
    success := mysg.success
    gp.param = nil
    mysg.c = nil
    releaseSudog(mysg)
    return true, success
}

Correspondientemente, en el juicio de sudog.success al final de runtime.chansend del emisor, su origen también es la configuración en la función runtime.recv del receptor. A través de esto se puede descubrir que el receptor y el emisor se complementan para que el channel funcione normalmente. En general, recibir datos es un poco más complejo que enviar datos. Hay las siguientes situaciones:

  1. El channel es nil, el programa colapsa
  2. El channel está cerrado, si el channel está vacío se devuelve directamente, si no está vacío se salta al caso 5 para ejecutar
  3. La capacidad del búfer es 0, hay una goroutine en espera de envío en sendq, se copian directamente los datos del emisor, luego se despierta el emisor
  4. El búfer está lleno, hay una goroutine en espera de envío en sendq, se hace dequeue del elemento principal del búfer, los datos del emisor se encolan, luego se despierta el emisor
  5. El búfer no está lleno y la cantidad no es 0, se hace dequeue del elemento principal del búfer, luego se devuelve
  6. El búfer está vacío, se entra en estado de bloqueo, esperando ser despertado por el emisor

Cierre

Para cerrar un channel, usamos la función incorporada close:

go
close(ch)

El compilador lo traduce a una llamada a runtime.closechan. Si el channel pasado es nil o ya está cerrado, se producirá un panic:

go
func closechan(c *hchan) {
  if c == nil {
    panic(plainError("close of nil channel"))
  }

  lock(&c.lock)
  if c.closed != 0 {
    unlock(&c.lock)
    panic(plainError("close of closed channel"))
  }
    c.closed = 1
  ...
}

Luego se hace dequeue de todas las goroutines bloqueadas en las colas sendq y recvq de este channel, y todas se despiertan mediante runtime.goready:

go
func closechan(c *hchan) {
    ...
  var glist gList

    // liberar todos los lectores
    for {
        sg := c.recvq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // liberar todos los escritores (ellos harán panic)
    for {
        sg := c.sendq.dequeue()
        gp := sg.g
        sg.success = false
        glist.push(gp)
    }

    // Preparar todas las Gs ahora que hemos soltado el bloqueo del channel.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

TIP

Por cierto, Go permite channels unidireccionales, con las siguientes reglas:

  1. Un channel de solo lectura no puede enviar datos
  2. Un channel de solo lectura no puede cerrarse
  3. Un channel de solo escritura no puede leer datos

Estos errores se detectan temprano en la etapa de verificación de tipos durante la compilación, no se dejan para tiempo de ejecución. Los interesados pueden leer el código relacionado en los siguientes paquetes:

  • cmd/compile/internal/types2
  • cmd/compile/internal/typecheck
go
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
    ...
    if uch.dir == RecvOnly {
       check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
       return
    }
    check.assignment(&val, uch.elem, "send")

Determinar si está cerrado

Muy temprano (antes de go1), había una función incorporada closed para determinar si un channel estaba cerrado, pero pronto fue eliminada. Esto se debe a que el escenario de uso del channel suele ser multi-goroutine. Supongamos que devuelve true, ciertamente puede representar que el channel está cerrado. Pero si devuelve false, no significa realmente que el channel no esté cerrado, porque nadie sabe quién lo cerrará en el próximo momento. Por lo tanto, este valor de retorno no es confiable. Si se usa este valor de retorno como base para determinar si enviar datos al channel es aún más peligroso, porque enviar datos a un channel cerrado causará un panic.

Si realmente es necesario, se puede implementar uno mismo. Un方案 es determinar si el channel está cerrado escribiendo en él:

go
func closed(ch chan int) (ans bool) {
  defer func() {
    if err := recover(); err != nil {
      ans = true
    }
  }()
  ch <- 0
  return ans
}

Pero esto también tiene efectos secundarios. Si no está cerrado, se escribirán datos redundantes, y se entrará en el proceso de manejo defer-recover, causando pérdida adicional de rendimiento. Por lo tanto, el方案 de escritura se puede descartar directamente. Para el方案 de lectura, se puede considerar, pero no se puede leer directamente el channel, porque leer directamente con el parámetro block en true bloqueará la goroutine. Se debe usar combinado con select. Cuando el channel se combina con select, es no bloqueante.

go
func closed(ch chan int) bool {
  select {
  case _, received := <-ch:
    return !received
  default:
    return false
  }
}

Esta situación solo parece un poco mejor que la anterior. Solo se aplica cuando el channel está cerrado y no hay elementos en el búfer del channel. Si hay elementos, se consumirá innecesariamente ese elemento. Todavía no hay una buena implementación.

Pero en realidad, no necesitamos determinar si el channel está cerrado. La razón ya se explicó al principio: el valor de retorno no es confiable. Usar correctamente el channel y cerrarlo correctamente es lo que deberíamos hacer. Por lo tanto:

  1. Nunca cerrar el channel en el receptor. El hecho de que cerrar un channel de solo lectura no pueda compilarse ya le dice claramente que no haga esto. Deje que el emisor haga esto.
  2. Si hay múltiples emisores, debe haber una goroutine separada para completar la operación de cierre. Asegúrese de que close sea llamado por una sola parte y solo una vez.
  3. Al pasar un channel, es mejor limitar a solo lectura o solo escritura

Siguiendo estos principios, se puede asegurar que no habrá grandes problemas.

Golang editado por www.golangdev.cn