目录

Go runtime 调度

Linux 进程 & 线程

在讲解详细的线程模型前,先整理一下概念

Linux 内核在 2.0.x 版本实现了轻量级进程(普遍意义上的线程),应用程序可以通过一个统一的clone()系统调用接口,用不同的flag参数指定创建轻量级进程(线程)还是普通进程。当flag参数设置了共享相关标识时,创建线程,否则创建的就是进程,二者的边界实际上很模糊。

从 Linux 内核角度来看,实际上也并没有对「进程」和「线程」进行严格的区分。无论是进程还是线程,都通过 struct task_struct 来表示,结构体中也并没有对进程和线程的标识位。

所以在 Linux 中,线程本质还是进程,只是它会和其他进程共享地址空间和系统资源等,如此便将进程和其占用的资源分离,相比与传统进程就显得更加”轻量级“。

进而我们可以将进程理解为资源的容器,其中包含一个或者多个线程,同一个进程下各个线程共享资源(地址空间,文件描述符等)。而线程则作为内核调度的基本单位,各个线程有各自的寄存器,线程栈,程序计数器等。

线程模型

用户级线程模型

早期操作系统并没有提供对线程的支持,所以线程以用户级线程的形式存在。多线程的调度都由用户自己的线程/协程库(比如 py 的 gevent,jdk 早期的 Green Threads)来完成,包括线程的创建,调度,销毁,同步等都由库函数在用户空间完成,不需要内核的支持。只在运行时将进程中所有的线程和内核的一个调度实体动态的绑定,即 N:1 的模型。这种模型下内核对用户线程没有任何感知,仍然以进程为单位调度。

https://static.imlgw.top/blog/20220413181645.png

这种模型好处显而易见:

  1. 调度过程在用户态实现,不需要内核切换,消耗很小。
  2. 创建成本低,可支持创建的线程数量很多
  3. 可以为线程定制调度算法,比如 gc 线程,我们就可以定制调度算法,不让其随便 stop

但是用户级线程也存在一些问题:

  1. 线程阻塞调用(如 I/O 操作)会导致进程下所有线程阻塞,因为用户线程都是挂靠在同一个内核进程/线程上的,在内核角度就是一个进程阻塞了,此时用户调度线程也无法再进行调度切换。一种不太优雅的解决方案就是重写系统调用库中一些阻塞的方法,将其封装为非阻塞调用,在即将阻塞的地方让出 cpu,将执行权交给其他线程,等系统准备好了不会再阻塞了再进行调用(py 的 gevent 库就是这样做的,使用的 select/epoll)
  2. 无法利用多核资源,多个线程最终还是挂靠在单个内核进程上,做不到真正的并发
  3. 不适合 CPU 密集型任务,如果用户线程是 cpu 密集型任务,没有阻塞操作,那么它就不会主动释放 CPU,其他线程就得不到执行机会,就会出现线程饥饿现象。

内核级线程模型

关于「内核线程」上面也有提到,实际上内核线程就是直接由操作系统内核支持的线程(Kernel Levvel Thread,KLT)。我们的应用程序一般不会直接去使用内核线程,而是通过创建我们前文中说到的轻量级进程(LWP),进而去使用内核线程。LWP 和内核线程是一一对应的,是一个 1:1 的映射模型。这样就将线程的创建,调度,销毁等都交给内核去完成(主流 jdk 的 Thread 库就是这种模型)

https://static.imlgw.top/blog/20220414075456.png

这种模型的优点很明显:

  1. 内核可以感知到线程(LWP)的存在。
  2. 实现简单,每个线程都是一个独立的调度单元,直接使用内核提供的调度器,由内核去处理线程阻塞,线程切换等问题,用户不干预调度过程,只能提供调度建议。
  3. 真正做到了并行处理,充分利用了多核处理器的优势

缺点也同样明显:

  1. 每一个用户线程都要对应到一个内核线程,而内核资源有限,所以能支持的线程数量也有限
  2. 线程创建,销毁,多线程之间上下文切换调度,都需要进行系统调用,需要在用户态和内核态之间进行切换,开销比较大,影响性能。

