select
select هي بنية يمكنها مراقبة حالة قنوات متعددة في نفس الوقت، وتركيبها مشابه لـ switch
import (
"context"
"log/slog"
"os"
"os/signal"
"time"
)
func main() {
finished := make(chan struct{})
ctx, stop := signal.NotifyContext(context.Background(), os.Kill, os.Interrupt)
defer stop()
slog.Info("running")
go func() {
time.Sleep(time.Second * 2)
finished <- struct{}{}
}()
select {
case <-ctx.Done():
slog.Info("shutting down")
case <-finished:
slog.Info("finished")
}
}هذا الكود يجمع بين context والقناة و select لتحقيق منطق خروج سلس للبرنامج. في الكود، select يراقب قناتين ctx.Done و finished في نفس الوقت. شرطا الخروج هما: أولاً إرسال نظام التشغيل لإشارة خروج، وثانياً وجود رسالة في قناة finished يمكن قراءتها أي اكتمال مهمة كود المستخدم. بهذا يمكننا القيام بأعمال التصفية عند خروج البرنامج.
كما هو معروف، لـ select خاصيتان مهمتان جداً: الأولى عدم الحجب، ويمكن رؤية معالجة خاصة لـ select في الكود المصدري لإرسال واستقبال القنوات، حيث يمكن الحكم على ما إذا كانت القناة متاحة في حالة عدم الحجب. الثانية العشوائية، إذا كانت قنوات متعددة متاحة فسيختار واحدة عشوائياً للتنفيذ، وعدم اتباع ترتيب محدد يجعل كل قناة تحصل على فرصة تنفيذ عادلة نسبياً، وإلا في الحالات القصوى قد لا تُعالج بعض القنوات أبداً. لأن عمله كله مرتبط بالقنوات، يُنصح أولاً بقراءة مقالة chan، وبعد فهم القنوات سيكون فهم select أسهل بكثير.
البنية
في وقت التشغيل، هناك فقط بنية runtime.scase تمثل فرع select، وتمثيل كل case في وقت التشغيل هو scase.
type scase struct {
c *hchan // chan
elem unsafe.Pointer // data element
}حيث c يشير إلى القناة، و elem يمثل مؤشر العنصر المستقبل أو المرسل. في الواقع الكلمة المفتاحية select تشير إلى دالة runtime.selectgo.
المبدأ
تنقسم طرق استخدام select في Go إلى أربع حالات للتحسين، ويمكن رؤية منطق معالجة هذه الحالات الأربع في دالة cmd/compile/internal/walk.walkSelectCases.
func walkSelectCases(cases []*ir.CommClause) []ir.Node {
ncas := len(cases)
sellineno := base.Pos
// optimization: zero-case select
if ncas == 0 {
return []ir.Node{mkcallstmt("block")}
}
// optimization: one-case select: single op.
if ncas == 1 {
...
}
// optimization: two-case select but one is default: single non-blocking op.
if ncas == 2 && dflt != nil {
...
}
...
return init
}التحسين
المترجم سيحسّن الحالات الثلاث الأولى. الحالة الأولى عندما يكون عدد case يساوي 0 أي select فارغ، نعلم جميعاً أن جملة select الفارغة ستسبب حجباً دائماً للكوروتين الحالي.
select{}سبب الحجب هو أن المترجم يترجمها إلى استدعاء مباشر لدالة runtime.block
func block() {
gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1) // forever
}ودالة block تستدعي دالة runtime.gopark، مما يجعل الكوروتين الحالي في حالة _Gwaitting، ويدخل في حجب دائم، ولن يحصل على جدولة مرة أخرى.
الحالة الثانية، case واحد فقط وليس default، في هذه الحالة المترجم سيترجمها مباشرة إلى عملية إرسال أو استقبال للقناة، وبشكل حاجب. مثلاً الكود التالي
func main() {
ch := make(chan int)
select {
case <-ch:
// do something
}
}سيُترجم إلى استدعاء مباشر لدالة runtime.chanrecv1، ويمكن رؤية ذلك من كود التجميع
TEXT main.main(SB), ABIInternal, $2
...
LEAQ type:chan int(SB), AX
XORL BX, BX
PCDATA $1, $0
CALL runtime.makechan(SB)
XORL BX, BX
NOP
CALL runtime.chanrecv1(SB)
ADDQ $16, SP
POPQ BP
...في حالة case واحد فقط، إرسال البيانات للقناة نفس الأمر، سيُترجم إلى استدعاء مباشر لدالة runtime.chansend1، وأيضاً بشكل حاجب.
الحالة الثالثة، caseان أحدهما default
func main() {
ch := make(chan int)
select {
case ch <- 1:
// do something
default:
// do something
}
}في هذه الحالة سيُترجم إلى جملة if تستدعي runtime.selectnbsend، كالتالي
if selectnbsend(ch, 1) {
// do something
} else {
// do something
}إذا كان استقبال بيانات من القناة سيُترجم إلى استدعاء runtime.selectnbrecv
ch := make(chan int)
select {
case x, ok := <-ch:
// do something
default:
// do something
}if selected, ok = selectnbrecv(&v, c); selected {
// do something
} else {
// do something
}المهم أن نلاحظ أن في هذه الحالة، استقبال أو إرسال البيانات للقناة غير حاجب، ويمكننا رؤية بوضوح أن المعامل block قيمته false.
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) {
return chanrecv(c, elem, false)
}وسواء إرسال أو استقبال بيانات للقناة، عندما يكون block يساوي false، هناك مسار سريع يمكن به الحكم على إمكانية الإرسال أو الاستقبال بدون قفل، كما هو موضح
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if !block && empty(c) {
if atomic.Load(&c.closed) == 0 {
return
}
return true, false
}
...
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if !block && c.closed == 0 && full(c) {
return false
}
...
}عند قراءة القناة، إذا كانت القناة فارغة ستعود مباشرة. عند الكتابة للقناة، إذا كانت القناة غير مغلقة وممتلئة ستعود مباشرة أيضاً. في الحالات العادية ستسبب حجب الكوروتين، لكن مع استخدام select لن يحدث ذلك.
المعالجة
الحالات الثلاث أعلاه هي تحسينات لحالات خاصة. استخدام select العادي سيُترجم إلى استدعاء دالة runtime.selectgo، ومنطق معالجتها يتجاوز 400 سطر.
func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool)المترجم سيجمع كل جمل case في مصفوفة scase، ثم يمررها لدالة selectgo. بعد المعالجة تُرجع قيمتين:
- الأولى هي فهرس القناة المختارة عشوائياً، تشير إلى أي قناة تمت معالجتها، إذا لم يوجد تُرجع -1
- الثانية تشير لعملية قراءة القناة هل تمت بنجاح
هنا شرح بسيط لمعاملاتها:
cas0، مؤشر رأس مصفوفةscase، النصف الأول يخزن case الكتابة للقناة، النصف الثاني يخزن case القراءة من القناة، ويميز بينهما بـnsendsorder0، طوله ضعف مصفوفةscase، النصف الأول مخصص لمصفوفةpollorder، والنصف الثاني لمصفوفةlockordernsendsوnrecvsيمثلان عدد case قراءة/كتابة القناة، ومجموعهما هو العدد الإجمالي لـ caseblockتشير إلى ما إذا كان حاجباً، إذا كان هناك casedefaultفهذا يعني عدم الحجب، وقيمتهtrue، وإلاtrue.pc0، يشير إلى رأس مصفوفة[ncases]uintptr، تستخدم لتحليل السباق، يمكن تجاهلها لاحقاً، لا تساعد كثيراً في فهم select
لنفترض الكود التالي
func main() {
ch := make(chan int)
select {
case ch <- 1:
println(1)
case ch <- 2:
println(2)
case ch <- 3:
println(3)
case ch <- 4:
println(4)
default:
println("default")
}
}لنرَ شكل التجميع، هنا حُذف جزء من الكود للتسهيل
0x0000 00000 TEXT main.main(SB), ABIInterna
...
0x0023 00035 CALL runtime.makechan(SB)
0x0028 00040 MOVQ $1, main..autotmp_2+72(SP) // 1 2 3 4 متغيرات مؤقتة
0x0031 00049 MOVQ $2, main..autotmp_4+64(SP)
0x003a 00058 MOVQ $3, main..autotmp_6+56(SP)
0x0043 00067 MOVQ $4, main..autotmp_8+48(SP)
...
0x00c8 00200 CALL runtime.selectgo(SB) // استدعاء دالة runtime.selectgo
0x00cd 00205 TESTQ AX, AX
0x00d0 00208 JLT 352 // القفز لفرع default
0x00d6 00214 PCDATA $1, $-1
0x00d6 00214 JEQ 320 // القفز لفرع 4
0x00d8 00216 CMPQ AX, $1
0x00dc 00220 JEQ 288 // القفز لفرع 3
0x00de 00222 NOP
0x00e0 00224 CMPQ AX, $2
0x00e4 00228 JNE 258 // القفز لفرع 2
0x00e6 00230 PCDATA $1, $0
0x00e6 00230 CALL runtime.printlock(SB)
0x00eb 00235 MOVL $3, AX
0x00f0 00240 CALL runtime.printint(SB)
0x00f5 00245 CALL runtime.printnl(SB)
0x00fa 00250 CALL runtime.printunlock(SB)
0x00ff 00255 NOP
0x0100 00256 JMP 379
0x0102 00258 CALL runtime.printlock(SB)
0x0107 00263 MOVL $4, AX
0x010c 00268 CALL runtime.printint(SB)
0x0111 00273 CALL runtime.printnl(SB)
0x0116 00278 CALL runtime.printunlock(SB)
0x011b 00283 JMP 379
0x011d 00285 NOP
0x0120 00288 CALL runtime.printlock(SB)
0x0125 00293 MOVL $2, AX
0x012a 00298 CALL runtime.printint(SB)
0x012f 00303 CALL runtime.printnl(SB)
0x0134 00308 CALL runtime.printunlock(SB)
0x0139 00313 JMP 379
0x013b 00315 NOP
0x0140 00320 CALL runtime.printlock(SB)
0x0145 00325 MOVL $1, AX
0x014a 00330 CALL runtime.printint(SB)
0x014f 00335 CALL runtime.printnl(SB)
0x0154 00340 CALL runtime.printunlock(SB)
0x0159 00345 JMP 379
0x015b 00347 NOP
0x0160 00352 CALL runtime.printlock(SB)
0x0165 00357 LEAQ go:string."default\n"(SB)
0x016c 00364 MOVL $8, BX
0x0171 00369 CALL runtime.printstring(SB)
0x0176 00374 CALL runtime.printunlock(SB)
0x017b 00379 PCDATA $1, $-1
0x017b 00379 ADDQ $160, SP
0x0182 00386 POPQ BP
0x0183 00387 RETيمكن ملاحظة أنه بعد استدعاء دالة selectgo هناك منطق حكم + قفز، ومن خلال هذا يمكننا استنتاج شكله الأصلي
casei, ok := runtime.selectgo()
if casei == -1 {
println("default")
} else if casei == 3 {
println(4)
} else if casei == 2 {
println(3)
} else if casei == 1 {
println(2)
} else {
println(1)
}الكود الفعلي الذي يولده المترجم قد يختلف عن هذا، لكن المعنى العام متقارب. لذا المترجم بعد استدعاء دالة selectgo سيستخدم جملة if للحكم على أي قناة تم تنفيذها، وقبل الاستدعاء، المترجم سيولد أيضاً حلقة for لجمع مصفوفة scase لكن حُذفت هنا.
بعد معرفة كيفية استخدام دالة selectgo خارجياً، دعنا نفهم كيف تعمل دالة selectgo داخلياً. أولاً ستُهيئ عدة مصفوفات، nsends+nrecvs يمثل العدد الإجمالي لـ case، ومن الكود التالي يتضح أن الحد الأقصى لعدد case هو 1 << 16. pollorder يحدد ترتيب تنفيذ القنوات، و lockorder يحدد ترتيب قفل القنوات.
cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
// طوله ضعف مصفوفة scase، النصف الأول مخصص لمصفوفة pollorder، والنصف الثاني لمصفوفة lockorder.
order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
ncases := nsends + nrecvs
scases := cas1[:ncases:ncases]
pollorder := order1[:ncases:ncases]
lockorder := order1[ncases:][:ncases:ncases]ثم تهيئة مصفوفة pollorder، التي تخزن فهارس مصفوفة scases للقنوات المراد تنفيذها
norder := 0
for i := range scases {
cas := &scases[i]
// Omit cases without channels from the poll and lock orders.
if cas.c == nil {
cas.elem = nil // allow GC
continue
}
j := fastrandn(uint32(norder + 1))
pollorder[norder] = pollorder[j]
pollorder[j] = uint16(i)
norder++
}
pollorder = pollorder[:norder]
lockorder = lockorder[:norder]ستجتاز مصفوفة scases بالكامل، ثم من خلال runtime.fastrandn تُولّد رقماً عشوائياً في النطاق [0, i]، ثم تبادله مع i، وفي العملية ستتخطى case التي قناتها nil. بعد انتهاء الاجتياع يتم الحصول على مصفوفة pollorder بعناصر مختلطة، كما في الصورة

