Skip to content

gmp

go 語言最大的特點之一就是它對於並發的天然支持,僅需一個關鍵字就可以開啟一個協程,就像下面例子所演示的一樣。

go
import (
  "fmt"
  "sync"
)

func main() {
  var wg sync.WaitGroup
  wg.Add(2)

  go func() {
    defer wg.Done()
    fmt.Println("hello world!")
  }()
  go func() {
    defer wg.Done()
    fmt.Println("hello world too!")
  }()

  wg.Wait()
}

go 語言的協程使用起來如此的簡單,對於開發者來說幾乎不需要做什麼額外的工作,這也是它受歡迎的原因之一。不過在簡單的背後,是一個並不簡單的並發調度器在支撐著這一切,它的名字相信各位或多或少都應該聽說過,因為其主要參與者分別由 G(協程),M(系統線程),P(處理器)這三個成員組成,所以也被稱為 GMP 調度器。GMP 調度器的設計影響著整個 go 語言運行時的設計,GC,網絡輪詢器,可以說它就是整個語言最核心的一塊,如果對它能夠有一定的了解,在日後的工作中說不定會有些許幫助。

歷史

Go 語言的並發調度模型並發不是完全原創的,它吸收了很多前人的經驗和教訓,經過不斷發展和改進才有了現在的樣子。它借鑑過的語言有下面這些:

  • Occam -1983
  • Erlang - 1986
  • Newsqueak - 1988
  • Concurrent ML - 1993
  • Alef - 1995
  • Limbo - 1996

影響最為巨大的還是霍爾在 1978 年發表了一篇關於 CSP(Communicate Sequential Process)的論文,該論文的基本思想是進程與進程之間通過通信來進行數據的交換。在上面的幾門編程語言中無一不受到了 CSP 思想的影響,Erlang 就是最為典型的一個面向消息的編程語言,著名開源消息隊列中間件 RabbitMQ 就是采用 Erlang 編寫的。到了現如今,隨著的計算機和互聯網的發展,並發支持幾乎已經成為了一個現代語言的標配,結合了 CSP 思想的 go 語言便應運而生。

調度模型

首先來簡單的介紹下 GMP 成的三個成員

  • G,Goroutine,指的是 go 語言中的協程
  • M,Machine,指是系統線程或者叫工作線程(worker thread),由操作系統來負責調度
  • P,Processor,並非指 CPU 處理器,是 go 自己抽象的一個概念,指的是工作在系統線程上的處理器,通過它來調度每一個系統線程上的協程。

協程就是一種更加輕量的線程,規模更小,所需的資源也會更少,創建和銷毀和調度的時機都是由 go 語言運行時來完成,而並非操作系統,所以它的管理成本要比線程低很多。不過協程也是依附於線程的,協程執行所需的時間片來自於線程,線程所需的時間片來自於操作系統,而不同的線程間的切換是有一定成本的,如何讓協程利用好線程的時間片就是設計的關鍵所在。

1:N

解決問題的最好辦法就是忽略這個問題,既然線程切換是有成本的,那直接不切換就行了。將所有的協程都分配到一個內核線程上,這樣就只涉及到了協程間的切換。

線程與協程間的關系是1:N,這樣做有一個非常明顯的缺點,當今時代的計算機幾乎都是多核 CPU,這樣的分配無法利用充分利用多核 CPU 的性能。

N:N

另一種方法,一個線程對應一個協程,一個協程可以享受該線程的所有時間片,多個線程也可以利用好多核 CPU 的性能。但是,線程的創建和切換成本是比較高的,如果是一比一的關系,反而沒有利用好協程的輕量這一優勢。

M:N

M 個線程對應 N 個協程,且 M 小於 N。多個線程對應多個協程,每一個線程都會對應若干個協程,由處理器 P 來負責調度協程 G 如何使用線程的時間片。這種方法是相對而言比較好的一種,也是 Go 一直沿用至今的調度模型。

M 只有與處理器 P 關聯後才能執行任務,go 會創建GOMAXPROCS個處理器,所以真正能夠用於執行任務的線程數量就是GOMAXPROCS個,它的默認值是當前機器上的 CPU 邏輯核數,我們也可以手動去設置它的值。

  • 通過代碼修改runtime.GOMAXPROCS(N),並且可以在運行時動態調整,調用後直接 STW。

  • 設置環境變量export GOMAXPROCS=N,靜態。

在實際情況下,M 的數量會大於 P 的數量,因為在運行時會需要它們去處理其它的任務,比如一些系統調用,最大值是 10000。

GMP 這三個參與者以及調度器本身在運行時都有其對應的類型表示,它們都位於runtime/runtime2.go文件中,下面會對其結構進行簡單的介紹,方便在後面進行理解。

G

G 在運行時的表現類型是runtime.g結構體,是調度模型中最基本的調度單元,其結構如下所示,為了方便理解,刪去了不少的字段。

go
type g struct {
    stack stack // offset known to runtime/cgo

    _panic   *_panic // innermost panic - offset known to liblink
    _defer   *_defer // innermost defer
    m        *m      // current m; offset known to arm liblink
    sched    gobuf

    goid       uint64
    waitsince  int64      // approx time when the g become blocked
    waitreason waitReason // if status==Gwaiting

    atomicstatus atomic.Uint32

    preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
    startpc       uintptr         // pc of goroutine function

    parentGoid uint64  // goid of goroutine that created this goroutine
    waiting    *sudog  // sudog structures this g is waiting on (that have a valid elem ptr); in lock order
}

第一個字段就是屬於該協程的棧的內存起始地址和結束地址

go
type stack struct {
  lo uintptr
  hi uintptr
}

_panic_defer是分別指向panic棧和defer棧的指針

go
_panic   *_panic // innermost panic - offset known to liblink
_defer   *_defer // innermost defer

m正在執行當前 g 的協程

go
m        *m      // current m; offset known to arm liblink

preempt表示當前協程是否需要被搶佔,等價於g.stackguard0 = stackpreempt

go
preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt

atomicstatus用於存儲協程 G 的狀態值,它有以下可選的值

名稱描述
_Gidle剛被分配,且未被初始化
_Grunnable表示當前協程可以運行,位於等待隊列中
_Grunning表示當前協程正在執行用戶代碼
_Gsyscall被分配了一個 M,用於執行系統調用,
_Gwaiting協程阻塞,阻塞的原因見下文
_Gdead表示當前協程未被使用,可能剛剛退出,也可能剛剛初始化
_Gcopystack表示協程棧正在移動,在此期間不執行用戶代碼,也不位於等待隊列中
_Gpreempted阻塞自身進入搶佔,等待被搶佔方喚醒
_GscanGC 正在掃描協程棧空間,可以其它狀態共存

sched用於存儲協程上下文信息用於恢復協程的執行現場,可以看到裡面存儲著sppcret指針。

go
type gobuf struct {
  sp   uintptr
  pc   uintptr
  g    guintptr
  ctxt unsafe.Pointer
  ret  uintptr
  lr   uintptr
  bp   uintptr // for framepointer-enabled architectures
}

waiting 表示當前協程正在等待的協程,waitsince記錄了協程發生阻塞的時刻,waitreason表示協程阻塞的原因,可選的值如下。