两级(混合)线程模型

「两级线程模型」就是结合了用户线程模型以及内核线程模型后的产物,用户线程LWP 是 M:N 的映射关系。此时用户线程还是完全建立在用户空间中,由程序的 Runtime 来负责调度,因此用户线程的创建,销毁,调度等操作仍然很廉价。并且也可以支持大规模的线程并发。

这种模式下,用户线程不再唯一的绑定一个内核线程,当某个用户线程阻塞时,其他的线程可以重新与其他的内核线程绑定运行,避免了用户线程模型中整个进程被阻塞的问题。

这种模型之所以被称为两级线程模型,是因为该模型中线程既不是完全靠自己调度,也不是完全靠内核去调度,而是二者协调调度。 用户 Runtime 负责调度用户线程到内核线程的调度,而内核负责内核线程到 CPU 上的调度,故称为二级线程模型。Go 语言采用的就是这种模型。

G-P-M 模型

GPM 结构

G(goroutine)

表示 goroutine,参与调度的最小单位。每个 Goroutine 对应一个 G 结构体,G 存储 goroutine 的运行堆栈、状态以及任务函数,因为 goroutine 在执行过程中可能因为各种原因被暂停,这时需要保存 PC 和堆栈信息,以便恢复后时继续执行。每个 G 需要绑定到 P 才能被调度执行。

在函数前加 go 关键字创建一个协程,其实是调用 newproc 函数,fn 就是 go 关键字后面函数地址

type g struct {
    goid           int64   // goroutine id
    atomicstatus   uint32  // 当前状态
    stack          stack   // g 栈区间
    m         *m      // 当前 m
    sched          gobuf   // 运行时信息,包含 PC 以及运行时的堆栈信息
    stackguard0    uintptr // stackguard0 = stack.lo + StackGuard,如果要抢占当前 g 会把字段值设为 stackPreempt
    preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
    // ... 省略部分字段。..
}
type stack struct { //栈从高地址往低地址增长
    lo uintptr
    hi uintptr
}
type gobuf struct {
    sp   uintptr // 堆栈指针
    pc   uintptr // 计数器指针
    g    guintptr
    ctxt unsafe.Pointer
    ret  uintptr
    lr   uintptr
    bp   uintptr // for framepointer-enabled architectures
}

G 的状态

_Gidle  // 刚被创建,还没初始化
_Grunnable // 在运行队列中,还没被执行,也没分配栈 
_Grunning // 运行中
_Gsyscall // 系统调用
_Gwaiting // 阻塞状态,比如:等待 channel、i/o 操作被 gopark
_Gdead // goroutine 执行结束,进入 freelist 中
_Gpreempted // g 被通过信号方式抢占,此状态不能直接 goready

P(Processor)

逻辑处理器,对于 G 来说,只有 G 绑定到 P(进入 P 的任务队列中),才能被调度。对于 M 来说,P 提供了相关的执行环境(Context 上下文),内存分配状态,任务队列等。P 的初始化在 schedinit

Go1.0 时期并没有 P,所以所有的 G 的创建和调度都需要加全局的锁,性能损耗很大,所以早期 Go 并发性能并不好

// go1.18 darwin/amd64 runtime/runtime2.go
type p struct {
    id          int32 //P 的 id
    status      uint32 //当前状态
    mcache      *mcache //内存分配器
    runqhead uint32 // 队列头
    runqtail uint32 // 队列尾
    runq     [256]guintptr //等执行的 goroutine 队列,访问时不需要加锁
    m           muintptr //P 绑定的 m
    // Available G's (status == Gdead)
    gFree struct { //当 G 运行结束后,清除数据放入列表以便复用
        gList
        n int32
    }
    runnext guintptr //下个运行的 g,如果不为 nil, 则当前 g 执行完后,优先执行它
    // ... 省略部分字段。..
}

P 的状态

