浅析时间轮定时器

前言: 最早是看到skynet群里边有人问如何取消定时器的问题,那时候正好在研读skynet代码,于是决定试试。但是最终只在lua层面实现了一个伪取消定时器的方案,而且还是不是优解。

    云风说从c层面取消定时器的开销要大于从lua层面取消的开销,当时不知道为什么。

    最近研读了云风实现的时间轮定时器代码(看着相当费劲啊),  过程中网上搜了很多资料,但大部分没能帮助我有个更好的理解,所以打算从写篇文章,希望能帮助像我一样的newbee, 更好的理解时间轮定时器。

    这里不讲定时器的进化过程,只讲时间轮,以及 skynet 中云风十分精巧的实现(才疏学浅,看这块儿代码,真他妈的爽)

步骤: 1 创建时间轮 2 添加到期时间 3 时钟tick 执行定时器到期回调,移动定时器列表  循环往复

1.初始化 "时间轮"

  首先看下相关的数据结构

typedef void (*timer_execute_func)(void *ud,void *arg);

#define TIME_NEAR_SHIFT 8
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)
#define TIME_LEVEL_SHIFT 6
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)
#define TIME_NEAR_MASK (TIME_NEAR-1)
#define TIME_LEVEL_MASK (TIME_LEVEL-1)

struct timer_event {
 uint32_t handle;
 int session;
};

struct timer_node {
 struct timer_node *next;
 uint32_t expire;
};

struct link_list {
 struct timer_node head;
 struct timer_node *tail;
};

struct timer {
 struct link_list near[TIME_NEAR];
 struct link_list t[4][TIME_LEVEL];
 struct spinlock lock;
 uint32_t time;
 uint32_t starttime;
 uint64_t current;
 uint64_t current_point;
};

可以看出 struct timer 就是 时间轮了。ok 下面是初始化代码

static struct timer * TI = NULL;

static struct timer *
timer_create_timer() {
 struct timer *r=(struct timer *)skynet_malloc(sizeof(struct timer));
 memset(r,0,sizeof(*r));

 int i,j;

 for (i=0;i<TIME_NEAR;i++) {
  link_clear(&r->near[i]);
 }

 for (i=0;i<4;i++) {
  for (j=0;j<TIME_LEVEL;j++) {
   link_clear(&r->t[i][j]);
  }
 }

 SPIN_INIT(r)

 r->current = 0;

 return r;
}

static uint64_t
gettime() {
 uint64_t t;
#if !defined(__APPLE__) || defined(AVAILABLE_MAC_OS_X_VERSION_10_12_AND_LATER)
 struct timespec ti;
 clock_gettime(CLOCK_MONOTONIC, &ti);
 t = (uint64_t)ti.tv_sec * 100;
 t += ti.tv_nsec / 10000000;
#else
 struct timeval tv;
 gettimeofday(&tv, NULL);
 t = (uint64_t)tv.tv_sec * 100;
 t += tv.tv_usec / 10000;
#endif
 return t;
}

void
skynet_timer_init(void) {
 TI = timer_create_timer();
 uint32_t current = 0;
 systime(&TI->starttime, &current);
 TI->current = current; //初始化时刻,纳秒数
 TI->current_point = gettime();
}

首先调用 timer_create_time() 创建五个数组  一个struct link_list near[TIME_NEAR] TIME_NEAR, 四个 struct link_list t[4][TIME_LEVEL]; TIME_LEVEL, 每个数组的每个 slot 表示一个时间段 同时又是个链表,用来存储

到期时间距当前tick 一个时间段的定时器。 skynet 提供的定时器精度为 1/100 秒 也就是10毫秒, 具体实现为 gettime() 。既然精度是 10 毫秒 那么 10毫秒就要调用一次  dispatch 函数,触发定时器,

那么哪里调用的呢?

答案在skynet_start.c里 其中定时器线程  