go
var waitReasonStrings = [...]string{
  waitReasonZero:                  "",
  waitReasonGCAssistMarking:       "GC assist marking",
  waitReasonIOWait:                "IO wait",
  waitReasonChanReceiveNilChan:    "chan receive (nil chan)",
  waitReasonChanSendNilChan:       "chan send (nil chan)",
  waitReasonDumpingHeap:           "dumping heap",
  waitReasonGarbageCollection:     "garbage collection",
  waitReasonGarbageCollectionScan: "garbage collection scan",
  waitReasonPanicWait:             "panicwait",
  waitReasonSelect:                "select",
  waitReasonSelectNoCases:         "select (no cases)",
  waitReasonGCAssistWait:          "GC assist wait",
  waitReasonGCSweepWait:           "GC sweep wait",
  waitReasonGCScavengeWait:        "GC scavenge wait",
  waitReasonChanReceive:           "chan receive",
  waitReasonChanSend:              "chan send",
  waitReasonFinalizerWait:         "finalizer wait",
  waitReasonForceGCIdle:           "force gc (idle)",
  waitReasonSemacquire:            "semacquire",
  waitReasonSleep:                 "sleep",
  waitReasonSyncCondWait:          "sync.Cond.Wait",
  waitReasonSyncMutexLock:         "sync.Mutex.Lock",
  waitReasonSyncRWMutexRLock:      "sync.RWMutex.RLock",
  waitReasonSyncRWMutexLock:       "sync.RWMutex.Lock",
  waitReasonTraceReaderBlocked:    "trace reader (blocked)",
  waitReasonWaitForGCCycle:        "wait for GC cycle",
  waitReasonGCWorkerIdle:          "GC worker (idle)",
  waitReasonGCWorkerActive:        "GC worker (active)",
  waitReasonPreempted:             "preempted",
  waitReasonDebugCall:             "debug call",
  waitReasonGCMarkTermination:     "GC mark termination",
  waitReasonStoppingTheWorld:      "stopping the world",
}

goidparentGoid表示當前協程和父協程的唯一標識,startpc表示當前協程入口函數的地址。

M

M在運行時表現為runtime.m結構體,是對工作線程的抽象

go
type m struct {
    id            int64

    g0 *g // goroutine with scheduling stack
    curg          *g           // current running goroutine

    gsignal       *g           // signal-handling g
    goSigStack    gsignalStack // Go-allocated signal handling stack

    p             puintptr     // attached p for executing go code (nil if not executing go code)
    nextp         puintptr
    oldp          puintptr // the p that was attached before executing a syscall

    mallocing     int32
    throwing      throwType
    preemptoff    string // if != "", keep curg running on this m
    locks         int32
    dying         int32

    spinning      bool // m is out of work and is actively looking for work

    tls           [tlsSlots]uintptr
    ...
}

同樣的,M 內部的字段也有很多,這裡僅介紹部分字段方便理解。

  • id,M 的唯一標識符
  • g0,擁有調度棧的協程
  • curg,正在工作線程上運行的用戶協程
  • gsignal,負責處理線程信號的協程
  • goSigStack,go 分配的用於信號處理的棧空間
  • p,處理器 P 的地址,oldp指向在執行系統調用前的 P,nextp指向新分配的 P
  • mallocing,用於表示當前是否正在分配新的內存空間
  • throwing,表示當 M 發生的錯誤類型
  • preemptoff,搶佔標識符,當它為空串時表示當前正在運行的協程可以被搶佔
  • locks,表示當前 M"鎖"的數量,不為 0 時禁止搶佔
  • dying,表示 M 發生了無法挽回的panic,有[0,3]四個可選值,從低到高表示嚴重程度。
  • spinning,表示 M 正處於空閒狀態,並且隨時可用。
  • tls,線程本地存儲

P

P 在運行時由runtime.p表示,負責調度 M 與 G 之間的工作,其結構如下所示

go
type p struct {
    id     int32
    status uint32 // one of pidle/prunning/...

    schedtick   uint32     // incremented on every scheduler call
    syscalltick uint32     // incremented on every system call
    sysmontick  sysmontick // last tick observed by sysmon

    m      muintptr // back-link to associated m (nil if idle)

    // Queue of runnable goroutines. Accessed without lock.
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr

    runnext guintptr

    // Available G's (status == Gdead)
    gFree struct {
        gList
        n int32
    }

    // preempt is set to indicate that this P should be enter the
    // scheduler ASAP (regardless of what G is running on it).
    preempt bool

    ...
}

status表示 P 的狀態,有以下幾個可選值

描述
_PidleP 位於空閒狀態,可以被調度器分配 M,也有可能只是在其它狀態間轉換
_PrunningP 與 M 關聯,並且正在執行用戶代碼
_Psyscall表示與 P 關聯的 M 正在進行系統調用,在此期間 P 可能會被其它的 M 搶佔
_Pgcstop表示 P 因 GC 而停止
_PdeadP 的大部分資源都被剝奪,將不再會被使用

下面幾個字段記錄了 P 中的runq本地隊列,可以看到本地隊列的最大數量是 256,超過此數量後 G 會被放到全局隊列中去。

go
runqhead uint32
runqtail uint32
runq     [256]guintptr

runnext表示下一個可用的 G

runnext guintptr

其它的幾個字段釋義如下

  • id,P 的唯一標識符
  • schedtick,隨著協程調度次數的增加而增加,在runtime.execute函數中可見。
  • syscalltick,隨著系統調用的次數增加而增加
  • sysmontick,記錄了上一次被系統監控觀察的信息
  • m,與 P 關聯的 M
  • gFree,空閒的 G 列表
  • preempt,表示 P 應該再次進入調度

全局隊列的信息則存放在runtime.schedt結構體中,是調度器在運行時的表示形式,如下。

go
type schedt struct {
  ...

  midle        muintptr // idle m's waiting for work
  ngsys atomic.Int32 // number of system goroutines
  pidle        puintptr // idle p's

  // Global runnable queue.
  runq     gQueue
  runqsize int32

  ...
}

初始化

調度器的初始化位於 go 程序的引導階段,負責引導 go 程序的就是runtime.rt0_go函數,它由匯編實現位於文件runtime/asm_*.s中,部分代碼如下

TEXT runtime·rt0_go(SB),NOSPLIT|NOFRAME|TOPFRAME,$0
    ...
    ...
  CALL  runtime·check(SB)

  MOVL  24(SP), AX    // copy argc
  MOVL  AX, 0(SP)
  MOVQ  32(SP), AX    // copy argv
  MOVQ  AX, 8(SP)
  CALL  runtime·args(SB)
  CALL  runtime·osinit(SB)
  CALL  runtime·schedinit(SB)

  // create a new goroutine to start program
  MOVQ  $runtime·mainPC(SB), AX    // entry
  PUSHQ  AX
  CALL  runtime·newproc(SB)
  POPQ  AX

  // start this M
  CALL  runtime·mstart(SB)

  CALL  runtime·abort(SB)  // mstart should never return
  RET

可以通過下面兩行可以看到對runtime·osinitruntime·schedinit的調用。

CALL  runtime·osinit(SB)
CALL  runtime·schedinit(SB)

前者負責初始化操作系統相關的工作,後者負責調度器的初始化,也就是runtime·schedinit函數。它會在程序啟動時負責初始化調度器運行所需的資源,下面是簡化後的代碼。

go
func schedinit() {
    ...
  gp := getg()

  sched.maxmcount = 10000

  // The world starts stopped.
  worldStopped()
  ...
    stackinit()
  mallocinit()
  mcommoninit(gp.m, -1)

    lock(&sched.lock)
  procs := ncpu
  if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n
  }
  if procresize(procs) != nil {
    throw("unknown runnable goroutine during bootstrap")
  }
  unlock(&sched.lock)
  ...
  // World is effectively started now, as P's can run.
  worldStarted()
    ...
}