_Pidle //当 M 没有 G 可执行时,P 进入空闲列表
_Prunning //P 和 M 绑定,正在执行 G 或在寻找可执行的 G
_Psyscall //与之关联中的 M 进入系统调用
_Pgcstop //GC STW
_Pdead //调小 GOMAXPROCS 数量后多余的 p 置为此状态

M(Machine)

系统线程抽象,代表着真正执行计算的资源。M 会从 P 的队列(或者 Global 队列)中取 G 来执行,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,当 G 被暂停 M 会把上下文信息写回 G,并取下一个 G 继续执行。M 并不保留 G 状态,这也是 G 可以跨 M 调度的核心。

新建 M 是通过 newm 来创建的,最终是通过 newosproc 函数来实现的新建线程,不通平台的 newosproc 有不同的具体实现, 比如 linux 平台下,是调用 clone 系统调用来实现创建线程,mac 平台下,是调用 pthread_create 创建线程。M 最多 10000 个,在 shedinit 中定义了 sched.maxmcount

type m struct {
    id            int64
    g0      *g     // 用于执行调度任务的 g,使用系统栈,不受 gc 影响
    tls           [6]uintptr   // 线程本地存储空间
    curg          *g       // 当前正在被执行的 goroutine
    nextp         puintptr // M 被唤醒需要立即绑定的 P
    p             puintptr // 与 M 绑定的 P
    spinning      bool // true 表示 M 处于自旋转状态(当前没有 g 执行,正在寻找可执行的 g)
    mcache        *mcache //当 M 与 P 绑定后,跟 P 的 mcache 指向同一个内存分配器
}

G0 和 M0

  • m0: 表示进程启动的第一个线程,也叫主线程。它和其他的 m 没有什么区别,要说区别的话,它是进程启动通过汇编直接复制给 m0 的,m0 是个全局变量,而其他的 m 都是 runtime 内自己创建的。 一个 go 进程只有一个 m0。

  • g0: 每个 m 都有一个 g0,因为每个线程有一个系统堆栈,g0 虽然也是 g 的结构,但和普通的 g 还是有差别的,最重要的差别就是栈的差别。g0 上的栈是系统分配的栈,在 linux 上栈大小默认固定 8MB,不能扩展,也不能缩小。 而普通 g 一开始只有 2KB 大小,可扩展。在 g0 上也没有任何任务函数,也没有任何状态,并且它不能被调度程序抢占,因为调度就是在 g0 上跑的(参考 allocm)。

    runtime 通常使用 systemstack、mcall 或 asmcgocall 临时切换到系统堆栈,以执行必须不被抢占的任务、不得增加用户堆栈的任务或切换用户 goroutines。在系统堆栈上运行的代码隐式不可抢占,垃圾收集器不扫描系统堆栈。在系统堆栈上运行时,不会使用当前用户堆栈执行

Runtime 源码注释

proc.go 的注释中写到:

The bootstrap sequence is:

  1. call osinit
  2. call schedinit
  3. make & queue new G
  4. call runtime·mstart

(从对应的汇编也可以看出这个过程,我这里对应的是 runtime/asm_amd64.s)

//1. osinit 初始化 cpu 数量和页大小
func osinit() {
    ncpu = getproccount()
    physHugePageSize = getHugePageSize()
}

