chan
القناة هي هيكل بيانات خاص، وهي ممثل نموذجي لفلسفة CSP في لغة Go. الفكرة الأساسية لـ CSP هي أن العمليات تتواصل من خلال الرسائل لتبادل البيانات. وبالمثل، من خلال القناة يمكننا بسهولة التواصل بين الكوروتينات.
import "fmt"
func main() {
done := make(chan struct{})
go func() {
// do something
done <- struct{}{}
}()
<-done
fmt.Println("finished")
}بالإضافة إلى التواصل، يمكن أيضًا استخدام القناة لتحقيق عمليات مثل تزامن الكوروتينات. وفي الأنظمة التي تتطلب التزامن، تظهر القنوات تقريبًا في كل مكان. لفهم أفضل لكيفية عمل القناة، سأقدم مبدأها أدناه.
البنية
يتم تمثيل القناة في وقت التشغيل ببنية runtime.hchan، التي لا تحتوي على الكثير من الحقول، كما يلي:
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
lock mutex
}من الواضح جدًا رؤية حقل lock أعلاه، القناة في الواقع هي قائمة انتظار حلقية متزامنة مؤمنة. باقي الحقول موضحة أدناه:
qcount، يمثل إجمالي عدد البياناتdataqsize، حجم قائمة الانتظار الحلقيةbuf، مؤشر لمصفوفة بحجمdataqsize، وهي قائمة الانتظار الحلقيةclosed، ما إذا كانت القناة مغلقةsendx،recvx، تمثل فهارس الإرسال والاستقبالsendq،recvq، تمثل قوائم الكوروتينات المرسلة والمستقبلة، وعنصرها هوruntime.sudoggotype waitq struct { first *sudog last *sudog }من خلال الصورة أدناه يمكن فهم بنية القناة بوضوح