runtime.getg函數由匯編實現,它的功能是獲取到當前協程的運行時表示,也就是runtime.g結構體的指針。通過sched.maxmcount = 10000可以看到,在調度器初始化的時候就設置了 M 的最大數量為 10000,這個值是固定的且沒法修改。再之後就是初始化堆棧,然後才是runtime.mcommoninit函數來初始化 M,其函數實現如下

go
func mcommoninit(mp *m, id int64) {
  gp := getg()

  // g0 stack won't make sense for user (and is not necessary unwindable).
  if gp != gp.m.g0 {
    callers(1, mp.createstack[:])
  }

  lock(&sched.lock)

  if id >= 0 {
    mp.id = id
  } else {
    mp.id = mReserveID()
  }

  ...

  mpreinit(mp)
  if mp.gsignal != nil {
    mp.gsignal.stackguard1 = mp.gsignal.stack.lo + stackGuard
  }

  // Add to allm so garbage collector doesn't free g->m
  // when it is just in a register or thread-local storage.
  mp.alllink = allm

  // NumCgoCall() iterates over allm w/o schedlock,
  // so we need to publish it safely.
  atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
  unlock(&sched.lock)
  ...
}

該函數對 M 進行預初始化,主要做了以下工作

  1. 分配 M 的 id
  2. 單獨分配一個 G 用來處理線程信號,由runtime.mpreinit函數完成
  3. 將其作為全局 M 鏈表runtime.allm的頭結點

接下來初始化 P,其數量默認是 CPU 的邏輯核數,其次是環境變量的值。

go
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
    procs = n
}
if procresize(procs) != nil {
    throw("unknown runnable goroutine during bootstrap")
}

最後由runtime.procresize函數來負責初始化 P,它會根據傳入的數量來修改runtime.allp這個存放所有 P 的全局切片。首先根據數量大小判斷是否需要擴容

go
if nprocs > int32(len(allp)) {
    // Synchronize with retake, which could be running
    // concurrently since it doesn't run on a P.
    lock(&allpLock)
    if nprocs <= int32(cap(allp)) {
      allp = allp[:nprocs]
    } else {
      nallp := make([]*p, nprocs)
      // Copy everything up to allp's cap so we
      // never lose old allocated Ps.
      copy(nallp, allp[:cap(allp)])
      allp = nallp
    }
    unlock(&allpLock)
}

然後再初始化每一個 P

go
// initialize new P's
for i := old; i < nprocs; i++ {
    pp := allp[i]
    if pp == nil {
        pp = new(p)
    }
    pp.init(i)
    atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}

如果當前協程正在使用的 P 需要被銷毀,則將其替換為allp[0],由runtime.acquirep函數來完成 M 與新 P 的關聯。

go
gp := getg()
if gp.m.p != 0 && gp.m.p.ptr().id < nprocs {
    gp.m.p.ptr().status = _Prunning
    gp.m.p.ptr().mcache.prepareForSweep()
} else {
    if gp.m.p != 0 {
        gp.m.p.ptr().m = 0
    }
    gp.m.p = 0
    pp := allp[0]
    pp.m = 0
    pp.status = _Pidle
    acquirep(pp)
}

隨後銷毀不再需要的 P,在銷毀時會釋放 P 的所有資源,將其本地隊列中所有的 G 放入全局隊列中,銷毀完畢然後對allp進行切片。

go
// release resources from unused P's
for i := nprocs; i < old; i++ {
    pp := allp[i]
    pp.destroy()
    // can't free P itself because it can be referenced by an M in syscall
}

// Trim allp.
if int32(len(allp)) != nprocs {
    lock(&allpLock)
    allp = allp[:nprocs]
    unlock(&allpLock)
}

最後將空閒的 P 鏈接成一個鏈表,並最終返回鏈表的頭結點

go
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
    pp := allp[i]
    if gp.m.p.ptr() == pp {
        continue
    }
    pp.status = _Pidle
    if runqempty(pp) {
        pidleput(pp, now)
    } else {
        pp.m.set(mget())
        pp.link.set(runnablePs)
        runnablePs = pp
    }
}
return runnablePs

再之後,調度器初始化完畢,由runtime.worldStarted將所有的 P 恢復運行。

MOVQ  $runtime·mainPC(SB), AX    // entry
PUSHQ  AX
CALL  runtime·newproc(SB)
POPQ  AX

// start this M
CALL  runtime·mstart(SB)

然後會通過runtime.newproc函數創建一個新的協程來啟動 go 程序,隨後調用runtime.mstart來正式啟動調度器的運行,它同樣是由匯編實現,其內部會調用runtime.mstart0函數進行創建,該函數部分代碼如下

go
gp := getg()
osStack := gp.stack.lo == 0
if osStack {
    size := gp.stack.hi
    if size == 0 {
        size = 16384 * sys.StackGuardMultiplier
    }
    gp.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
    gp.stack.lo = gp.stack.hi - size + 1024
}
gp.stackguard0 = gp.stack.lo + stackGuard
gp.stackguard1 = gp.stackguard0
mstart1()

此時的 M 只有擁有一個協程g0,該協程使用線程的系統棧,並非單獨分配的棧空間。mstart0函數會先初始化 G 的棧邊界,然後交給mstart1去完成剩下的初始化工作。

go
gp := getg()

gp.sched.g = guintptr(unsafe.Pointer(gp))
gp.sched.pc = getcallerpc()
gp.sched.sp = getcallersp()

asminit()
minit()

if gp.m == &m0 {
    mstartm0()
}

if fn := gp.m.mstartfn; fn != nil {
    fn()
}

if gp.m != &m0 {
    acquirep(gp.m.nextp.ptr())
    gp.m.nextp = 0
}
schedule()

在開始之前,首先會記錄當前的執行現場,因為初始化成功之後就會進入調度循環並且永遠也不會返回,其它的調用可以復用執行現場從mstart1函數返回達到退出線程的目的。記錄完畢後,由runtime.asminitruntime.minit兩個函數負責初始化系統棧,然後由runtime.mstartm0函數設置用於處理信號的回調。執行回調函數m.mstartfn後 ,runtime.acquirep函數將 M 與先前創建好的 P 進行關聯,最後進入調度循環。

這裡調用的runtime.schudule是整個 go 運行時的第一輪調度循環,代表著調度器正式開始工作。

線程

在調度器中,G 想要執行用戶代碼要依靠 P,而 P 要正常工作必須要與一個 M 關聯,M 指的就是系統線程。

創建

M 的創建是由函數runtime.newm完成的,它接受一個函數和 P 以及 id 作為參數,作為參數的函數不能閉包。

go
func newm(fn func(), pp *p, id int64) {
  acquirem()
  mp := allocm(pp, fn, id)
  mp.nextp.set(pp)
  mp.sigmask = initSigmask
  newm1(mp)
  releasem(getg().m)
}

在開始前,newm會先調用runtime.allocm函數來創建線程的運行時表示也就是 M,在過程中會使用runtime.mcommoninit函數來初始化 M 的棧邊界。

go
func allocm(pp *p, fn func(), id int64) *m  {
    allocmLock.rlock()

    // The caller owns pp, but we may borrow (i.e., acquirep) it. We must
    // disable preemption to ensure it is not stolen, which would make the
    // caller lose ownership.
    acquirem()

    gp := getg()
    if gp.m.p == 0 {
        acquirep(pp) // temporarily borrow p for mallocs in this function
    }

    mp := new(m)
    mp.mstartfn = fn
    mcommoninit(mp, id)

    mp.g0.m = mp

    releasem(gp.m)
    allocmLock.runlock()
    return mp
}

再之後由runtime.newm1調用runtime.newosproc函數來完成真正的系統線程的創建。