ثم تُرتّب مصفوفة pollorder حسب حجم عنوان القناة باستخدام فرز الكومة للحصول على مصفوفة lockorder، ثم تُستدعى runtime.sellock لقفلها بالترتيب
func sellock(scases []scase, lockorder []uint16) {
var c *hchan
for _, o := range lockorder {
c0 := scases[o].c
if c0 != c {
c = c0
lock(&c.lock)
}
}
}المهم هنا أن ترتيب القنوات حسب حجم العنوان هو لتجنب الجمود، لأن عملية select نفسها لا تحتاج قفل للسماح بالتزامن. لنفترض القفل بالترتيب العشوائي pollorder، عندئذٍ ضع في اعتبارك الكود التالي
ch1 := make(chan int)
ch2 := make(chan int)
ch3 := make(chan int)
ch4 := make(chan int)
poll := func() {
select {
case ch1 <- 1:
println(1)
case ch2 <- 2:
println(2)
case ch3 <- 3:
println(3)
case ch4 <- 4:
println(4)
default:
println("default")
}
}
// A
go poll()
// B
go poll()
// C
go poll()ثلاثة كوروتينات ABC وصلت جميعها لخطوة القفل، وترتيب قفل كل منها عشوائي ومختلف عن الآخرين، مما قد يسبب حالة مثل الصورة