static void *
thread_timer(void *p) {
 struct monitor * m = p;
 skynet_initthread(THREAD_TIMER);
 for (;;) {
  skynet_updatetime();
  CHECK_ABORT
  wakeup(m,m->count-1);
  usleep(2500);
  if (SIG) {
   signal_hup();
   SIG = 0;
  }
 }
 // wakeup socket thread
 skynet_socket_exit();
 // wakeup all worker thread
 pthread_mutex_lock(&m->mutex);
 m->quit = 1;
 pthread_cond_broadcast(&m->cond);
 pthread_mutex_unlock(&m->mutex);
 return NULL;
}

撇开无关代码, 可以提取出

void
skynet_updatetime(void) {
 uint64_t cp = gettime();
 if(cp < TI->current_point) {
  skynet_error(NULL, "time diff error: change from %lld to %lld", cp, TI->current_point);
  TI->current_point = cp;
 } else if (cp != TI->current_point) {
  uint32_t diff = (uint32_t)(cp - TI->current_point);
  TI->current_point = cp;
  TI->current += diff;
  int i;
  for (i=0;i<diff;i++) {
   timer_update(TI);
  }
 }
}

这里基本可以保证diff 的值为1, 也就是10 毫秒的时间间隔, 可以写个测试程序试一下,稍后贴运行结果

 for (;;) {
  skynet_updatetime();
  usleep(2500);
  }
 }

ok 现在我们创建了定时器的骨架, 然后也知道了在哪里保证 10 毫秒触发定时器,如果没有注册定时器,就是空转。

个人觉得时间轮定时器的难点在于 注册时 和 shift 时,定位到slot。要想知道这点,一个好的方法就是

现在我们注册几个定时器。 这里会挑一些时间点的定时器,来分析注册,分发,shift 过程。

先看注册函数

int
skynet_timeout(uint32_t handle, int time, int session) {
 if (time <= 0) {
  struct skynet_message message;
  message.source = 0;
  message.session = session;
  message.data = NULL;
  message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

  if (skynet_context_push(handle, &message)) {
   return -1;
  }
 } else {
  struct timer_event event;
  event.handle = handle;
  event.session = session;
  timer_add(TI, &event, sizeof(event), time);
 }

 return session;
}

timer_add(TI, &event, sizeof(event), time);

看看timer_add;

static void
timer_add(struct timer *T,void *arg,size_t sz,int time) {
 struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
 memcpy(node+1,arg,sz);  //每个node后边绑一个event参数

 SPIN_LOCK(T);

  node->expire=time+T->time;
  add_node(T,node);

 SPIN_UNLOCK(T);
}

申请一个node,看 node->expire = time+T->time; 到期时间是 time+T->time,   那么T->time 是啥,这个值是如何变化的,在哪里变化的

 现需要留神下几个变量的含义

T->time; T->starttime; T->current; T->current_point; 

T->time 服务器经过的tick 数, 每10毫秒 tick 一次,T->time 增加1;

T->starttime; 服务器开始的时间,单位秒。

T->current (uint32_t)(ti.tv_nsec / 10000000);

T->current_point   t = (uint64_t)ti.tv_sec * 100, t += ti.tv_nsec / 10000000 ;

现在看下注册函数 lua 层是调用 skynet.timeout(time), 例如 skynet.timeout(10), skynet.timeout(100), skynet.timeout(1000)

ok 现在看看 c 层调用情况 

int
skynet_timeout(uint32_t handle, int time, int session) {
if (time <= 0) {
struct skynet_message message;
message.source = 0;
message.session = session;
message.data = NULL;
message.sz = (size_t)PTYPE_RESPONSE << MESSAGE_TYPE_SHIFT;

if (skynet_context_push(handle, &message)) {
return -1;
}
} else {
struct timer_event event;
event.handle = handle;
event.session = session;
timer_add(TI, &event, sizeof(event), time);
}

return session;
}

 先忽略 session time 在这里就是例子中的 10, 100, 1000, 表示 0.1 秒, 1 秒和 10 秒 后触发, 在看 timer_add();

static void
timer_add(struct timer *T,void *arg,size_t sz,int time) {
struct timer_node *node = (struct timer_node *)skynet_malloc(sizeof(*node)+sz);
memcpy(node+1,arg,sz); //每个node后边绑一个event参数

SPIN_LOCK(T);

node->expire=time+T->time;
add_node(T,node);

SPIN_UNLOCK(T);
}

 timer_add 先创建一个 node 用来保存超时时间即 node->expire , 我们看到 node->expire = node->time + time, 上边讲到 node->time 是从进程启动时刻算起,经过的 滴答数