go
func newm1(mp *m) {
  execLock.rlock()
  newosproc(mp)
  execLock.runlock()
}

runtim.newosproc的實現會根據操作系統的不同而不同,具體怎麼創建就不是我們要關心的事了,由操作系統負責,然後由runtime.mstart來啟動 M 的工作。

退出

go
runtime.gogo(&mp.g0.sched)

在初始的時候提到過,在調用mstart1函數時將執行現場保存在了 g0sched字段中,將該字段傳給runtime.gogo函數(匯編實現)就可以讓線程跳到執行現場繼續執行,在保存的時候用的是getcallerpc(),所以恢復現場的時候是回到了mstar0函數。

go
mstart1()

if mStackIsSystemAllocated() {
    osStack = true
}
mexit(osStack)

執行現場恢復後,按照執行順序就會進入mexit函數來退出線程。

go
mp := getg().m

unminit()

lock(&sched.lock)
for pprev := &allm; *pprev != nil; pprev = &(*pprev).alllink {
    if *pprev == mp {
        *pprev = mp.alllink
    }
}

mp.freeWait.Store(freeMWait)
mp.freelink = sched.freem
sched.freem = mp
unlock(&sched.lock)

handoffp(releasep())

mdestroy(mp)

exitThread(&mp.freeWait)

它總共做了以下幾個主要的事情

  1. 調用runtime.uminit來撤銷runtime.minit的工作
  2. 從全局變量allm中刪除該 M
  3. 將調度器的freem指向當前 M
  4. runtime.releasep將 P 與當前 M 解綁,並由runtime.handoffp讓 P 跟其它的 M 綁定以繼續工作
  5. runtime.destroy負責銷毀 M 的資源
  6. 最後由操作系統來退出線程

到此 M 就成功退出了。

暫停

當因為調度器調度,GC,系統調用等原因需要暫停 M 時,就會調用runtime.stopm函數來暫停線程,下面是簡化後的代碼。

go
func stopm() {
  gp := getg()
  lock(&sched.lock)
  mput(gp.m)
  unlock(&sched.lock)
  mPark()
  acquirep(gp.m.nextp.ptr())
  gp.m.nextp = 0
}

它首先會將 M 放入全局的空閒 M 列表中,然後由mPark()將當前線程阻塞在notesleep(&gp.m.park)這裡,當被喚醒過後該函數就會返回

go
func mPark() {
  gp := getg()
  notesleep(&gp.m.park)
  noteclear(&gp.m.park)
}

喚醒後的 M 會去尋找一個 P 進行綁定從而繼續執行任務。

協程

協程的生命周期剛好對應著協程的幾個狀態,了解協程的生命周期對了解調度器會很有幫助,畢竟整個調度器就是圍繞著協程來設計的,整個協程的生命周期就如下圖所示。

_Gcopystack是協程棧擴張時具有的狀態,在協程棧部分進行講解。

創建

協程的創建從語法層面上來講只需要一個go關鍵字加一個函數。

go
go doSomething()

編譯後會變成runtime.newproc函數的調用

go
func newproc(fn *funcval) {
  gp := getg()
  pc := getcallerpc()
  systemstack(func() {
    newg := newproc1(fn, gp, pc)

    pp := getg().m.p.ptr()
    runqput(pp, newg, true)

    if mainStarted {
      wakep()
    }
  })
}

runtime.newproc1來完成實際的創建,在創建時首先會鎖住 M,禁止搶佔,然後會去 P 的本地gfree列表中尋找空閒的 G 來重復利用,如果找不到就由runtime.malg 創建一個新的 G,並為其分配 2kb 的棧空間。此時 G 的狀態為_Gdead

go
mp := acquirem() // disable preemption because we hold M and P in local vars.
pp := mp.p.ptr()
newg := gfget(pp)
if newg == nil {
    newg = malg(stackMin)
    casgstatus(newg, _Gidle, _Gdead)
    allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}

在 go1.18 及以後,參數的拷貝不再由newproc1函數完成,在這之前,會使用runtime.memmove來拷貝函數的參數。現在的話只是負責重置協程的棧空間,將runtime.goexit作為棧底由它來進行協程的退出處理,然後設置入口函數的 PCnewg.startpc = fn.fn表示從這裡開始執行,設置完成後,此時 G 的狀態為_Grunnable

go
totalSize := uintptr(4*goarch.PtrSize + sys.MinFrameSize) // extra space in case of reads slightly beyond frame
totalSize = alignUp(totalSize, sys.StackAlign)
sp := newg.stack.hi - totalSize
spArg := sp
if usesLR {
    // caller's LR
    *(*uintptr)(unsafe.Pointer(sp)) = 0
    prepGoExitFrame(sp)
    spArg += sys.MinFrameSize
}

memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
newg.sched.sp = sp
newg.stktopsp = sp
newg.sched.pc = abi.FuncPCABI0(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
newg.parentGoid = callergp.goid
newg.gopc = callerpc
newg.ancestors = saveAncestors(callergp)
newg.startpc = fn.fn
casgstatus(newg, _Gdead, _Grunnable)

最後設置 G 的唯一標識符,然後釋放 M,返回創建的協程 G。

go
newg.goid = pp.goidcache
pp.goidcache++
releasem(mp)

return newg

協程創建完畢後,會由runtime.runqput函數嘗試將其放入 P 的本地隊列中,如果放不下就放到全局隊列中。在協程創建的整個過程中,其狀態變化首先是由_Gidle變為_Gdead,設置好入口函數後由_Gdead變為_Grunnable

退出

在創建的時候 go 就已經將runtime.goexit函數作為協程的棧底,那麼當協程執行完畢後最終就會走入該函數,經過調用鏈goexit->goexit1->goexit0,最終由runtime.goexit0來負責協程的退出工作。

go
func goexit0(gp *g) {
  mp := getg().m
  pp := mp.p.ptr()
  ...
  casgstatus(gp, _Grunning, _Gdead)
  ...
  gp.m = nil
  locked := gp.lockedm != 0
  gp.lockedm = 0
  mp.lockedg = 0
  gp.preemptStop = false
  gp.paniconfault = false
  gp._defer = nil // should be true already but just in case.
  gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
  gp.writebuf = nil
  gp.waitreason = waitReasonZero
  gp.param = nil
  gp.labels = nil
  gp.timer = nil

  dropg()
  ...
  gfput(pp, gp)
    ...
  schedule()
}

該函數主要做了下面幾個事情

  1. 設置狀態為_Gdead
  2. 重置字段值
  3. dropg()切斷 M 與 G 之間的關聯
  4. gfput(pp, gp) 將當前 G 放入 P 的本地空閒列表中
  5. schedule()進行新一輪調度,將 M 的執行權讓給其它的 G

退出後,協程的狀態由_Grunning變化為_Gdead,在以後新建協程時仍有可能被重復利用。

系統調用

當協程 G 在執行用戶代碼時如果進行了系統調用,觸發系統調用的方法有兩種

  1. syscall標准庫的系統調用
  2. cgo 調用

因為系統調用會阻塞工作線程,所以在此之前需要進行准備工作,由runtime.entersyscall函數完成這一過程,但前者也只是對runtime.reentersyscall函數的一個簡單調用,實際的工作是由後者來完成的。首先會鎖住當前的 M,在進行准備期間 G 禁止被搶佔,也禁止棧擴張,設置gp.stackguard0 = stackPreempt表示在准備工作完成後將 P 的執行權將被其它的 G 搶佔,然後保留協程的執行現場,方便系統調用返回後恢復。

go
gp := getg()

// Disable preemption because during this function g is in Gsyscall status,
// but can have inconsistent g->sched, do not let GC observe it.
gp.m.locks++

// Entersyscall must not call any function that might split/grow the stack.
// (See details in comment above.)
// Catch calls that might, by replacing the stack guard with something that
// will trip any stack check and leaving a flag to tell newstack to die.
gp.stackguard0 = stackPreempt
gp.throwsplit = true

// Leave SP around for GC and traceback.
save(pc, sp)
gp.syscallsp = sp
gp.syscallpc = pc

再之後,因為了防止長時間阻塞而影響其它 G 的執行,M 與 P 會解綁,解綁後的 M 和 G 會因執行系統調用而阻塞,而 P 在解綁以後可能會與其它空閒的 M 綁定從而讓 P 本地隊列中其它的 G 能夠繼續工作。

go
casgstatus(gp, _Grunning, _Gsyscall)
gp.m.syscalltick = gp.m.p.ptr().syscalltick
pp := gp.m.p.ptr()
pp.m = 0
gp.m.oldp.set(pp)
gp.m.p = 0
atomic.Store(&pp.status, _Psyscall)
gp.m.locks--

在准備工作完成後,釋放 M 的鎖,在此期間 G 的狀態由_Grunning變為_Gsyscall,P 的狀態變為_Psyscall

當系統調用返回後,線程 M 不再阻塞,對應的 G 也需要再次被調度來執行用戶代碼,由runtime.exitsyscall函數來完成這個善後的工作。首先鎖住當前的 M,獲取舊 P 的引用。

go
gp := getg()

gp.waitsince = 0
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0

此時分為兩種情況來處理,第一種情況是是否有 P 可以直接使用,runtime.exitsyscallfast函數會判斷原來的 P 是否可用,即 P 的狀態是否為_Psyscall,否則的話就會去找空閒的 P。

go
func exitsyscallfast(oldp *p) bool {
  gp := getg()

  // Freezetheworld sets stopwait but does not retake P's.
  if sched.stopwait == freezeStopWait {
    return false
  }

  // Try to re-acquire the last P.
  if oldp != nil && oldp.status == _Psyscall && atomic.Cas(&oldp.status, _Psyscall, _Pidle) {
    // There's a cpu for us, so we can run.
    wirep(oldp)
    exitsyscallfast_reacquired()
    return true
  }

  // Try to get any other idle P.
  if sched.pidle != 0 {
    var ok bool
    systemstack(func() {
      ok = exitsyscallfast_pidle()
    })
    if ok {
      return true
    }
  }
  return false
}

如果成功找到了可用的 P,M 會與 P 進行綁定,G 由_Gsyscall狀態切換為_Grunning狀態,然後通過runtime.GoschedG 主動讓出執行權,P 進入調度循環尋找其它可用的 G。

go
oldp := gp.m.oldp.ptr()
gp.m.oldp = 0
if exitsyscallfast(oldp) {
    // There's a cpu for us, so we can run.
    gp.m.p.ptr().syscalltick++
    // We need to cas the status and scan before resuming...
    casgstatus(gp, _Gsyscall, _Grunning)

    // Garbage collector isn't running (since we are),
    // so okay to clear syscallsp.
    gp.syscallsp = 0
    gp.m.locks--
    if gp.preempt {
        // restore the preemption request in case we've cleared it in newstack
        gp.stackguard0 = stackPreempt
    } else {
        // otherwise restore the real stackGuard, we've spoiled it in entersyscall/entersyscallblock
        gp.stackguard0 = gp.stack.lo + stackGuard
    }
    gp.throwsplit = false

    if sched.disable.user && !schedEnabled(gp) {
        // Scheduling of this goroutine is disabled.
        Gosched()
    }

    return
}

假如沒有找到的話,M 會與 G 解綁,G 由_Gsyscall切換為_Grunnable狀態,然後再次嘗試是否能找到空閒的 P,如果沒有找到就直接將 G 放入全局隊列中,然後進入新一輪調度循環,舊 M 由runtime.stopm進入空閒狀態,等待日後的新任務。如果 P 找到了的話,舊 M 和 G 與新的 P 進行關聯,然後繼續執行用戶代碼,狀態由_Grunnable變為_Grunning

go
func exitsyscall0(gp *g) {
  casgstatus(gp, _Gsyscall, _Grunnable)
  dropg()
  lock(&sched.lock)
  var pp *p
  if schedEnabled(gp) {
    pp, _ = pidleget(0)
  }
  var locked bool
  if pp == nil {
    globrunqput(gp)
  }
  unlock(&sched.lock)
  if pp != nil {
    acquirep(pp)
    execute(gp, false) // Never returns.
  }
  stopm()
  schedule() // Never returns.
}

在退出系統調用後,G 的狀態最終有兩種結果,一種是等待被調度的_Grunnable,一種是繼續運行的_Grunning

掛起

當前協程因為一些原因掛起的時候,狀態會由_Grunnable變為_Gwaiting,掛起的原因有很多,可以是因為通道阻塞,select,鎖或者是time.sleep,更多原因見G 結構。拿time.Sleep舉例,它實際上是鏈接到了runtime.timesleep,後者的代碼如下。

go
func timeSleep(ns int64) {
  if ns <= 0 {
    return
  }

  gp := getg()
  t := gp.timer
  if t == nil {
    t = new(timer)
    gp.timer = t
  }
  t.f = goroutineReady
  t.arg = gp
  t.nextwhen = nanotime() + ns
  if t.nextwhen < 0 { // check for overflow.
    t.nextwhen = maxWhen
  }
  gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceBlockSleep, 1)
}

可以看出,它通過getg獲取當前的協程,再通過runtime.gopark使得當前的協程掛起。runtime.gopark會更新 G 和 M 的阻塞原因,釋放 M 的鎖

go
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
    throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waitTraceBlockReason = traceReason
mp.waitTraceSkip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)

然後切換到系統棧上由runtime.park_m來將 G 的狀態切換為_Gwaiting,然後切斷 M 與 G 之間的關聯並進入新的調度循環從而將執行權讓給其它的 G。掛起後,G 即不執行用戶代碼,也不在本地隊列中,只是保持著對 M 和 P 的引用。

go
mp := getg().m
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
schedule()

runtime.timesleep函數中有這麼一行代碼,指定了t.f的值

go
t.f = goroutineReady

這個runtime.goroutineReady函數的作用就是用於喚醒掛起的協程,它會調用runtime.ready函數來喚醒協程

go
status := readgstatus(gp)
// Mark runnable.
mp := acquirem()
casgstatus(gp, _Gwaiting, _Grunnable)
runqput(mp.p.ptr(), gp, next)
wakep()
releasem(mp)

喚醒後,將 G 的狀態切換為_Grunnable,然後將 G 放入 P 的本地隊列中等待日後被調度。

協程棧

go 語言中的協程是典型的有棧協程,每開啟一個協程都會為其在堆上分配一個獨立的棧空間,並且它會隨著使用量的變化而增長或縮小。在調度器初始化的時候,runtime.stackinit函數負責來初始化全局的棧空間緩存stackpoolstackLarge

go
func stackinit() {
  if _StackCacheSize&_PageMask != 0 {
    throw("cache size must be a multiple of page size")
  }
  for i := range stackpool {
    stackpool[i].item.span.init()
    lockInit(&stackpool[i].item.mu, lockRankStackpool)
  }
  for i := range stackLarge.free {
    stackLarge.free[i].init()
    lockInit(&stackLarge.lock, lockRankStackLarge)
  }
}

除此之外,每一個 P 都有一個自己獨立的棧空間緩存mcache

go
type p struct {
  ...
  mcache      *mcache
  ...
}

