skynet源码阅读<7>--死循环检测

    在使用skynet开发时,你也许会碰到类似这样的警告:A message from [ :0100000f ] to [ :0100000a ] maybe in an endless loop (version = 137)
    它表示你的代码在某处陷入了死循环。但是如何找到死循环的点呢?可以这样做:
    1)本机登陆skynet控制台:nc 127.0.0.1 8000 ==> telnet登录skynet
    2)输入命令:signal addr 0                 ==> 向目标服务地址addr发送信号0,在本例中应为:signal :0100000a 0
    此时目标地址addr所指服务会退出死循环并打印错误堆栈。关于进一步使用skynet console的教程,请参考skynet在git上的wiki:https://github.com/cloudwu/skynet/wiki/DebugConsole
    死循环检测的问题解决了,让我们趁着这个机会,探索下这背后的原理吧。
    skynet console是通过debug_console.lua来实现的,可以看到signal函数:

1 function COMMAND.signal(address, sig)
2     address = skynet.address(adjust_address(address))
3     if sig then
4         core.command("SIGNAL", string.format("%s %d",address,sig))
5     else
6         core.command("SIGNAL", address)
7     end
8 end

    顺藤摸瓜,我们来到skynet_server.c中的cmd_signal函数:

 1 static const char *
 2 cmd_signal(struct skynet_context * context, const char * param) {
 3     uint32_t handle = tohandle(context, param);
 4     if (handle == 0)
 5         return NULL;
 6     struct skynet_context * ctx = skynet_handle_grab(handle);
 7     if (ctx == NULL)
 8         return NULL;
 9     param = strchr(param, ' ');
10     int sig = 0;
11     if (param) {
12         sig = strtol(param, NULL, 0);
13     }
14     // NOTICE: the signal function should be thread safe.
15     skynet_module_instance_signal(ctx->mod, ctx->instance, sig);
16 
17     skynet_context_release(ctx);
18     return NULL;
19 }

    解析出signal参数后调用skynet_module_instance_signal:

1 void
2 skynet_module_instance_signal(struct skynet_module *m, void *inst, int signal) {
3     if (m->signal) {
4         m->signal(inst, signal);
5     }
6 }

    其调用通过动态库加载的API:m->signal,它是在加载skynet module时动态加载的,Lua服务对应的module是snlua,我们看下service_snlua.c中的snlua_signal函数做了什么:

 1 void
 2 snlua_signal(struct snlua *l, int signal) {
 3     skynet_error(l->ctx, "recv a signal %d", signal);
 4     if (signal == 0) {
 5 #ifdef lua_checksig
 6     // If our lua support signal (modified lua version by skynet), trigger it.
 7     skynet_sig_L = l->L;
 8 #endif
 9     } else if (signal == 1) {
10         skynet_error(l->ctx, "Current Memory %.3fK", (float)l->mem / 1024);
11     }
12 }

    可以看到,signal为0时,将skynet_sig_L置空。那么skynet_sig_L是干嘛用的呢?在lua的lvm.c中有如下定义:

 1 /* Add by skynet */
 2 lua_State * skynet_sig_L = NULL;
 3 
 4 LUA_API void
 5 lua_checksig_(lua_State *L) {
 6   if (skynet_sig_L == G(L)->mainthread) {
 7     skynet_sig_L = NULL;
 8     lua_pushnil(L);
 9     lua_error(L);
10   }
11 }

    即如果skynet_sig_L为lua主线程G(L)->mainthread的话,那么将其置空并主动报错,在lua_error中会展开堆栈信息。

    在lua.h中定义了lua_checksig宏:

1 #define lua_checksig(L) if (skynet_sig_L) { lua_checksig_(L); }

    查找此宏的引用点:

    在虚拟机指令:跳转OP_JMP、尾调用OP_TAILCALL、for循环OP_FORLOOP、OP_TFORLOOP处都会做checksig检查。除了for循环之外的其它循环,比如while或repeat,在LUA中都是通过条件判断结合JMP跳转来实现的,因此也是可以被检查报错的。对于无限递归的情况,如果递归函数可以被优化成尾调用的话,那么会在TAILCALL中被检查并报错。至此,如何打断死循环并报错跳出的处理我们已经清楚了,可是新的问题又来了,skynet中是如何检测到死循环发生的呢?

    回想skynet启动时,会创建monitor线程,监视各个线程对应的skynet_monitor参数的情况:

 1 static void *
 2 thread_monitor(void *p) {
 3     struct monitor * m = p;
 4     int i;
 5     int n = m->count;
 6     skynet_initthread(THREAD_MONITOR);
 7     for (;;) {
 8         CHECK_ABORT
 9         for (i=0;i<n;i++) {
10             skynet_monitor_check(m->m[i]);
11         }
12         for (i=0;i<5;i++) {
13             CHECK_ABORT
14             sleep(1);
15         }
16     }
17 
18     return NULL;
19 }

    skynet_monitor_check监视的内容:

 1 void 
 2 skynet_monitor_check(struct skynet_monitor *sm) {
 3     if (sm->version == sm->check_version) {
 4         if (sm->destination) {
 5             skynet_context_endless(sm->destination);
 6             skynet_error(NULL, "A message from [ :%08x ] to [ :%08x ] maybe in an endless loop (version = %d)", sm->source , sm->destination, sm->version);
 7         }
 8     } else {
 9         sm->check_version = sm->version;
10     }
11 }

    skynet_monitor有个version参数,上节中我们讨论过消息分发,每次消息分发时,分发前置monitor->destination并自增version,分发后置monitor->destination为空并自增version(为简化,以下代码做了裁剪):

 1 struct message_queue * 
 2 skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
 3     uint32_t handle = skynet_mq_handle(q);
 4     struct skynet_context * ctx = skynet_handle_grab(handle);
 5 
 6     int i,n=1;
 7     struct skynet_message msg;
 8 
 9     for (i=0;i<n;i++) {
10         if (skynet_mq_pop(q,&msg)) {
11             skynet_context_release(ctx);
12             return skynet_globalmq_pop();
13         }
14 
15         skynet_monitor_trigger(sm, msg.source , handle);
16         dispatch_message(ctx, &msg);
17         skynet_monitor_trigger(sm, 0,0);
18     }
19 
20     assert(q == ctx->queue);
21     struct message_queue *nq = skynet_globalmq_pop();
22     if (nq) {
23         skynet_globalmq_push(q);
24         q = nq;
25     } 
26     skynet_context_release(ctx);
27 
28     return q;
29 }

    这样,当线程X处理消息陷入长久的阻滞时,monitor线程便会检测到X正在处理消息(skynet_monitor->destination不为空)并且version未改变,给出警告。

原文地址:https://www.cnblogs.com/Jackie-Snow/p/6951644.html