记一个多线程使用libevent的问题

前段时间使用libevent网络库实现了一个游戏服务器引擎,在此记录下其中遇到的一个问题。

我在设计服务器上选择把逻辑和网络分线程,线程之间通信使用队列。但是这样做会有个问题:

当逻辑线程想要主动的发一个数据包的时候,网络线程此时可能还阻塞在等待网络IO的系统调用上(比如说epoll)。如果不做特殊处理的话,此时消息包就会一直积压在缓冲区中,直到下一次网络线程从挂起的系统调用返回(比如来了一个消息包)。因此,当逻辑线程发送消息包的时候(bufferevent_write)需要一种唤醒机制,让网络线程从epoll等系统调用中返回并处理发送消息包逻辑。

由于对libevent的api不熟悉,起初我是自己实现这个功能的。实现确实是不复杂,但是缺违背了我的初心:只写简单必要的代码,保证尽可能少的bug。直到后来和同事探讨(争论:P)了一番,才发现原来libevent是有对此做支持的,但是具体怎么做,文档里面没有详细的说,因此同事也说不出个所以然。鉴于此情况,我决定,把libevent中与此相关的源码粗略的过一遍,以求能弄明白以下两件事:

(1)与跨线程唤醒事件等待相关的api有哪些,以及如何使用?

(2)这些api背后到底做了哪些工作?

相关API,及用法

和我起初想得不一样,libevent相关的api很简单并且只有一个:

/*call event_use_pthreads() if use posix threads.*/ 
evthread_use_windows_threads();
struct event_base* ev_base = event_base_new();

  

需要注意的是函数evthread_use_windows_threads的调用必须在初始化event_base之前。在此之后,无需再做别的事情,逻辑线程在执行bufferevent_write的时候,libevent就会自动唤醒网络线程的事件循环,并执行发送数据。

隐藏在API背后的逻辑

先看看evthread_use_windows_threads函数做了什么?

int evthread_use_windows_threads(void) {
  ...  
  evthread_set_lock_callbacks(&cbs);
  evthread_set_id_callback(evthread_win32_get_id);
  evthread_set_condition_callbacks(&cond_cbs);
  return 0;            
}

通过调用evthread_use_windows_threads,我们设置了一些回调函数,包括设置了libevent获取线程id的回调函数evthread_id_fn_

看看初始化事件循环的函数event_base_new做了什么:

// event.c
struct event_base * 
event_base_new(void) {
    ...
    base = event_base_new_with_config(cfg);
}

struct event_base * 
event_base_new_with_config(const struct event_config *cfg) {
    ...
#ifndef EVENT__DISABLE_THREAD_SUPPORT
	if (EVTHREAD_LOCKING_ENABLED() &&
	    (!cfg || !(cfg->flags & EVENT_BASE_FLAG_NOLOCK))) {
		int r;
		EVTHREAD_ALLOC_LOCK(base->th_base_lock, 0);
		EVTHREAD_ALLOC_COND(base->current_event_cond);
		r = evthread_make_base_notifiable(base);
		if (r<0) {
			event_warnx("%s: Unable to make base notifiable.", __func__);
			event_base_free(base);
			return NULL;
		}
	}
#endif
    ...
}

int
evthread_make_base_notifiable(struct event_base *base) {
	int r;
	if (!base)
		return -1;

	EVBASE_ACQUIRE_LOCK(base, th_base_lock);
	r = evthread_make_base_notifiable_nolock_(base);
	EVBASE_RELEASE_LOCK(base, th_base_lock);
	return r;
}

static int
evthread_make_base_notifiable_nolock_(struct event_base *base) {
    ...
    if (evutil_make_internal_pipe_(base->th_notify_fd) == 0) {
	notify = evthread_notify_base_default;
	cb = evthread_notify_drain_default;
    } else {
	return -1;
    }

    base->th_notify_fn = notify;
}

  

它通过如下调用:

event_base_new-->event_base_new_with_config-->evthread_make_base_notifiable-->evthread_make_base_notifiable_nolock_

最后通过evutil_make_internal_pipe_函数创建了两个互相连接的socket(windows环境下,用此来模拟pipe):

/* Internal function: Set fd[0] and fd[1] to a pair of fds such that writes on
 * fd[1] get read from fd[0].  Make both fds nonblocking and close-on-exec.
 * Return 0 on success, -1 on failure.
 */
int
evutil_make_internal_pipe_(evutil_socket_t fd[2])
{
	/*
	  Making the second socket nonblocking is a bit subtle, given that we
	  ignore any EAGAIN returns when writing to it, and you don't usally
	  do that for a nonblocking socket. But if the kernel gives us EAGAIN,
	  then there's no need to add any more data to the buffer, since
	  the main thread is already either about to wake up and drain it,
	  or woken up and in the process of draining it.
	*/

#if defined(EVENT__HAVE_PIPE2)
	if (pipe2(fd, O_NONBLOCK|O_CLOEXEC) == 0)
		return 0;
#endif
#if defined(EVENT__HAVE_PIPE)
	if (pipe(fd) == 0) {
		if (evutil_fast_socket_nonblocking(fd[0]) < 0 ||
		    evutil_fast_socket_nonblocking(fd[1]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[0]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[1]) < 0) {
			close(fd[0]);
			close(fd[1]);
			fd[0] = fd[1] = -1;
			return -1;
		}
		return 0;
	} else {
		event_warn("%s: pipe", __func__);
	}
#endif

#ifdef _WIN32
#define LOCAL_SOCKETPAIR_AF AF_INET
#else
#define LOCAL_SOCKETPAIR_AF AF_UNIX
#endif
	if (evutil_socketpair(LOCAL_SOCKETPAIR_AF, SOCK_STREAM, 0, fd) == 0) {
		if (evutil_fast_socket_nonblocking(fd[0]) < 0 ||
		    evutil_fast_socket_nonblocking(fd[1]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[0]) < 0 ||
		    evutil_fast_socket_closeonexec(fd[1]) < 0) {
			evutil_closesocket(fd[0]);
			evutil_closesocket(fd[1]);
			fd[0] = fd[1] = -1;
			return -1;
		}
		return 0;
	}
	fd[0] = fd[1] = -1;
	return -1;
}