type mcache struct {
  _ sys.NotInHeap
  nextSample uintptr
  scanAlloc  uintptr
  tiny       uintptr
  tinyoffset uintptr
  tinyAllocs uintptr
  alloc [numSpanClasses]*mspan
  stackcache [_NumStackOrders]stackfreelist
  flushGen atomic.Uint32
}

線程緩存mcache是每一個線程獨立的且不是分配在堆內存上,訪問的時候不需要加鎖,這三個棧緩存在後續分配空間的時候都會用到。

分配

在新建協程時,如果沒有可重復利用的協程,就會選擇為其分配一個新的棧空間,它的大小默認為 2KB。

go
newg := gfget(pp)
if newg == nil {
    newg = malg(stackMin)
    casgstatus(newg, _Gidle, _Gdead)
    allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}

負責分配棧空間的函數就是runtime.stackalloc

go
func stackalloc(n uint32) stack

根據申請的棧內存大小是否小於 32KB 分為兩種情況,32KB 同時也是 go 中判斷是小對象還是大對象的標准。倘若小於這個值就會從stackpool緩存中去獲取,當 M 與 P 綁定且 M 不允許被搶佔時,就會去本地的線程緩存中獲取。

go
if n < fixedStack<<_NumStackOrders && n < _StackCacheSize {
    order := uint8(0)
    n2 := n
    for n2 > fixedStack {
        order++
        n2 >>= 1
    }
    var x gclinkptr
    if stackNoCache != 0 || thisg.m.p == 0 || thisg.m.preemptoff != "" {
        lock(&stackpool[order].item.mu)
        x = stackpoolalloc(order)
        unlock(&stackpool[order].item.mu)
    } else {
        c := thisg.m.p.ptr().mcache
        x = c.stackcache[order].list
        if x.ptr() == nil {
            stackcacherefill(c, order)
            x = c.stackcache[order].list
        }
        c.stackcache[order].list = x.ptr().next
        c.stackcache[order].size -= uintptr(n)
    }
    v = unsafe.Pointer(x)
}

倘若大於 32KB,就會去stackLarge緩存中獲取,如果還不夠的話就直接在堆上分配內存。

go
else {
    var s *mspan
    npage := uintptr(n) >> _PageShift
    log2npage := stacklog2(npage)

    // Try to get a stack from the large stack cache.
    lock(&stackLarge.lock)
    if !stackLarge.free[log2npage].isEmpty() {
        s = stackLarge.free[log2npage].first
        stackLarge.free[log2npage].remove(s)
    }
    unlock(&stackLarge.lock)

    lockWithRankMayAcquire(&mheap_.lock, lockRankMheap)

    if s == nil {
        // Allocate a new stack from the heap.
        s = mheap_.allocManual(npage, spanAllocStack)
        if s == nil {
            throw("out of memory")
        }
        osStackAlloc(s)
        s.elemsize = uintptr(n)
    }
    v = unsafe.Pointer(s.base())
}

完事後返回棧空間的低地址和高地址

go
return stack{uintptr(v), uintptr(v) + uintptr(n)}

擴容

默認的協程棧大小為 2KB,足夠輕量,所以創建一個協程的成本非常低,但這不一定夠用,當棧空間不夠用的時候就需要擴容。編譯器會在函數的開頭插入runtime.morestack函數來檢查當前協程是否需要進行棧擴容,如果需要的話就調用runtime.newstack來完成真正的擴容操作。

TIP

由於morestack幾乎會在所有函數的開頭都被插入,所以棧擴容檢查的時機也是一個協程搶佔點。

go
thisg := getg()
gp := thisg.m.curg
// Allocate a bigger segment and move the stack.
oldsize := gp.stack.hi - gp.stack.lo
newsize := oldsize * 2

// The goroutine must be executing in order to call newstack,
// so it must be Grunning (or Gscanrunning).
casgstatus(gp, _Grunning, _Gcopystack)

// The concurrent GC will not scan the stack while we are doing the copy since
// the gp is in a Gcopystack status.
copystack(gp, newsize)
casgstatus(gp, _Gcopystack, _Grunning)
gogo(&gp.sched)

可以看到,計算後的棧空間容量是原來的兩倍,由runtime.copystack函數來完成棧拷貝的工作,在拷貝前 G 的狀態由_Grunning切換為_Gcopystack

go
func copystack(gp *g, newsize uintptr) {
  old := gp.stack
  used := old.hi - gp.sched.sp

  // allocate new stack
  new := stackalloc(uint32(newsize))

  // Compute adjustment.
  var adjinfo adjustinfo
  adjinfo.old = old
  adjinfo.delta = new.hi - old.hi

  // Copy the stack (or the rest of it) to the new location
  memmove(unsafe.Pointer(new.hi-ncopy), unsafe.Pointer(old.hi-ncopy), ncopy)

  // Adjust remaining structures that have pointers into stacks.
  // We have to do most of these before we traceback the new
  // stack because gentraceback uses them.
  adjustctxt(gp, &adjinfo)
  adjustdefers(gp, &adjinfo)
  adjustpanics(gp, &adjinfo)
  if adjinfo.sghi != 0 {
    adjinfo.sghi += adjinfo.delta
  }

  // Swap out old stack for new one
  gp.stack = new
  gp.stackguard0 = new.lo + stackGuard // NOTE: might clobber a preempt request
  gp.sched.sp = new.hi - used
  gp.stktopsp += adjinfo.delta

  // Adjust pointers in the new stack.
  var u unwinder
  for u.init(gp, 0); u.valid(); u.next() {
    adjustframe(&u.frame, &adjinfo)
  }

  stackfree(old)
}

該函數總共做了以下幾個工作

  1. 分配新的棧空間
  2. 將舊棧內存通過runtime.memmove直接復制到新的棧空間中,
  3. 調整包含棧指針的結構,比如 defer,panic 等
  4. 更新 G 的棧空間字段
  5. 通過runtime.adjustframe調整指向舊棧內存的指針
  6. 釋放舊棧的內存

完成後,G 的狀態由_Gcopystack切換為_Grunning,並由runtime.gogo函數讓 G 繼續執行用戶代碼。正是因為協程棧擴容的存在,所以 go 中的內存是不穩定的。

收縮

當 G 的狀態為_Grunnable_Gsyscall_Gwaiting時,GC 會掃描協程棧的內存空間。

go
func scanstack(gp *g, gcw *gcWork) int64 {
  switch readgstatus(gp) &^ _Gscan {
  case _Grunnable, _Gsyscall, _Gwaiting:
    // ok
  }
    ...

  if isShrinkStackSafe(gp) {
    // Shrink the stack if not much of it is being used.
    shrinkstack(gp)
  }
    ...
}

實際的棧收縮工作由runtime.shrinkstack來完成。

go
func shrinkstack(gp *g) {
  if !isShrinkStackSafe(gp) {
    throw("shrinkstack at bad time")
  }

  oldsize := gp.stack.hi - gp.stack.lo
  newsize := oldsize / 2
  if newsize < fixedStack {
    return
  }

  avail := gp.stack.hi - gp.stack.lo
  if used := gp.stack.hi - gp.sched.sp + stackNosplit; used >= avail/4 {
    return
  }

  copystack(gp, newsize)
}

當使用的棧空間不足原有的 1/4 時,就會通過runtime.copystack將其縮小為原來的 1/2,後面的工作跟之前就沒什麼兩樣了。

分段棧

