与时间相关的需求经常在我们的编码过程中使用。虽然时间转换等相对简单,但是计时器已经经历了以下迭代:
在Go1.9版本之前,对于Go1.10~1.13,定时器由全球唯一的四堆维护,每个P创建的定时器由对应的四堆维护。Go1.14版本后,各处理器独立维护定时器,通过网络轮询器在
一、计时器设计
10-101001.10前触发定时器。结构如下。
# https://github.com/golang/go/blob/go 1 . 9 . 7/src/runtime/time . go # L28var计时器结构{lock mutexgp * G创建了bool sleeping bool重新安排boolsleepuntil int 64等待注意事项notet [] * timer}请注意,此结构由var变量定义,它将存储所有计时器。t是最小的四堆,运行时创建的所有计时器都将被添加到四堆中。
独立的timerproc通过最小四堆和futexsleep管理定时任务。
1.1、全局四叉堆
但是共享一个锁的全局四堆对性能影响很大,所以在Go1.10之后,全局四堆被分成了64个更小的四堆。
# https://github.com/golang/go/blob/go 1 . 13 . 15/src/runtime/time . go # l39 const timersLen=64 var timers[timersLen]struct { timersBucket } Type RS bucket struct { lock mutex gp * g }创建了bool sleeping bool重新调度bool leepuntil int 64等待注意事项注意事项[] * timer}在理想情况下,四堆的数量应该等于GOMAXPROCS的处理器数量,但处理的数量需要动态调整如果当前机器中处理器P的数量超过64,则多个处理器的定时器可能存储在同一个桶中。
func(t * timer)Assign Bucket()* TimersBucket { ID :=uint 8(getg()。m.p.ptr()。ID)% Timerslent.tb=Timers [ID]。Timersbucketreturn t.tb }对全局计时器进行分频,尽管可以减少。
00-1010 GO版本Go1.14之后,定时器桶timersBucket被移除,所有定时器以最小四堆的形式存储在P中。
以下字段与p结构中的计时器相关联:
TimersLock:用于保护计时器的Mutexclusive lock Timers:用于存储计时器的最小四元堆numTimers:处理器P中的计时器数量adjustTimers:处理器P中状态为timermodiedified的计时器数量deletedTimers:处理器P中状态为timerDeleted的计时器数量,计时器的结构为:
type timer struct { PP puintptrwhen int 64 period int 64f func(interface { },Uintptr)argin interface { } seq uintprtnextwehen int 64 status uint 32 } PP:定时器所在的处理器P的指针地址当:当前定时器被唤醒的时间段:当前定时器再次被唤醒的时间f:回调函数,每次定时器被唤醒,都会被调用arg:回调函数的参数seq:回调函数的参数, 只在netpoll的应用场景中使用nextwhen:当定时器状态为timerModifiedXXX时,会赋给whenstatus:定时器状态只是runtime/time.go runtime的内部处理结构,但它确实是对外公开的。
结构体是:
type Timer struct { C <-chan Time r runtimeTimer}type Ticker struct { C <-chan Time r runtimeTimer}
通过channel来通知计时器时间
二、状态机
状态 |
说明 |
timerNoStatus |
timer尚未设置状态 |
timerWaiting |
等待timer启动 |
timerRunning |
运行timer的回调方法 |
timerDeleted |
timer已经被删除,但仍然在某些p的堆中 |
timerRemoving |
timer即将被删除 |
timerRemoved |
timer已经停止,且不存在任何p的堆中 |
timerModifying |
timer正在被修改 |
timerModifiedEarlier |
timer已被修改为更早的时间,新的时间被设置在nextwhen字段中 |
timerModifiedLater |
timer已被修改为更迟的时间,新的时间被设置在nextwhen字段中 |
timerMoving |
timer已经被修改,正在被移动 |
在runtime/time.go文件下,我们可以看到下面几个方法:
2.1、增加计时器addtimer
当通过time.NewTimer方法增加新的计时器时,会执行startTimer来增加计时器
func startTimer(t *timer) { addtimer(t)}
状态从timerNoStatus->timerWaiting,其他状态会抛出异常
func addtimer(t *timer) { if t.when < 0 { t.when = maxWhen } if t.status != timerNoStatus { throw("addtimer called with initialized timer") } t.status = timerWaiting when := t.when pp := getg().m.p.ptr() lock(&pp.timersLock) cleantimers(pp) doaddtimer(pp, t) unlock(&pp.timersLock) wakeNetPoller(when)}
1、调用cleantimers清除处理器P中的计时器,可以加快创建和删除计时器的程序速度
2、调用doaddtimer将当前计时器加入到处理器P的四叉堆timers中
3、调用wakeNetPoller唤醒网络轮询器中休眠的线程,检查timer被唤醒的时间when是否在当前轮询预期的运行时间内,如果是就唤醒。
2.2、删除计时器deltimer
当通过调用timer.Stop停止计时器时,会执行stopTimer来停止计时器
func stopTimer(t *timer) bool { return deltimer(t)}
deltimer会标记需要删除的计时器。在删除计时器的过程中,可能会遇到其他处理器P的计时器,所以我们仅仅只是将状态标记为删除,处理器P执行删除操作。
- timerWaiting -> timerModifying -> timerDeleted
- timerModifiedLater -> timerModifying -> timerDeleted
- timerModifiedEarlier -> timerModifying -> timerDeleted
- 其他状态 -> 等待状态改变或返回
2.3、修改计时器modtimer
当通过调用timer.Reset重置定时器时,会执行resetTimer来重置定时器
func resettimer(t *timer, when int64) { modtimer(t, when, t.period, t.f, t.arg, t.seq)}
modtimer会修改已经存在的计时器,会根据以下规则处理计时器状态
- timerWaiting -> timerModifying -> timerMofidiedXXX
- timerMofidiedXXX -> timerModifying -> timerMofidiedXXX
- timerNoStatus -> timerModifying -> timerWaiting
- timerRemoved -> timerModifying -> timerWaiting
- timerDeleted -> timerModifying -> timerMofidiedXXX
- 其他状态 -> 等待状态改变
状态为timerNoStatus, timerRemoved会被标记为已删除wasRemoved,就会调用doaddtimer新创建一个计时器。
而在正常情况下会根据修改后的时间进行不同的处理:
- 修改时间 >= 修改前的时间,设置状态为timerModifiedLater
- 修改时间 < 修改前的时间,设置状态为timerModifiedEarlier,并调用wakeNetPoller触发调度器重新调度
2.4、清除定时器cleantimers
会根据状态清除处理器P的最小四叉堆队头的计时器
- timerDeleted -> timerRemoving -> timerRemoved
- timerModifiedEarlier -> timerMoving -> timerWaiting
- timerModifiedLater -> timerMoving -> timerWaiting
func cleantimers(pp *p) { for { if len(pp.timers) == 0 { return } t := pp.timers[0] if t.pp.ptr() != pp { throw("cleantimers: bad p") } switch s := atomic.Load(&t.status); s { case timerDeleted: atomic.Cas(&t.status, s, timerRemoving) dodeltimer0(pp) atomic.Cas(&t.status, timerRemoving, timerRemoved) case timerModifiedEarlier, timerModifiedLater: atomic.Cas(&t.status, s, timerMoving) t.when = t.nextwhen dodeltimer0(pp) doaddtimer(pp, t) atomic.Cas(&t.status, timerMoving, timerWaiting) default: return } }}
- 如果计时器状态为timerDeleted: 将计时器状态改成timerRemoving 调用dodeltimer0删除堆顶的计时器 将计时器状态改成timerRemoved
- 如果计时器状态为timerModifiedEarlier/timerModifiedLater 将计时器状态改成timerMoving 使用计时器下次触发时间nextwhen覆盖本次时间when 调用dodeltimer0删除堆顶的计时器 调用doaddtimer将计时器加入四叉堆中 将计时器状态改成timerWaiting
2.5、调整计时器adjusttimers
在GPM调度的时候检查计时器
- timerDeleted -> timerRemoving -> timerRemoved
- timerModifiedEarlier -> timerMoving -> timerWaiting
- timerModifiedLater -> timerMoving -> timerWaiting
与cleantimers不同的是,adjusttimers会遍历处理器P转给你所有的计时器
func adjusttimers(pp *p) { if len(pp.timers) == 0 { return } var moved []*timerloop: for i := 0; i < len(pp.timers); i++ { t := pp.timers[i] switch s := atomic.Load(&t.status); s { case timerDeleted: // 删除计时器 case timerModifiedEarlier, timerModifiedLater: // 修改计时器 case timerNoStatus, timerRunning, timerRemoving, timerRemoved, timerMoving: case timerWaiting: case timerModifying: default: } } if len(moved) > 0 { addAdjustedTimers(pp, moved) }}
2.6、运行计时器runtimer
会检查四叉堆堆顶的计时器,根据状态处理计时器
- timerWaiting -> timerRunning
- timerDeleted -> timerRemoving -> timerRemoved
- timerModifiedXXX -> timerMoving -> timerWaiting
- 其他状态 -> 等待或异常退出
1、状态是timerDeleted,状态变为timerDeleted,然后删除计时器,再变更状态为timerRemoved
atomic.Cas(&t.status, s, timerRemoving)dodeltimer0(pp)atomic.Cas(&t.status, timerRemoving, timerRemoved)
2、状态是timerModifiedXXX
- 将计时器状态改成timerMoving
- 使用计时器下次触发时间nextwhen覆盖本次时间when
- 调用dodeltimer0删除堆顶的计时器
- 调用doaddtimer将计时器加入四叉堆中
- 将计时器状态改成timerWaiting
atomic.Cas(&t.status, s, timerMoving)t.when = t.nextwhendodeltimer0(pp)doaddtimer(pp, t)atomic.Cas(&t.status, timerMoving, timerWaiting)
3、状态是timerWaiting,如果计时器没有到达触发时间,直接返回,否则状态变为timerRunning,调用runOneTimer运行堆顶的计时器
if t.when > now { // Not ready to run. return t.when}atomic.Cas(&t.status, s, timerRunning)runOneTimer(pp, t, now)
func runOneTimer(pp *p, t *timer, now int64) { f := t.f arg := t.arg seq := t.seq if t.period > 0 { delta := t.when - now t.when += t.period * (1 + -delta/t.period) siftdownTimer(pp.timers, 0) if !atomic.Cas(&t.status, timerRunning, timerWaiting) { badTimer() } updateTimer0When(pp) } else { dodeltimer0(pp) if !atomic.Cas(&t.status, timerRunning, timerNoStatus) { badTimer() } } unlock(&pp.timersLock) f(arg, seq) lock(&pp.timersLock)}
根据period字段是否大于0判断,如果大于0
- 修改下一次的触发时间,并更新在四叉堆中的位置
- 更新状态timerWaiting
- 调用updateTimer0When设置下一个timer的触发时间
如果小于等于0:
- 移除定时器
- 更新状态timerNoStatus
更新完状态后,回调函数f(arg, seq)执行方法。
三、调度器
在adjesttimers中提到过
checkTimers是调度器用来运行处理器P中定时器的函数,会在以下几种情况被触发:
- 调度器调用schedule时
- 调度器在findrunnable获取可执行的G时
- 调度器在findrunnable从其他处理器偷G时
func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { if atomic.Load(&pp.adjustTimers) == 0 { next := int64(atomic.Load64(&pp.timer0When)) if next == 0 { return now, 0, false } if now == 0 { now = nanotime() } if now < next { if pp != getg().m.p.ptr() || int(atomic.Load(&pp.deletedTimers)) <= int(atomic.Load(&pp.numTimers)/4) { return now, next, false } } } lock(&pp.timersLock) adjusttimers(pp) rnow = now if len(pp.timers) > 0 { if rnow == 0 { rnow = nanotime() } for len(pp.timers) > 0 { if tw := runtimer(pp, rnow); tw != 0 { if tw > 0 { pollUntil = tw } break } ran = true } } if pp == getg().m.p.ptr() && int(atomic.Load(&pp.deletedTimers)) > len(pp.timers)/4 { clearDeletedTimers(pp) } unlock(&pp.timersLock) return rnow, pollUntil, ran}
1、先通过处理器P字段中updateTimer0When判断是否有需要执行的计时器,如果没有直接返回
2、如果下一个计时器没有到期但是需要删除的计时器较少时会直接返回
3、加锁
4、需要处理的timer,根据时间将timers切片中的timer重新排序,调用adjusttimers
5、会通过runtimer依次查找运行计时器
6、处理器中已删除的timer大于p上的timer数量的1/4,对标记为timerDeleted的timer进行清理
7、解锁
四、小结
go1.10最多可以创建GOMAXPROCS数量的timerproc协程,当然不超过64。但我们要知道timerproc自身就是协程,也需要runtime pmg的调度。到go 1.14把检查到期定时任务的工作交给了网络轮询器,不需要额外的调度,每次runtime.schedule和findrunable时直接运行到期的定时任务。
内容来源网络,如有侵权,联系删除,本文地址:https://www.230890.com/zhan/157257.html