之后,再设置用于唤醒操作的notify函数(evthread_notify_base_default):

static int
evthread_notify_base_default(struct event_base *base) { 
  char buf[1]; 
  int r; 
  buf[0] = (char) 0; 
#ifdef _WIN32 
  r = send(base->th_notify_fd[1], buf, 1, 0); 
#else 
  r = write(base->th_notify_fd[1], buf, 1); 
#endif 
  return (r < 0 && ! EVUTIL_ERR_IS_EAGAIN(errno)) ? -1 : 0; 
}

  

可以看出,在windows下libevent的唤醒机制实际也是self pipe trick,只不过它通过构造一对socket来模拟pipe,当需要唤醒的时候,它就往其中一个socket写入1个字节

再去看看bufferevent_write:

// bufferevent.c
int
bufferevent_write(struct bufferevent *bufev, const void *data, size_t size) {
	if (evbuffer_add(bufev->output, data, size) == -1)
		return (-1);

	return 0;
}

// buffer.c
int
evbuffer_add(struct evbuffer *buf, const void *data_in, size_t datlen) {
    ...
    evbuffer_invoke_callbacks_(buf);
}

它会触发一系列列回调函数,而这些回调函数在创建bufferevent的时候被指定:

//bufferevent_sock.c
struct bufferevent *
bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, int options) {
    ...
    evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev);
}

static void
bufferevent_socket_outbuf_cb(struct evbuffer *buf, const struct evbuffer_cb_info *cbinfo, void *arg) {
    ...
    if (cbinfo->n_added &&
	    (bufev->enabled & EV_WRITE) &&
	    !event_pending(&bufev->ev_write, EV_WRITE, NULL) &&
	    !bufev_p->write_suspended) {
		/* Somebody added data to the buffer, and we would like to
		 * write, and we were not writing.  So, start writing. */
		if (bufferevent_add_event_(&bufev->ev_write, &bufev->timeout_write) == -1) {
		    /* Should we log this? */
		}
	}
}

//bufferevent.c
int
bufferevent_add_event_(struct event *ev, const struct timeval *tv) {
	if (!evutil_timerisset(tv))
		return event_add(ev, NULL);
	else
		return event_add(ev, tv);
}

//event.c
int
event_add(struct event *ev, const struct timeval *tv) {
	EVBASE_ACQUIRE_LOCK(ev->ev_base, th_base_lock);
	res = event_add_nolock_(ev, tv, 0);
	EVBASE_RELEASE_LOCK(ev->ev_base, th_base_lock);
}

int
event_add_nolock_(struct event *ev, const struct timeval *tv, int tv_is_absolute) {
    ...
    /* if we are not in the right thread, we need to wake up the loop */
    //如果在构造event_base之前调用了evthread_use_windows_threads,EVBASE_NEED_NOTIFY此时会返回true,否则为false。
    if (res != -1 && notify && EVBASE_NEED_NOTIFY(base))
        evthread_notify_base(base);
}

由代码可知,在往bufferevent写数据后执行的回调函数中,就有唤醒网络线程逻辑(evthread_notify_base)。那为什么还需要手动调用evthread_use_windows_threads函数呢?

这里再说一下:

#define EVBASE_NEED_NOTIFY(base)			 
	((base)->running_loop &&			 
	    ((base)->th_owner_id != evthreadimpl_get_id_()))

unsigned long
evthreadimpl_get_id_() {
	return evthread_id_fn_ ? evthread_id_fn_() : 1;
}

之前说过,当调用evthread_use_windows_threads,设置了libevent获取线程id的回调函数evthread_id_fn_。也正因为此,才会去跑下去执行evthread_notify_base函数:

static int
evthread_notify_base(struct event_base *base) {
	EVENT_BASE_ASSERT_LOCKED(base);
	if (!base->th_notify_fn)
		return -1;
	if (base->is_notify_pending)
		return 0;
	base->is_notify_pending = 1;
	return base->th_notify_fn(base);
}

所以,当我们在逻辑线程调用bufferevent_write尝试发送一段数据的时候,它会依据以下的调用,通知网络线程:

bufferevent_write-->evbuffer_add-->evbuffer_invoke_callbacks_-->bufferevent_socket_evtbuf_cb_-->bufferevent_add_event_-->event_add-->event_add_nolock_-->evthread_notify_base

以上便是libevent跨线程唤醒的逻辑。

原文地址:https://www.cnblogs.com/adinosaur/p/7113631.html