copystack的過程可以看到,它會將舊棧內存拷貝到一個更大的棧空間上,不管是原來的棧還是新的棧它們的內存地址都是連續的。而在遠古時期的 go 語言中,棧擴容時做法和現在不一樣,那會覺得內存拷貝太消耗性能了,采用的是分段棧的思路,如果棧空間內存不夠用了,就再申請一片新的棧空間,原有的棧空間內存不會釋放也不會被拷貝,彼此之前通過指針鏈接起來,形成了一個棧鏈表,這也就是分段棧的由來,就如下圖所示

這樣做的好處在於不用拷貝原有的棧,但缺點也十分的明顯,就是會十分頻繁的觸發棧擴容和縮容。當棧空間的空閒內存所剩無幾時,新的函數調用會觸發棧的擴容,當這些函數返回時,不再需要新的棧空間了後就又會觸發縮容,假如這些函數調用的頻率非常高,那麼就會非常頻繁的觸發擴容和縮容,這種操作所造成的性能損耗是非常大的。

所以在 go1.4 之後換成了連續棧,連續棧因為分配了一個容量更大的棧空間,不會出現已使用內存達到臨界值時因函數調用而頻繁的觸發擴縮容這種情況,並且由於內存地址是連續的,根據緩存的空間局部性原理而言,連續棧對 CPU 緩存也更加友好。

調度循環

在調度器初始化部分提到過,在runtime.mstart1函數中,M 與 P 成功關聯後就會進入第一個runtime.schedule調度循環正式開始調度 G 以執行用戶代碼。在調度循環中,這一部分就主要是 P 在發揮作用。M 對應著系統線程,G 對應著入口函數也就是用戶代碼,但 P 並不像 M 和 G 一樣有著對應的實體,它只是一個抽象的概念,作為中間人處理著 M 和 G 之間的關系。

go
func schedule() {
  mp := getg().m

top:
  pp := mp.p.ptr()
  pp.preempt = false

    if mp.spinning {
      resetspinning()
    }

  gp, inheritTime, tryWakeP := findRunnable() // blocks until work is available

  execute(gp, inheritTime)
}

上面的代碼經過簡化,刪去了許多條件判斷,最核心的點只有兩個runtime.findRunnableruntime.execute,前者負責找到一個 G,並且一定會返回一個可用的 G,後者負責讓 G 繼續執行用戶代碼。

對於findRunnable函數而言,第一個 G 來源就是 P 的本地隊列

go
// local runq
if gp, inheritTime := runqget(pp); gp != nil {
    return gp, inheritTime, false
}

如果本地隊列沒有 G,那麼就嘗試從全局隊列中獲取

go
// global runq
if sched.runqsize != 0 {
    lock(&sched.lock)
    gp := globrunqget(pp, 0)
    unlock(&sched.lock)
    if gp != nil {
        return gp, false, false
    }
}

如果本地和全局隊列中都沒有找到,就會嘗試從網絡輪詢器中獲取

go
if netpollinited() && netpollWaiters.Load() > 0 && sched.lastpoll.Load() != 0 {
    if list := netpoll(0); !list.empty() { // non-blocking
        gp := list.pop()
        injectglist(&list)
        casgstatus(gp, _Gwaiting, _Grunnable)
        if traceEnabled() {
            traceGoUnpark(gp, 0)
        }
        return gp, false, false
    }
}

倘若還找不到,最終就會從其它的 P 去偷取它本地隊列中的 G。在創建協程的時候提到過,P 的本地隊列中的 G 一大來源是當前協程派生的子協程,然而並非所有的協程都會創建子協程,這樣就可能會出現一部分 P 非常忙碌,另一部分 P 是空閒的,這會導致一種情況,有的 G 因為一直在等待而遲遲無法被運行,而另一邊的 P 十分清閒,什麼事也沒有。為了能夠壓榨所有的 P,讓它們發揮最大的工作效率,當 P 找不到 G 的時候,就會去其它 P 的本地隊列中「偷取」能夠執行的 G,這樣一來,每一個 P 都能擁有較為均勻的 G 隊列,就很少會出現 P 與 P 之間隔岸觀火的情況了。

go
gp, inheritTime, tnow, w, newWork := stealWork(now)
if gp != nil {
    // Successfully stole.
    return gp, inheritTime, false
}

runtime.stealWork會隨機選一個 P 來進行偷取,真正的偷取工作由runtime.runqgrab函數來完成,它會嘗試偷取該 P 本地隊列一半的 G。

go
for {
    h := atomic.LoadAcq(&pp.runqhead) // load-acquire, synchronize with other consumers
    t := atomic.LoadAcq(&pp.runqtail) // load-acquire, synchronize with the producer
    n := t - h
    n = n - n/2
    if n > uint32(len(pp.runq)/2) { // read inconsistent h and t
        continue
    }
    for i := uint32(0); i < n; i++ {
        g := pp.runq[(h+i)%uint32(len(pp.runq))]
        batch[(batchHead+i)%uint32(len(batch))] = g
    }
    if atomic.CasRel(&pp.runqhead, h, h+n) { // cas-release, commits consume
        return n
    }
}

整個偷取工作會進行四次,如果四次後也偷不到 G 那麼就返回。如果最終無法找到,當前 M 會被runtime.stopm給暫停,直到被喚醒後繼續重復上面的步驟。當找到並返回了一個 G 後,就將其交給runtime.execute來運行 G。

go
mp := getg().m

mp.curg = gp
gp.m = mp
casgstatus(gp, _Grunnable, _Grunning)
gp.waitsince = 0
gp.preempt = false
gp.stackguard0 = gp.stack.lo + stackGuard

gogo(&gp.sched)

首先更新 M 的curg,然後更新 G 的狀態為_Grunning,最後交給runtime.gogo來恢復 G 的運行。

總的的來說,在調度循環中 G 的來源根據優先級來分有四個

  1. P 的本地隊列
  2. 全局隊列
  3. 網絡輪詢器
  4. 從其它 P 的本地隊列偷

runtime.execute在執行過後並不會返回,剛獲取的 G 也不會永遠執行下去,在某一個時機觸發調度以後,它的執行權會被剝奪,然後進入新一輪調度循環,將執行權讓給其它的 G。

調度策略

不同的 G 執行用戶代碼的時長可能不同,部分 G 可能會耗時很長,部分 G 耗時很短,執行時機長的 G 可能會導致其它的 G 遲遲無法得到執行,所以交替執行 G 才是正確的方式,在操作系統中這種工作方式被稱之為並發。

協作式調度

協作式調度的基本思路是,讓 G 自行將執行權讓給其它的 G,主要有兩種方法。

第一種方法就是在用戶代碼中主動讓權,go 提供了runtime.Gosched()函數,使用者可以自己決定在何時讓出執行權,不過在很多時候調度器內部的工作細節都使用者來說都是一個黑盒,很難去判斷到底什麼時候該主動讓權,對使用者的要求比較高,並且 go 的調度器力求對使用者屏蔽大部分細節,追求更簡單的使用方法,在這種情況下讓使用者也參與到調度工作中並非是什麼好事。

第二種方法是搶佔標記,雖然它的名字有搶佔的字眼,但它本質上還是協作式調度策略。思路就是在函數的頭部插入搶佔檢測代碼runtime.morestack(),插入的過程在編譯期完成,前面提到過它原本是用於進行棧擴容檢測的函數,因為其檢測點是每一個函數的調用,這同樣是一個進行搶佔檢測的良好時機。runtime.newstack函數上半部分都是在進行搶佔檢測,下半部分則在進行棧擴容檢測,前面為了避免干擾就將這部分省掉了,現在就來看看這部分干了什麼。首先會根據gp.stackguard0進行搶佔判斷,如果不需要的話就繼續執行用戶代碼。

