chan
channel é uma estrutura de dados especial, é um representante típico do pensamento CSP da linguagem Go. O núcleo do pensamento CSP é que os processos trocam dados através de comunicação por mensagens. Correspondentemente, através do channel podemos facilmente comunicar entre goroutines.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Além da comunicação, através do channel também podemos implementar operações como sincronização de goroutines. Em sistemas que necessitam de concorrência, a figura do channel é quase onipresente. Para entender melhor como o channel funciona, abaixo apresentaremos seus princípios.
Estrutura
O channel é representado em tempo de execução pela estrutura runtime.hchan, que contém campos não muito numerosos, como abaixo:
type hchan struct {
qcount uint // total de dados na fila
dataqsiz uint // tamanho da fila circular
buf unsafe.Pointer // aponta para um array de elementos dataqsiz
elemsize uint16
closed uint32
elemtype *_type // tipo do elemento
sendx uint // índice de envio
recvx uint // índice de recebimento
recvq waitq // lista de waiters de recebimento
sendq waitq // lista de waiters de envio
lock mutex
}Podemos ver claramente o campo lock acima. O channel é na verdade uma fila circular síncrona com bloqueio. Os outros campos são descritos abaixo:
qcount, representa o número total de dadosdataqsize, tamanho da fila circularbuf, ponteiro para o array de tamanhodataqsize, que é a fila circularclosed, se o channel está fechadosendx,recvx, representam os índices de envio e recebimentosendq,recvq, representam as listas encadeadas de goroutines de envio e recebimento, cujos elementos sãoruntime.sudoggotype waitq struct { first *sudog last *sudog }
Através da figura abaixo podemos entender claramente a estrutura do channel:

Ao usar as funções len e cap no channel, na verdade retornamos seus campos hchan.qcount e hchan.dataqsiz.
Criação
Normalmente, há apenas uma maneira de criar um channel, usando a função make:
ch := make(chan int, size)O compilador traduz isso para uma chamada à função runtime.makechan, que é responsável pela criação real do channel. Seu código é mostrado abaixo:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Esta lógica é relativamente simples, principalmente alocando memória para o channel. Primeiro calcula o tamanho de memória esperado necessário com base no size e tipo de elemento elem.size passados, depois trata em três situações:
sizeé 0, aloca apenashchanSize- O elemento não contém ponteiro, então aloca espaço de memória correspondente, e a memória da fila circular é contínua com a memória do channel
- O elemento contém ponteiro, a memória do channel e da fila circular é alocada separadamente
Se os elementos armazenados na fila circular forem elementos de ponteiro, como eles referenciam elementos externos, o GC pode escanar esses ponteiros durante a fase de marcação-limpeza. Quando elementos não-ponteiro são armazenados em memória contínua, isso evita varreduras desnecessárias. Após a alocação de memória, finalmente atualiza alguns outros campos de informações de registro.
Vale mencionar que quando a capacidade do channel é um inteiro de 64 bits, usa-se a função runtime.makechan64 para criar. Essencialmente também é uma chamada à runtime.makechan, apenas faz uma verificação de tipo adicional:
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}Geralmente é melhor não exceder math.MaxInt32 para size.
Envio
Ao enviar dados para um channel, colocamos os dados a serem enviados à direita da seta:
ch <- struct{}{}O compilador traduz isso para runtime.chansend1. A função realmente responsável por enviar dados é runtime.chansend. chansend1 passa a ela o ponteiro elem, que aponta para o ponteiro do elemento enviado:
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Primeiro verifica se o channel é nil. block indica se a operação de envio atual é bloqueante (o valor de block está relacionado à estrutura select). Se o envio for bloqueante e o channel for nil, ocorre panic diretamente. No caso de envio não bloqueante, verifica diretamente sem bloqueio se o channel está cheio. Se estiver cheio, retorna diretamente:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Em seguida, adquire o bloqueio e verifica se o channel está fechado. Se já estiver fechado, ocorre panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Depois disso, faz dequeue de um sudog da fila recvq, e envia através da função runtime.send:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}O conteúdo da função send é o seguinte. Ela atualiza recvx e sendx, depois usa a função runtime.memmove para copiar diretamente a memória dos dados de comunicação para o endereço do elemento de destino da goroutine receptora. Em seguida, usa a função runtime.goready para colocar a goroutine receptora no estado _Grunnable, para que possa participar novamente do escalonamento:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}Neste processo acima, como é possível encontrar a goroutine aguardando recebimento, os dados são enviados diretamente para o receptor, sem serem armazenados na fila circular. Se não houver receptor disponível e a capacidade for suficiente, será colocado no buffer da fila circular e retorna diretamente:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Se o buffer estiver cheio e for envio não bloqueante, retorna diretamente:
if !block {
unlock(&c.lock)
return false
}Se for envio bloqueante, entra no seguinte fluxo de código:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Primeiro constrói a goroutine atual como sudog e adiciona à fila de goroutines aguardando envio hchan.sendq. Depois runtime.gopark bloqueia a goroutine atual, mudando para o estado _Gwaiting até ser despertada novamente pelo receptor. E através de runtime.KeepAlive mantém os dados a serem enviados vivos para garantir que o receptor copie com sucesso. Quando for despertado, entra no processo de finalização:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Podemos ver que para envio de dados através do channel, há as seguintes situações:
- Channel é
nil, o programa entra em panic - Channel está fechado, ocorre
panic - A fila
recvqnão está vazia, envia diretamente para o receptor - Sem goroutine aguardando, adiciona ao buffer
- Buffer cheio, a goroutine de envio entra em estado bloqueante, aguardando que outras goroutines recebam os dados
Vale notar que na lógica de envio acima não vimos tratamento para dados excedentes do buffer. Esses dados não podem ser descartados, eles são salvos em sudog.elem, e serão processados pelo receptor.
Recebimento
Em Go, há duas formas de receber dados de um channel. A primeira é apenas ler os dados:
data <- chA segunda é verificar se a leitura foi bem-sucedida:
data, ok <- chAs duas sintaxes acima são traduzidas pelo compilador para chamadas a runtime.chanrecv1 e runtime.chanrecv2, mas na verdade são apenas chamadas a runtime.chanrecv. O início da lógica de recebimento é similar à de envio, primeiro verifica se o channel é nulo:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Em seguida, no caso de leitura não bloqueante, verifica sem bloqueio se o channel está vazio. Se o channel não estiver fechado, retorna diretamente. Se estiver fechado, limpa a memória do elemento recebido:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Depois adquire o bloqueio e acessa a fila hchan.sendq. Podemos ver através da branch if c.closed != 0 que, mesmo que o channel esteja fechado, se ainda houver elementos no channel, não retorna diretamente. Ainda executa o código de consumo de elementos abaixo. É por isso que ainda é permitido ler após o channel ser fechado:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Se o channel não estiver fechado, verifica se há goroutines aguardando envio na fila sendq. Se houver, a função runtime.recv processa essa goroutine de envio:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copia dados da fila para o receptor
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copia dados do remetente para a fila
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Primeira situação: capacidade do channel é 0, ou seja, channel sem buffer. O receptor copia diretamente os dados do remetente através da função runtime.recvDirect. Segunda situação: buffer cheio. Embora não tenhamos visto antes a lógica de verificar se o buffer está cheio, na verdade quando a capacidade do buffer não é 0 e há remetente aguardando envio, já representa que o buffer está cheio. Porque só quando o buffer está cheio o remetente bloqueia aguardando envio. Esta lógica é julgada pelo remetente. Em seguida, o receptor faz dequeue do elemento da cabeça do buffer e copia sua memória para o ponteiro do elemento receptor de destino. Depois copia os dados que a goroutine de envio quer enviar e enfileira (aqui vemos como o receptor processa dados excedentes do buffer). Finalmente, runtime.goready desperta a goroutine de envio, colocando-a no estado _Grunnable para se juntar novamente ao escalonamento.
Se não houver goroutine aguardando envio, verifica se há elementos aguardando consumo no buffer. Faz dequeue do elemento da cabeça e copia sua memória para o elemento de destino do receptor, depois retorna:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Recebe diretamente da fila
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Por fim, se não houver elementos consumíveis no channel, usa runtime.gopark para colocar a goroutine atual no estado _Gwaiting, bloqueando e aguardando até ser despertada pela goroutine de envio:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Após ser despertado, retorna. Neste momento o valor de success retornado vem de sudog.success. Se o remetente enviou os dados com sucesso, este campo deve ser definido como true pelo remetente. Podemos ver esta lógica na função runtime.send:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Correspondentemente, no final de runtime.chansend do remetente, o julgamento de sudog.success tem origem na configuração da função runtime.recv do receptor. Através disso podemos descobrir que receptor e remetente se complementam para que o channel funcione normalmente. No geral, receber dados é um pouco mais complexo que enviar dados. Há as seguintes situações:
- Channel é
nil, o programa entra em panic - Channel está fechado, se o channel estiver vazio retorna diretamente, se não estiver vazio pula para a situação 5
- Capacidade do buffer é 0, há goroutine aguardando envio em
sendq, então copia diretamente os dados do remetente e desperta o remetente - Buffer cheio, há goroutine aguardando envio em
sendq, faz dequeue do elemento da cabeça do buffer, enqueue dos dados do remetente, e desperta o remetente - Buffer não cheio e quantidade não zero, faz dequeue do elemento da cabeça do buffer e retorna
- Buffer vazio, entra em estado bloqueante, aguardando ser despertado pelo remetente
Fechamento
Para fechar um channel, usamos a função integrada close:
close(ch)O compilador traduz isso para uma chamada a runtime.closechan. Se o channel passado for nil ou já estiver fechado, ocorre panic diretamente:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Em seguida, faz dequeue de todas as goroutines bloqueadas nas filas sendq e recvq deste channel e as desperta todas através de runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// libera todos os leitores
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// libera todos os escritores (eles entrarão em panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Prepara todas as Gs agora que liberamos o bloqueio do channel.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
Vale mencionar que Go permite channels unidirecionais, com as seguintes regras:
- Channel somente leitura não pode enviar dados
- Channel somente leitura não pode ser fechado
- Channel somente escrita não pode receber dados
Esses erros são detectados já na fase de verificação de tipos em tempo de compilação, não deixando para tempo de execução. Se estiver interessado, pode ler o código relacionado nestes dois pacotes:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Verificar fechamento
Muito tempo atrás (antes do go1), havia uma função integrada closed para verificar se um channel estava fechado, mas foi rapidamente removida. Isso porque o cenário de uso do channel normalmente envolve múltiplas goroutines. Supondo que retorne true, realmente representa que o channel está fechado. Mas se retornar false, não significa que o channel realmente não está fechado, pois ninguém sabe quem o fechará no próximo momento. Portanto, este valor de retorno não é confiável. Usar este valor de retorno como base para decidir enviar dados para o channel é ainda mais perigoso, pois enviar dados para um channel fechado causa panic.
Se realmente necessário, pode implementar por conta própria. Uma solução é verificar através da escrita no channel, como no código abaixo:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Mas isso também tem efeitos colaterais. Se não estiver fechado, escreverá dados redundantes, e entrará no processo de tratamento defer-recover, causando perda adicional de desempenho. Portanto, a solução de escrita pode ser descartada diretamente. Para a solução de leitura, pode considerar, mas não pode ler diretamente o channel, pois ler diretamente com parâmetro block igual a true bloqueará a goroutine. Deve ser usado em conjunto com select. Quando combinado com select, o channel é não bloqueante:
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Esta situação parece apenas um pouco melhor que a anterior. Aplica-se apenas quando o channel está fechado e não há elementos no buffer do channel. Se houver elementos, consumirá inutilmente este elemento. Ainda não há uma boa implementação.
Mas na verdade não precisamos verificar se o channel está fechado. O motivo já foi explicado no início: o valor de retorno não é confiável. O que devemos fazer é usar corretamente o channel e fechá-lo corretamente. Portanto:
- Nunca feche o channel no lado receptor. O fato de que fechar um channel somente leitura não passa pela compilação já diz claramente para não fazer isso. Deixe isso para o remetente.
- Se houver múltiplos remetentes, deve-se usar uma goroutine separada para completar a operação de fechamento, garantindo que
closeseja chamado por apenas um lado e apenas uma vez. - Ao passar o channel, é melhor restringir para somente leitura ou somente escrita
Seguindo estes princípios, garante-se que não haverá grandes problemas.