// 2. call schedinit
func schedinit() {
    sched.maxmcount = 10000 //m 的最大数量
    mallocinit() //内存分配相关
    //根据环境变量,创建 GOMAXPROCS 个 G
    sched.lastpoll = uint64(nanotime())
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
}
// 3. 启动 main goroutine
func main() {
    g := getg()
    // 标记 main 已经启动
    mainStarted = true
    //在系统栈上(也就是通过 g0) 创建 m,并执行 sysmon,所以 symon 是在单独的 m 上运行,不受 gc 影响
    if GOARCH != "wasm" {
        systemstack(func() {
            newm(sysmon, nil)
        })
    }
    //如果 main goroutine 不在 m0 上运行,肯定 bug 了
    if g.m != &m0 {
        throw("runtime.main not on m0")
    }
    //执行 runtime 的 init 和用户包中 init 函数
    doInit(&runtime_inittask)
    doInit(&main_inittask)
    //调用用户自定义的 main 函数
    //从这里可以看出在 golang 中,init 函数先于 main 函数执行
    fn := main_main
    fn()
    //退出主进程
    exit(0)
    for {
        var x *int32
        *x = 0
    }
}
//4. call runtime·mstart(m0 启动)
//M0 在这里调用的 https://github.com/golang/go/blob/master/src/runtime/asm_amd64.s#L225
// mstart 是一个新 M 的入口函数。
func mstart() {
    _g_ := getg()
    osStack := _g_.stack.lo == 0
    if osStack {
        // 初始化 g0 栈大小
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    mstart1()
    if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
        osStack = true
    }
    // m 退出
    mexit(osStack)
}
func mstart1() {
    _g_ := getg()
    if _g_ != _g_.m.g0 {//调用这个函数只,m 上只可能有 g0
        throw("bad runtime·mstart")
    }
    //初始化 m
    asminit()
    minit()
    //执行 mspinning、sysmon 等
    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }
    //_g_所在的 m 不是 m0, 则关联 p 和 m
    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()//开始调度
}

M 启动后进入 schedule() 方法,进入调度,调度的本质其实就是查找可以运行的 G,然后去运行 G 上面的任务函数

//需要注意的是:schedule 函数及子函数中调用的 getg() 返回的都是 g0,因为 schedule 是运行在 g0 上的
func schedule() {
    _g_ := getg() //获取当前 g
    var gp *g
    var inheritTime bool
    // .....
    if gp == nil {
        // 为了保证公平调度,schedule 每执行 61 次就会去全局队列拿一批 g 到 p 的本地队列,避免全局队列中的 g 饥饿
        // 否则可能出现 2 个 g 永久占用本地队列(因为被暂停的 goroutine 唤醒后优先放入本地队列)
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    // .....
    if gp == nil {
        //从本地队列中取 g
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    //如果本地队列中没有可执行的 g,则调用 findrunnable 直到有可运行的 g 为止
    if gp == nil {
        gp, inheritTime = findrunnable()
    }
    // 到这里,说明已经找到可运行的 g,如果 m 还处于自旋转状态,则置回正常状态
    // 并唤醒 p 与之绑定
    if _g_.m.spinning {
        resetspinning()
    }
    //直接在当前 m 上执行 g
    execute(gp, inheritTime)
}

findrunnable 查找可运行的 G

//阻塞获取可执行的 G,findrunnable 会从全局队列、其它 P 队列、netpoll 中去轮询
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()
top:
    _p_ := _g_.m.p.ptr()
    // 本地队列中有 g,则直接返回
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }
    // 全局队列中如果有 g, 则从全局队列中取一批到本地队列
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }
    // 从 netpoll 中获取 g(非阻塞轮询已经完成的网络 io)
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(false); !list.empty() { //从 netpoll 中取出 i/o 读写完成的 g 列表
            gp := list.pop()   //先取出一个让当前 m 执行,这样能提高响应速度
            injectglist(&list) //再把剩余的 g 列表放入队列
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false
        }
    }
    // 检查是否可以从其它 p 中偷一部分 g
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        //如果其它 p 全部都是 idle 的,那肯定没地方偷
        goto stop
    }
    //如果当前 m 没有处于自旋转且自旋转中的 p 数量 < running 中的 p 数量/2, 则让当前 m 进入自旋转
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }

    //随机从一个 p 中偷,最多偿试 4 次
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
stop:
    allpSnapshot := allp
    //再次检查全局队列,如果有 g,则取出执行,否则把 p 与当前 m 解绑
    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ { //p 与当前 m 解绑
        throw("findrunnable: wrong p")
    }
    pidleput(_p_) //解绑的 p 放入 idle 队列
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning { //如果当前 m 还处于自旋转状态,则取消,sched.nmspinning -1
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // 再次检查所有 p 的队列
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget() //如果某个 p 的队列不为空,则从 idle 列表中取出一个 p
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning { //如果解绑以前 m 是自旋转的,则还是让它保持自旋转
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    // 再次从 netpoll 中阻塞的取 g
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        if _g_.m.p != 0 {
            throw("findrunnable: netpoll with p")
        }
        if _g_.m.spinning {
            throw("findrunnable: netpoll with spinning")
        }
        list := netpoll(true)
        atomic.Store64(&sched.lastpoll, uint64(nanotime()))
        if !list.empty() {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                gp := list.pop()
                injectglist(&list)
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {
                    traceGoUnpark(gp, 0)
                }
                return gp, false
            }
            injectglist(&list)
        }
    }
    //如果始终找到不,就让 m 停止
    stopm()
    goto top
}