go
stackguard0 := atomic.Loaduintptr(&gp.stackguard0)
preempt := stackguard0 == stackPreempt
if preempt {
    if !canPreemptM(thisg.m) {
        gp.stackguard0 = gp.stack.lo + stackGuard
        gogo(&gp.sched) // never return
    }
}

g.stackguard0 == stackPreempt時,由runtime.canPreemptM()函數來判斷協程條件是否需要被搶佔,代碼如下,

go
func canPreemptM(mp *m) bool {
  return mp.locks == 0 && mp.mallocing == 0 && mp.preemptoff == "" && mp.p.ptr().status == _Prunning
}

可以看到能夠被搶佔需要滿足四個條件

  1. M 沒有被鎖住
  2. 沒有正在分配內存
  3. 沒有禁用搶佔
  4. P 處於_Prunning狀態

而在以下兩種種情況會將g.stackguard0設置為stackPreempt

  • 需要進行垃圾回收時
  • 發生系統調用時
go
if preempt {
    if gp.preemptShrink {
        gp.preemptShrink = false
        shrinkstack(gp)
    }
    // Act like goroutine called runtime.Gosched.
    gopreempt_m(gp) // never return
}

最後就會走到runtime.gopreempt_m()主動讓出當前協程的執行權。首先切斷 M 與 G 之間的聯系,狀態變為_Grunnbale,然後將 G 放入全局隊列中,最後進入調度循環將執行權讓給其它的 G。

go
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)

schedule()

這樣一來,所有的協程在進行函數調用時都可能會進入該函數進行搶佔檢測,這種策略得依賴函數調用這一時機才能觸發搶佔並且主動讓權。在 1.14 之前,go 一直都是沿用的這種調度策略,但這樣會有一個問題,如果沒有函數調用,就沒法檢測了,比如下面這段經典代碼,應該在很多教程中都有出現

go
func main() {
  // 限制P的數量只能為1
  runtime.GOMAXPROCS(1)
    // 協程1
  go func() {
    for {
      // 該協程不停的空轉
    }
  }()
  // 進入系統調用,主協程讓權給其它協程
  time.Sleep(time.Millisecond)
  println("exit")
}

代碼中創建了一個空轉的協程 1,然後主協程因為系統調用主動讓權,此時協程 1 正在被調度,但因為它根本就不調用函數,也就沒法進行搶佔檢測,由於 P 只有一個,沒有其它空閒的 P,這樣會導致主協程永遠無法被調度,exit也就永遠無法輸出,不過這種問題也僅限於 go1.14 之前。

搶佔式調度

官方在 go1.14 加入了基於信號的搶佔式調度策略,這是一種異步搶佔策略,通過異步線程發送信號的方式來進行搶佔線程,基於信號的搶佔式調度目前只有有兩個入口,分別是系統監控和 GC。

在系統監控的循環中,會遍歷每一個 P,如果 P 正在調度的 G 執行時間超過了 10ms,就會強制觸發搶佔。這部分工作由runtime.retake函數完成,下面是簡化的後代碼。

go
func retake(now int64) uint32 {
  n := 0
  lock(&allpLock)
  for i := 0; i < len(allp); i++ {
    pp := allp[i]
    if pp == nil {
      continue
    }
    pd := &pp.sysmontick
    s := pp.status
    sysretake := false
    if s == _Prunning || s == _Psyscall {
      // Preempt G if it's running for too long.
      t := int64(pp.schedtick)
      if int64(pd.schedtick) != t {
        pd.schedtick = uint32(t)
        pd.schedwhen = now
      } else if pd.schedwhen+forcePreemptNS <= now {
        preemptone(pp)
        sysretake = true
      }
    }
  }
  unlock(&allpLock)
  return uint32(n)
}

當需要進行垃圾回收時,如果 G 是狀態是_Grunning,也就是還在運行,同樣也會觸發搶佔。

go
func suspendG(gp *g) suspendGState {
  for i := 0; ; i++ {
    switch s := readgstatus(gp); s {
    case _Grunning:
      gp.preemptStop = true
      gp.preempt = true
      gp.stackguard0 = stackPreempt
      casfrom_Gscanstatus(gp, _Gscanrunning, _Grunning)

      if preemptMSupported && debug.asyncpreemptoff == 0 && needAsync {
        now := nanotime()
        if now >= nextPreemptM {
          nextPreemptM = now + yieldDelay/2
          preemptM(asyncM)
        }
      }
    ......
    ......


func preemptM(mp *m) {
  if mp.signalPending.CompareAndSwap(0, 1) {
    if GOOS == "darwin" || GOOS == "ios" {
      pendingPreemptSignals.Add(1)
    }
    signalM(mp, sigPreempt)
  }
}

兩個搶佔入口最後都會進入runtime.preemptM函數中,由它來完成搶佔信號的發送。當信號成功發送後,在runtime.mstart時通過runtime.initsig注冊的信號處理器回調函數runtime.sighandler就會派上用場,如果檢測到發送的是搶佔信號,就會開始搶佔。

go
func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
  ...
  if sig == sigPreempt && debug.asyncpreemptoff == 0 && !delayedSignal {
    // Might be a preemption signal.
    doSigPreempt(gp, c)
  }
    ...
}

doSigPreempt會修改目標協程的上下文,注入調用runtime.asyncPreempt

go
func doSigPreempt(gp *g, ctxt *sigctxt) {
  // Check if this G wants to be preempted and is safe to
  // preempt.
  if wantAsyncPreempt(gp) {
    if ok, newpc := isAsyncSafePoint(gp, ctxt.sigpc(), ctxt.sigsp(), ctxt.siglr()); ok {
      // Adjust the PC and inject a call to asyncPreempt.
      ctxt.pushCall(abi.FuncPCABI0(asyncPreempt), newpc)
    }
  }
...

這樣一來當重新切換回用戶代碼的時候,目標協程就會走到runtime.asyncPreempt函數,在該函數中涉及到runtime.asyncPreempt2的調用。

go
TEXT ·asyncPreempt(SB),NOSPLIT|NOFRAME,$0-0
  PUSHQ BP
  MOVQ SP, BP
  // Save flags before clobbering them
  PUSHFQ
  // obj doesn't understand ADD/SUB on SP, but does understand ADJSP
  ADJSP $368
  // But vet doesn't know ADJSP, so suppress vet stack checking
  ...
  CALL ·asyncPreempt2(SB)
  ...
  RET

它會讓當前協程停止工作並進行一輪新的調度循環從而將執行權讓給其它協程。

go
func asyncPreempt2() {
  gp := getg()
  gp.asyncSafePoint = true
  if gp.preemptStop {
    mcall(preemptPark)
  } else {
    mcall(gopreempt_m)
  }
  gp.asyncSafePoint = false
}

這一過程過程都發生在runtime.asyncPreempt函數中,它由匯編實現(位於runtime/preempt_*.s中)並且會在調度完成後恢復先前被修改的協程上下文,以便讓該協程在日後能夠正常恢復。在采用異步搶佔策略以後,之前的例子就不再會永久阻塞主協程了,當空轉協程運行一定時間後就會被強制執行調度循環,從而將執行權讓給了主協程,最終讓程序能夠正常結束。

小結

總的來說,觸發調度的時機有以下幾個:

  • 函數調用
  • 系統調用
  • 系統監控
  • 垃圾回收,垃圾回收對於執行時間過長的協程也會進行搶佔
  • 協程因管道,鎖等原因而掛起

調度策略主要就是兩大類,協作式和搶佔式,協作式是主動讓出執行權,搶佔式是異步搶佔執行權,兩者共存才形成了如今的調度器。

Golang學習網由www.golangdev.cn整理維護