chan
channel es una estructura de datos especial, es el representante típico del pensamiento CSP en Go. El núcleo del pensamiento CSP es que los procesos intercambian datos mediante mensajes. Correspondientemente, a través de channel podemos comunicarnos fácilmente entre goroutines.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Además de la comunicación, a través de channel también se pueden implementar operaciones de sincronización entre goroutines. En sistemas que requieren concurrencia, channel es casi omnipresente. Para comprender mejor cómo funciona channel, a continuación se presentarán sus principios.
Estructura
En tiempo de ejecución, channel se representa mediante la estructura runtime.hchan, que contiene los siguientes campos:
type hchan struct {
qcount uint // total de datos en la cola
dataqsiz uint // tamaño de la cola circular
buf unsafe.Pointer // puntero a un array de dataqsiz elementos
elemsize uint16
closed uint32
elemtype *_type // tipo de elemento
sendx uint // índice de envío
recvx uint // índice de recepción
recvq waitq // lista de esperas de recepción
sendq waitq // lista de esperas de envío
lock mutex
}Como se puede ver claramente, existe el campo lock. channel es esencialmente una cola sincronizada con bloqueo. Los otros campos se explican a continuación:
qcount: representa el número total de datosdataqsiz: tamaño de la cola circularbuf: puntero al array de tamañodataqsiz, es decir, la cola circularclosed: indica si el channel está cerradosendx,recvx: índices de envío y recepciónsendq,recvq: listas de goroutines en espera de envío y recepción, cuyos elementos sonruntime.sudoggotype waitq struct { first *sudog last *sudog }
Con la siguiente imagen se puede comprender claramente la estructura de channel:

Al usar las funciones len y cap en un channel, en realidad se devuelven sus campos hchan.qcount y hchan.dataqsiz.
Creación
Normalmente, hay una única forma de crear un channel, usando la función make:
ch := make(chan int, size)El compilador lo traduce a una llamada a la función runtime.makechan, que es responsable de la creación real del channel. Su código es el siguiente:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Esta lógica es bastante simple, principalmente asigna memoria para el channel. Primero calcula el tamaño de memoria estimado necesario según el size y el tipo de elemento elem.Size_ pasados, luego lo maneja en tres casos:
sizees 0, solo asignahchanSize- El elemento no contiene punteros, asigna el espacio de memoria correspondiente, y la memoria de la cola circular es continua con la memoria del channel
- El elemento contiene punteros, la memoria del channel y la cola circular se asigna por separado
Si los elementos almacenados en la cola circular son punteros, ya que referencian elementos externos, GC podría escanear estos punteros durante la fase de marcado-eliminación. Cuando se almacenan elementos no puntero en memoria continua, se evitan escaneos innecesarios. Después de asignar la memoria, finalmente se actualizan otros campos de información.
Cabe mencionar que cuando la capacidad del channel es un entero de 64 bits, se usa la función runtime.makechan64 para crearlo. Esencialmente también es una llamada a runtime.makechan, solo que hace una verificación de tipo adicional.
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}Generalmente, es mejor que size no exceda math.MaxInt32.
Envío
Al enviar datos a un channel, colocamos los datos a enviar a la derecha de la flecha:
ch <- struct{}{}El compilador lo traduce a runtime.chansend1. La función realmente responsable de enviar datos es runtime.chansend. chansend1 le pasa el puntero elem, que apunta al puntero del elemento enviado.
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Primero verifica si el channel es nil. block indica si la operación de envío actual es bloqueante (el valor de block está relacionado con la estructura select). Si el envío es bloqueante y el channel es nil, se produce un panic directamente. En caso de envío no bloqueante, se verifica directamente sin bloqueo si el channel está lleno. Si está lleno, se devuelve directamente.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Luego se adquiere el bloqueo y se verifica si el channel está cerrado. Si ya está cerrado, se produce un panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Después se hace dequeue de un sudog de la cola recvq, y luego se envía mediante la función runtime.send.
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}El contenido de la función send es el siguiente. Actualiza recvx y sendx, luego usa la función runtime.memmove para copiar directamente la memoria de los datos de comunicación a la dirección del elemento objetivo de la goroutine receptora. Luego, mediante la función runtime.goready, la goroutine receptora cambia al estado _Grunnable para participar nuevamente en la programación.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}En este proceso, como se puede encontrar una goroutine en espera de recepción, los datos se envían directamente al receptor sin almacenarse en la cola circular. Si no hay un receptor disponible y la capacidad es suficiente, se coloca en el búfer de la cola circular y se devuelve directamente.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Si el búfer está lleno, en caso de envío no bloqueante se devuelve directamente:
if !block {
unlock(&c.lock)
return false
}Si es envío bloqueante, se entra en el siguiente flujo de código:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Primero construye la goroutine actual como un sudog y la agrega a la cola de goroutines en espera de envío hchan.sendq. Luego, runtime.gopark hace que la goroutine actual se bloquee, cambie al estado _Gwaiting hasta que sea despertada nuevamente por el receptor. Además, mediante runtime.KeepAlive se mantiene vivo el dato a enviar para asegurar que el receptor lo copie exitosamente. Después de ser despertada, se entra en el proceso de finalización:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Como se puede ver, para el envío de datos a un channel hay las siguientes situaciones:
- El channel es
nil, el programa colapsa - El channel está cerrado, ocurre un panic
- La cola
recvqno está vacía, se envía directamente al receptor - No hay goroutines en espera, se agrega al búfer
- El búfer está lleno, la goroutine de envío se bloquea y espera a que otras goroutines reciban datos
Vale la pena señalar que en la lógica de envío anterior no se ve el manejo de datos que desbordan el búfer. Estos datos no pueden descartarse, se guardan en sudog.elem y son procesados por el receptor.
Recepción
En Go, hay dos formas de recibir datos de un channel. La primera es solo leer los datos:
data <- chLa segunda es determinar si la lectura de datos fue exitosa:
data, ok <- chLas dos sintaxis anteriores son traducidas por el compilador a llamadas a runtime.chanrecv1 y runtime.chanrecv2, pero en realidad son llamadas a runtime.chanrecv. La lógica de recepción es similar al inicio de la lógica de envío, primero verifica si el channel es nil.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Luego, en caso de lectura no bloqueante, se verifica sin bloqueo si el channel está vacío. Si el channel no está cerrado, se devuelve directamente. Si el channel está cerrado, se limpia la memoria del elemento recibido.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Luego se accede con bloqueo a la cola hchan.sendq. Se puede ver en la rama if c.closed != 0 que, incluso si el channel está cerrado, pero aún hay elementos en el channel, no se devuelve directamente. Todavía se ejecuta el código de consumo de elementos. Esta es la razón por la que se permite leer después de que el channel está cerrado.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Si el channel no está cerrado, se verifica si hay goroutines en espera de envío en la cola sendq. Si es así, la función runtime.recv maneja esa goroutine de envío.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copiar datos de la cola al receptor
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copiar datos del emisor a la cola
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Primer caso, la capacidad del channel es 0, es decir, channel sin búfer. El receptor copia directamente los datos del emisor mediante la función runtime.recvDirect. Segundo caso, el búfer está lleno. Aunque antes no se vio la lógica para verificar si el búfer está lleno, cuando la capacidad del búfer no es 0 y hay un emisor en espera, esto ya representa que el búfer está lleno. Solo cuando el búfer está lleno, el emisor se bloquea esperando enviar. Esta lógica es juzgada por el emisor. Luego, el receptor hace dequeue del elemento principal del búfer y copia su memoria al puntero del elemento receptor objetivo. Luego copia los datos que la goroutine de envío quiere enviar y los encola (aquí vemos cómo el receptor maneja los datos que desbordan el búfer). Finalmente, runtime.goready despierta la goroutine de envío, cambiándola al estado _Grunnable para unirse nuevamente a la programación.
Si no hay goroutines en espera de envío, se verifica si hay elementos en espera de consumo en el búfer. Se hace dequeue del elemento principal y se copia su memoria al elemento objetivo del receptor, luego se devuelve.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Recibir directamente de la cola
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Finalmente, si no hay elementos consumibles en el channel, runtime.gopark cambia la goroutine actual al estado _Gwaiting, bloqueándose hasta que sea despertada por la goroutine de envío.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Después de ser despertada, se devuelve. En este momento, el valor success devuelto proviene de sudog.success. Si el emisor envió datos exitosamente, este campo debería ser establecido en true por el emisor. Esta lógica se puede ver en la función runtime.send.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Correspondientemente, en el juicio de sudog.success al final de runtime.chansend del emisor, su origen también es la configuración en la función runtime.recv del receptor. A través de esto se puede descubrir que el receptor y el emisor se complementan para que el channel funcione normalmente. En general, recibir datos es un poco más complejo que enviar datos. Hay las siguientes situaciones:
- El channel es
nil, el programa colapsa - El channel está cerrado, si el channel está vacío se devuelve directamente, si no está vacío se salta al caso 5 para ejecutar
- La capacidad del búfer es 0, hay una goroutine en espera de envío en
sendq, se copian directamente los datos del emisor, luego se despierta el emisor - El búfer está lleno, hay una goroutine en espera de envío en
sendq, se hace dequeue del elemento principal del búfer, los datos del emisor se encolan, luego se despierta el emisor - El búfer no está lleno y la cantidad no es 0, se hace dequeue del elemento principal del búfer, luego se devuelve
- El búfer está vacío, se entra en estado de bloqueo, esperando ser despertado por el emisor
Cierre
Para cerrar un channel, usamos la función incorporada close:
close(ch)El compilador lo traduce a una llamada a runtime.closechan. Si el channel pasado es nil o ya está cerrado, se producirá un panic:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Luego se hace dequeue de todas las goroutines bloqueadas en las colas sendq y recvq de este channel, y todas se despiertan mediante runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// liberar todos los lectores
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// liberar todos los escritores (ellos harán panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Preparar todas las Gs ahora que hemos soltado el bloqueo del channel.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
Por cierto, Go permite channels unidireccionales, con las siguientes reglas:
- Un channel de solo lectura no puede enviar datos
- Un channel de solo lectura no puede cerrarse
- Un channel de solo escritura no puede leer datos
Estos errores se detectan temprano en la etapa de verificación de tipos durante la compilación, no se dejan para tiempo de ejecución. Los interesados pueden leer el código relacionado en los siguientes paquetes:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Determinar si está cerrado
Muy temprano (antes de go1), había una función incorporada closed para determinar si un channel estaba cerrado, pero pronto fue eliminada. Esto se debe a que el escenario de uso del channel suele ser multi-goroutine. Supongamos que devuelve true, ciertamente puede representar que el channel está cerrado. Pero si devuelve false, no significa realmente que el channel no esté cerrado, porque nadie sabe quién lo cerrará en el próximo momento. Por lo tanto, este valor de retorno no es confiable. Si se usa este valor de retorno como base para determinar si enviar datos al channel es aún más peligroso, porque enviar datos a un channel cerrado causará un panic.
Si realmente es necesario, se puede implementar uno mismo. Un方案 es determinar si el channel está cerrado escribiendo en él:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Pero esto también tiene efectos secundarios. Si no está cerrado, se escribirán datos redundantes, y se entrará en el proceso de manejo defer-recover, causando pérdida adicional de rendimiento. Por lo tanto, el方案 de escritura se puede descartar directamente. Para el方案 de lectura, se puede considerar, pero no se puede leer directamente el channel, porque leer directamente con el parámetro block en true bloqueará la goroutine. Se debe usar combinado con select. Cuando el channel se combina con select, es no bloqueante.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Esta situación solo parece un poco mejor que la anterior. Solo se aplica cuando el channel está cerrado y no hay elementos en el búfer del channel. Si hay elementos, se consumirá innecesariamente ese elemento. Todavía no hay una buena implementación.
Pero en realidad, no necesitamos determinar si el channel está cerrado. La razón ya se explicó al principio: el valor de retorno no es confiable. Usar correctamente el channel y cerrarlo correctamente es lo que deberíamos hacer. Por lo tanto:
- Nunca cerrar el channel en el receptor. El hecho de que cerrar un channel de solo lectura no pueda compilarse ya le dice claramente que no haga esto. Deje que el emisor haga esto.
- Si hay múltiples emisores, debe haber una goroutine separada para completar la operación de cierre. Asegúrese de que
closesea llamado por una sola parte y solo una vez. - Al pasar un channel, es mejor limitar a solo lectura o solo escritura
Siguiendo estos principios, se puede asegurar que no habrá grandes problemas.