//把 g 列表放入全局队列,调用 startm 检测是否有 idle 的 p,将 idle 的 m 与之绑定或 new 一个 m
func injectglist(glist *gList) {
    if glist.empty() {
        return
    }
    if trace.enabled {
        for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
            traceGoUnpark(gp, 0)
        }
    }
    lock(&sched.lock)
    var n int
    for n = 0; !glist.empty(); n++ {
        gp := glist.pop()
        //队列中的 g 状态必须为_Grunnable
        casgstatus(gp, _Gwaiting, _Grunnable)
        globrunqput(gp)
    }
    unlock(&sched.lock)
    for ; n != 0 && sched.npidle != 0; n-- {
        startm(nil, false)
    }
    *glist = gList{}
}

//重置自旋转状态
func resetspinning() {
    _g_ := getg()
    if !_g_.m.spinning {
        throw("resetspinning: not a spinning m")
    }
    _g_.m.spinning = false
    nmspinning := atomic.Xadd(&sched.nmspinning, -1)
    if int32(nmspinning) < 0 {
        throw("findrunnable: negative nmspinning")
    }
    //除当前 m 外没有其它 m 处于自旋转状态且还有 p 处于 idle,则唤醒一个 p,这样能让等执行的 g 尽量早的被处理。
    if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
        wakep()
    }
}
func wakep() {
    // 如果已有 m 处于自旋转状态,则直接返回 (g 一定会被处于自旋转状态的 m 执行 [结合 findrunable 函数看])
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    startm(nil, true)
}

//startm 检测是否有 idle 的 p, 并将 idle 的 m 与 p 绑定或 new 一个 m
func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    if _p_ == nil {
        _p_ = pidleget() //获取一个 idle 的 p
        if _p_ == nil {
            unlock(&sched.lock)
            if spinning {
                // spinning 为 true 说明 startm 的调用方对 nmspinning 加了 1,但是没发现 idle 的 p,所以要回滚 nmspinning
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    //成功获取到 p, 再获取 idle 的 m
    mp := mget()
    unlock(&sched.lock)
    if mp == nil { //获取 idle 的 m 失败,则创建一个 m
        var fn func()
        if spinning {
            // 如果 spinning 为 true, 则标记新创建的 m 为 spinning
            fn = mspinning
        }
        newm(fn, _p_) //创建新 m
        return
    }
    mp.spinning = spinning
    mp.nextp.set(_p_) // 把 p 设置为即将与 m 绑定的 p
    notewakeup(&mp.park)
}
func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn) //new 一个 m 结构,并初始化
    mp.nextp.set(_p_)     //把 p 设置为即将与 m 绑定的 p
    mp.sigmask = initSigmask
    newm1(mp)
}
func newm1(mp *m) {
    execLock.rlock()
    newosproc(mp)
    execLock.runlock()
}

//newosproc 创建 OS 线程,不同的 OS 接口不一样,linux 用的 clone, windows 为_CreateThread
func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi)
    // clone 期间禁用信号,clone 完成再启用
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    //调用 clone 创建 os 线程, mstart 为线程起始函数
    ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
    sigprocmask(_SIG_SETMASK, &oset, nil)
}

参考资料