chan
Ein Channel ist eine spezielle Datenstruktur und ein typischer Vertreter der CSP-Philosophie in Go. Der Kern der CSP-Idee ist, dass Prozesse durch Nachrichtenkommunikation Daten austauschen. Entsprechend können wir durch Channels sehr einfach zwischen Goroutinen kommunizieren.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}Neben der Kommunikation können durch Channels auch Operationen wie die Synchronisation von Goroutinen realisiert werden. In Systemen, die Nebenläufigkeit erfordern, sind Channels fast überall zu finden. Um die Funktionsweise von Channels besser zu verstehen, werden im Folgenden die Prinzipien erläutert.
Struktur
Zur Laufzeit wird ein Channel durch die Struktur runtime.hchan repräsentiert, die nur wenige Felder enthält:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}Man kann deutlich das lock-Feld erkennen. Ein Channel ist tatsächlich eine gesperrte synchrone Ringwarteschlange. Die anderen Felder werden wie folgt erklärt:
qcount, gibt die Gesamtzahl der Daten andataqsize, die Größe der Ringwarteschlangebuf, ein Zeiger auf ein Array der Größedataqsize, also die Ringwarteschlangeclosed, ob der Channel geschlossen istsendx,recvx, repräsentieren die Sende- und Empfangsindizessendq,recvq, repräsentieren die verketteten Listen der sendenden und empfangenden Goroutinen, deren Elementeruntime.sudogsindgotype waitq struct { first *sudog last *sudog }Die folgende Abbildung zeigt deutlich die Struktur eines Channels:

Wenn die Funktionen len und cap auf einen Channel angewendet werden, werden tatsächlich die Felder hchan.qcount und hchan.dataqsiz zurückgegeben.
Erstellung
Normalerweise gibt es nur eine Möglichkeit, einen Channel zu erstellen: die Verwendung der make-Funktion:
ch := make(chan int, size)Der Compiler übersetzt dies in einen Aufruf der Funktion runtime.makechan, die für die tatsächliche Erstellung des Channels verantwortlich ist. Der Code sieht wie folgt aus:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}Diese Logik ist relativ einfach und hauptsächlich für die Speicherzuweisung des Channels zuständig. Zuerst wird anhand der übergebenen size und dem Elementtyp elem.size der erwartete Speicherbedarf berechnet. Dann wird in drei Fällen unterschieden:
sizeist 0, es wird nurhchanSizezugewiesen- Das Element enthält keine Zeiger, dann wird der entsprechende Speicherplatz zugewiesen und der Speicher der Ringwarteschlange ist zusammenhängend mit dem Speicher des Channels
- Das Element enthält Zeiger, der Speicher für den Channel und die Ringwarteschlange wird getrennt zugewiesen
Wenn in der Ringwarteschlange Zeigerelemente gespeichert werden, kann der GC diese Zeiger während der Mark-Sweep-Phase scannen, da sie auf externe Elemente verweisen. Wenn Nicht-Zeigerelemente gespeichert werden, vermeidet die Zuweisung in zusammenhängendem Speicher unnötige Scans. Nach Abschluss der Speicherzuweisung werden schließlich die anderen Felder zur Informationsspeicherung aktualisiert.
Übrigens: Wenn die Kanalkapazität eine 64-Bit-Ganzzahl ist, wird die Funktion runtime.makechan64 für die Erstellung verwendet. Diese ist im Wesentlichen ein Aufruf von runtime.makechan, führt jedoch zusätzlich eine Typprüfung durch.
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}Generell sollte size besser nicht math.MaxInt32 überschreiten.
Senden
Beim Senden von Daten an einen Channel platzieren wir die zu sendenden Daten rechts vom Pfeil:
ch <- struct{}{}Der Compiler übersetzt dies in runtime.chansend1. Die Funktion, die tatsächlich für das Senden von Daten verantwortlich ist, ist runtime.chansend. chansend1 übergibt ihr den elem-Zeiger, der auf das zu sendende Element zeigt.
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}Zuerst wird geprüft, ob der Channel nil ist. block gibt an, ob die aktuelle Sendoperation blockierend ist (der Wert von block hängt mit der select-Struktur zusammen). Wenn blockierend gesendet wird und der Channel nil ist, stürzt das Programm direkt ab. Im Fall einer nicht-blockierenden Sendung wird ohne Sperre direkt geprüft, ob der Channel voll ist. Wenn er voll ist, wird direkt zurückgekehrt.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}Anschließend wird die Sperre gesetzt und geprüft, ob der Channel geschlossen ist. Wenn er bereits geschlossen ist, wird ein panic ausgelöst:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}Anschließend wird ein sudog aus der recvq-Warteschlange entfernt und durch die Funktion runtime.send gesendet:
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}Der Inhalt der send-Funktion sieht wie folgt aus. Sie aktualisiert recvx und sendx und verwendet dann die Funktion runtime.memmove, um die Kommunikationsdaten direkt in die Zieladresse des empfangenden Goroutine zu kopieren. Anschließend wird die empfangende Goroutine durch die Funktion runtime.goready in den _Grunnable-Zustand versetzt, um erneut an der Scheduling-Phase teilzunehmen:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}In diesem Prozess werden die Daten direkt an den Empfänger gesendet, da eine wartende empfangende Goroutine gefunden wurde. Die Daten werden nicht in der Ringwarteschlange gespeichert. Wenn keine empfangende Goroutine verfügbar ist und die Kapazität ausreicht, werden die Daten in den Puffer der Ringwarteschlange gelegt und dann direkt zurückgekehrt:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}Wenn der Puffer voll ist, wird im Fall einer nicht-blockierenden Sendung direkt zurückgekehrt:
if !block {
unlock(&c.lock)
return false
}Wenn es sich um eine blockierende Sendung handelt, wird der folgende Code-Ablauf ausgeführt:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}Zuerst wird die aktuelle Goroutine in eine sudog-Struktur umgewandelt und in die Warteschlange hchan.sendq für sendende Goroutinen eingereiht. Dann wird die aktuelle Goroutine durch runtime.gopark blockiert und in den _Gwaiting-Zustand versetzt, bis sie erneut vom Empfänger geweckt wird. Dabei wird durch runtime.KeepAlive sichergestellt, dass die zu sendenden Daten erhalten bleiben, bis der Empfänger sie erfolgreich kopiert hat. Nach dem Aufwecken wird der abschließende Ablauf ausgeführt:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}Wie man sieht, gibt es beim Senden von Daten an einen Channel insgesamt folgende Situationen:
- Der Channel ist
nil, das Programm stürzt ab - Der Channel ist bereits geschlossen, ein
panictritt auf - Die
recvq-Warteschlange ist nicht leer, Daten werden direkt an den Empfänger gesendet - Keine Goroutine wartet, Daten werden in den Puffer gelegt
- Der Puffer ist voll, die sendende Goroutine geht in den blockierten Zustand und wartet darauf, dass andere Goroutinen Daten empfangen
Es ist erwähnenswert, dass in der oben beschriebenen Sende-Logik keine Behandlung für Daten zu finden ist, die den Puffer überlaufen lassen. Diese Daten können nicht verworfen werden. Sie werden in sudog.elem gespeichert und vom Empfänger verarbeitet.
Empfangen
In Go gibt es zwei Syntaxformen, um Daten aus einem Channel zu empfangen. Die erste ist nur das Lesen von Daten:
data <- chDie zweite ist die Überprüfung, ob die Daten erfolgreich gelesen wurden:
data, ok <- chDie beiden oben genannten Syntaxformen werden vom Compiler in Aufrufe von runtime.chanrecv1 und runtime.chanrecv2 übersetzt, die jedoch tatsächlich nur Aufrufe von runtime.chanrecv sind. Der Anfang der Empfangslogik ähnelt der Sende-Logik: Zuerst wird geprüft, ob der Channel nil ist.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}Anschließend wird im Fall einer nicht-blockierenden Leseoperation ohne Sperre geprüft, ob der Channel leer ist. Wenn der Channel nicht geschlossen ist, wird direkt zurückgekehrt. Wenn der Channel geschlossen ist, wird der Speicher des empfangenen Elements gelöscht.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}Anschließend wird die Sperre gesetzt und auf die hchan.sendq-Warteschlange zugegriffen. Durch den Zweig if c.closed != 0 kann man sehen, dass selbst wenn der Channel bereits geschlossen ist, aber noch Elemente im Channel vorhanden sind, nicht direkt zurückgekehrt wird. Stattdessen wird der Code zum Konsumieren der Elemente weiter ausgeführt. Das ist der Grund, warum nach dem Schließen des Channels weiterhin gelesen werden darf.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}Wenn der Channel nicht geschlossen ist, wird geprüft, ob in der sendq-Warteschlange eine Goroutine wartet, die senden möchte. Falls ja, wird die sendende Goroutine durch runtime.recv verarbeitet.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}Im ersten Fall beträgt die Kanalkapazität 0, also ein ungebufferter Channel. Der Empfänger kopiert die Daten direkt vom Sender durch die Funktion runtime.recvDirect. Im zweiten Fall ist der Puffer voll. Obwohl zuvor keine Logik zu sehen war, die prüft, ob der Puffer voll ist, bedeutet eine Kapazität ungleich 0 zusammen mit einem wartenden Sender tatsächlich, dass der Puffer voll ist. Nur wenn der Puffer voll ist, wartet der Sender blockiert auf das Senden. Diese Logik wird vom Sender beurteilt. Anschließend entnimmt der Empfänger das erste Element aus dem Puffer und kopiert seinen Speicher in den Zeiger des Zielelements. Dann werden die vom Sender zu sendenden Daten kopiert und in die Warteschlange eingereiht (hier sehen wir, wie der Empfänger mit Daten umgeht, die den Puffer überlaufen lassen). Schließlich wird die sendende Goroutine durch runtime.goready aufgeweckt und in den _Grunnable-Zustand versetzt, um erneut am Scheduling teilzunehmen.
Wenn keine Goroutine wartet, die senden möchte, wird geprüft, ob Elemente im Puffer auf den Konsum warten. Das erste Element wird aus der Warteschlange entfernt und sein Speicher in das Zielelement des Empfängers kopiert. Dann wird zurückgekehrt.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}Wenn schließlich keine konsumierbaren Elemente mehr im Channel vorhanden sind, wird die aktuelle Goroutine durch runtime.gopark in den _Gwaiting-Zustand versetzt und blockiert, bis sie von einer sendenden Goroutine aufgeweckt wird.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}Nach dem Aufwecken wird zurückgekehrt. Der zurückgegebene success-Wert stammt dabei von sudog.success. Wenn der Sender die Daten erfolgreich gesendet hat, sollte dieses Feld vom Sender auf true gesetzt werden. Diese Logik können wir in der Funktion runtime.send sehen.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}Entsprechend stammt die Beurteilung von sudog.success am Ende von runtime.chansend auf der Senderseite ebenfalls von der Einstellung durch den Empfänger in der Funktion runtime.recv. Daraus lässt sich erkennen, dass Empfänger und Sender erst durch ihr Zusammenspiel den reibungslosen Betrieb des Channels ermöglichen. Insgesamt ist der Empfang von Daten etwas komplexer als das Senden. Es gibt insgesamt folgende Situationen:
- Der Channel ist
nil, das Programm stürzt ab - Der Channel ist geschlossen. Wenn der Channel leer ist, wird direkt zurückgekehrt. Wenn er nicht leer ist, wird mit Situation 5 fortgefahren
- Die Pufferkapazität ist 0, in
sendqwartet eine Goroutine auf das Senden. Die Daten des Senders werden direkt kopiert und dann wird der Sender aufgeweckt - Der Puffer ist voll, in
sendqwartet eine Goroutine auf das Senden. Das erste Element des Puffers wird entfernt, die Daten des Senders werden in die Warteschlange eingereiht und dann wird der Sender aufgeweckt - Der Puffer ist nicht voll und die Anzahl ist nicht 0. Das erste Element des Puffers wird entfernt und dann zurückgekehrt
- Der Puffer ist leer. Die Goroutine geht in den blockierten Zustand und wartet darauf, vom Sender aufgeweckt zu werden
Schließen
Um einen Channel zu schließen, verwenden wir die eingebaute Funktion close:
close(ch)Der Compiler übersetzt dies in einen Aufruf von runtime.closechan. Wenn der übergebene Channel nil ist oder bereits geschlossen wurde, wird direkt ein panic ausgelöst:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}Anschließend werden alle blockierten Goroutinen aus den Warteschlangen sendq und recvq dieses Channels entfernt und alle durch runtime.goready aufgeweckt:
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
Übrigens erlaubt Go unidirektionale Channels mit folgenden Regeln:
- Ein Nur-Lese-Channel kann keine Daten senden
- Ein Nur-Lese-Channel kann nicht geschlossen werden
- Ein Nur-Schreib-Channel kann keine Daten lesen
Diese Fehler werden bereits zur Kompilierzeit während der Typprüfung erkannt und nicht zur Laufzeit überlassen. Interessierte können den entsprechenden Code in den folgenden beiden Paketen nachlesen:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")Prüfen ob geschlossen
In sehr frühen Zeiten (vor Go 1) gab es eine eingebaute Funktion closed, um zu prüfen, ob ein Channel geschlossen ist. Diese wurde jedoch schnell entfernt. Der Grund dafür ist, dass Channels normalerweise in Szenarien mit mehreren Goroutinen verwendet werden. Angenommen, sie gibt true zurück, kann dies tatsächlich bedeuten, dass der Channel bereits geschlossen ist. Wenn sie jedoch false zurückgibt, bedeutet das nicht, dass der Channel wirklich nicht geschlossen ist, denn niemand weiß, wer den Channel im nächsten Moment schließen wird. Daher ist dieser Rückgabewert nicht vertrauenswürdig. Wenn man sich auf diesen Rückgabewert verlässt, um zu entscheiden, ob Daten an den Channel gesendet werden sollen, ist das noch gefährlicher, da das Senden von Daten an einen geschlossenen Channel ein panic auslöst.
Wenn es wirklich notwendig ist, kann man selbst eine Implementierung erstellen. Eine Möglichkeit ist, durch Schreiben in den Channel zu prüfen, ob er geschlossen ist. Der Code sieht wie folgt aus:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}Dies hat jedoch Nebenwirkungen. Wenn der Channel nicht geschlossen ist, werden redundante Daten hineingeschrieben. Außerdem wird der defer-recover-Prozess durchlaufen, was zusätzliche Leistungseinbußen verursacht. Daher kann die Schreib-Methode direkt verworfen werden. Für die Lese-Methode kann man dies in Betracht ziehen, aber man kann nicht direkt aus dem Channel lesen, da ein direktes Lesen mit dem block-Parameterwert true die Goroutine blockieren würde. Man sollte dies in Kombination mit select verwenden. Wenn ein Channel mit select kombiniert wird, ist er nicht-blockierend.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}Diese Methode sieht nur etwas besser aus als die obige. Sie funktioniert nur, wenn der Channel geschlossen ist und sich keine Elemente im Puffer befinden. Wenn Elemente vorhanden sind, wird zusätzlich ein Element konsumiert. Es gibt immer noch keine perfekte Implementierung.
Tatsächlich müssen wir gar nicht prüfen, ob ein Channel geschlossen ist. Der Grund wurde bereits am Anfang erläutert: Der Rückgabewert ist nicht vertrauenswürdig. Wir sollten den Channel korrekt verwenden und korrekt schließen. Daher:
- Schließen Sie niemals einen Channel auf der Empfängerseite. Dass das Schließen eines Nur-Lese-Channels nicht kompiliert werden kann, sagt Ihnen bereits eindeutig, dass Sie das nicht tun sollten. Überlassen Sie diese Aufgabe dem Sender.
- Wenn es mehrere Sender gibt, sollte eine separate Goroutine für das Schließen zuständig sein, um sicherzustellen, dass
closenur von einer Seite aufgerufen wird und nur einmal aufgerufen wird. - Beim Übergeben von Channels ist es am besten, sie auf Nur-Lesen oder Nur-Schreiben zu beschränken.
Wenn Sie diese Prinzipien befolgen, können Sie sicherstellen, dass keine größeren Probleme auftreten.
