gmp
Một trong những đặc điểm lớn nhất của ngôn ngữ go là sự hỗ trợ tự nhiên cho concurrent, chỉ cần một từ khóa là có thể khởi động một coroutine, giống như ví dụ dưới đây minh họa.
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
fmt.Println("hello world!")
}()
go func() {
defer wg.Done()
fmt.Println("hello world too!")
}()
wg.Wait()
}Coroutine của ngôn ngữ go sử dụng rất đơn giản, đối với developer hầu như không cần làm công việc phụ trợ nào, đây cũng là một trong những lý do nó được ưa chuộng. Tuy nhiên đằng sau sự đơn giản là một bộ điều phối concurrent không hề đơn giản đang hỗ trợ, tên của nó chắc hẳn mọi người đều ít nhiều đã nghe qua, vì các thành phần tham gia chính của nó lần lượt là G (coroutine), M (thread hệ thống), P (processor), nên còn được gọi là bộ điều phối GMP. Thiết kế của bộ điều phối GMP ảnh hưởng đến toàn bộ thiết kế runtime của ngôn ngữ go, GC, network poller, có thể nói nó là khối cốt lõi nhất của toàn bộ ngôn ngữ, nếu có thể hiểu biết nhất định về nó, trong công việc sau này có lẽ sẽ có một số giúp đỡ.
Lịch sử
Mô hình điều phối concurrent của ngôn ngữ Go không hoàn toàn là nguyên bản, nó hấp thu nhiều kinh nghiệm và bài học của tiền nhân, trải qua không ngừng phát triển và cải tiến mới có được diện mạo hiện nay. Các ngôn ngữ nó từng học tập bao gồm:
- Occam -1983
- Erlang - 1986
- Newsqueak - 1988
- Concurrent ML - 1993
- Alef - 1995
- Limbo - 1996
Ảnh hưởng lớn nhất vẫn là bài luận văn về CSP (Communicate Sequential Process) do Hoare công bố năm 1978, tư tưởng cơ bản của bài luận văn này là tiến trình trao đổi dữ liệu với nhau thông qua giao tiếp. Trong các ngôn ngữ lập trình kể trên không cái nào không chịu ảnh hưởng của tư tưởng CSP, Erlang là điển hình nhất của một ngôn ngữ hướng message, phần mềm middleware message queue nổi tiếng RabbitMQ được viết bằng Erlang. Đến hiện nay, cùng với sự phát triển của máy tính và internet, hỗ trợ concurrent hầu như đã trở thành tiêu chuẩn của một ngôn ngữ hiện đại, ngôn ngữ go kết hợp tư tưởng CSP ra đời.
Mô hình điều phối
Đầu tiên giới thiệu đơn giản ba thành phần của GMP
- G, Goroutine, chỉ coroutine trong ngôn ngữ go
- M, Machine, chỉ thread hệ thống hoặc gọi là worker thread, do hệ điều hành负责 điều phối
- P, Processor, không phải chỉ CPU processor, là khái niệm trừu tượng do go tự định nghĩa, chỉ processor làm việc trên thread hệ thống, thông qua nó để điều phối coroutine G trên mỗi thread hệ thống.
Coroutine là một loại thread nhẹ hơn, quy mô nhỏ hơn, tài nguyên cần thiết cũng ít hơn, thời điểm tạo và hủy và điều phối đều do runtime của go hoàn thành, chứ không phải hệ điều hành, nên chi phí quản lý của nó thấp hơn nhiều so với thread. Tuy nhiên coroutine cũng dựa vào thread, time slice cần thiết để thực thi coroutine đến từ thread, time slice của thread đến từ hệ điều hành, mà việc chuyển đổi giữa các thread khác nhau có chi phí nhất định, làm thế nào để coroutine tận dụng tốt time slice của thread là then chốt của thiết kế.
1:N
Cách tốt nhất để giải quyết vấn đề là phớt lờ vấn đề này,既然 việc chuyển đổi thread có chi phí, thì trực tiếp không chuyển đổi là được. Phân phối tất cả coroutine vào một kernel thread, như vậy chỉ liên quan đến việc chuyển đổi giữa các coroutine.

Mối quan hệ giữa thread và coroutine là 1:N, làm như vậy có một nhược điểm rất rõ ràng, máy tính thời nay hầu như đều là multi-core CPU, phân phối như vậy không thể tận dụng hiệu năng của multi-core CPU.
N:N
Một cách khác, một thread tương ứng với một coroutine, một coroutine có thể hưởng toàn bộ time slice của thread đó, nhiều thread cũng có thể tận dụng hiệu năng của multi-core CPU. Nhưng chi phí tạo và chuyển đổi của thread khá cao, nếu là quan hệ một-một, ngược lại không tận dụng được ưu điểm nhẹ của coroutine.

M:N
M thread tương ứng với N coroutine, và M nhỏ hơn N. Nhiều thread tương ứng với nhiều coroutine, mỗi thread sẽ tương ứng với若干 coroutine, do processor P负责 điều phối coroutine G sử dụng time slice của thread như thế nào. Cách này là tương đối tốt nhất, cũng là mô hình điều phối mà Go vẫn dùng đến nay.
M chỉ sau khi liên kết với processor P mới có thể thực thi nhiệm vụ, go sẽ tạo GOMAXPROCS processor, nên số lượng thread thực sự có thể dùng để thực thi nhiệm vụ là GOMAXPROCS, giá trị mặc định của nó là số lượng CPU logic core của máy hiện tại, chúng ta cũng có thể thủ công thiết lập giá trị của nó.
Thông qua code sửa đổi
runtime.GOMAXPROCS(N), và có thể điều chỉnh động trong runtime, gọi sau trực tiếp STW.Thiết lập biến môi trường
export GOMAXPROCS=N, tĩnh.
Trong tình huống thực tế, số lượng M sẽ lớn hơn số lượng P, vì trong runtime sẽ cần chúng để xử lý các nhiệm vụ khác, ví dụ như một số system call, giá trị lớn nhất là 10000.

