Go runtime 调度
Linux 进程 & 线程
在讲解详细的线程模型前,先整理一下概念
Linux 内核在 2.0.x 版本实现了轻量级进程(普遍意义上的线程),应用程序可以通过一个统一的clone()
系统调用接口,用不同的flag
参数指定创建轻量级进程(线程)还是普通进程。当flag
参数设置了共享相关标识时,创建线程,否则创建的就是进程,二者的边界实际上很模糊。
从 Linux 内核角度来看,实际上也并没有对「进程」和「线程」进行严格的区分。无论是进程还是线程,都通过 struct task_struct 来表示,结构体中也并没有对进程和线程的标识位。
所以在 Linux 中,线程本质还是进程,只是它会和其他进程共享地址空间和系统资源等,如此便将进程和其占用的资源分离,相比与传统进程就显得更加”轻量级“。
进而我们可以将进程理解为资源的容器,其中包含一个或者多个线程,同一个进程下各个线程共享资源(地址空间,文件描述符等)。而线程则作为内核调度的基本单位,各个线程有各自的寄存器,线程栈,程序计数器等。
线程模型
用户级线程模型
早期操作系统并没有提供对线程的支持,所以线程以用户级线程的形式存在。多线程的调度都由用户自己的线程/协程库(比如 py 的 gevent,jdk 早期的 Green Threads)来完成,包括线程的创建,调度,销毁,同步等都由库函数在用户空间完成,不需要内核的支持。只在运行时将进程中所有的线程和内核的一个调度实体动态的绑定,即 N:1 的模型。这种模型下内核对用户线程没有任何感知,仍然以进程为单位调度。
这种模型好处显而易见:
- 调度过程在用户态实现,不需要内核切换,消耗很小。
- 创建成本低,可支持创建的线程数量很多
- 可以为线程定制调度算法,比如 gc 线程,我们就可以定制调度算法,不让其随便 stop
但是用户级线程也存在一些问题:
- 线程阻塞调用(如 I/O 操作)会导致进程下所有线程阻塞,因为用户线程都是挂靠在同一个内核进程/线程上的,在内核角度就是一个进程阻塞了,此时用户调度线程也无法再进行调度切换。一种不太优雅的解决方案就是重写系统调用库中一些阻塞的方法,将其封装为非阻塞调用,在即将阻塞的地方让出 cpu,将执行权交给其他线程,等系统准备好了不会再阻塞了再进行调用(py 的 gevent 库就是这样做的,使用的 select/epoll)
- 无法利用多核资源,多个线程最终还是挂靠在单个内核进程上,做不到真正的并发
- 不适合 CPU 密集型任务,如果用户线程是 cpu 密集型任务,没有阻塞操作,那么它就不会主动释放 CPU,其他线程就得不到执行机会,就会出现线程饥饿现象。
内核级线程模型
关于「内核线程」上面也有提到,实际上内核线程就是直接由操作系统内核支持的线程(Kernel Levvel Thread,KLT)。我们的应用程序一般不会直接去使用内核线程,而是通过创建我们前文中说到的轻量级进程(LWP),进而去使用内核线程。LWP 和内核线程是一一对应的,是一个 1:1 的映射模型。这样就将线程的创建,调度,销毁等都交给内核去完成(主流 jdk 的 Thread 库就是这种模型)
这种模型的优点很明显:
- 内核可以感知到线程(LWP)的存在。
- 实现简单,每个线程都是一个独立的调度单元,直接使用内核提供的调度器,由内核去处理线程阻塞,线程切换等问题,用户不干预调度过程,只能提供调度建议。
- 真正做到了并行处理,充分利用了多核处理器的优势
缺点也同样明显:
- 每一个用户线程都要对应到一个内核线程,而内核资源有限,所以能支持的线程数量也有限
- 线程创建,销毁,多线程之间上下文切换调度,都需要进行系统调用,需要在用户态和内核态之间进行切换,开销比较大,影响性能。
两级(混合)线程模型
「两级线程模型」就是结合了用户线程模型以及内核线程模型后的产物,用户线程和 LWP 是 M:N 的映射关系。此时用户线程还是完全建立在用户空间中,由程序的 Runtime 来负责调度,因此用户线程的创建,销毁,调度等操作仍然很廉价。并且也可以支持大规模的线程并发。
这种模式下,用户线程不再唯一的绑定一个内核线程,当某个用户线程阻塞时,其他的线程可以重新与其他的内核线程绑定运行,避免了用户线程模型中整个进程被阻塞的问题。
这种模型之所以被称为两级线程模型,是因为该模型中线程既不是完全靠自己调度,也不是完全靠内核去调度,而是二者协调调度。 用户 Runtime 负责调度用户线程到内核线程的调度,而内核负责内核线程到 CPU 上的调度,故称为二级线程模型。Go 语言采用的就是这种模型。
G-P-M 模型
GPM 结构
G(goroutine)
表示 goroutine,参与调度的最小单位。每个 Goroutine 对应一个 G 结构体,G 存储 goroutine 的运行堆栈、状态以及任务函数,因为 goroutine 在执行过程中可能因为各种原因被暂停,这时需要保存 PC 和堆栈信息,以便恢复后时继续执行。每个 G 需要绑定到 P 才能被调度执行。
在函数前加 go 关键字创建一个协程,其实是调用 newproc 函数,fn 就是 go 关键字后面函数地址
type g struct {
goid int64 // goroutine id
atomicstatus uint32 // 当前状态
stack stack // g 栈区间
m *m // 当前 m
sched gobuf // 运行时信息,包含 PC 以及运行时的堆栈信息
stackguard0 uintptr // stackguard0 = stack.lo + StackGuard,如果要抢占当前 g 会把字段值设为 stackPreempt
preempt bool // preemption signal, duplicates stackguard0 = stackpreempt
// ... 省略部分字段。..
}
type stack struct { //栈从高地址往低地址增长
lo uintptr
hi uintptr
}
type gobuf struct {
sp uintptr // 堆栈指针
pc uintptr // 计数器指针
g guintptr
ctxt unsafe.Pointer
ret uintptr
lr uintptr
bp uintptr // for framepointer-enabled architectures
}
G 的状态
_Gidle // 刚被创建,还没初始化
_Grunnable // 在运行队列中,还没被执行,也没分配栈
_Grunning // 运行中
_Gsyscall // 系统调用
_Gwaiting // 阻塞状态,比如:等待 channel、i/o 操作被 gopark
_Gdead // goroutine 执行结束,进入 freelist 中
_Gpreempted // g 被通过信号方式抢占,此状态不能直接 goready
P(Processor)
逻辑处理器,对于 G 来说,只有 G 绑定到 P(进入 P 的任务队列中),才能被调度。对于 M 来说,P 提供了相关的执行环境(Context 上下文),内存分配状态,任务队列等。P 的初始化在 schedinit 中
Go1.0 时期并没有 P,所以所有的 G 的创建和调度都需要加全局的锁,性能损耗很大,所以早期 Go 并发性能并不好
// go1.18 darwin/amd64 runtime/runtime2.go
type p struct {
id int32 //P 的 id
status uint32 //当前状态
mcache *mcache //内存分配器
runqhead uint32 // 队列头
runqtail uint32 // 队列尾
runq [256]guintptr //等执行的 goroutine 队列,访问时不需要加锁
m muintptr //P 绑定的 m
// Available G's (status == Gdead)
gFree struct { //当 G 运行结束后,清除数据放入列表以便复用
gList
n int32
}
runnext guintptr //下个运行的 g,如果不为 nil, 则当前 g 执行完后,优先执行它
// ... 省略部分字段。..
}
P 的状态
_Pidle //当 M 没有 G 可执行时,P 进入空闲列表
_Prunning //P 和 M 绑定,正在执行 G 或在寻找可执行的 G
_Psyscall //与之关联中的 M 进入系统调用
_Pgcstop //GC STW
_Pdead //调小 GOMAXPROCS 数量后多余的 p 置为此状态
M(Machine)
系统线程抽象,代表着真正执行计算的资源。M 会从 P 的队列(或者 Global 队列)中取 G 来执行,切换到 G 的执行栈上并执行 G 的函数,调用 goexit 做清理工作并回到 M,当 G 被暂停 M 会把上下文信息写回 G,并取下一个 G 继续执行。M 并不保留 G 状态,这也是 G 可以跨 M 调度的核心。
新建 M 是通过 newm 来创建的,最终是通过 newosproc 函数来实现的新建线程,不通平台的 newosproc 有不同的具体实现, 比如 linux 平台下,是调用 clone 系统调用来实现创建线程,mac 平台下,是调用 pthread_create 创建线程。M 最多 10000 个,在 shedinit 中定义了 sched.maxmcount
type m struct {
id int64
g0 *g // 用于执行调度任务的 g,使用系统栈,不受 gc 影响
tls [6]uintptr // 线程本地存储空间
curg *g // 当前正在被执行的 goroutine
nextp puintptr // M 被唤醒需要立即绑定的 P
p puintptr // 与 M 绑定的 P
spinning bool // true 表示 M 处于自旋转状态(当前没有 g 执行,正在寻找可执行的 g)
mcache *mcache //当 M 与 P 绑定后,跟 P 的 mcache 指向同一个内存分配器
}
G0 和 M0
-
m0: 表示进程启动的第一个线程,也叫主线程。它和其他的 m 没有什么区别,要说区别的话,它是进程启动通过汇编直接复制给 m0 的,m0 是个全局变量,而其他的 m 都是 runtime 内自己创建的。 一个 go 进程只有一个 m0。
-
g0: 每个 m 都有一个 g0,因为每个线程有一个系统堆栈,g0 虽然也是 g 的结构,但和普通的 g 还是有差别的,最重要的差别就是栈的差别。g0 上的栈是系统分配的栈,在 linux 上栈大小默认固定 8MB,不能扩展,也不能缩小。 而普通 g 一开始只有 2KB 大小,可扩展。在 g0 上也没有任何任务函数,也没有任何状态,并且它不能被调度程序抢占,因为调度就是在 g0 上跑的(参考 allocm)。
runtime 通常使用 systemstack、mcall 或 asmcgocall 临时切换到系统堆栈,以执行必须不被抢占的任务、不得增加用户堆栈的任务或切换用户 goroutines。在系统堆栈上运行的代码隐式不可抢占,垃圾收集器不扫描系统堆栈。在系统堆栈上运行时,不会使用当前用户堆栈执行
Runtime 源码注释
proc.go 的注释中写到:
The bootstrap sequence is:
- call osinit
- call schedinit
- make & queue new G
- call runtime·mstart
(从对应的汇编也可以看出这个过程,我这里对应的是 runtime/asm_amd64.s)
//1. osinit 初始化 cpu 数量和页大小
func osinit() {
ncpu = getproccount()
physHugePageSize = getHugePageSize()
}
// 2. call schedinit
func schedinit() {
sched.maxmcount = 10000 //m 的最大数量
mallocinit() //内存分配相关
//根据环境变量,创建 GOMAXPROCS 个 G
sched.lastpoll = uint64(nanotime())
procs := ncpu
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
}
// 3. 启动 main goroutine
func main() {
g := getg()
// 标记 main 已经启动
mainStarted = true
//在系统栈上(也就是通过 g0) 创建 m,并执行 sysmon,所以 symon 是在单独的 m 上运行,不受 gc 影响
if GOARCH != "wasm" {
systemstack(func() {
newm(sysmon, nil)
})
}
//如果 main goroutine 不在 m0 上运行,肯定 bug 了
if g.m != &m0 {
throw("runtime.main not on m0")
}
//执行 runtime 的 init 和用户包中 init 函数
doInit(&runtime_inittask)
doInit(&main_inittask)
//调用用户自定义的 main 函数
//从这里可以看出在 golang 中,init 函数先于 main 函数执行
fn := main_main
fn()
//退出主进程
exit(0)
for {
var x *int32
*x = 0
}
}
//4. call runtime·mstart(m0 启动)
//M0 在这里调用的 https://github.com/golang/go/blob/master/src/runtime/asm_amd64.s#L225
// mstart 是一个新 M 的入口函数。
func mstart() {
_g_ := getg()
osStack := _g_.stack.lo == 0
if osStack {
// 初始化 g0 栈大小
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
mstart1()
if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
osStack = true
}
// m 退出
mexit(osStack)
}
func mstart1() {
_g_ := getg()
if _g_ != _g_.m.g0 {//调用这个函数只,m 上只可能有 g0
throw("bad runtime·mstart")
}
//初始化 m
asminit()
minit()
//执行 mspinning、sysmon 等
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
//_g_所在的 m 不是 m0, 则关联 p 和 m
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
schedule()//开始调度
}
M 启动后进入 schedule() 方法,进入调度,调度的本质其实就是查找可以运行的 G,然后去运行 G 上面的任务函数
//需要注意的是:schedule 函数及子函数中调用的 getg() 返回的都是 g0,因为 schedule 是运行在 g0 上的
func schedule() {
_g_ := getg() //获取当前 g
var gp *g
var inheritTime bool
// .....
if gp == nil {
// 为了保证公平调度,schedule 每执行 61 次就会去全局队列拿一批 g 到 p 的本地队列,避免全局队列中的 g 饥饿
// 否则可能出现 2 个 g 永久占用本地队列(因为被暂停的 goroutine 唤醒后优先放入本地队列)
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// .....
if gp == nil {
//从本地队列中取 g
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
//如果本地队列中没有可执行的 g,则调用 findrunnable 直到有可运行的 g 为止
if gp == nil {
gp, inheritTime = findrunnable()
}
// 到这里,说明已经找到可运行的 g,如果 m 还处于自旋转状态,则置回正常状态
// 并唤醒 p 与之绑定
if _g_.m.spinning {
resetspinning()
}
//直接在当前 m 上执行 g
execute(gp, inheritTime)
}
findrunnable 查找可运行的 G
//阻塞获取可执行的 G,findrunnable 会从全局队列、其它 P 队列、netpoll 中去轮询
func findrunnable() (gp *g, inheritTime bool) {
_g_ := getg()
top:
_p_ := _g_.m.p.ptr()
// 本地队列中有 g,则直接返回
if gp, inheritTime := runqget(_p_); gp != nil {
return gp, inheritTime
}
// 全局队列中如果有 g, 则从全局队列中取一批到本地队列
if sched.runqsize != 0 {
lock(&sched.lock)
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
if gp != nil {
return gp, false
}
}
// 从 netpoll 中获取 g(非阻塞轮询已经完成的网络 io)
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
if list := netpoll(false); !list.empty() { //从 netpoll 中取出 i/o 读写完成的 g 列表
gp := list.pop() //先取出一个让当前 m 执行,这样能提高响应速度
injectglist(&list) //再把剩余的 g 列表放入队列
casgstatus(gp, _Gwaiting, _Grunnable)
return gp, false
}
}
// 检查是否可以从其它 p 中偷一部分 g
procs := uint32(gomaxprocs)
if atomic.Load(&sched.npidle) == procs-1 {
//如果其它 p 全部都是 idle 的,那肯定没地方偷
goto stop
}
//如果当前 m 没有处于自旋转且自旋转中的 p 数量 < running 中的 p 数量/2, 则让当前 m 进入自旋转
if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
goto stop
}
if !_g_.m.spinning {
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
//随机从一个 p 中偷,最多偿试 4 次
for i := 0; i < 4; i++ {
for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
if sched.gcwaiting != 0 {
goto top
}
stealRunNextG := i > 2
if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
return gp, false
}
}
}
stop:
allpSnapshot := allp
//再次检查全局队列,如果有 g,则取出执行,否则把 p 与当前 m 解绑
if sched.runqsize != 0 {
gp := globrunqget(_p_, 0)
unlock(&sched.lock)
return gp, false
}
if releasep() != _p_ { //p 与当前 m 解绑
throw("findrunnable: wrong p")
}
pidleput(_p_) //解绑的 p 放入 idle 队列
unlock(&sched.lock)
wasSpinning := _g_.m.spinning
if _g_.m.spinning { //如果当前 m 还处于自旋转状态,则取消,sched.nmspinning -1
_g_.m.spinning = false
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("findrunnable: negative nmspinning")
}
}
// 再次检查所有 p 的队列
for _, _p_ := range allpSnapshot {
if !runqempty(_p_) {
lock(&sched.lock)
_p_ = pidleget() //如果某个 p 的队列不为空,则从 idle 列表中取出一个 p
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
if wasSpinning { //如果解绑以前 m 是自旋转的,则还是让它保持自旋转
_g_.m.spinning = true
atomic.Xadd(&sched.nmspinning, 1)
}
goto top
}
break
}
}
// 再次从 netpoll 中阻塞的取 g
if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
if _g_.m.p != 0 {
throw("findrunnable: netpoll with p")
}
if _g_.m.spinning {
throw("findrunnable: netpoll with spinning")
}
list := netpoll(true)
atomic.Store64(&sched.lastpoll, uint64(nanotime()))
if !list.empty() {
lock(&sched.lock)
_p_ = pidleget()
unlock(&sched.lock)
if _p_ != nil {
acquirep(_p_)
gp := list.pop()
injectglist(&list)
casgstatus(gp, _Gwaiting, _Grunnable)
if trace.enabled {
traceGoUnpark(gp, 0)
}
return gp, false
}
injectglist(&list)
}
}
//如果始终找到不,就让 m 停止
stopm()
goto top
}
//把 g 列表放入全局队列,调用 startm 检测是否有 idle 的 p,将 idle 的 m 与之绑定或 new 一个 m
func injectglist(glist *gList) {
if glist.empty() {
return
}
if trace.enabled {
for gp := glist.head.ptr(); gp != nil; gp = gp.schedlink.ptr() {
traceGoUnpark(gp, 0)
}
}
lock(&sched.lock)
var n int
for n = 0; !glist.empty(); n++ {
gp := glist.pop()
//队列中的 g 状态必须为_Grunnable
casgstatus(gp, _Gwaiting, _Grunnable)
globrunqput(gp)
}
unlock(&sched.lock)
for ; n != 0 && sched.npidle != 0; n-- {
startm(nil, false)
}
*glist = gList{}
}
//重置自旋转状态
func resetspinning() {
_g_ := getg()
if !_g_.m.spinning {
throw("resetspinning: not a spinning m")
}
_g_.m.spinning = false
nmspinning := atomic.Xadd(&sched.nmspinning, -1)
if int32(nmspinning) < 0 {
throw("findrunnable: negative nmspinning")
}
//除当前 m 外没有其它 m 处于自旋转状态且还有 p 处于 idle,则唤醒一个 p,这样能让等执行的 g 尽量早的被处理。
if nmspinning == 0 && atomic.Load(&sched.npidle) > 0 {
wakep()
}
}
func wakep() {
// 如果已有 m 处于自旋转状态,则直接返回 (g 一定会被处于自旋转状态的 m 执行 [结合 findrunable 函数看])
if !atomic.Cas(&sched.nmspinning, 0, 1) {
return
}
startm(nil, true)
}
//startm 检测是否有 idle 的 p, 并将 idle 的 m 与 p 绑定或 new 一个 m
func startm(_p_ *p, spinning bool) {
lock(&sched.lock)
if _p_ == nil {
_p_ = pidleget() //获取一个 idle 的 p
if _p_ == nil {
unlock(&sched.lock)
if spinning {
// spinning 为 true 说明 startm 的调用方对 nmspinning 加了 1,但是没发现 idle 的 p,所以要回滚 nmspinning
if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
throw("startm: negative nmspinning")
}
}
return
}
}
//成功获取到 p, 再获取 idle 的 m
mp := mget()
unlock(&sched.lock)
if mp == nil { //获取 idle 的 m 失败,则创建一个 m
var fn func()
if spinning {
// 如果 spinning 为 true, 则标记新创建的 m 为 spinning
fn = mspinning
}
newm(fn, _p_) //创建新 m
return
}
mp.spinning = spinning
mp.nextp.set(_p_) // 把 p 设置为即将与 m 绑定的 p
notewakeup(&mp.park)
}
func newm(fn func(), _p_ *p) {
mp := allocm(_p_, fn) //new 一个 m 结构,并初始化
mp.nextp.set(_p_) //把 p 设置为即将与 m 绑定的 p
mp.sigmask = initSigmask
newm1(mp)
}
func newm1(mp *m) {
execLock.rlock()
newosproc(mp)
execLock.runlock()
}
//newosproc 创建 OS 线程,不同的 OS 接口不一样,linux 用的 clone, windows 为_CreateThread
func newosproc(mp *m) {
stk := unsafe.Pointer(mp.g0.stack.hi)
// clone 期间禁用信号,clone 完成再启用
var oset sigset
sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
//调用 clone 创建 os 线程, mstart 为线程起始函数
ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart)))
sigprocmask(_SIG_SETMASK, &oset, nil)
}
参考资料
- 《Modern Operating Systems》
- 《深入理解 Java 虚拟机》
- https://blog.csdn.net/qq_28351465/article/details/88950311
- https://www.cnblogs.com/chenny7/p/14060906.html
- https://strikefreedom.top/high-performance-implementation-of-goroutine-pool
- https://lifan.tech/2020/06/01/golang/runtime/
- https://zboya.github.io/post/go_scheduler/
- https://zhuanlan.zhihu.com/p/65738076