chan
channel — это особая структура данных и типичный представитель философии CSP в Go. Основная идея философии CSP заключается в том, что процессы обмениваются данными через обмен сообщениями. Соответственно, через channel мы можем легко общаться между goroutine.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Помимо коммуникации, через channel мы также можем реализовать операции синхронизации goroutine. В системах, требующих конкурентности, channel практически вездесущ. Чтобы лучше понять, как работает channel, ниже мы представим его принципы.
Структура
Во время выполнения channel представляется структурой runtime.hchan, которая содержит следующие поля:
type hchan struct {
qcount uint // общее количество данных в очереди
dataqsiz uint // размер циклической очереди
buf unsafe.Pointer // указывает на массив элементов dataqsiz
elemsize uint16
closed uint32
elemtype *_type // тип элемента
sendx uint // индекс отправки
recvx uint // индекс получения
recvq waitq // список ожидающих получение
sendq waitq // список ожидающих отправку
lock mutex
}Как видно из поля lock, channel фактически является блокируемой синхронной циклической очередью. Другие поля объясняются ниже:
qcount: общее количество элементов данныхdataqsiz: размер циклической очередиbuf: указатель на массив размераdataqsiz, который является циклической очередьюclosed: закрыт ли channelsendx,recvx: индексы отправки и полученияsendq,recvq: связные списки goroutine для отправки и получения, состоящие изruntime.sudoggotype waitq struct { first *sudog last *sudog }
Структура channel может быть чётко понята из рисунка ниже:

При использовании функций len и cap на channel, фактически возвращаются его поля hchan.qcount и hchan.dataqsiz.
Создание
Обычно существует только один способ создания channel, использование функции make:
ch := make(chan int, size)Компилятор транслирует это в вызов runtime.makechan, который отвечает за фактическое создание channel. Его код следующий:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Эта логика довольно проста, в основном выделение памяти для channel. Сначала вычисляется ожидаемый размер памяти на основе переданного size и типа элемента elem.Size_, затем обрабатываются три случая:
sizeравен 0, выделяется толькоhchanSize- Элемент не содержит указателей, выделяется соответствующий размер памяти, и память циклической очереди смежна с памятью channel
- Элемент содержит указатели, память channel и циклической очереди выделяется отдельно
Если циклическая очередь содержит элементы-указатели, поскольку они ссылаются на внешние элементы, GC может сканировать эти указатели во время фазы mark-sweep. При хранении элементов без указателей в смежной памяти это позволяет избежать ненужного сканирования. После выделения памяти обновляются другие информационные поля.
Кстати, когда ёмкость channel является 64-битным целым числом, для создания используется функция runtime.makechan64. Это по сути вызов runtime.makechan, просто с дополнительной проверкой типа:
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}Обычно size не должен превышать math.MaxInt32.
Отправка
При отправке данных в channel мы помещаем данные для отправки справа от стрелки:
ch <- struct{}{}Компилятор транслирует это в runtime.chansend1, а функция, фактически отвечающая за отправку данных — runtime.chansend. chansend1 передаёт указатель elem, который указывает на отправляемый элемент:
// точка входа для c <- x из скомпилированного кода.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Сначала проверяется, является ли channel nil. block указывает, является ли текущая операция отправки блокирующей (значение block связано со структурой select). Если блокирующая отправка и channel равен nil, происходит panic напрямую. В случае неблокирующей отправки напрямую проверяется, заполнен ли channel без блокировки, и если заполнен, немедленно возвращается:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Затем захватывается блокировка и проверяется, закрыт ли channel. Если уже закрыт, происходит panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}После этого извлекается sudog из очереди recvq, затем отправляется через функцию runtime.send:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}Содержимое функции send следующее. Обновляются recvx и sendx, затем используется функция runtime.memmove для прямого копирования памяти коммуникационных данных в адрес целевого элемента goroutine-получателя, затем используется функция runtime.goready для перевода goroutine-получателя в состояние _Grunnable для повторного участия в планировании:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}В этом процессе, поскольку может быть найден ожидающий goroutine-получатель, данные отправляются напрямую получателю без хранения в циклической очереди. Если получатель недоступен и ёмкость достаточна, данные помещаются в буфер циклической очереди и возвращается напрямую:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Если буфер заполнен, в случае неблокирующей отправки возвращается напрямую:
if !block {
unlock(&c.lock)
return false
}Если блокирующая отправка, входит в следующий поток кода:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Сначала конструирует текущую goroutine как sudog и добавляет в очередь ожидающих на отправку goroutine hchan.sendq, затем использует runtime.gopark для блокировки текущей goroutine, изменяя её состояние на _Gwaiting до пробуждения получателем. Также используется runtime.KeepAlive для удержания отправляемых данных, чтобы обеспечить успешное копирование получателем. При пробуждении входит в следующий поток очистки:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Как видно, для операций отправки channel существуют следующие случаи:
- Channel равен
nil, программа panic - Channel закрыт, происходит panic
- Очередь
recvqне пуста, отправляется напрямую получателю - Нет ожидающих goroutine, добавляется в буфер
- Буфер заполнен, goroutine-отправитель переходит в заблокированное состояние, ожидая получения данных другими goroutine
Стоит отметить, что в логике отправки выше мы не видели обработки данных буфера переполнения. Эти данные не могут быть отброшены; они сохраняются в sudog.elem и обрабатываются получателем.
Получение
В Go существуют два синтаксиса для получения данных из channel. Первый — только чтение данных:
data <- chВторой — проверка успешности чтения данных:
data, ok <- chВышеуказанные два синтаксиса транслируются компилятором в вызовы runtime.chanrecv1 и runtime.chanrecv2, но фактически это вызовы runtime.chanrecv. Начало логики получения аналогично логике отправки, оба сначала проверяют, равен ли channel nil:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Затем в случае неблокирующего чтения без блокировки проверяется, пуст ли channel. Если channel не закрыт, возвращается напрямую. Если channel закрыт, очищается память принимающего элемента:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Затем захватывается блокировка и accessed очередь hchan.sendq. Из ветки if c.closed != 0 ниже видно, что даже если channel закрыт, если в channel всё ещё есть элементы, он не возвращается напрямую, а продолжает выполнять код потребления элементов. Поэтому чтение всё ещё разрешено после закрытия channel:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Если channel не закрыт, проверяется, есть ли в очереди sendq goroutine, ожидающие отправки. Если да, runtime.recv обрабатывает goroutine-отправителя:
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// копируем данные из очереди в получатель
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// копируем данные от отправителя в очередь
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Первый случай: ёмкость channel равна 0 (небуферизированный channel), получатель напрямую копирует данные от отправителя через функцию runtime.recvDirect. Второй случай: буфер заполнен. Хотя мы не видели логики проверки заполненности буфера, когда ёмкость буфера не равна 0 и есть отправитель, ожидающий отправки, это означает, что буфер уже заполнен, потому что только когда буфер заполнен, отправитель блокируется в ожидании отправки. Эта логика определяется отправителем. Затем получатель извлекает головной элемент из буфера и копирует его память в целевой указатель принимающего элемента, затем копирует и помещает в очередь данные goroutine-отправителя для отправки (здесь мы видим, как получатель обрабатывает данные буфера переполнения). Наконец, runtime.goready пробуждает goroutine-отправителя, переводя его в состояние _Grunnable для повторного участия в планировании.
Если нет goroutine, ожидающих отправки, проверяется, есть ли в буфере элементы, ожидающие потребления, извлекается головной элемент и копируется его память в целевой элемент получателя, затем возвращается:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Получаем напрямую из очереди
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Наконец, если в channel нет потребляемых элементов, runtime.gopark изменяет состояние текущей goroutine на _Gwaiting, блокируя и ожидая пробуждения goroutine-отправителем:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}После пробуждения возвращается. В этот момент возвращённое значение success приходит из sudog.success. Если отправитель успешно отправил данные, это поле должно быть установлено в true отправителем, что мы можем видеть в функции runtime.send:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Соответственно, в отправителе runtime.chansend суждение sudog.success приходит из установки получателя в функции runtime.recv. Через это мы можем обнаружить, что получатель и отправитель дополняют друг друга, заставляя channel работать правильно. В целом, получение данных немного сложнее отправки, со следующими случаями:
- Channel равен
nil, программа panic - Channel закрыт, если channel пуст — возвращается напрямую, если не пуст — переход к случаю 5
- Ёмкость буфера равна 0, в
sendqесть ожидающий отправитель, напрямую копируются данные отправителя, затем пробуждается отправитель - Буфер заполнен, в
sendqесть ожидающий отправитель, извлекается головной элемент буфера, помещаются данные отправителя, затем пробуждается отправитель - Буфер не заполнен и количество не 0, извлекается головной элемент буфера, затем возвращается
- Буфер пуст, входит в заблокированное состояние, ожидая пробуждения отправителем
Закрытие
Для закрытия channel мы используем встроенную функцию close:
close(ch)Компилятор транслирует это в вызов runtime.closechan. Если переданный channel равен nil или уже закрыт, происходит panic напрямую:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Затем извлекаются все заблокированные goroutine из sendq и recvq этого channel и все пробуждаются через runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// освобождаем всех читателей
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// освобождаем всех писателей (они panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
Кстати, Go разрешает однонаправленные channel, со следующими правилами:
- Channel только для чтения не может отправлять данные
- Channel только для чтения не может быть закрыт
- Channel только для записи не может читать данные
Эти ошибки обнаруживаются во время проверки типов на этапе компиляции, не оставляются на runtime. Если интересно, вы можете прочитать соответствующий код в этих двух пакетах:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Проверка на закрытие
В ранние дни (до go1) существовала встроенная функция closed для проверки, закрыт ли channel, но она была быстро удалена. Это потому, что channel обычно используются в много-goroutine сценариях. Если возвращается true, это действительно означает, что channel закрыт. Но если возвращается false, это не означает, что channel действительно не закрыт, потому что никто не знает, кто закроет channel в следующий момент. Поэтому это возвращаемое значение ненадёжно. Использование этого возвращаемого значения как основания для решения, отправлять ли данные в channel, ещё более опасно, потому что отправка в закрытый channel вызывает panic.
Если действительно нужно, вы можете реализовать собственное. Один подход — проверка записью в channel, как показано ниже:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Однако это также имеет побочные эффекты. Если не закрыт, записываются избыточные данные, и входит процесс обработки defer-recover, вызывая дополнительную потерю производительности. Поэтому подход с записью можно напрямую отбросить. Для подхода с чтением можно рассмотреть, но нельзя читать channel напрямую, потому что прямое чтение со значением параметра block равным true заблокирует goroutine. Следует использовать с select. Когда channel комбинируется с select, он неблокирующий:
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Это просто выглядит немного лучше вышеупомянутого. Применяется только когда channel закрыт и в буфере channel нет элементов. Если есть элементы, это ненужно потребит этот элемент. Всё ещё нет хорошей реализации.
Но на самом деле нам вообще не нужно проверять, закрыт ли channel. Причина уже была объяснена в начале: возвращаемое значение ненадёжно. Правильное использование channel и правильное закрытие channel — это то, что мы должны делать. Поэтому:
- Никогда не закрывайте channel на стороне получателя. Тот факт, что закрытие channel только для чтения не компилируется, уже говорит вам не делать этого. Пусть это делает отправитель.
- Если есть несколько отправителей, пусть отдельная goroutine завершает операцию закрытия, обеспечивая вызов
closeтолько одной стороной и только один раз. - При передаче channel лучше ограничиться только для чтения или только для записи
Следование этим принципам гарантирует отсутствие серьёзных проблем.