Ba thành phần GMP này và bản thân bộ điều phối trong runtime đều có biểu diễn kiểu tương ứng, chúng đều nằm trong file runtime/runtime2.go, dưới đây sẽ giới thiệu đơn giản cấu trúc của chúng, tiện cho việc hiểu sau này.
G
G trong runtime biểu diễn bằng cấu trúc runtime.g, là đơn vị điều phối cơ bản nhất trong mô hình điều phối, cấu trúc của nó như sau, để tiện hiểu, đã xóa bớt không ít trường.
type g struct {
stack stack // offset known to runtime/cgo
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost defer
m *m // current m; offset known to arm liblink
sched gobuf
goid uint64
waitsince int64 // approx time when the g become blocked
waitreason waitReason // if status==Gwaiting
atomicstatus atomic.Uint32
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
startpc uintptr // pc of goroutine function
parentGoid uint64 // goid of goroutine that created this goroutine
waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
}Trường đầu tiên là địa chỉ bắt đầu và địa chỉ kết thúc của bộ nhớ stack thuộc về coroutine đó
type stack struct {
lo uintptr
hi uintptr
}_panic và _defer lần lượt là con trỏ trỏ đến panic stack và defer stack
_panic *_panic // innermost panic - offset known to liblink
_defer *_defer // innermost deferm đang thực thi g coroutine hiện tại
m *m // current m; offset known to arm liblinkpreempt biểu thị coroutine hiện tại có cần bị chiếm quyền không, tương đương với g.stackguard0 = stackpreempt
preempt bool // preemption signal, duplicates stackguard0 = stackpreemptatomicstatus dùng để lưu trữ giá trị trạng thái của coroutine G, nó có các giá trị tùy chọn sau
| Tên | Mô tả |
|---|---|
| _Gidle | Vừa được phân phối, chưa khởi tạo |
| _Grunnable | Biểu thị coroutine hiện tại có thể chạy, nằm trong hàng đợi chờ |
| _Grunning | Biểu thị coroutine hiện tại đang thực thi code người dùng |
| _Gsyscall | Được phân phối một M, dùng để thực thi system call, |
| _Gwaiting | Coroutine bị chặn, nguyên nhân bị chặn xem phần dưới |
| _Gdead | Biểu thị coroutine hiện tại chưa được sử dụng, có thể vừa thoát, cũng có thể vừa khởi tạo |
| _Gcopystack | Biểu thị stack của coroutine đang di chuyển, trong thời gian này không thực thi code người dùng, cũng không nằm trong hàng đợi chờ |
| _Gpreempted | Bị chặn tự thân vào chiếm quyền, đợi bên bị chiếm quyền đánh thức |
| _Gscan | GC đang quét không gian stack của coroutine, có thể cùng tồn tại với trạng thái khác |
sched dùng để lưu trữ thông tin context của coroutine để phục hồi hiện trường thực thi của coroutine, có thể thấy bên trong lưu trữ các con trỏ sp, pc, ret.
type gobuf struct {
sp uintptr
pc uintptr
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}waiting biểu thị coroutine mà coroutine hiện tại đang chờ, waitsince ghi lại thời điểm coroutine bị chặn, waitreason biểu thị nguyên nhân coroutine bị chặn, các giá trị tùy chọn như sau.
var waitReasonStrings = [...]string{
waitReasonZero: "",
waitReasonGCAssistMarking: "GC assist marking",
waitReasonIOWait: "IO wait",
waitReasonChanReceiveNilChan: "chan receive (nil chan)",
waitReasonChanSendNilChan: "chan send (nil chan)",
waitReasonDumpingHeap: "dumping heap",
waitReasonGarbageCollection: "garbage collection",
waitReasonGarbageCollectionScan: "garbage collection scan",
waitReasonPanicWait: "panicwait",
waitReasonSelect: "select",
waitReasonSelectNoCases: "select (no cases)",
waitReasonGCAssistWait: "GC assist wait",
waitReasonGCSweepWait: "GC sweep wait",
waitReasonGCScavengeWait: "GC scavenge wait",
waitReasonChanReceive: "chan receive",
waitReasonChanSend: "chan send",
waitReasonFinalizerWait: "finalizer wait",
waitReasonForceGCIdle: "force gc (idle)",
waitReasonSemacquire: "semacquire",
waitReasonSleep: "sleep",
waitReasonSyncCondWait: "sync.Cond.Wait",
waitReasonSyncMutexLock: "sync.Mutex.Lock",
waitReasonSyncRWMutexRLock: "sync.RWMutex.RLock",
waitReasonSyncRWMutexLock: "sync.RWMutex.Lock",
waitReasonTraceReaderBlocked: "trace reader (blocked)",
waitReasonWaitForGCCycle: "wait for GC cycle",
waitReasonGCWorkerIdle: "GC worker (idle)",
waitReasonGCWorkerActive: "GC worker (active)",
waitReasonPreempted: "preempted",
waitReasonDebugCall: "debug call",
waitReasonGCMarkTermination: "GC mark termination",
waitReasonStoppingTheWorld: "stopping the world",
}goid và parentGoid biểu thị định danh duy nhất của coroutine hiện tại và coroutine cha, startpc biểu thị địa chỉ hàm入口 của coroutine hiện tại.
M
M trong runtime biểu diễn bằng cấu trúc runtime.m, là trừu tượng hóa của worker thread
type m struct {
id int64
g0 *g // goroutine with scheduling stack
curg *g // current running goroutine
gsignal *g // signal-handling g
goSigStack gsignalStack // Go-allocated signal handling stack
p puintptr // attached p for executing go code (nil if not executing go code)
nextp puintptr
oldp puintptr // the p that was attached before executing a syscall
mallocing int32
throwing throwType
preemptoff string // if != "", keep curg running on this m
locks int32
dying int32
spinning bool // m is out of work and is actively looking for work
tls [tlsSlots]uintptr
...
}Tương tự, trường bên trong M cũng rất nhiều, ở đây chỉ giới thiệu một số trường để tiện hiểu.
id, định danh duy nhất của Mg0, coroutine có scheduling stackcurg, user coroutine đang chạy trên worker threadgsignal, coroutine负责 xử lý signal của threadgoSigStack, không gian stack do go phân phối dùng để xử lý signalp, địa chỉ processor P,oldptrỏ đến P trước khi thực thi system call,nextptrỏ đến P mới phân phốimallocing, dùng để biểu thị hiện tại có đang phân phối không gian bộ nhớ mới khôngthrowing, biểu thị loại lỗi xảy ra khi Mpreemptoff, định danh chiếm quyền, khi nó là chuỗi rỗng biểu thị coroutine đang chạy hiện tại có thể bị chiếm quyềnlocks, biểu thị số lượng "lock" hiện tại của M, khi khác 0 cấm chiếm quyềndying, biểu thị M xảy rapanickhông thể cứu vãn, có bốn giá trị tùy chọn[0,3], từ thấp đến cao biểu thị mức độ nghiêm trọng.spinning, biểu thị M đang ở trạng thái rảnh, và随时可用.tls, thread local storage
P
P trong runtime được biểu diễn bằng runtime.p,负责 điều phối công việc giữa M và G, cấu trúc của nó như sau
type p struct {
id int32
status uint32 // one of pidle/prunning/...
schedtick uint32 // incremented on every scheduler call
syscalltick uint32 // incremented on every system call
sysmontick sysmontick // last tick observed by sysmon
m muintptr // back-link to associated m (nil if idle)
// Queue of runnable goroutines. Accessed without lock.
runqhead uint32
runqtail uint32
runq [256]guintptr
runnext guintptr
// Available G's (status == Gdead)
gFree struct {
gList
n int32
}
// preempt is set to indicate that this P should be enter the
// scheduler ASAP (regardless of what G is running on it).
preempt bool
...
}status biểu thị trạng thái của P, có các giá trị tùy chọn sau
| Giá trị | Mô tả |
|---|---|
| _Pidle | P ở trạng thái rảnh, có thể được bộ điều phối phân phối M, cũng có thể chỉ đang chuyển đổi giữa các trạng thái khác |
| _Prunning | P liên kết với M, và đang thực thi code người dùng |
| _Psyscall | Biểu thị M liên kết với P đang thực thi system call, trong thời gian này P có thể bị M khác chiếm quyền |
| _Pgcstop | Biểu thị P bị dừng vì GC |
| _Pdead | Hầu hết tài nguyên của P bị tước đoạt, sẽ không còn được sử dụng |
Một số trường dưới đây ghi lại hàng đợi địa phương runq trong P, có thể thấy số lượng lớn nhất của hàng đợi địa phương là 256, vượt quá số lượng này G sẽ được đặt vào hàng đợi toàn cục.
runqhead uint32
runqtail uint32
runq [256]guintptrrunnext biểu thị G khả dụng tiếp theo
runnext guintptrGiải thích một số trường khác như sau
id, định danh duy nhất của Pschedtick, tăng lên cùng với số lần điều phối coroutine, có thể thấy trong hàmruntime.execute.syscalltick, tăng lên cùng với số lần system callsysmontick, ghi lại thông tin lần cuối被系统监控 quan sátm, M liên kết với PgFree, danh sách G rảnhpreempt, biểu thị P应该再次进入调度
Thông tin hàng đợi toàn cục thì được lưu trữ trong cấu trúc runtime.schedt, là biểu diễn của bộ điều phối trong runtime, như sau.
type schedt struct {
...
midle muintptr // idle m's waiting for work
ngsys atomic.Int32 // number of system goroutines
pidle puintptr // idle p's
// Global runnable queue.
runq gQueue
runqsize int32
...
}Khởi tạo
Khởi tạo bộ điều phối nằm trong giai đoạn bootstrap của chương trình go,负责 bootstrap chương trình go là hàm runtime.rt0_go, nó được thực hiện bằng assembly nằm trong file runtime/asm_*.s, một phần code như sau
TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
...
...
CALL runtime·check(SB)
MOVL 24(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 32(SP), AX // copy argv
MOVQ AX, 8(SP)
CALL runtime·args(SB)
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)
// create a new goroutine to start program
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RETCó thể thấy lời gọi đến runtime·osinit và runtime·schedinit thông qua hai dòng dưới đây.
CALL runtime·osinit(SB)
CALL runtime·schedinit(SB)Cái trước负责 khởi tạo công việc liên quan đến hệ điều hành, cái sau负责 khởi tạo bộ điều phối,也就是 runtime·schedinit hàm. Nó sẽ负责 khởi tạo tài nguyên cần thiết cho bộ điều phối chạy khi chương trình khởi động, dưới đây là code đã được đơn giản hóa.
func schedinit() {
...
gp := getg()
sched.maxmcount = 10000
// The world starts stopped.
worldStopped()
...
stackinit()
mallocinit()
mcommoninit(gp.m, -1)
lock(&sched.lock)
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
unlock(&sched.lock)
...
// World is effectively started now, as P's can run.
worldStarted()
...
}Hàm runtime.getg được thực hiện bằng assembly, chức năng của nó là lấy được biểu diễn runtime của coroutine hiện tại,也就是 con trỏ cấu trúc runtime.g. Thông qua sched.maxmcount = 10000 có thể thấy, khi bộ điều phối khởi tạo đã thiết lập số lượng lớn nhất của M là 10000, giá trị này là cố định và không thể sửa đổi. Sau đó là khởi tạo heap stack, rồi mới đến hàm runtime.mcommoninit để khởi tạo M, phần thực hiện hàm của nó như sau
func mcommoninit(mp *m, id int64) {
gp := getg()
// g0 stack won't make sense for user (and is not necessary unwindable).
if gp != gp.m.g0 {
callers(1, mp.createstack[:])
}
lock(&sched.lock)
if id >= 0 {
mp.id = id
} else {
mp.id = mReserveID()
}
...
mpreinit(mp)
if mp.gsignal != nil {
mp.gsignal.stackguard1 = mp.gsignal.stack.lo + stackGuard
}
// Add to allm so garbage collector doesn't free g->m
// when it is just in a register or thread-local storage.
mp.alllink = allm
// NumCgoCall() iterates over allm w/o schedlock,
// so we need to publish it safely.
atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
unlock(&sched.lock)
...
}Hàm này pre-khởi tạo M, chủ yếu làm các công việc sau
- Phân phối id cho M
- Phân phối riêng một G để xử lý signal của thread, do hàm
runtime.mpreinithoàn thành - Lấy nó làm node đầu của danh sách M toàn cục
runtime.allm
Tiếp theo khởi tạo P, số lượng của nó mặc định là số lượng logic core của CPU, thứ đến là giá trị của biến môi trường.
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}Cuối cùng do hàm runtime.procresize负责 khởi tạo P, nó sẽ dựa vào số lượng truyền vào để sửa đổi slice toàn cục runtime.allp lưu trữ tất cả P. Đầu tiên dựa vào kích thước số lượng để判断 xem có cần mở rộng không
if nprocs > int32(len(allp)) {
// Synchronize with retake, which could be running
// concurrently since it doesn't run on a P.
lock(&allpLock)
if nprocs <= int32(cap(allp)) {
allp = allp[:nprocs]
} else {
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}Rồi khởi tạo từng P
// initialize new P's
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}Nếu P mà coroutine hiện tại đang sử dụng cần bị hủy, thì thay thế nó bằng allp[0], do hàm runtime.acquirep hoàn thành việc liên kết M với P mới.
gp := getg()
if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
gp.m.p.ptr().status = _Prunning
gp.m.p.ptr().mcache.prepareForSweep()
} else {
if gp.m.p != 0 {
gp.m.p.ptr().m = 0
}
gp.m.p = 0
pp := allp[0]
pp.m = 0
pp.status = _Pidle
acquirep(pp)
}Sau đó hủy P không cần thiết nữa, khi hủy sẽ giải phóng tất cả tài nguyên của P, đặt tất cả G trong hàng đợi địa phương của nó vào hàng đợi toàn cục, sau khi hủy xong rồi slice allp.
// release resources from unused P's
for i := nprocs; i < old; i++ {
pp := allp[i]
pp.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
// Trim allp.
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}Cuối cùng liên kết các P rảnh thành một danh sách, và cuối cùng trả về node đầu của danh sách
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
pp := allp[i]
if gp.m.p.ptr() == pp {
continue
}
pp.status = _Pidle
if runqempty(pp) {
pidleput(pp, now)
} else {
pp.m.set(mget())
pp.link.set(runnablePs)
runnablePs = pp
}
}
return runnablePsSau đó, bộ điều phối khởi tạo xong, do runtime.worldStarted khôi phục chạy tất cả P.
MOVQ $runtime·mainPC(SB), AX // entry
PUSHQ AX
CALL runtime·newproc(SB)
POPQ AX
// start this M
CALL runtime·mstart(SB)Rồi sẽ tạo một coroutine mới thông qua hàm runtime.newproc để khởi động chương trình go, sau đó gọi runtime.mstart để chính thức khởi động chạy của bộ điều phối, nó cũng được thực hiện bằng assembly, bên trong nó sẽ gọi hàm runtime.mstart0 để khởi tạo, một phần code của hàm này như sau
gp := getg()
osStack := gp.stack.lo == 0
if osStack {
size := gp.stack.hi
if size == 0 {
size = 16384 * sys.StackGuardMultiplier
}
gp.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
gp.stack.lo = gp.stack.hi - size + 1024
}
gp.stackguard0 = gp.stack.lo + stackGuard
gp.stackguard1 = gp.stackguard0
mstart1()Lúc này M chỉ có một coroutine g0, coroutine này sử dụng system stack của thread, không phải không gian stack phân phối riêng. Hàm mstart0 sẽ khởi tạo boundary của stack G trước, rồi giao cho mstart1 để hoàn thành công việc khởi tạo còn lại.
gp := getg()
gp.sched.g = guintptr(unsafe.Pointer(gp))
gp.sched.pc = getcallerpc()
gp.sched.sp = getcallersp()
asminit()
minit()
if gp.m == &m0 {
mstartm0()
}
if fn := gp.m.mstartfn; fn != nil {
fn()
}
if gp.m != &m0 {
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}
schedule()Trước khi bắt đầu, đầu tiên sẽ ghi lại hiện trường thực thi hiện tại, vì sau khi khởi tạo thành công sẽ đi vào vòng lặp điều phối và永远 không trả về, các lời gọi khác có thể复用 hiện trường thực thi từ hàm mstart1 trả về để đạt được mục đích thoát thread. Ghi lại xong, do hai hàm runtime.asminit và runtime.minit负责 khởi tạo system stack, rồi do hàm runtime.mstartm0 thiết lập callback dùng để xử lý signal. Sau khi thực thi hàm callback m.mstartfn, hàm runtime.acquirep liên kết M với P đã tạo trước đó, cuối cùng đi vào vòng lặp điều phối.
Lời gọi runtime.schedule này là vòng lặp điều phối đầu tiên của toàn bộ runtime go, đại diện cho bộ điều phối chính thức bắt đầu làm việc.
Thread
Trong bộ điều phối, G muốn thực thi code người dùng phải dựa vào P, mà P muốn làm việc bình thường phải liên kết với một M, M chỉ là system thread.
Tạo
Việc tạo M do hàm runtime.newm hoàn thành, nó nhận một hàm và P cũng như id làm tham số, hàm làm tham số không thể là closure.
func newm(fn func(), pp *p, id int64) {
acquirem()
mp := allocm(pp, fn, id)
mp.nextp.set(pp)
mp.sigmask = initSigmask
newm1(mp)
releasem(getg().m)
}Trước khi bắt đầu, newm sẽ gọi hàm runtime.allocm để tạo biểu diễn runtime của thread也就是 M, trong quá trình sẽ sử dụng hàm runtime.mcommoninit để khởi tạo boundary của stack M.
func allocm(pp *p, fn func(), id int64) *m {
allocmLock.rlock()
// The caller owns pp, but we may borrow (i.e., acquirep) it. We must
// disable preemption to ensure it is not stolen, which would make the
// caller lose ownership.
acquirem()
gp := getg()
if gp.m.p == 0 {
acquirep(pp) // temporarily borrow p for mallocs in this function
}
mp := new(m)
mp.mstartfn = fn
mcommoninit(mp, id)
mp.g0.m = mp
releasem(gp.m)
allocmLock.runlock()
return mp
}Sau đó do hàm runtime.newm1 gọi hàm runtime.newosproc để hoàn thành việc tạo system thread thực sự.
func newm1(mp *m) {
execLock.rlock()
newosproc(mp)
execLock.runlock()
}Phần thực hiện runtime.newosproc sẽ khác nhau tùy theo hệ điều hành, cụ thể tạo như thế nào không phải là chuyện chúng ta cần quan tâm, do hệ điều hành负责, rồi do runtime.mstart để khởi động công việc của M.
Thoát
runtime.gogo(&mp.g0.sched)Lúc khởi tạo có đề cập, khi gọi hàm mstart1 đã lưu hiện trường thực thi vào trường sched của g0, truyền trường này cho hàm runtime.gogo (thực hiện bằng assembly) có thể khiến thread nhảy đến hiện trường thực thi để tiếp tục thực thi, lúc lưu dùng getcallerpc(), nên khi khôi phục hiện trường là trở về hàm mstar0.
mstart1()
if mStackIsSystemAllocated() {
osStack = true
}
mexit(osStack)Sau khi hiện trường thực thi được khôi phục, theo thứ tự thực thi sẽ đi vào hàm mexit để thoát thread.
mp := getg().m
unminit()
lock(&sched.lock)
for pprev := &allm; *pprev != nil; pprev = &(*pprev).alllink {
if *pprev == mp {
*pprev = mp.alllink
}
}
mp.freeWait.Store(freeMWait)
mp.freelink = sched.freem
sched.freem = mp
unlock(&sched.lock)
handoffp(releasep())
mdestroy(mp)
exitThread(&mp.freeWait)Nó总共 làm mấy việc chính sau
- Gọi
runtime.uminitđể撤销 công việc củaruntime.minit - Xóa M này khỏi biến toàn cục
allm - Đặt
freemcủa bộ điều phối trỏ đến M hiện tại - Do
runtime.releasep解绑 P với M hiện tại, và doruntime.handoffpđể P liên kết với M khác để tiếp tục làm việc - Do
runtime.destroy负责 hủy tài nguyên của M - Cuối cùng do hệ điều hành thoát thread
Đến đây M đã thoát thành công.
Tạm dừng
Khi vì lý do điều phối bộ điều phối, GC, system call等原因 cần tạm dừng M, sẽ gọi hàm runtime.stopm để tạm dừng thread, dưới đây là code đã đơn giản hóa.
func stopm() {
gp := getg()
lock(&sched.lock)
mput(gp.m)
unlock(&sched.lock)
mPark()
acquirep(gp.m.nextp.ptr())
gp.m.nextp = 0
}Nó đầu tiên sẽ đặt M vào danh sách M rảnh toàn cục, rồi do mPark() khiến thread hiện tại bị chặn ở notesleep(&gp.m.park) đây, khi被 đánh thức hàm này sẽ trả về
func mPark() {
gp := getg()
notesleep(&gp.m.park)
noteclear(&gp.m.park)
}M sau khi被 đánh thức sẽ đi tìm một P để liên kết nhằm tiếp tục thực thi nhiệm vụ.
Coroutine
Vòng đời của coroutine vừa vặn tương ứng với một số trạng thái của coroutine, hiểu vòng đời của coroutine sẽ rất có ích cho việc hiểu bộ điều phối, dù sao toàn bộ bộ điều phối được thiết kế xoay quanh coroutine, toàn bộ vòng đời của coroutine như hình dưới đây.