عند استخدام الدالتين len و cap على القناة، ما يتم إرجاعه في الواقع هو حقلا hchan.qcount و hchan.dataqsiz.
الإنشاء
بشكل طبيعي، هناك طريقة واحدة فقط لإنشاء القناة، وهي استخدام دالة make:
ch := make(chan int, size)سيقوم المترجم بترجمتها إلى استدعاء دالة runtime.makechan، التي تكون مسؤولة عن الإنشاء الفعلي للقناة. الكود الخاص بها كما يلي:
func makechan(t *chantype, size int) *hchan {
elem := t.Elem
mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))
var c *hchan
switch {
case mem == 0:
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.PtrBytes == 0:
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.Size_)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}هذا المنطق بسيط نسبيًا، وهو يقوم أساسًا بتخصيص الذاكرة للقناة. أولاً يحسب حجم الذاكرة المتوقع بناءً على size الممرر ونوع العنصر elem.size، ثم يعالج ثلاث حالات:
sizeيساوي 0، يتم تخصيصhchanSizeفقط- العنصر لا يحتوي على مؤشرات، يتم تخصيص مساحة بالحجم المقابل، وذاكرة قائمة الانتظار الحلقية تكون متصلة بذاكرة القناة
- العنصر يحتوي على مؤشرات، يتم تخصيص ذاكرة القناة وقائمة الانتظار الحلقية بشكل منفصل
إذا كانت قائمة الانتظار الحلقية تحتوي على عناصر من نوع المؤشر، لأنها تشير إلى عناصر خارجية، فقد يقوم GC بمسح هذه المؤشرات في مرحلة التوسيم-التنظيف. عندما يتم تخزين عناصر غير مؤشر في ذاكرة متصلة، يتم تجنب المسح غير الضروري. بعد انتهاء تخصيص الذاكرة، يتم تحديث بعض حقول المعلومات الأخرى.
بالمناسبة، عندما تكون سعة القناة عددًا صحيحًا 64 بت، يتم استخدام دالة runtime.makechan64 للإنشاء، وهي في الأساس استدعاء لـ runtime.makechan، مع إضافة فحص للنوع فقط.
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}بشكل عام، من الأفضل ألا يتجاوز size قيمة math.MaxInt32.
الإرسال
عند إرسال البيانات إلى القناة، نضع البيانات المراد إرسالها على الجانب الأيمن من السهم:
ch <- struct{}{}سيقوم المترجم بترجمتها إلى runtime.chansend1، والدالة المسؤولة فعليًا عن الإرسال هي runtime.chansend. ستقوم chansend1 بتمرير مؤشر elem، الذي يشير إلى العنصر المرسل.
// entry point for c <- x from compiled code.
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}أولاً يتم فحص ما إذا كانت القناة nil. block يشير إلى ما إذا كانت عملية الإرسال الحالية حاجبة (قيمة block مرتبطة ببنية select). إذا كان الإرسال حاجبًا والقناة nil، سيتعطل البرنامج مباشرة. في حالة الإرسال غير الحاجب، يتم الحكم على ما إذا كانت القناة ممتلئة بدون قفل، وإذا كانت ممتلئة يتم العودة مباشرة.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
if !block {
return false
}
gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2)
throw("unreachable")
}
if !block && c.closed == 0 && full(c) {
return false
}
...
}بعد ذلك يتم القفل وفحص ما إذا كانت القناة مغلقة، وإذا كانت مغلقة سيحدث panic:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
...
}بعد ذلك يتم إخراج sudog من قائمة انتظار recvq، ثم يتم الإرسال بواسطة دالة runtime.send.
if sg := c.recvq.dequeue(); sg != nil {
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}محتوى دالة send كما يلي، ستقوم بتحديث recvx و sendx، ثم استخدام دالة runtime.memmove لنسخ ذاكرة بيانات الاتصال مباشرة إلى عنوان العنصر المستهدف للكوروتين المستقبل، ثم من خلال دالة runtime.goready يجعل الكوروتين المستقبل في حالة _Grunnable ليتمكن من المشاركة في الجدولة مرة أخرى.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
if sg.elem != nil {
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) {
dst := sg.elem
memmove(dst, src, t.Size_)
}في هذه العملية، لأنه يمكن العثور على كوروتين ينتظر الاستقبال، يتم إرسال البيانات مباشرة إلى المستقبل ولا يتم تخزينها في قائمة الانتظار الحلقية. إذا لم يكن هناك كوروتين مستقبل متاح وكانت السعة كافية، سيتم وضعها في مخزن قائمة الانتظار الحلقية ثم العودة مباشرة.
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
qp := chanbuf(c, c.sendx)
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}إذا كان المخزن ممتلئًا، في حالة الإرسال غير الحاجب سيتم العودة مباشرة:
if !block {
unlock(&c.lock)
return false
}إذا كان الإرسال حاجبًا، سيدخل في عملية الكود التالية:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)
KeepAlive(ep)
...
}أولاً يقوم ببناء الكوروتين الحالي كـ sudog وإضافته إلى قائمة انتظار hchan.sendq للكوروتينات المنتظرة للإرسال، ثم من خلال runtime.gopark يجعل الكوروتين الحالي محجوبًا، ويصبح في حالة _Gwaitting حتى يتم إيقاظه مرة أخرى من قبل المستقبل، ومن خلال runtime.KeepLAlive يحافظ على البيانات المراد إرسالها لضمان نجاح النسخ من قبل المستقبل. عند الاستيقاظ، سيدخل في عملية الإنهاء التالية:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
if closed {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
releaseSudog(mysg)
return true
}يمكن ملاحظة أنه بالنسبة لإرسال البيانات إلى القناة، هناك عدة حالات إجمالاً:
- القناة
nil، يتعطل البرنامج - القناة مغلقة، يحدث
panic - قائمة انتظار
recvqليست فارغة، إرسال مباشر للمستقبل - لا يوجد كوروتين ينتظر، إضافة إلى المخزن
- المخزن ممتلئ، الكوروتين المرسل يدخل في حالة الحظر، ينتظر كوروتينات أخرى لاستقبال البيانات
تجدر الإشارة إلى أنه في منطق الإرسال أعلاه، لم نرَ معالجة البيانات التي تتجاوز سعة المخزن. هذه البيانات لا يمكن التخلص منها، يتم حفظها في sudog.elem، ويتم معالجتها من قبل المستقبل.
الاستقبال
في Go، هناك صيغتان لاستقبال البيانات من القناة. الأولى هي قراءة البيانات فقط:
data <- chوالثانية هي الحكم على ما إذا تمت قراءة البيانات بنجاح:
data, ok <- chسيقوم المترجم بترجمة الصيغتين أعلاه إلى استدعاءات لـ runtime.chanrecv1 و runtime.chanrecv2، لكنهما في الواقع مجرد استدعاءات لـ runtime.chanrecv. بداية منطق الاستقبال مشابهة لمنطق الإرسال، كلاهما يفحص أولاً ما إذا كانت القناة فارغة.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2)
throw("unreachable")
}
...
}ثم في حالة القراءة غير الحاجبة، يتم الحكم بدون قفل على ما إذا كانت القناة فارغة. إذا كانت القناة غير مغلقة يتم العودة مباشرة، وإذا كانت مغلقة يتم مسح ذاكرة العنصر المستقبل.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
if empty(c) {
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
...
}ثم يتم القفل والوصول إلى قائمة انتظار hchan.sendq. من خلال الفرع if c.closed != 0 أدناه يمكن رؤية أنه حتى لو كانت القناة مغلقة، إذا كان لا يزال هناك عناصر في القناة، لن يتم العودة مباشرة، بل سيستمر في تنفيذ كود استهلاك العناصر. هذا هو السبب في أنه يُسمح بالقراءة بعد إغلاق القناة.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.closed != 0 {
if c.qcount == 0 {
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
} else {
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
...
}إذا لم تكن القناة مغلقة، سيتم فحص ما إذا كان هناك كوروتين ينتظر الإرسال في قائمة انتظار sendq، وإذا وجد يتم معالجة الكوروتين المرسل بواسطة runtime.recv.
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
recvDirect(c.elemtype, sg, ep)
}
} else {
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
goready(gp, skip+1)
}الحالة الأولى، سعة القناة 0 أي قناة غير مخزنة، المستقبل سينسخ البيانات مباشرة من المرسل عبر دالة runtime.recvDirect. الحالة الثانية، المخزن ممتلئ. على الرغم من أننا لم نرَ منطق الحكم على ما إذا كان المخزن ممتلئًا في المقدمة، لكن في الواقع عندما تكون سعة المخزن غير صفرية وهناك مرسل ينتظر، فهذا يعني أن المخزن ممتلئ بالفعل، لأن المرسل لن ينتظر الإرسال إلا عندما يكون المخزن ممتلئًا. هذا المنطق يتم الحكم عليه من قبل المرسل. ثم المستقبل سيخرج العنصر الأول من المخزن وينسخ ذاكرته إلى مؤشر العنصر المستهدف، ثم ينسخ بيانات الكوروتين المرسل ويضيفها إلى القائمة (هنا رأينا كيف يعالج المستقبل البيانات التي تتجاوز سعة المخزن). أخيرًا، سيتم إيقاظ الكوروتين المرسل بواسطة runtime.goready، ليجعله في حالة _Grunnable لإعادة إضافته إلى الجدولة.
إذا لم يكن هناك كوروتين ينتظر الإرسال، سيتم فحص ما إذا كان هناك عناصر تنتظر الاستهلاك في المخزن، ويتم إخراج العنصر الأول ونسخ ذاكرته إلى العنصر المستهدف، ثم العودة.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
...
}أخيرًا، إذا لم يكن هناك عناصر يمكن استهلاكها في القناة، سيجعل runtime.gopark الكوروتين الحالي في حالة _Gwaitting، محجوبًا في انتظار أن يتم إيقاظه من قبل الكوروتين المرسل.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp := getg()
mysg := acquireSudog()
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
gp.parkingOnChan.Store(true)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)
...
}عند الاستيقاظ، سيتم العودة. في هذه اللحظة، قيمة success المرجعة تأتي من sudog.success. إذا أرسل المرسل البيانات بنجاح، هذا الحقل يجب أن يكون قد تم تعيينه إلى true من قبل المرسل. يمكننا رؤية هذا المنطق في دالة runtime.send.
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
sg.success = true
goready(gp, skip+1)
}
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}وبالمقابل، في نهاية runtime.chansend الخاص بالمرسل، الحكم على sudog.success، مصدره أيضًا من الإعداد في دالة runtime.recv من قبل المستقبل. من خلال هذه يمكن اكتشاف أن المستقبل والمرسل يكملان بعضهما البعض لجعل القناة تعمل بشكل طبيعي. بشكل عام، استقبال البيانات أكثر تعقيدًا قليلاً من إرسال البيانات، وهناك عدة حالات إجمالاً:
- القناة
nil، يتعطل البرنامج - القناة مغلقة، إذا كانت القناة فارغة يتم العودة مباشرة، إذا لم تكن فارغة يتم الانتقال إلى الحالة الخامسة للتنفيذ
- سعة المخزن 0، هناك كوروتين ينتظر الإرسال في
sendq، يتم نسخ بيانات المرسل مباشرة ثم إيقاظ المرسل - المخزن ممتلئ، هناك كوروتين ينتظر الإرسال في
sendq، يتم إخراج العنصر الأول من المخزن، وبيانات المرسل تدخل إلى القائمة، ثم يتم إيقاظ المرسل - المخزن غير ممتلئ والكمية ليست 0، يتم إخراج العنصر الأول من المخزن ثم العودة
- المخزن فارغ، الدخول في حالة الحظر، انتظار الإيقاظ من قبل المرسل
الإغلاق
بالنسبة لإغلاق القناة، نستخدم الدالة المدمجة close:
close(ch)سيقوم المترجم بترجمتها إلى استدعاء runtime.closechan. إذا كانت القناة الممررة nil أو مغلقة بالفعل، سيحدث panic مباشرة:
func closechan(c *hchan) {
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
c.closed = 1
...
}ثم يتم إخراج جميع الكوروتينات المحجوبة في sendq و recvq لهذه القناة وإيقاظها جميعًا عبر runtime.goready:
func closechan(c *hchan) {
...
var glist gList
// release all readers
for {
sg := c.recvq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
gp := sg.g
sg.success = false
glist.push(gp)
}
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}TIP
بالمناسبة، Go يسمح بالقنوات أحادية الاتجاه، وهناك عدة قواعد:
- القناة للقراءة فقط لا يمكنها إرسال البيانات
- القناة للقراءة فقط لا يمكن إغلاقها
- القناة للكتابة فقط لا يمكنها قراءة البيانات
هذه الأخطاء سيتم اكتشافها في مرحلة فحص النوع أثناء الترجمة، ولن تُترك لوقت التشغيل. للمهتمين، يمكن قراءة الكود ذي الصلة في الحزمتين التاليتين:
cmd/compile/internal/types2cmd/compile/internal/typecheck
// cmd/compile/internal/types2/stmt.go: 425
case *syntax.SendStmt:
...
if uch.dir == RecvOnly {
check.errorf(s, InvalidSend, invalidOp+"cannot send to receive-only channel %s", &ch)
return
}
check.assignment(&val, uch.elem, "send")الحكم على الإغلاق
في وقت مبكر جدًا (قبل go1)، كانت هناك دالة مدمجة closed للحكم على ما إذا كانت القناة مغلقة، لكنها حُذفت بسرعة. هذا لأن استخدام القنوات عادة يكون في سيناريوهات متعددة الكوروتينات. لنفترض أنها أرجعت true، هذا يمثل بالفعل أن القناة مغلقة. لكن إذا أرجعت false، هذا لا يعني أن القناة ليست مغلقة بالفعل، لأن لا أحد يعرف من سيغلق القناة في اللحظة التالية. لذا هذه القيمة المرجعة غير موثوقة. إذا استخدمنا هذه القيمة المرجعة كأساس للحكم على ما إذا كان سيتم إرسال البيانات إلى القناة، فهذا خطير جدًا، لأن إرسال البيانات إلى قناة مغلقة سيحدث panic.
إذا كان هناك حاجة فعلية، يمكن تنفيذ واحدة بنفسك. أحد الحلول هو الحكم على ما إذا كانت القناة مغلقة من خلال الكتابة إلى القناة، الكود كالتالي:
func closed(ch chan int) (ans bool) {
defer func() {
if err := recover(); err != nil {
ans = true
}
}()
ch <- 0
return ans
}لكن هذا له آثار جانبية أيضًا. إذا لم تكن مغلقة، سيتم كتابة بيانات زائدة إليها، وستدخل في عملية معالجة defer-recover، مما يسبب خسارة إضافية في الأداء. لذا يمكن التخلي عن حل الكتابة مباشرة. أما حل القراءة فيمكن التفكير فيه، لكن لا يمكن القراءة مباشرة من القناة، لأن القراءة المباشرة بقيمة block تساوي true ستحجب الكوروتين. يجب استخدامها مع select، القناة مع select تصبح غير حاجبة.
func closed(ch chan int) bool {
select {
case _, received := <-ch:
return !received
default:
return false
}
}هذا يبدو أفضل قليلاً من ما سبق، لكن حالته تنطبق فقط عندما تكون القناة مغلقة ولا توجد عناصر في المخزن. إذا كان هناك عناصر، سيتم استهلاك هذا العنصر دون سبب، ولا يزال لا يوجد تنفيذ جيد جدًا.
لكن في الواقع لا نحتاج للحكم على ما إذا كانت القناة مغلقة، السبب ذُكر في البداية لأن القيمة المرجعة غير موثوقة. الاستخدام الصحيح للقناة وإغلاقها بشكل صحيح هو ما يجب أن نفعله. لذا:
- لا تغلق القناة أبدًا من جهة المستقبل، حقيقة أن إغلاق القناة للقراءة فقط لا يمكن أن يمر عبر الترجمة تخبرك بوضوح ألا تفعل ذلك، اترك الأمر للمرسل.
- إذا كان هناك عدة مرسلين، يجب أن يتم إغلاق القناة من قبل كوروتين منفصل، لضمان أن
closeيتم استدعاؤها من جهة واحدة فقط ومرة واحدة فقط. - عند تمرير القناة، من الأفضل تحديد ما إذا كانت للقراءة فقط أو الكتابة فقط
باتباع هذه المبادئ، يمكن ضمان عدم حدوث مشاكل كبيرة.