又知道每 10 毫秒 滴答一次, 那么我们可以得出 node->expire 就是 从当前 node->time 起 经过 time 个滴答数后 到达 node->expire. 

计算一下可以知道 10 * 10(每个滴答的间隔) == 100毫秒 = 0.1 秒, 同理  100 * 10, 1000 * 10. 

现在看 add_node ,

static void
add_node(struct timer *T,struct timer_node *node) {
uint32_t time=node->expire;
uint32_t current_time=T->time;

if ((time|TIME_NEAR_MASK)==(current_time|TIME_NEAR_MASK)) { //如果当前时间
link(&T->near[time&TIME_NEAR_MASK],node);
} else {
int i;
uint32_t mask=TIME_NEAR << TIME_LEVEL_SHIFT;
for (i=0;i<3;i++) {
if ((time|(mask-1))==(current_time|(mask-1))) {
break;
}
mask <<= TIME_LEVEL_SHIFT;
}

link(&T->t[i][((time>>(TIME_NEAR_SHIFT + i*TIME_LEVEL_SHIFT)) & TIME_LEVEL_MASK)],node);
}
}

从  add_node 里我们看到了许多宏定义,那么这些宏是什么意思呢? 要想理解这个, 这里就需要详细讲解 时间轮 的设计思路。

放到这里才讲解 时间轮 的设计思路 是因为我觉得这样自然过度过来比较好理解。现在分析:

typedef void (*timer_execute_func)(void *ud,void *arg);

#define TIME_NEAR_SHIFT 8   
#define TIME_NEAR (1 << TIME_NEAR_SHIFT)   //TIME_NEAR = 00000001 00000000
#define TIME_LEVEL_SHIFT 6           
#define TIME_LEVEL (1 << TIME_LEVEL_SHIFT)   //TIME_LEVEL = 01000000
#define TIME_NEAR_MASK (TIME_NEAR-1)     //TIME_NEAR_MASK = 11111111
#define TIME_LEVEL_MASK (TIME_LEVEL-1)       //TIME_LEVEL_MASK = 00111111

 首先 skynet 的定时器 node->time 类型为 uint32_t , 范围是 0 ~ 2^32 - 1 ,  skynet 实现的 时间轮的思路是这样的(我想linux也是这样):

将 32 位 分为五个部分,

struct link_list near[TIME_NEAR]; (000000 000000 000000 000000 11111111)  低8位 表示 0 ~ 2^8 - 1 即 0 ~ 255 个 tick , 这里每个 near[x] 的精度是一个 tick  ,即 10 毫秒

下面是四个数组,每个数组有 TIME_LEVEL  个 slot ,即 2^6 = 64 个 slot, 

link_list[0] 数组, 每个 slot 表示的精度是 2^8 , 即在这个数组中 每一个 slot 比上一个 slot 多 256 个 tick ,link_list[0] 表示的范围是 (2^8 ~ 2^14 - 1), 即 node->expire 时间在这个范围内的,都被分到这个数组里

link_list[1]数组, 每个 slot 表示的精度是 2^8 * 2^6 , link_list[1] 表示的范围是 (2^14 ~ 2^20 - 1), 即 node->expire 时间在这个范围内的 (2^14 ~ 2^20 - 1),都被分到这个数组里

link_list[2]数组, 每个 slot 表示的精度是 2^8 * 2^6 * 2^6 , link_list[2] 表示的范围是 (2^20 ~ 2^26 - 1), 即 node->expire 时间在这个范围内的 (2^20 ~ 2^26 - 1),都被分到这个数组里

link_list[3]数组, 每个 slot 表示的精度是 2^8 * 2^6 * 2^6 * 2^6 , link_list[2] 表示的范围是 (2^26 ~ 2^32 - 1), 即 node->expire 时间在 这个范围内的  (2^26 ~ 2^32 - 1),,都被分到这个数组里

有了这 5 个数组

         

原文地址:https://www.cnblogs.com/newbeeyu/p/9022623.html