_Gcopystack là trạng thái mà coroutine có khi stack của coroutine giãn nở, sẽ讲解 trong phần Stack coroutine.
Tạo
Việc tạo coroutine từ góc độ cú pháp chỉ cần một từ khóa go cộng với một hàm.
go doSomething()Sau khi biên dịch sẽ biến thành lời gọi hàm runtime.newproc
func newproc(fn *funcval) {
gp := getg()
pc := getcallerpc()
systemstack(func() {
newg := newproc1(fn, gp, pc)
pp := getg().m.p.ptr()
runqput(pp, newg, true)
if mainStarted {
wakep()
}
})
}Do runtime.newproc1 hoàn thành việc tạo thực tế, khi tạo đầu tiên sẽ khóa M, cấm chiếm quyền, rồi sẽ đi tìm G rảnh trong danh sách gfree địa phương của P để tái sử dụng, nếu không tìm thấy sẽ tạo một G mới bằng runtime.malg, và phân phối 2kb không gian stack cho nó. Lúc này trạng thái của G là _Gdead.
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp)
if newg == nil {
newg = malg(stackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}Trong go1.18 về sau, việc copy tham số không còn do hàm newproc1 hoàn thành, trước đó, sẽ sử dụng runtime.memmove để copy tham số của hàm. Bây giờ chỉ负责 reset không gian stack của coroutine, lấy runtime.goexit làm đáy stack để nó xử lý việc thoát coroutine, rồi thiết lập PC入口 của hàm newg.startpc = fn.fn biểu thị bắt đầu thực thi từ đây, sau khi thiết lập xong, lúc này trạng thái của G là _Grunnable.
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
// caller's LR
*(*uintptr)(unsafe.Pointer(sp)) = 0
prepGoExitFrame(sp)
spArg += sys.MinFrameSize
}
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)Cuối cùng thiết lập định danh duy nhất của G, rồi giải phóng M, trả về coroutine G đã tạo.
newg.goid = pp.goidcache
pp.goidcache++
releasem(mp)
return newgSau khi coroutine được tạo, sẽ thử đặt nó vào hàng đợi địa phương của P bằng hàm runtime.runqput, nếu không đặt được thì đặt vào hàng đợi toàn cục. Trong toàn bộ quá trình tạo coroutine, trạng thái của nó đầu tiên thay đổi từ _Gidle thành _Gdead, sau khi thiết lập hàm入口 xong từ _Gdead thành _Grunnable.
Thoát
Lúc tạo go đã đặt hàm runtime.goexit làm đáy stack của coroutine, vậy khi coroutine thực thi xong cuối cùng sẽ đi vào hàm này, thông qua chuỗi gọi goexit->goexit1->goexit0, cuối cùng do runtime.goexit0负责 công việc thoát của coroutine.
func goexit0(gp *g) {
mp := getg().m
pp := mp.p.ptr()
...
casgstatus(gp, _Grunning, _Gdead)
...
gp.m = nil
locked := gp.lockedm != 0
gp.lockedm = 0
mp.lockedg = 0
gp.preemptStop = false
gp.paniconfault = false
gp._defer = nil // should be true already but just in case.
gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
gp.writebuf = nil
gp.waitreason = waitReasonZero
gp.param = nil
gp.labels = nil
gp.timer = nil
dropg()
...
gfput(pp, gp)
...
schedule()
}Hàm này chủ yếu làm mấy việc sau
- Thiết lập trạng thái thành
_Gdead - Reset giá trị trường
dropg()cắt đứt liên kết giữa M và Ggfput(pp, gp)đặt G hiện tại vào danh sách rảnh địa phương của Pschedule()thực hiện điều phối vòng mới, nhường quyền thực thi cho G khác
Sau khi thoát, trạng thái của coroutine thay đổi từ _Grunning thành _Gdead, trong tương lai khi tạo coroutine mới vẫn có thể được tái sử dụng.
System call
Khi coroutine G đang thực thi code người dùng nếu thực hiện system call, có hai cách kích hoạt system call
- System call của thư viện chuẩn
syscall - cgo call
Vì system call sẽ chặn worker thread, nên trước đó cần chuẩn bị, do hàm runtime.entersyscall hoàn thành quá trình này, nhưng cái trước cũng chỉ là một lời gọi đơn giản đến hàm runtime.reentersyscall, công việc thực tế do cái sau hoàn thành. Đầu tiên sẽ khóa M hiện tại, trong thời gian chuẩn bị G bị cấm chiếm quyền, cũng cấm stack giãn nở, thiết lập gp.stackguard0 = stackPreempt biểu thị sau khi chuẩn bị xong quyền thực thi của P sẽ bị G khác chiếm quyền, rồi lưu hiện trường thực thi của coroutine, tiện khôi phục sau khi system call trả về.
gp := getg()
// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
gp.m.locks++
// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
gp.stackguard0 = stackPreempt
gp.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
gp.syscallsp = sp
gp.syscallpc = pcSau đó, vì để ngăn chặn lâu dài ảnh hưởng đến việc thực thi của G khác, M và P sẽ解绑, M và G sau khi解绑 sẽ bị chặn vì thực thi system call, mà P sau khi解绑 có thể liên kết với M rảnh khác để G khác trong hàng đợi địa phương của P có thể tiếp tục làm việc.
casgstatus(gp, _Grunning, _Gsyscall)
gp.m.syscalltick = gp.m.p.ptr().syscalltick
pp := gp.m.p.ptr()
pp.m = 0
gp.m.oldp.set(pp)
gp.m.p = 0
atomic.Store(&pp.status, _Psyscall)
gp.m.locks--Sau khi chuẩn bị xong, giải phóng khóa của M, trong thời gian này trạng thái của G thay đổi từ _Grunning thành _Gsyscall, trạng thái của P thay đổi thành _Psyscall.
Khi system call trả về, thread M không còn bị chặn, G tương ứng cũng cần được điều phối lại để thực thi code người dùng, do hàm runtime.exitsyscall hoàn thành công việc thiện hậu này. Đầu tiên khóa M hiện tại, lấy reference của P cũ.
gp := getg()
gp.waitsince = 0
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0Lúc này chia làm hai tình huống để xử lý, tình huống đầu tiên là có P khả dụng trực tiếp không, hàm runtime.exitsyscallfast sẽ判断 P cũ có khả dụng không, tức trạng thái của P có phải là _Psyscall không, nếu không thì sẽ đi tìm P rảnh.
func exitsyscallfast(oldp *p) bool {
gp := getg()
// Freezetheworld sets stopwait but does not retake P's.
if sched.stopwait == freezeStopWait {
return false
}
// Try to re-acquire the last P.
if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
// There's a cpu for us, so we can run.
wirep(oldp)
exitsyscallfast_reacquired()
return true
}
// Try to get any other idle P.
if sched.pidle != 0 {
var ok bool
systemstack(func() {
ok = exitsyscallfast_pidle()
})
if ok {
return true
}
}
return false
}Nếu thành công tìm được P khả dụng, M sẽ liên kết với P, G chuyển từ trạng thái _Gsyscall sang trạng thái _Grunning, rồi thông qua runtime.Gosched G chủ động nhường quyền thực thi, P đi vào vòng lặp điều phối tìm G khả dụng khác.
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0
if exitsyscallfast(oldp) {
// There's a cpu for us, so we can run.
gp.m.p.ptr().syscalltick++
// We need to cas the status and scan before resuming...
casgstatus(gp, _Gsyscall, _Grunning)
// Garbage collector isn't running (since we are),
// so okay to clear syscallsp.
gp.syscallsp = 0
gp.m.locks--
if gp.preempt {
// restore the preemption request in case we've cleared it in newstack
gp.stackguard0 = stackPreempt
} else {
// otherwise restore the real stackGuard, we've spoiled it in entersyscall/entersyscallblock
gp.stackguard0 = gp.stack.lo + stackGuard
}
gp.throwsplit = false
if sched.disable.user && !schedEnabled(gp) {
// Scheduling of this goroutine is disabled.
Gosched()
}
return
}Giả sử không tìm thấy, M sẽ解绑 với G, G chuyển từ trạng thái _Gsyscall sang trạng thái _Grunnable, rồi thử lại xem có thể tìm được P rảnh không, nếu không tìm thấy thì trực tiếp đặt G vào hàng đợi toàn cục, rồi đi vào vòng lặp điều phối mới, M cũ do runtime.stopm đi vào trạng thái rảnh, đợi nhiệm vụ mới trong tương lai. Nếu tìm thấy P, M cũ và G liên kết với P mới, rồi tiếp tục thực thi code người dùng, trạng thái từ _Grunnable thành _Grunning.
func exitsyscall0(gp *g) {
casgstatus(gp, _Gsyscall, _Grunnable)
dropg()
lock(&sched.lock)
var pp *p
if schedEnabled(gp) {
pp, _ = pidleget(0)
}
var locked bool
if pp == nil {
globrunqput(gp)
}
unlock(&sched.lock)
if pp != nil {
acquirep(pp)
execute(gp, false) // Never returns.
}
stopm()
schedule() // Never returns.
}Sau khi thoát system call, trạng thái của G cuối cùng có hai kết quả, một là _Grunnable đợi被 điều phối, một là _Grunning tiếp tục chạy.
Treo
Khi coroutine hiện tại vì một số lý do bị treo, trạng thái sẽ từ _Grunnable thành _Gwaiting, có nhiều lý do bị treo, có thể là vì channel bị chặn, select, lock hoặc time.sleep, nhiều lý do hơn xem Cấu trúc G. Lấy time.Sleep làm ví dụ, nó thực tế liên kết đến runtime.timesleep, code của cái sau như sau.
func timeSleep(ns int64) {
if ns <= 0 {
return
}
gp := getg()
t := gp.timer
if t == nil {
t = new(timer)
gp.timer = t
}
t.f = goroutineReady
t.arg = gp
t.nextwhen = nanotime() + ns
if t.nextwhen < 0 { // check for overflow.
t.nextwhen = maxWhen
}
gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceBlockSleep, 1)
}Có thể thấy, nó thông qua getg lấy coroutine hiện tại, rồi thông qua runtime.gopark khiến coroutine hiện tại bị treo. runtime.gopark sẽ cập nhật nguyên nhân bị chặn của G và M, giải phóng khóa của M
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waitTraceBlockReason = traceReason
mp.waitTraceSkip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)Rồi chuyển sang system stack do runtime.park_m chuyển trạng thái của G thành _Gwaiting, rồi cắt đứt liên kết giữa M và G và đi vào vòng lặp điều phối mới để nhường quyền thực thi cho G khác. Sau khi bị treo, G không thực thi code người dùng, cũng không nằm trong hàng đợi địa phương, chỉ giữ reference đến M và P.
mp := getg().m
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
schedule()Trong hàm runtime.timesleep có một dòng code như vậy, chỉ định giá trị của t.f
t.f = goroutineReadyHàm runtime.goroutineReady này dùng để đánh thức coroutine bị treo, nó sẽ gọi hàm runtime.ready để đánh thức coroutine
status := readgstatus(gp)
// Mark runnable.
mp := acquirem()
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(mp.p.ptr(), gp, next)
wakep()
releasem(mp)Sau khi đánh thức, chuyển trạng thái của G thành _Grunnable, rồi đặt G vào hàng đợi địa phương của P đợi被 điều phối trong tương lai.
Stack coroutine
Coroutine trong ngôn ngữ go là điển hình của stack coroutine, mỗi khi khởi động một coroutine sẽ phân phối một không gian stack độc lập trên heap cho nó, và nó sẽ tăng hoặc giảm cùng với sự thay đổi của lượng sử dụng. Khi bộ điều phối khởi tạo, hàm runtime.stackinit负责 khởi tạo bộ nhớ cache stack toàn cục stackpool và stackLarge.
func stackinit() {
if _StackCacheSize&_PageMask != 0 {
throw("cache size must be a multiple of page size")
}
for i := range stackpool {
stackpool[i].item.span.init()
lockInit(&stackpool[i].item.mu, lockRankStackpool)
}
for i := range stackLarge.free {
stackLarge.free[i].init()
lockInit(&stackLarge.lock, lockRankStackLarge)
}
}Ngoài ra, mỗi P đều có bộ nhớ cache stack độc lập của riêng mình mcache
type p struct {
...
mcache *mcache
...
}
type mcache struct {
_ sys.NotInHeap
nextSample uintptr
scanAlloc uintptr
tiny uintptr
tinyoffset uintptr
tinyAllocs uintptr
alloc [numSpanClasses]*mspan
stackcache [_NumStackOrders]stackfreelist
flushGen atomic.Uint32
}Thread cache mcache là độc lập của mỗi thread và không được phân phối trên bộ nhớ heap, khi truy cập không cần加锁, ba bộ nhớ cache stack này sẽ được sử dụng khi phân phối không gian sau này.
Phân phối
Khi tạo coroutine, nếu không có coroutine có thể tái sử dụng, sẽ chọn phân phối một không gian stack mới cho nó, kích thước mặc định của nó là 2KB.
newg := gfget(pp)
if newg == nil {
newg = malg(stackMin)
casgstatus(newg, _Gidle, _Gdead)
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}Hàm负责 phân phối không gian stack là runtime.stackalloc
func stackalloc(n uint32) stackDựa vào kích thước bộ nhớ stack xin cấp có nhỏ hơn 32KB hay không chia làm hai tình huống, 32KB cũng là tiêu chuẩn判断 trong go là đối tượng nhỏ hay đối tượng lớn. Nếu nhỏ hơn giá trị này sẽ đi lấy từ bộ nhớ cache stackpool, khi M liên kết với P và M không允许 bị chiếm quyền, sẽ đi lấy từ thread cache địa phương.
if n < fixedStack<<_NumStackOrders && n < _StackCacheSize {
order := uint8(0)
n2 := n
for n2 > fixedStack {
order++
n2 >>= 1
}
var x gclinkptr
if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
lock(&stackpool[order].item.mu)
x = stackpoolalloc(order)
unlock(&stackpool[order].item.mu)
} else {
c := thisg.m.p.ptr().mcache
x = c.stackcache[order].list
if x.ptr() == nil {
stackcacherefill(c, order)
x = c.stackcache[order].list
}
c.stackcache[order].list = x.ptr().next
c.stackcache[order].size -= uintptr(n)
}
v = unsafe.Pointer(x)
}Nếu lớn hơn 32KB, sẽ đi lấy từ bộ nhớ cache stackLarge, nếu vẫn không đủ thì trực tiếp phân phối bộ nhớ trên heap.
else {
var s *mspan
npage := uintptr(n) >> _PageShift
log2npage := stacklog2(npage)
// Try to get a stack from the large stack cache.
lock(&stackLarge.lock)
if !stackLarge.free[log2npage].isEmpty() {
s = stackLarge.free[log2npage].first
stackLarge.free[log2npage].remove(s)
}
unlock(&stackLarge.lock)
lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)
if s == nil {
// Allocate a new stack from the heap.
s = mheap_.allocManual(npage, spanAllocStack)
if s == nil {
throw("out of memory")
}
osStackAlloc(s)
s.elemsize = uintptr(n)
}
v = unsafe.Pointer(s.base())
}Xong việc trả về địa chỉ thấp và địa chỉ cao của không gian stack
return stack{uintptr(v), uintptr(v) + uintptr(n)}Giãn nở
Kích thước stack coroutine mặc định là 2KB, đủ nhẹ, nên chi phí tạo một coroutine rất thấp, nhưng điều này chưa chắc đã đủ dùng, khi không gian stack không đủ dùng sẽ cần giãn nở. Compiler sẽ chèn hàm runtime.morestack ở đầu hàm để kiểm tra coroutine hiện tại có cần giãn nở stack không, nếu cần thì gọi hàm runtime.newstack để hoàn thành việc giãn nở thực sự.
TIP
Vì morestack hầu như được chèn ở đầu tất cả các hàm, nên thời điểm kiểm tra giãn nở stack cũng là một điểm chiếm quyền của coroutine.
thisg := getg()
gp := thisg.m.curg
// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2
// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
casgstatus(gp, _Grunning, _Gcopystack)
// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
copystack(gp, newsize)
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched)Có thể thấy, dung lượng stack sau khi tính toán là gấp đôi ban đầu, do hàm runtime.copystack hoàn thành công việc copy stack, trước khi copy trạng thái của G từ _Grunning chuyển thành _Gcopystack.
func copystack(gp *g, newsize uintptr) {
old := gp.stack
used := old.hi - gp.sched.sp
// allocate new stack
new := stackalloc(uint32(newsize))
// Compute adjustment.
var adjinfo adjustinfo
adjinfo.old = old
adjinfo.delta = new.hi - old.hi
// Copy the stack (or the rest of it) to the new location
memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)
// Adjust remaining structures that have pointers into stacks.
// We have to do most of these before we traceback the new
// stack because gentraceback uses them.
adjustctxt(gp, &adjinfo)
adjustdefers(gp, &adjinfo)
adjustpanics(gp, &adjinfo)
if adjinfo.sghi != 0 {
adjinfo.sghi += adjinfo.delta
}
// Swap out old stack for new one
gp.stack = new
gp.stackguard0 = new.lo + stackGuard // NOTE: might clobber a preempt request
gp.sched.sp = new.hi - used
gp.stktopsp += adjinfo.delta
// Adjust pointers in the new stack.
var u unwinder
for u.init(gp, 0); u.valid(); u.next() {
adjustframe(&u.frame, &adjinfo)
}
stackfree(old)
}Hàm này总共 làm mấy công việc sau
- Phân phối không gian stack mới
- Copy trực tiếp bộ nhớ stack cũ vào không gian stack mới thông qua
runtime.memmove, - Điều chỉnh các cấu trúc chứa con trỏ stack, ví dụ như defer, panic等
- Cập nhật trường không gian stack của G
- Điều chỉnh con trỏ trỏ đến bộ nhớ stack cũ thông qua
runtime.adjustframe - Giải phóng bộ nhớ stack cũ
Sau khi hoàn thành, trạng thái của G từ _Gcopystack chuyển thành _Grunning, và do hàm runtime.gogo để G tiếp tục thực thi code người dùng. Chính vì sự tồn tại của giãn nở stack coroutine, nên bộ nhớ trong go là không ổn định.
Co lại
Khi trạng thái của G là _Grunnable, _Gsyscall, _Gwaiting, GC sẽ quét không gian bộ nhớ stack của coroutine.
func scanstack(gp *g, gcw *gcWork) int64 {
switch readgstatus(gp) &^ _Gscan {
case _Grunnable, _Gsyscall, _Gwaiting:
// ok
}
...
if isShrinkStackSafe(gp) {
// Shrink the stack if not much of it is being used.
shrinkstack(gp)
}
...
}Công việc co stack thực tế do runtime.shrinkstack hoàn thành.
func shrinkstack(gp *g) {
if !isShrinkStackSafe(gp) {
throw("shrinkstack at bad time")
}
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize / 2
if newsize < fixedStack {
return
}
avail := gp.stack.hi - gp.stack.lo
if used := gp.stack.hi - gp.sched.sp + stackNosplit; used >= avail/4 {
return
}
copystack(gp, newsize)
}Khi không gian stack được sử dụng không đủ 1/4 ban đầu, sẽ thông qua runtime.copystack thu nhỏ nó thành 1/2 ban đầu, công việc sau đó không khác gì trước.
Stack phân đoạn
Từ quá trình copystack có thể thấy, nó sẽ copy bộ nhớ stack cũ vào một không gian stack lớn hơn, bất kể là stack ban đầu hay stack mới địa chỉ bộ nhớ của chúng đều liên tục. Trong ngôn ngữ go thời kỳ cổ đại, khi giãn nở stack做法 không giống hiện nay, lúc đó cảm thấy copy bộ nhớ quá tiêu hao hiệu suất, áp dụng思路 stack phân đoạn, nếu không gian bộ nhớ stack không đủ dùng, sẽ xin cấp một không gian stack mới, bộ nhớ stack ban đầu không được giải phóng cũng không bị copy, liên kết với nhau thông qua con trỏ, hình thành một linked list stack, đây cũng là nguồn gốc của stack phân đoạn, như hình dưới đây