لنفترض أن ترتيب قفل ABC كما في الصورة أعلاه، عندئذٍ احتمال الجمود كبير جداً. مثلاً A سيحمل قفل ch2 أولاً، ثم يحاول الحصول على قفل ch1، لكن لنفترض أن ch1 مقفول بالفعل من قبل الكوروتين B، والكوروتين B سيحاول الحصول على قفل ch2، عندئذٍ يحدث جمود.

إذا قُفلت جميع الكوروتينات بنفس الترتيب، لن تحدث مشكلة جمود، وهذا هو السبب الأساسي لترتيب lockorder حسب حجم العنوان.
بعد انتهاء القفل، تبدأ مرحلة المعالجة الفعلية. أولاً اجتياع مصفوفة pollorder، والوصول للقنوات بالترتيب المختلط السابق، واجتياعها واحداً تلو الآخر لإيجاد قناة متاحة
for _, casei := range pollorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
if casi >= nsends { // قراءة قناة
sg = c.sendq.dequeue()
if sg != nil {
goto recv
}
if c.qcount > 0 {
goto bufrecv
}
if c.closed != 0 {
goto rclose
}
} else { // كتابة قناة
if c.closed != 0 {
goto sclose
}
sg = c.recvq.dequeue()
if sg != nil {
goto send
}
if c.qcount < c.dataqsiz {
goto bufsend
}
}
}يمكن ملاحظة معالجة 6 حالات لقراءة/كتابة القنوات، سنشرحها بالترتيب. الحالة الأولى، قراءة قناة وهناك مرسل ينتظر الإرسال، هنا سيُذهب لدالة runtime.recv، التي شرحنا وظيفتها، وستوقظ في النهاية كوروتين المرسل، وقبل الإيقاظ ستقوم دالة الاستدعاء بفك قفل جميع القنوات.
recv:
// can receive from sleeping sender (sg)
recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
recvOK = true
goto retcالحالة الثانية، قراءة قناة، لا يوجد مرسل ينتظر، وعدد عناصر المخزن أكبر من 0، هنا ستُقرأ البيانات مباشرة من المخزن، ومنطقها مطابق تماماً لـ runtime.chanrecv، ثم فك القفل.
bufrecv:
recvOK = true
qp = chanbuf(c, c.recvx)
if cas.elem != nil {
typedmemmove(c.elemtype, cas.elem, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
selunlock(scases, lockorder)
goto retcالحالة الثالثة، قراءة قناة، لكن القناة مغلقة بالفعل، ولا توجد عناصر متبقية في المخزن، هنا سيُفك القفل أولاً ثم العودة مباشرة.
rclose:
selunlock(scases, lockorder)
recvOK = false
if cas.elem != nil {
typedmemclr(c.elemtype, cas.elem)
}
goto retcالحالة الرابعة، إرسال بيانات لقناة مغلقة، هنا سيُفك القفل أولاً ثم panic،
sclose:
selunlock(scases, lockorder)
panic(plainError("send on closed channel"))الحالة الخامسة، هناك مستقبل ينتظر محجوباً، هنا ستُستدعى دالة runtime.send، وستوقظ في النهاية كوروتين المستقبل، وقبل الإيقاظ ستقوم دالة الاستدعاء بفك قفل جميع القنوات.
send:
send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
goto retcالحالة السادسة، لا يوجد كوروتين مستقبل ينتظر، تُوضع البيانات المراد إرسالها في المخزن، ثم فك القفل.
bufsend:
typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
selunlock(scases, lockorder)
goto retcثم جميع الحالات أعلاه ستدخل في النهاية الفرع retc، وما يفعله هو إرجاع فهرس القناة المختارة casi و recvOk الذي يمثل نجاح القراءة.
retc:
return casi, recvOKالحالة السابعة، لم يُعثر على قناة متاحة، والكود يحتوي على فرع default، عندئذٍ فك قفل القناة ثم العودة مباشرة، هنا casi المرجع هو -1 أي لا توجد قناة متاحة.
if !block {
selunlock(scases, lockorder)
casi = -1
goto retc
}الحالة الأخيرة، لم يُعثر على قناة متاحة، والكود لا يحتوي على فرع default، عندئذٍ الكوروتين الحالي سيدخل في حالة حجب. قبل ذلك، selectgo ستضيف الكوروتين الحالي لجميع قوائم انتظار recvq/sendq للقنوات المراقَبة
gp = getg()
nextp = &gp.waiting
for _, casei := range lockorder {
casi = int(casei)
cas = &scases[casi]
c = cas.c
sg := acquireSudog()
sg.g = gp
sg.isSelect = true
sg.elem = cas.elem
sg.releasetime = 0
sg.c = c
*nextp = sg
nextp = &sg.waitlink
if casi < nsends {
c.sendq.enqueue(sg)
} else {
c.recvq.enqueue(sg)
}
}هنا سيُنشأ عدد من sudog وتربط بالقنوات المقابلة، كما في الصورة

ثم تُحجب بواسطة runtime.gopark. قبل الحجب سيُفك قفل القنوات، وعمل فك القفل تقوم به دالة runtime.selparkcommit، التي تُمرر كدالة استدعاء لـ gopark.
gp.param = nil
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
gp.parkingOnChan.Store(true)
gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
gp.activeStackChans = falseأول شيء بعد الاستيقاظ هو فك ربط sudog بالقناة
sellock(scases, lockorder)
gp.selectDone.Store(0)
sg = (*sudog)(gp.param)
gp.param = nil
casi = -1
cas = nil
caseSuccess = false
sglist = gp.waiting
// Clear all elem before unlinking from gp.waiting.
for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
sg1.isSelect = false
sg1.elem = nil
sg1.c = nil
}
gp.waiting = nilثم إزالة sudog من قوائم انتظار القنوات السابقة
for _, casei := range lockorder {
k = &scases[casei]
if sg == sglist {
// sg has already been dequeued by the G that woke us up.
casi = int(casei)
cas = k
caseSuccess = sglist.success
if sglist.releasetime > 0 {
caseReleaseTime = sglist.releasetime
}
} else {
c = k.c
if int(casei) < nsends {
c.sendq.dequeueSudoG(sglist)
} else {
c.recvq.dequeueSudoG(sglist)
}
}
sgnext = sglist.waitlink
sglist.waitlink = nil
releaseSudog(sglist)
sglist = sgnext
}في العملية أعلاه حتماً سيُعثر على قناة عالجها كوروتين الموقِظ، ثم بناءً على caseSuccess تُتخذ المعالجة الأخيرة. لعملية الكتابة، sg.success يساوي false يعني أن القناة مغلقة بالفعل، وكل وقت تشغيل Go فقط دالة close ستضبط هذا الحقل على false، وهذا يشير إلى أن الكوروتين الحالي أُوقظ بواسطة دالة close. لعملية القراءة، إذا أُوقظ بواسطة المرسل، عملية قراءة البيانات تمت قبل الاستيقاظ بواسطة المرسل من خلال دالة runtime.send، وقيمته true. إذا أُوقظ بواسطة دالة close، نفس الشيء يعود مباشرة.
c = cas.c
if casi < nsends {
if !caseSuccess {
goto sclose
}
} else {
recvOK = caseSuccess
}
selunlock(scases, lockorder)
goto retcبهذا يكون منطق select بأكمله قد وُضح تقريباً. الحالات أعلاه متعددة، مما يدل على أن معالجة select معقدة نسبياً.
