本文着重讨论skynet框架中,第一个服务launcher的启动流程,其他服务也是类似的;
launcher.lua代码如下:
local skynet = require "skynet"
local core = require "skynet.core"
require "skynet.manager" -- import manager apis
local string = string
local services = {}
local command = {}
local instance = {} -- for confirm (function command.LAUNCH / command.ERROR / command.LAUNCHOK)
local function handle_to_address(handle)
return tonumber("0x" .. string.sub(handle , 2))
end
local NORET = {}
function command.LIST()
local list = {}
for k,v in pairs(services) do
list[skynet.address(k)] = v
end
return list
end
function command.STAT()
local list = {}
for k,v in pairs(services) do
local ok, stat = pcall(skynet.call,k,"debug","STAT")
if not ok then
stat = string.format("ERROR (%s)",v)
end
list[skynet.address(k)] = stat
end
return list
end
function command.KILL(_, handle)
handle = handle_to_address(handle)
skynet.kill(handle)
local ret = { [skynet.address(handle)] = tostring(services[handle]) }
services[handle] = nil
return ret
end
function command.MEM()
local list = {}
for k,v in pairs(services) do
local ok, kb, bytes = pcall(skynet.call,k,"debug","MEM")
if not ok then
list[skynet.address(k)] = string.format("ERROR (%s)",v)
else
list[skynet.address(k)] = string.format("%.2f Kb (%s)",kb,v)
end
end
return list
end
function command.GC()
for k,v in pairs(services) do
skynet.send(k,"debug","GC")
end
return command.MEM()
end
function command.REMOVE(_, handle, kill)
services[handle] = nil
local response = instance[handle]
if response then
-- instance is dead
response(not kill) -- return nil to caller of newservice, when kill == false
instance[handle] = nil
end
-- don't return (skynet.ret) because the handle may exit
return NORET
end
local function launch_service(service, ...)
local param = table.concat({...}, " ")
local inst = skynet.launch(service, param)
local response = skynet.response()
if inst then
services[inst] = service .. " " .. param
instance[inst] = response
else
response(false)
return
end
return inst
end
function command.LAUNCH(_, service, ...)
launch_service(service, ...)
return NORET
end
function command.LOGLAUNCH(_, service, ...)
local inst = launch_service(service, ...)
if inst then
core.command("LOGON", skynet.address(inst))
end
return NORET
end
function command.ERROR(address)
-- see serivce-src/service_lua.c
-- init failed
local response = instance[address]
if response then
response(false)
instance[address] = nil
end
services[address] = nil
return NORET
end
function command.LAUNCHOK(address)
-- init notice
local response = instance[address]
if response then
response(true, address)
instance[address] = nil
end
return NORET
end
-- for historical reasons, launcher support text command (for C service)
skynet.register_protocol {
name = "text",
id = skynet.PTYPE_TEXT,
unpack = skynet.tostring,
dispatch = function(session, address , cmd)
if cmd == "" then
command.LAUNCHOK(address)
elseif cmd == "ERROR" then
command.ERROR(address)
else
error ("Invalid text command " .. cmd)
end
end,
}
skynet.dispatch("lua", function(session, address, cmd , ...)
cmd = string.upper(cmd)
local f = command[cmd]
if f then
local ret = f(address, ...)
if ret ~= NORET then
skynet.ret(skynet.pack(ret))
end
else
skynet.ret(skynet.pack {"Unknown command"} )
end
end)
skynet.start(function() end)
这个服务的启动比较特殊,我们看看bootstrap.lua中的启动代码:
local launcher = assert(skynet.launch("snlua","launcher"))
skynet.name(".launcher", launcher)
该服务是通过调用skynet.launch("snlua","launcher")函数来启动的,该函数的实现在文件./lualib/skynet/ manager.lua中,代码如下:
function skynet.launch(...)
local addr = c.command("LAUNCH",
table.concat({...}," "))
if addr then
return
tonumber("0x"
..
string.sub(addr ,
2))
end
end
而这里直接调用的C库函数command()函数,我们来看看这个command函数的实现:
static
int
lcommand(lua_State *L) {
struct
skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
const
char * cmd = luaL_checkstring(L,1);
const
char * result;
const
char * parm = NULL;
if (lua_gettop(L) == 2) {
parm = luaL_checkstring(L,2);
}
result = skynet_command(context, cmd, parm);
if (result) {
lua_pushstring(L, result);
return 1;
}
return 0;
}
实际上是调用了skynet_command(context, cmd, parm)函数,再来看看这个函数的实现:
static
struct
command_func
cmd_funcs[] = {
{ "TIMEOUT", cmd_timeout },
{ "REG", cmd_reg },
{ "QUERY", cmd_query },
{ "NAME", cmd_name },
{ "EXIT", cmd_exit },
{ "KILL", cmd_kill },
{ "LAUNCH", cmd_launch },
{ "GETENV", cmd_getenv },
{ "SETENV", cmd_setenv },
{ "STARTTIME", cmd_starttime },
{ "ABORT", cmd_abort },
{ "MONITOR", cmd_monitor },
{ "STAT", cmd_stat },
{ "LOGON", cmd_logon },
{ "LOGOFF", cmd_logoff },
{ "SIGNAL", cmd_signal },
{ NULL, NULL },
};
const
char *
skynet_command(struct
skynet_context * context, const
char * cmd , const
char * param)
{
struct
command_func * method = &cmd_funcs[0];
while(method->name)
{
if (strcmp(cmd, method->name) == 0)
{
return
method->func(context, param);
}
++method;
}
return
NULL;
}
该函数是通过传入的命令字符串,到事先维护好的一个二维表寻找对应的执行函数,我们传入的LAUNCH,那么对应的执行函数为cmd_launch,实现如下:
static
const
char *
cmd_launch(struct
skynet_context * context, const
char * param) {
size_t
sz = strlen(param);
char
tmp[sz+1];
strcpy(tmp,param);
char * args = tmp;
char * mod = strsep(&args, "
");
args = strsep(&args, "
");
LOG("mod=%s, args=%s", mod, args);
struct
skynet_context * inst = skynet_context_new(mod,args);
if (inst == NULL) {
return
NULL;
} else {
id_to_hex(context->result, inst->handle);
return
context->result;
}
}
代码比较简单,该函数接下来执行了skynet_context_new(mod,args),这个函数的实现代码:
struct skynet_context *
skynet_context_new(const char * name, const char *param) {
struct skynet_module * mod = skynet_module_query(name);//snlua
if (mod == NULL)
return NULL;
void *inst = skynet_module_instance_create(mod);
if (inst == NULL)
return NULL;
struct skynet_context * ctx = skynet_malloc(sizeof(*ctx));
CHECKCALLING_INIT(ctx)
ctx->mod = mod;
ctx->instance = inst;
ctx->ref = 2;
ctx->cb = NULL;
ctx->cb_ud = NULL;
ctx->session_id = 0;
ctx->logfile = NULL;
ctx->init = false;
ctx->endless = false;
ctx->cpu_cost = 0;
ctx->cpu_start = 0;
ctx->message_count = 0;
ctx->profile = G_NODE.profile;
// Should set to 0 first to avoid skynet_handle_retireall get an uninitialized handle
ctx->handle = 0;
ctx->handle = skynet_handle_register(ctx);//将当前服务挂载到全局服务列表,并分配服务地址
struct message_queue * queue = ctx->queue = skynet_mq_create(ctx->handle);
// init function maybe use ctx->handle, so it must init at last
context_inc();
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
} else {
skynet_error(ctx, "FAILED launch %s", name);
uint32_t handle = ctx->handle;
skynet_context_release(ctx);
skynet_handle_retire(handle);
struct drop_t d = { handle };
skynet_mq_release(queue, drop_message, &d);
return NULL;
}
}
该函数首先会去加载一个模块,模块就是C动态库,并且具有一致的函数接口(xxx_create(),xxx_init(),xxx_release()),如果模块之前已经加载则不会加载直接返回模块的指针,
如果从未加载,先从模块路径加载,再返回,所有lua服务都是通过snlua模块来启动的,这里以snlua模块为例来说明:
skynet_module_instance_create(mod)函数会执行snlua_create(…)函数,该函数实现代码如下
struct
snlua *
snlua_create(void) {
struct
snlua * l = skynet_malloc(sizeof(*l));
memset(l,0,sizeof(*l));
l->mem_report = MEMORY_WARNING_REPORT;
l->mem_limit = 0;
l->L = lua_newstate(lalloc, l);
return
l;
}
该函数主要执行一些内存分配的初始化操作,对于snlua_create(),还会创建一个lua虚拟机,并使用自定义内存分配策略来为该虚拟机分配内存;
skynet_module_instance_init(…)对新context执行了初始化,这里调用的是snlua_init(…)函数,实现如下:
int
snlua_init(struct
snlua *l, struct
skynet_context *ctx, const
char * args)
{
int
sz = strlen(args);
char * tmp = skynet_malloc(sz);
memcpy(tmp, args, sz);
skynet_callback(ctx, l , launch_cb);
const
char * self = skynet_command(ctx, "REG", NULL);
uint32_t
handle_id = strtoul(self+1, NULL, 16);
// it must be first message
skynet_send(ctx, 0, handle_id, PTYPE_TAG_DONTCOPY,0, tmp, sz);
return 0;
}
这里有一个比较关键的函数skynet_callback(…),该函数将会为新创建的context设置回调函数,函数实现如下:
void
skynet_callback(struct skynet_context * context, void *ud, skynet_cb cb) {
context->cb = cb;
context->cb_ud = ud;
}
接下来调用了skynet_command(…)函数,传递的命令为"REG",通过上面的分析,可以得到会调用cmd_reg(…),实现代码如下:
static const char *
cmd_reg(struct skynet_context * context, const char * param)
{
if (param == NULL || param[0] == ' ') {
sprintf(context->result, ":%x", context->handle);
return context->result;
} else if (param[0] == '.') {
return skynet_handle_namehandle(context->handle, param + 1);
} else {
skynet_error(context, "Can't register global name %s in C", param);
return NULL;
}
}
我们知道前面传递的param=NULL,所以直接执行下面的语句:
sprintf(context->result, ":%x", context->handle);
return context->result;
以上代码将handle转为16进制字符串并返回,返回之后往这个地址发送了一条消息,这里为什么需要这么做呢?
接着看如下代码片段:
CHECKCALLING_BEGIN(ctx)
int r = skynet_module_instance_init(mod, inst, ctx, param);//这里调用snlua_init(...)函数,初始化当前ctx
CHECKCALLING_END(ctx)
if (r == 0) {
struct skynet_context * ret = skynet_context_release(ctx);
if (ret) {
ctx->init = true;
}
skynet_globalmq_push(queue);
if (ret) {
skynet_error(ret, "LAUNCH %s %s", name, param ? param : "");
}
return ret;
}
初始化函数执行成功之后,调用了skynet_context_release(…)函数,代码如下:
struct skynet_context *
skynet_context_release(struct skynet_context *ctx) {
if (ATOM_DEC(&ctx->ref) == 0) {
delete_context(ctx);
return NULL;
}
return ctx;
}
注意一个细节,我们再为新的context分配内存块之后,有如下赋值:
ctx->ref = 2;
那么这里执行ATOM_DEC(&ctx->ref)后的结果为1,如果为0,就是异常,释放当前ctx内存空间;
正常情况下返回的ctx->ref=1,至此,ctx的初始化基本完成;
接下来就调用skynet_globalmq_push(queue),将先前创建的当前ctx的消息队列丢入全局消息队列;
以上分析就创建了一个完整的launcher服务,接下来就是服务接收消息并处理消息;
上面还遗留了一个问题,在执行skynet_module_instance_init(…)函数的时候,最后给自己发了一条消息,这条消息时怎么流转的呢?
我们来仔细分析一下下面的过程:
// it must be first message
skynet_send(
ctx,
0, 发送
handle_id,
PTYPE_TAG_DONTCOPY,
0,
tmp,
sz);
源代码注释中已经说明,这是第一条消息,我们继续看看skynet_send(…),对入参进行处理后,调用了skynet_context_push(…)
int
skynet_send(struct skynet_context * context, uint32_t source, uint32_t destination , int type, int session, void * data, size_t sz) {
if ((sz & MESSAGE_TYPE_MASK) != sz) {
skynet_error(context, "The message to %x is too large", destination);
if (type & PTYPE_TAG_DONTCOPY) {
skynet_free(data);
}
return -1;
}
_filter_args(context, type, &session, (void **)&data, &sz);
if (source == 0) {
source = context->handle;
}
if (destination == 0) {
return session;
}
if (skynet_harbor_message_isremote(destination)) {
struct remote_message * rmsg = skynet_malloc(sizeof(*rmsg));
rmsg->destination.handle = destination;
rmsg->message = data;
rmsg->sz = sz;
skynet_harbor_send(rmsg, source, session);
} else {
struct skynet_message smsg;
smsg.source = source;
smsg.session = session;
smsg.data = data;
smsg.sz = sz;
if (skynet_context_push(destination, &smsg)) {
skynet_free(data);
return -1;
}
}
return session;
}
然后继续翻skynet_context_push(…)的代码,实现代码如下:
int
skynet_context_push(uint32_t handle, struct skynet_message *message) {
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
return -1;
}
skynet_mq_push(ctx->queue, message);
skynet_context_release(ctx);
return 0;
}
先取接收消息地址对应的context,然后将消息丢到这个ctx对应的消息队列中,接着看看skynet_mq_push(…)的代码实现:
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {
assert(message);
SPIN_LOCK(q)
//上一条消息挂载后,已经为下一条消息确定了挂载位置
q->queue[q->tail] = *message;
q->tail++;
if(q->tail > q->cap)
{
q->tail = 0;
}
if (q->head == q->tail)
{
expand_queue(q);
}
if (q->in_global == 0) {
q->in_global = MQ_IN_GLOBAL;
skynet_globalmq_push(q);
}
SPIN_UNLOCK(q)
}
以上代码很简单,先将消息丢入服务(context)的私有消息队列,然后判断,消息队列是否在全局消息队列中,不在就挂到全局消息队列;
消息何时被处理?
框架一旦启动,工作线程也会随之启动,工作线程的任务很简单,就是监控全局消息队列中挂载的各个服务的私有消息队列是否有数据,
如果发现某个私有消息队列有消息进来了,就会执行相应的处理;照旧,继续翻代码,工作线在这里启动的:
void
skynet_start(struct skynet_config * config) {
// register SIGHUP for log file reopen
struct sigaction sa;
sa.sa_handler = &handle_hup;
sa.sa_flags = SA_RESTART;
sigfillset(&sa.sa_mask);
sigaction(SIGHUP, &sa, NULL);
if (config->daemon) {
if (daemon_init(config->daemon)) {
exit(1);
}
}
skynet_harbor_init(config->harbor);
skynet_handle_init(config->harbor);
skynet_mq_init();
skynet_module_init(config->module_path);
skynet_timer_init();
skynet_socket_init();
skynet_profile_enable(config->profile);
struct skynet_context *ctx = skynet_context_new(config->logservice, config->logger);
if (ctx == NULL) {
fprintf(stderr, "Can't launch %s service ", config->logservice);
exit(1);
}
bootstrap(ctx, config->bootstrap);
start(config->thread);
// harbor_exit may call socket send, so it should exit before socket_free
skynet_harbor_exit();
skynet_socket_free();
if (config->daemon) {
daemon_exit(config->daemon);
}
}
上面代码是有先后顺序的,我们看到启动工作线程是在lua初始化服务都启动完成之后再启动的,看看start(…)函数的实现:
static
void
start(int
thread)
{
pthread_t
pid[thread+3];
struct
monitor *m = skynet_malloc(sizeof(*m));
memset(m, 0, sizeof(*m));
m->count = thread;
m->sleep = 0;
m->m = skynet_malloc(thread * sizeof(struct
skynet_monitor *));
int
i;
for (i=0;i<thread;i++) {
m->m[i] = skynet_monitor_new();
}
if (pthread_mutex_init(&m->mutex, NULL)) {
fprintf(stderr, "Init mutex error");
exit(1);
}
if (pthread_cond_init(&m->cond, NULL)) {
fprintf(stderr, "Init cond error");
exit(1);
}
create_thread(&pid[0], thread_monitor, m);
create_thread(&pid[1], thread_timer, m);
create_thread(&pid[2], thread_socket, m);
static
int
weight[] = {
-1, -1, -1, -1, 0, 0, 0, 0,
1, 1, 1, 1, 1, 1, 1, 1,
2, 2, 2, 2, 2, 2, 2, 2,
3, 3, 3, 3, 3, 3, 3, 3, };
struct
worker_parm
wp[thread];
for (i=0;i<thread;i++) {
wp[i].m = m;
wp[i].id = i;
if (i < sizeof(weight)/sizeof(weight[0])) {
wp[i].weight= weight[i];
} else {
wp[i].weight = 0;
}
create_thread(&pid[i+3], thread_worker, &wp[i]);
}
for (i=0;i<thread+3;i++) {
pthread_join(pid[i], NULL);
}
free_monitor(m);
}
会根据用户配置的工作线程数,启动相应数量的工作线程数,工作线程执行函数thread_worker(…),实现代码如下:
static
void *
thread_worker(void *p)
{
struct
worker_parm *wp = p;
int
id = wp->id;
int
weight = wp->weight;
struct
monitor *m = wp->m;
struct
skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct
message_queue * q = NULL;
while (!m->quit)
{
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL)
{
if (pthread_mutex_lock(&m->mutex) == 0)
{
++ m->sleep;
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex);
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex))
{
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return
NULL;
}
再来看看其中的关键函数skynet_context_message_dispatch(…),实现代码如下:
struct message_queue *
skynet_context_message_dispatch(struct skynet_monitor *sm, struct message_queue *q, int weight) {
if (q == NULL) {
q = skynet_globalmq_pop();
if (q==NULL)
return NULL;
}
uint32_t handle = skynet_mq_handle(q);
struct skynet_context * ctx = skynet_handle_grab(handle);
if (ctx == NULL) {
struct drop_t d = { handle };
skynet_mq_release(q, drop_message, &d);
return skynet_globalmq_pop();
}
int i,n=1;
struct skynet_message msg;
for (i=0;i<n;i++) {
if (skynet_mq_pop(q,&msg)) {
skynet_context_release(ctx);
return skynet_globalmq_pop();
} else if (i==0 && weight >= 0) {
n = skynet_mq_length(q);
n >>= weight;
}
int overload = skynet_mq_overload(q);
if (overload) {
skynet_error(ctx, "May overload, message queue length = %d", overload);
}
skynet_monitor_trigger(sm, msg.source , handle);
if (ctx->cb == NULL) {
skynet_free(msg.data);
} else {
dispatch_message(ctx, &msg);
}
skynet_monitor_trigger(sm, 0,0);
}
assert(q == ctx->queue);
struct message_queue *nq = skynet_globalmq_pop();
if (nq) {
// If global mq is not empty , push q back, and return next queue (nq)
// Else (global mq is empty or block, don't push q back, and return q again (for next dispatch)
skynet_globalmq_push(q);
q = nq;
}
skynet_context_release(ctx);
return q;
}
仔细分析一下这一段代码,首先会判断一下传入的队列是否为空,如果为空,就从全局消息队列中取出一个私有消息队列,参见skynet_globalmq_pop(…)调用;
然后取出消息队列对应的ctx,都满足条件后,就会执行dispatch_message(…)函数,该函数代码实现如下:
static void
dispatch_message(struct skynet_context *ctx, struct skynet_message *msg) {
assert(ctx->init);
CHECKCALLING_BEGIN(ctx)
pthread_setspecific(G_NODE.handle_key, (void *)(uintptr_t)(ctx->handle));
int type = msg->sz >> MESSAGE_TYPE_SHIFT;
size_t sz = msg->sz & MESSAGE_TYPE_MASK;
if (ctx->logfile) {
skynet_log_output(ctx->logfile, msg->source, type, msg->session, msg->data, sz);
}
++ctx->message_count;
int reserve_msg;
if (ctx->profile) {
ctx->cpu_start = skynet_thread_time();
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
uint64_t cost_time = skynet_thread_time() - ctx->cpu_start;
ctx->cpu_cost += cost_time;
} else {
reserve_msg = ctx->cb(ctx, ctx->cb_ud, type, msg->session, msg->source, msg->data, sz);
}
if (!reserve_msg) {
skynet_free(msg->data);
}
CHECKCALLING_END(ctx)
}
关键部分的代码已经标注出来,这里会直接调用ctx->cb(…)函数,该函数上面已经提到过,snlua_init(…)函数会为服务设置,
skynet_callback(ctx, l , launch_cb);
上面设置的服务处理函数是launch_cb(…),再来看看该函数的实现代码:
static
int
launch_cb(struct
skynet_context * context, void *ud, int
type, int
session, uint32_t
source , const
void * msg, size_t
sz) {
assert(type == 0 && session == 0);
struct
snlua *l = ud;
skynet_callback(context, NULL, NULL);
int
err = init_cb(l, context, msg, sz);
if (err) {
skynet_command(context, "EXIT", NULL);
}
return 0;
}
这个函数首先会调用skynet_callback(…),这里传入的两个参数为NULL,重新将当前服务的回调函数设置为空,这里这么做的目的是啥?
接下来就调用了init_cb(…),我们来看看这个函数的实现代码:
static int
init_cb(struct snlua *l, struct skynet_context *ctx, const char * args, size_t sz) {
lua_State *L = l->L;//保存当前服务的虚拟机
l->ctx = ctx;//保存当前服务的context
lua_gc(L, LUA_GCSTOP, 0);//停止当前虚拟机GC,为啥这么做?
lua_pushboolean(L, 1); /* signal for libraries to ignore env. vars. */
lua_setfield(L, LUA_REGISTRYINDEX, "LUA_NOENV");//在全局注册表中设置LUA_NOENV=1
luaL_openlibs(L);//为当前虚拟机打开库
lua_pushlightuserdata(L, ctx);
lua_setfield(L, LUA_REGISTRYINDEX, "skynet_context");//在全局注册表中设置skynet_context=ctx指针
luaL_requiref(L, "skynet.codecache", codecache , 0);//将C库以skynet.codecache模块名直接导入lua中使用
lua_pop(L,1);//弹出先前压入的1,即第一个元素,弹出后栈空
const char *path = optstring(ctx, "lua_path","./lualib/?.lua;./lualib/?/init.lua");
lua_pushstring(L, path);
lua_setglobal(L, "LUA_PATH");//把栈顶的元素传入虚拟机环境中作为全局变量,此时元素已经不在栈内
const char *cpath = optstring(ctx, "lua_cpath","./luaclib/?.so");
lua_pushstring(L, cpath);
lua_setglobal(L, "LUA_CPATH");
const char *service = optstring(ctx, "luaservice", "./service/?.lua");
lua_pushstring(L, service);
lua_setglobal(L, "LUA_SERVICE");
const char *preload = skynet_command(ctx, "GETENV", "preload");
lua_pushstring(L, preload);
lua_setglobal(L, "LUA_PRELOAD");
lua_pushcfunction(L, traceback);
assert(lua_gettop(L) == 1);//此时栈中只有一个元素,即上面压入的函数
const char * loader = optstring(ctx, "lualoader", "./lualib/loader.lua");
int r = luaL_loadfile(L,loader);//加载并运行./lualib/loader.lua中的lua代码,这段代码将被作为一个函数
if (r != LUA_OK) {
skynet_error(ctx, "Can't load %s : %s", loader, lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
LOG("args=%s, sz=%d", args, sz);
lua_pushlstring(L, args, sz);
r = lua_pcall(L,1,0,1);//调用上面加载的代码,上面的lua代码被当做一个lua函数,这里直接调用,错误处理函数在栈底,所以索引为1
if (r != LUA_OK) {
skynet_error(ctx, "lua loader error : %s", lua_tostring(L, -1));
report_launcher_error(ctx);
return 1;
}
lua_settop(L,0);//清空栈
if (lua_getfield(L, LUA_REGISTRYINDEX, "memlimit") == LUA_TNUMBER) {
size_t limit = lua_tointeger(L, -1);
l->mem_limit = limit;
skynet_error(ctx, "Set memory limit to %.2f M", (float)limit / (1024 * 1024));
lua_pushnil(L);
lua_setfield(L, LUA_REGISTRYINDEX, "memlimit");
}
lua_pop(L, 1);//???
lua_gc(L, LUA_GCRESTART, 0);//重启虚拟机GC
return 0;
}
以上代码注释已经比较清晰,核心部分已经加粗,从luaL_loadfile(L,loader)说起,该函数会加载./lualib/loader.lua脚本文件,代码如下:
local args =
{}
for word in
string.gmatch(...,
"%S+")
do
table.insert(args, word)
end
SERVICE_NAME = args[1]
print("loader.lua:"..SERVICE_NAME)
local main, pattern
local err =
{}
for pat in
string.gmatch(LUA_SERVICE,
"([^;]+);*")
do
local filename =
string.gsub(pat,
"?", SERVICE_NAME)
print("loader.lua:"..filename)
local f, msg =
loadfile(filename)--加载文件,编译文件,不运行,并将整个加载的文件作为一个函数
if
not f then
table.insert(err, msg)
else
pattern = pat
main = f
break
end
end
if
not main then
error(table.concat(err,
"
"))
end
LUA_SERVICE =
nil
package.path
, LUA_PATH = LUA_PATH
package.cpath
, LUA_CPATH = LUA_CPATH
local service_path =
string.match(pattern,
"(.*/)[^/?]+$")
if service_path then
service_path =
string.gsub(service_path,
"?", args[1])
package.path
= service_path ..
"?.lua;"
..
package.path
SERVICE_PATH = service_path
else
local p =
string.match(pattern,
"(.*/).+$")
SERVICE_PATH = p
end
if LUA_PRELOAD then
local f =
assert(loadfile(LUA_PRELOAD))
f(table.unpack(args))
LUA_PRELOAD =
nil
end
print("xxxxx:"..table.unpack(args))
main(select(2,
table.unpack(args)))--执行上面加载的文件,选择第二个参数传入,实际上是nil
这段代码的作用就是根据传入的脚本文件名,寻找对应的lua脚本,比如:launcher.lua,执行loadfile(filename)函数,
将加载的脚本解释为一个函数main,最后调用这个函数,也就是执行对应的脚本,为什么是skynet.start(func)?
我们知道,编写一个服务,必须要
local skynet = require "skynet"
…
skynet.start(function() end)
每个服务必须都要引入skynet库,这里是指./lualib/skynet.lua这个文件,我们来看看这个文件中skynet.start()的实现:
function skynet.start(start_func)
c.callback(skynet.dispatch_message)
skynet.timeout(0,
function()
skynet.init_service(start_func)
end)
end
这个函数的入参是一个函数,进来后调用了c.callback(skynet.dispatch_message),对应C库中的lcallback,我们看看这个函数的实现代码:
static
int
lcallback(lua_State *L) {
struct
skynet_context * context = lua_touserdata(L, lua_upvalueindex(1));
int
forward = lua_toboolean(L, 2);
luaL_checktype(L,1,LUA_TFUNCTION);//lua中传入的skynet.dispatch_message
lua_settop(L,1);
lua_rawsetp(L, LUA_REGISTRYINDEX, _cb);//register[_cb]=skynet.dispatch_message,用C中函数地址将lua层回调函数保存到全局表中
lua_rawgeti(L, LUA_REGISTRYINDEX, LUA_RIDX_MAINTHREAD);//参考:http://www.linuxidc.com/Linux/2014-05/102528.htm
lua_State *gL = lua_tothread(L,-1);//参考:https://blog.codingnow.com/2012/07/lua_c_callback.html
if (forward) {
skynet_callback(context, gL, forward_cb);
} else {
skynet_callback(context, gL, _cb);//那么lua层的服务在C层都有统一的消息回调函数
}
return 0;
}
至此launcher.lua的消息处理函数配置完毕,当该服务有消息到达时_cb(…)函数将会被执行,我们来看看这个函数的实现代码:
static
int
_cb(struct
skynet_context * context, void * ud, int
type, int
session, uint32_t
source, const
void * msg, size_t
sz) {
lua_State *L = ud;
int
trace = 1;
int
r;
int
top = lua_gettop(L);
if (top == 0) {
lua_pushcfunction(L, traceback);
lua_rawgetp(L, LUA_REGISTRYINDEX, _cb);
} else {
assert(top == 2);
}
lua_pushvalue(L,2);
lua_pushinteger(L, type);
lua_pushlightuserdata(L, (void *)msg);
lua_pushinteger(L,sz);
lua_pushinteger(L, session);
lua_pushinteger(L, source);
r = lua_pcall(L, 5, 0 , trace);//调用了skynet.dispatch_message(...)
if (r == LUA_OK) {
return 0;
}
const
char * self = skynet_command(context, "REG", NULL);
switch (r) {
case
LUA_ERRRUN:
skynet_error(context, "lua call [%x to %s : %d msgsz = %d] error : "
KRED
"%s"
KNRM, source , self, session, sz, lua_tostring(L,-1));
break;
case
LUA_ERRMEM:
skynet_error(context, "lua memory error : [%x to %s : %d]", source , self, session);
break;
case
LUA_ERRERR:
skynet_error(context, "lua error in error : [%x to %s : %d]", source , self, session);
break;
case
LUA_ERRGCMM:
skynet_error(context, "lua gc error : [%x to %s : %d]", source , self, session);
break;
};
lua_pop(L,1);
return 0;
}
有了之前的分析,这里就比较简单了,lua_rawgetp(L, LUA_REGISTRYINDEX, _cb)函数将从全局表中取出之前设置的lua层消息回调函数skynet.dispatch_message(...)并入栈,
接着将消息参数入栈,然后执行lua_pcall(L, 5, 0 , trace),实际调用skynet.dispatch_message(...),我们来看看这个函数的实现代码:
function skynet.dispatch_message(...)
local succ, err =
pcall(raw_dispatch_message,...)
while
true
do
local key,co =
next(fork_queue)
if co ==
nil
then
break
end
fork_queue[key]
=
nil
local fork_succ, fork_err =
pcall(suspend,co,coroutine_resume(co))
if
not fork_succ then
if succ then
succ =
false
err =
tostring(fork_err)
else
err =
tostring(err)
..
"
"
..
tostring(fork_err)
end
end
end
assert(succ,
tostring(err))
end
上面接着调用了raw_dispatch_message(…)函数,接着看这个函数的实现代码:
local
function raw_dispatch_message(prototype, msg, sz, session, source)
-- skynet.PTYPE_RESPONSE = 1, read skynet.h
if prototype ==
1
then –回应包
local co = session_id_coroutine[session]
if co ==
"BREAK"
then
session_id_coroutine[session]
=
nil
elseif co ==
nil
then
unknown_response(session, source, msg, sz)
else
session_id_coroutine[session]
=
nil
suspend(co, coroutine_resume(co,
true, msg, sz))
end
else
local p = proto[prototype]
if p ==
nil
then
if session ~=
0
then
c.send(source, skynet.PTYPE_ERROR, session,
"")
else
unknown_request(session, source, msg, sz, prototype)
end
return
end
local f = p.dispatch
if f then
local ref = watching_service[source]
if ref then
watching_service[source]
= ref +
1
else
watching_service[source]
=
1
end
local co = co_create(f)
session_coroutine_id[co]
= session
session_coroutine_address[co]
= source
suspend(co, coroutine_resume(co, session,source, p.unpack(msg,sz)))
elseif session ~=
0
then
c.send(source, skynet.PTYPE_ERROR, session,
"")
else
unknown_request(session, source, msg, sz, proto[prototype].name)
end
end
end
比较关键的地方已经加粗标注了,最终取了协议表中的dispatch函数,这个函数什么时候注册的呢?,我们看看launcher.lua这一个函数:
skynet.dispatch("lua",
function(session, address, cmd ,
...)
cmd =
string.upper(cmd)
local f = command[cmd]
if f then
local ret = f(address,
...)
if ret ~= NORET then
skynet.ret(skynet.pack(ret))
end
else
skynet.ret(skynet.pack {"Unknown command"}
)
end
end)
调用了skynet.lua中的dispatch(…)函数,将自己的消息处理函数加入协议表proto中;
至此整个流程分析完毕;