Ưu điểm của làm như vậy là không cần copy stack ban đầu, nhưng nhược điểm cũng rất rõ ràng, là sẽ rất thường xuyên kích hoạt giãn nở và co stack. Khi không gian rảnh của stack space còn lại không bao nhiêu, lời gọi hàm mới sẽ kích hoạt giãn nở stack, khi các hàm này trả về, không cần không gian stack mới nữa thì lại sẽ kích hoạt co stack, giả sử tần suất gọi các hàm này rất cao, thì sẽ rất thường xuyên kích hoạt giãn nở và co stack, tổn hao hiệu suất do thao tác này gây ra là rất lớn.
Nên sau go1.4 đổi thành stack liên tục, stack liên tục vì phân phối một không gian stack dung lượng lớn hơn, không xuất hiện tình trạng bộ nhớ đã sử dụng đạt đến giá trị tới hạn vì lời gọi hàm mà thường xuyên kích hoạt giãn nở co, và vì địa chỉ bộ nhớ liên tục, theo nguyên lý cục bộ không gian của cache CPU, stack liên tục cũng thân thiện hơn với cache CPU.
Vòng lặp điều phối

Trong phần khởi tạo bộ điều phối có đề cập, trong hàm runtime.mstart1, sau khi M liên kết thành công với P sẽ đi vào vòng lặp runtime.schedule điều phối đầu tiên chính thức bắt đầu điều phối G để thực thi code người dùng. Trong vòng lặp điều phối, phần này chủ yếu là P đang phát huy tác dụng. M tương ứng với system thread, G tương ứng với hàm入口也就是 code người dùng, nhưng P không giống M và G có thực thể tương ứng, nó chỉ là một khái niệm trừu tượng,作为中间人 xử lý mối quan hệ giữa M và G.
func schedule() {
mp := getg().m
top:
pp := mp.p.ptr()
pp.preempt = false
if mp.spinning {
resetspinning()
}
gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available
execute(gp, inheritTime)
}Code trên đã được đơn giản hóa, xóa bớt nhiều điều kiện判断, điểm cốt lõi nhất chỉ có hai runtime.findRunnable và runtime.execute, cái trước负责 tìm một G, và nhất định sẽ trả về một G khả dụng, cái sau负责 để G tiếp tục thực thi code người dùng.
Đối với hàm findRunnable mà nói, nguồn G đầu tiên là hàng đợi địa phương của P
// local runq
if gp, inheritTime := runqget(pp); gp != nil {
return gp, inheritTime, false
}Nếu hàng đợi địa phương không có G, thì thử lấy từ hàng đợi toàn cục
// global runq
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(pp, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false, false
}
}Nếu không tìm thấy trong địa phương và toàn cục, sẽ thử lấy từ network poller
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
if list := netpoll(0); !list.empty() { // non-blocking
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if traceEnabled() {
traceGoUnpark(gp, 0)
}
return gp, false, false
}
}Nếu vẫn không tìm thấy, cuối cùng sẽ đi lấy G từ hàng đợi địa phương của P khác. Lúc tạo coroutine có đề cập, một nguồn lớn của G trong hàng đợi địa phương của P là coroutine con do coroutine hiện tại phái sinh, tuy nhiên không phải tất cả coroutine đều sẽ tạo coroutine con, như vậy có thể xuất hiện tình trạng một phần P rất bận, một phần P rảnh, điều này sẽ dẫn đến một tình huống, có G vì一直在 chờ mà迟迟无法 được chạy, mà bên kia P rất nhàn rỗi, không có việc gì làm. Để có thể vắt kiệt tất cả P, để chúng phát huy hiệu suất làm việc lớn nhất, khi P không tìm thấy G, sẽ đi đến hàng đợi địa phương của P khác "đánh cắp" G có thể thực thi, như vậy, mỗi P đều có thể sở hữu hàng đợi G khá đều đặn, rất ít xuất hiện tình huống P và P đứng nhìn nhau.
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
// Successfully stole.
return gp, inheritTime, false
}runtime.stealWork sẽ随机 chọn một P để đánh cắp, công việc đánh cắp thực tế do hàm runtime.runqgrab hoàn thành, nó sẽ thử đánh cắp một nửa G trong hàng đợi địa phương của P đó.
for {
h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
n := t - h
n = n - n/2
if n > uint32(len(pp.runq)/2) { // read inconsistent h and t
continue
}
for i := uint32(0); i < n; i++ {
g := pp.runq[(h+i)%uint32(len(pp.runq))]
batch[(batchHead+i)%uint32(len(batch))] = g
}
if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
return n
}
}Toàn bộ công việc đánh cắp sẽ tiến hành bốn lần, nếu bốn lần cũng không đánh cắp được G thì trả về. Nếu cuối cùng không thể tìm thấy, M hiện tại sẽ bị runtime.stopm tạm dừng, cho đến khi被 đánh thức tiếp tục lặp lại các bước trên. Khi tìm thấy và trả về một G, sẽ giao nó cho runtime.execute để chạy G.
mp := getg().m
mp.curg = gp
gp.m = mp
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + stackGuard
gogo(&gp.sched)Đầu tiên cập nhật curg của M, rồi cập nhật trạng thái của G thành _Grunning, cuối cùng giao cho runtime.gogo để khôi phục chạy của G.
Nói chung, trong vòng lặp điều phối nguồn G dựa vào ưu tiên để phân có bốn
- Hàng đợi địa phương của P
- Hàng đợi toàn cục
- Network poller
- Đánh cắp từ hàng đợi địa phương của P khác
runtime.execute sau khi thực thi sẽ không trả về, G vừa lấy được cũng sẽ không永远 thực thi下去, ở một thời điểm nào đó kích hoạt điều phối sau, quyền thực thi của nó sẽ bị tước đoạt, rồi đi vào vòng lặp điều phối mới, nhường quyền thực thi cho G khác.
Chiến lược điều phối
Thời gian thực thi code người dùng của G khác nhau có thể khác nhau, một phần G có thể耗时 rất dài, một phần G耗时 rất ngắn, G thực thi thời gian dài có thể dẫn đến G khác迟迟无法 được thực thi, nên luân phiên thực thi G mới là cách đúng đắn, cách làm việc này trong hệ điều hành được gọi là concurrent.
Điều phối hợp tác
Ý tưởng cơ bản của điều phối hợp tác là, để G tự nhường quyền thực thi cho G khác, chủ yếu có hai cách.
Cách thứ nhất là chủ động nhường quyền trong code người dùng, go cung cấp hàm runtime.Gosched(), người sử dụng có thể tự quyết định lúc nào nhường quyền thực thi, tuy nhiên trong rất nhiều trường hợp chi tiết công việc nội bộ của bộ điều phối đối với người sử dụng đều là một hộp đen, rất khó để判断到底 lúc nào nên chủ động nhường quyền, yêu cầu đối với người sử dụng khá cao, và bộ điều phối của go力求 đối với người sử dụng che giấu hầu hết chi tiết,追求 cách sử dụng đơn giản hơn, trong tình huống này để người sử dụng cũng tham gia vào công việc điều phối không phải là chuyện tốt.
Cách thứ hai là đánh dấu chiếm quyền, tuy tên của nó có chữ chiếm quyền, nhưng bản chất của nó vẫn là chiến lược điều phối hợp tác. Ý tưởng là chèn code kiểm tra chiếm quyền runtime.morestack() ở đầu hàm, quá trình chèn hoàn thành trong thời gian biên dịch,前面 có đề cập nó vốn là hàm dùng để kiểm tra giãn nở stack, vì điểm kiểm tra của nó là lời gọi của mỗi hàm, đây cũng là thời cơ tốt để kiểm tra chiếm quyền. Phần trên của hàm runtime.newstack đều đang kiểm tra chiếm quyền, phần dưới thì đang kiểm tra giãn nở stack,前面 để tránh làm nhiễu đã lược bỏ phần này, giờ hãy xem phần này làm gì. Đầu tiên sẽ判断 chiếm quyền dựa vào gp.stackguard0, nếu không cần thì tiếp tục thực thi code người dùng.
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
preempt := stackguard0 == stackPreempt
if preempt {
if !canPreemptM(thisg.m) {
gp.stackguard0 = gp.stack.lo + stackGuard
gogo(&gp.sched) // never return
}
}Khi g.stackguard0 == stackPreempt, do hàm runtime.canPreemptM()来判断 điều kiện coroutine có cần bị chiếm quyền không, code như sau,
func canPreemptM(mp *m) bool {
return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}Có thể thấy có thể bị chiếm quyền cần thỏa mãn bốn điều kiện
- M không bị khóa
- Không đang phân phối bộ nhớ
- Không禁用 chiếm quyền
- P ở trạng thái
_Prunning
Mà trong hai tình huống sau sẽ đặt g.stackguard0 thành stackPreempt
- Khi cần garbage collection
- Khi xảy ra system call
if preempt {
if gp.preemptShrink {
gp.preemptShrink = false
shrinkstack(gp)
}
// Act like goroutine called runtime.Gosched.
gopreempt_m(gp) // never return
}Cuối cùng sẽ đi đến runtime.gopreempt_m() chủ động nhường quyền thực thi của coroutine hiện tại. Đầu tiên cắt đứt liên kết giữa M và G, trạng thái thành _Grunnbale, rồi đặt G vào hàng đợi toàn cục, cuối cùng đi vào vòng lặp điều phối nhường quyền thực thi cho G khác.
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()Như vậy, tất cả coroutine khi thực hiện lời gọi hàm đều có thể đi vào hàm này để kiểm tra chiếm quyền, chiến lược này phải dựa vào thời cơ lời gọi hàm mới có thể kích hoạt chiếm quyền và chủ động nhường quyền. Trước 1.14, go vẫn沿用 chiến lược điều phối này, nhưng như vậy sẽ có một vấn đề, nếu không có lời gọi hàm, thì không thể kiểm tra了, ví dụ đoạn code kinh điển dưới đây, nên xuất hiện trong rất nhiều tutorial
func main() {
// 限制P的数量只能为1
runtime.GOMAXPROCS(1)
// 协程1
go func() {
for {
// 该协程不停的空转
}
}()
// 进入系统调用,主协程让权给其它协程
time.Sleep(time.Millisecond)
println("exit")
}Code trong đó tạo một coroutine 1 quay vòng rỗng, rồi coroutine chính vì system call chủ động nhường quyền, lúc này coroutine 1 đang được điều phối, nhưng vì nó根本就不 gọi hàm, cũng không thể kiểm tra chiếm quyền, vì P chỉ có một, không có P rảnh khác, như vậy sẽ dẫn đến coroutine chính永远无法 được điều phối, exit cũng永远无法 output, tuy nhiên vấn đề này cũng chỉ限于 trước go1.14.
Điều phối chiếm quyền
Chính thức trong go1.14 thêm chiến lược điều phối chiếm quyền dựa trên signal, đây là một chiến lược chiếm quyền bất đồng bộ, thông qua cách gửi signal của thread bất đồng bộ để thực hiện chiếm quyền thread, điều phối chiếm quyền dựa trên signal hiện tại chỉ có hai入口,分别是 system monitor và GC.
Trong vòng lặp của system monitor, sẽ duyệt từng P, nếu thời gian thực thi của G mà P đang điều phối vượt quá 10ms, sẽ cưỡng chế kích hoạt chiếm quyền. Phần công việc này do hàm runtime.retake hoàn thành, dưới đây là code sau khi đơn giản hóa.
func retake(now int64) uint32 {
n := 0
lock(&allpLock)
for i := 0; i < len(allp); i++ {
pp := allp[i]
if pp == nil {
continue
}
pd := &pp.sysmontick
s := pp.status
sysretake := false
if s == _Prunning || s == _Psyscall {
// Preempt G if it's running for too long.
t := int64(pp.schedtick)
if int64(pd.schedtick) != t {
pd.schedtick = uint32(t)
pd.schedwhen = now
} else if pd.schedwhen+forcePreemptNS <= now {
preemptone(pp)
sysretake = true
}
}
}
unlock(&allpLock)
return uint32(n)
}Khi cần garbage collection, nếu trạng thái của G là _Grunning,也就是 vẫn đang chạy, cũng sẽ kích hoạt chiếm quyền.
func suspendG(gp *g) suspendGState {
for i := 0; ; i++ {
switch s := readgstatus(gp); s {
case _Grunning:
gp.preemptStop = true
gp.preempt = true
gp.stackguard0 = stackPreempt
casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)
if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
now := nanotime()
if now >= nextPreemptM {
nextPreemptM = now + yieldDelay/2
preemptM(asyncM)
}
}
......
......
func preemptM(mp *m) {
if mp.signalPending.CompareAndSwap(0, 1) {
if GOOS == "darwin" || GOOS == "ios" {
pendingPreemptSignals.Add(1)
}
signalM(mp, sigPreempt)
}
}Hai入口 chiếm quyền cuối cùng đều sẽ đi vào hàm runtime.preemptM, do nó hoàn thành việc gửi signal chiếm quyền. Khi signal thành công gửi sau, callback handler signal runtime.sighandler đã đăng ký trong runtime.mstart thông qua runtime.initsig sẽ phát huy tác dụng, nếu kiểm tra phát hiện gửi là signal chiếm quyền, sẽ bắt đầu chiếm quyền.
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
...
if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
// Might be a preemption signal.
doSigPreempt(gp, c)
}
...
}doSigPreempt sẽ sửa đổi context của coroutine mục tiêu, inject gọi runtime.asyncPreempt.
func doSigPreempt(gp *g, ctxt *sigctxt) {
// Check if this G wants to be preempted and is safe to
// preempt.
if wantAsyncPreempt(gp) {
if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
// Adjust the PC and inject a call to asyncPreempt.
ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
}
}
...Như vậy khi chuyển lại code người dùng, coroutine mục tiêu sẽ đi đến hàm runtime.asyncPreempt, trong hàm này liên quan đến lời gọi runtime.asyncPreempt2.
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
PUSHQ BP
MOVQ SP, BP
// Save flags before clobbering them
PUSHFQ
// obj doesn't understand ADD/SUB on SP, but does understand ADJSP
ADJSP $368
// But vet doesn't know ADJSP, so suppress vet stack checking
...
CALL ·asyncPreempt2(SB)
...
RETNó sẽ khiến coroutine hiện tại dừng làm việc và thực hiện một vòng lặp điều phối mới để nhường quyền thực thi cho coroutine khác.
func asyncPreempt2() {
gp := getg()
gp.asyncSafePoint = true
if gp.preemptStop {
mcall(preemptPark)
} else {
mcall(gopreempt_m)
}
gp.asyncSafePoint = false
}Toàn bộ quá trình này đều xảy ra trong hàm runtime.asyncPreempt, nó được thực hiện bằng assembly (nằm trong runtime/preempt_*.s) và sẽ khôi phục context của coroutine đã bị sửa đổi trước đó sau khi điều phối hoàn thành, để coroutine này có thể khôi phục bình thường trong tương lai. Sau khi áp dụng chiến lược chiếm quyền bất đồng bộ, ví dụ trước đó就不再 sẽ bị chặn vĩnh viễn coroutine chính, khi coroutine quay vòng chạy一定 thời gian sau sẽ bị cưỡng chế thực thi vòng lặp điều phối,从而 nhường quyền thực thi cho coroutine chính, cuối cùng để chương trình có thể kết thúc bình thường.
Tóm tắt
Nói chung, thời điểm kích hoạt điều phối có mấy loại sau:
- Lời gọi hàm
- System call
- System monitor
- Garbage collection, garbage collection đối với coroutine thực thi thời gian quá dài cũng sẽ thực hiện chiếm quyền
- Coroutine vì pipeline, lock等原因 mà bị treo
Chiến lược điều phối chủ yếu là hai loại lớn, điều phối hợp tác và chiếm quyền, điều phối hợp tác là chủ động nhường quyền thực thi, chiếm quyền là异步 chiếm quyền thực thi, hai cái cùng tồn tại mới hình thành bộ điều phối ngày nay.
