dyad源码分析

背景介绍

dyad是一个异步网络库,目标是可移植、轻量级、易于使用。地址位于https://github.com/rxi/dyad。

echoserver如何写

我们来看看一个简单的echo Server来看看其API的设计

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "dyad.h"
/* An echo server: Echos any data received by a client back to the client */
static void onData(dyad_Event *e) {
  dyad_write(e->stream, e->data, e->size);
}
static void onAccept(dyad_Event *e) { 
    puts("Accept a connection ");
    dyad_addListener(e->remote, DYAD_EVENT_DATA, onData, NULL);

}
static void onError(dyad_Event *e) {
  printf("server error: %s
", e->msg);
}

static void onListen(dyad_Event* e){ 
    fprintf(stdout, "listen success %d", e->type); 
}
int main(void) {
  dyad_Stream *s;
  dyad_init();
  s = dyad_newStream();
  dyad_addListener(s, DYAD_EVENT_LISTEN, onListen, NULL);
  /* 添加错误的回调 */
  dyad_addListener(s, DYAD_EVENT_ERROR,  onError,  NULL);
   /* 添加连接成功后的回调 */
  dyad_addListener(s, DYAD_EVENT_ACCEPT, onAccept, NULL);
  dyad_listen(s, 8000);
  fprintf(stdout, "stream num %d", dyad_getStreamCount());
  while (dyad_getStreamCount() > 0) {
    dyad_update();
  }
  return 0;
}

我们从这个例子中,看到了几个简单的API:

  • dyad_init();
  • dyad_newStream();
  • dyad_addListener
  • dyad_update

dyad_init分析

这个函数非常简单,就是要改变SIGPIPE的默认行为(终止进程),忽略它。

void dyad_init(void) {
#ifdef _WIN32
  WSADATA dat;
  int err = WSAStartup(MAKEWORD(2, 2), &dat);
  if (err != 0) {
    panic("WSAStartup failed (%d)", err);
  }
#else
  /* Stops the SIGPIPE signal being raised when writing to a closed socket */
  signal(SIGPIPE, SIG_IGN);
#endif
}

SIGPIPE信号的产生

当服务器close一个连接时,若client端接着发数据。根据TCP协议的规定,会收到一个RST响应,client再往这个服务器发送数据时,系统会发出一个SIGPIPE信号给进程,告诉进程这个连接已经断开了,不要再写了。

又或者当一个进程向某个已经收到RST的socket执行写操作是,内核向该进程发送一个SIGPIPE信号。该信号的缺省学位是终止进程,因此进程必须捕获它以免不情愿的被终止。

dyad_Stream

dyad_stream 用来表示一个套接字链接,最重要的几个字段分别是listener,linebuffer,和writeBuffer,分别对应为回调函数,读缓冲区,写缓冲区。我们应该注意的是所有的tcp连接都是通过list来连在一起。

struct dyad_Stream {
  int state, flags;
  dyad_Socket sockfd;
  char *address;
  int port;
  int bytesSent, bytesReceived;
  double lastActivity, timeout;
   /* 该连接的回调 */
  Vec(Listener) listeners;
  Vec(char) lineBuffer;
  Vec(char) writeBuffer;
   /* 指向下一个连接 */
  dyad_Stream *next;
};

Vec宏定义

dyad用宏来模拟了C++模板的功能,我们来看看,这里的Vec主要的作用就是模拟C++中的vector也就是动态数组。

#define Vec(T)
  struct { T *data; int length, capacity; }

#define vec_unpack(v)
  (char**)&(v)->data, &(v)->length, &(v)->capacity, sizeof(*(v)->data)

#define vec_init(v)
  memset((v), 0, sizeof(*(v)))

#define vec_deinit(v)
  dyad_free((v)->data)

#define vec_clear(v)
  ((v)->length = 0)

#define vec_push(v, val)
  ( vec_expand(vec_unpack(v)),
    (v)->data[(v)->length++] = (val) )

#define vec_splice(v, start, count)
  ( vec_splice(vec_unpack(v), start, count),
    (v)->length -= (count) )

dyad_newStream函数

我们再来看看dyad_newStream函数,这个函数很简单,就是要分批额一个dyad_Stream内存,并初始化

dyad_Stream *dyad_newStream(void) {
  dyad_Stream *stream = dyad_realloc(NULL, sizeof(*stream));
  memset(stream, 0, sizeof(*stream));
  stream->state = DYAD_STATE_CLOSED;
  // 
  stream->sockfd = INVALID_SOCKET;
  // 上次活动的时间为当前时间
  stream->lastActivity = dyad_getTime();
  /* Add to list and increment count */
  stream->next = dyad_streams;
  dyad_streams = stream;
  // 流数目增加
  dyad_streamCount++;
  return stream;
}

注意到dyad_streams这个变量,其是一个全局的变量,表示当前和服务器请求的TCP连接队列的头部,当我们新分配一个tcp连接的资源的时候,头部发生变化,指向了当前新分配的连接资源(Stream)。

static dyad_Stream *dyad_streams;
static int dyad_streamCount;

dyad_addListener函数

这个函数就比较简单了,就是给当前的套接字,添加相应事件的回调函数,在上面的例子中,我们看到了添加了listen,accept和error三个回调函数。我们来看看事件类型的定义:


enum {
DYAD_EVENT_NULL,
DYAD_EVENT_DESTROY,
DYAD_EVENT_ACCEPT,
DYAD_EVENT_LISTEN,
DYAD_EVENT_CONNECT,
DYAD_EVENT_CLOSE,
DYAD_EVENT_READY,
DYAD_EVENT_DATA,
DYAD_EVENT_LINE,
DYAD_EVENT_ERROR,
DYAD_EVENT_TIMEOUT,
DYAD_EVENT_TICK
};

再看看添加相应的回调函数

typedef void (*dyad_Callback)(dyad_Event*);

/* 每个事件回调的定义 */
typedef struct {
   // 事件的类型
  int event;
  // 事件的回调
  dyad_Callback callback;
  // 参数类型
  void *udata;
} Listener;



void dyad_addListener(
  dyad_Stream *stream, int event, dyad_Callback callback, void *udata
) {
  Listener listener;
  listener.event = event;
  listener.callback = callback;
  listener.udata = udata;
  vec_push(&stream->listeners, listener);
}

dyad_listen

这个函数看命字就知道干什么,就是创建监听套接字,注意到这个套接字是non-block的,注意到。过程就是典型的sock-->bind->listen的过程。

int dyad_listenEx(
  dyad_Stream *stream, const char *host, int port, int backlog
) {
  struct addrinfo hints, *ai = NULL;
  int err, optval;
  char buf[64];
  dyad_Event e;

  /* Get addrinfo */
  memset(&hints, 0, sizeof(hints));
  hints.ai_family = AF_UNSPEC;
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE;
  sprintf(buf, "%d", port);

  // 获取ip地址
  err = getaddrinfo(host,  buf,  &hints, &ai);
  if (err) {
    stream_error(stream, "could not get addrinfo", errno);
    goto fail;
  }
  /* Init socket */
 // 为连接创建套接字描述符
  err = stream_initSocket(stream, ai->ai_family, ai->ai_socktype,
                          ai->ai_protocol);
  if (err) goto fail;

  /* Set SO_REUSEADDR so that the socket can be immediately bound without
   * having to wait for any closed socket on the same port to timeout */
  optval = 1;
 // 能够使用端口地址,
  setsockopt(stream->sockfd, SOL_SOCKET, SO_REUSEADDR,
             &optval, sizeof(optval));
  /* Bind and listen */
  err = bind(stream->sockfd, ai->ai_addr, ai->ai_addrlen);
  if (err) {
    stream_error(stream, "could not bind socket", errno);
    goto fail;
  }
  err = listen(stream->sockfd, backlog);
  if (err) {
    stream_error(stream, "socket failed on listen", errno);
    goto fail;
  }

  stream->state = DYAD_STATE_LISTENING;
  stream->port = port;
  // 第二次绑定地址信息
  stream_initAddress(stream);
  /* Emit listening event */
  e = createEvent(DYAD_EVENT_LISTEN);
  e.msg = "socket is listening";
  // 调用回调
  stream_emitEvent(stream, &e);
  freeaddrinfo(ai);
  return 0;
  fail:
  if (ai) freeaddrinfo(ai);
  return -1;
}
// 监听套接字
int dyad_listen(dyad_Stream *stream, int port) {
  return dyad_listenEx(stream, NULL, port, 511);
}

getaddrinfo系统调用

我们在上面看到了getaddrinfo,这个系统调用的man page 如下:

int getaddrinfo(const char *node, const char *service,
                       const struct addrinfo *hints,
                       struct addrinfo **res);

Given node and service, which identify an Internet host and a service, getaddrinfo() returns one or more addrinfo structures, each of which contains an Internet address that can be specified in a call to bind(2) or connect(2). The getaddrinfo() function combines the functionality provided by the gethostbyname(3) and getservbyname(3) functions into a single interface, but unlike the latter functions, getaddrinfo() is reentrant and allows programs to eliminate IPv4-versus-IPv6 dependencies.

实际上,上面的形参node表示的是主机名称,service表示的是端口号。这个函数目的是获取其internet address,为后面的bind系统调用所使用。注意到在经历socket-->bind--->listen之后,套接字被设置成non-blocking,然后是“发射了”一个事件,这就是listen事件。我们来看看这个发射事件的函数。

static void stream_emitEvent(dyad_Stream *stream, dyad_Event *e) {
  int i;
  e->stream = stream;
  // 遍历整个该socket的回调
  for (i = 0; i < stream->listeners.length; i++) {
    Listener *listener = &stream->listeners.data[i];
    // 如果回调的事件类型为e
    if (listener->event == e->type) {
      e->udata = listener->udata;
      // 调用回调函数
      listener->callback(e);
    }
    /* Check to see if this listener was removed: If it was we decrement `i`
     * since the next listener will now be in this ones place */
    if (listener != &stream->listeners.data[i]) {
      i--;
    }
  }
}

从上面的代码中可以看到,就是遍历该连接所有的回调函数,如果该连接的事件类型满足则应该调用。

dyad_update函数

dyad_update这个函数是非常重要的,这就是这个简单的异步库的核心,相当于Reactor中的Event Loop,所以现在来好好好分析这个函数。

void dyad_update(void) {
  dyad_Stream *stream;
  struct timeval tv;
  destroyClosedStreams();
  updateTickTimer();
  // 处理超时回调
  updateStreamTimeouts();
  /* Create fd sets for select() */
  select_zero(&dyad_selectSet);

  stream = dyad_streams;
  while (stream) {
    switch (stream->state) {
      case DYAD_STATE_CONNECTED:
        select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
        if (!(stream->flags & DYAD_FLAG_READY) ||
            stream->writeBuffer.length != 0
        ) {
          select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        }
        break;
      case DYAD_STATE_CLOSING:
        select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        break;
      case DYAD_STATE_CONNECTING:
        select_add(&dyad_selectSet, SELECT_WRITE, stream->sockfd);
        select_add(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd);
        break;
      case DYAD_STATE_LISTENING:
        select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
        break;
    }
    stream = stream->next;
  }

  /* Init timeout value and do select */
  #ifdef _MSC_VER
    #pragma warning(push)
    /* Disable double to long implicit conversion warning,
     * because the type of timeval's fields don't agree across platforms */
    #pragma warning(disable: 4244)
  #endif
  tv.tv_sec = dyad_updateTimeout;
  tv.tv_usec = (dyad_updateTimeout - tv.tv_sec) * 1e6;
  #ifdef _MSC_VER
    #pragma warning(pop)
  #endif

  select(dyad_selectSet.maxfd + 1,
         dyad_selectSet.fds[SELECT_READ],
         dyad_selectSet.fds[SELECT_WRITE],
         dyad_selectSet.fds[SELECT_EXCEPT],
         &tv);

  /* Handle streams */
  stream = dyad_streams;
  while (stream) {
    switch (stream->state) {

      case DYAD_STATE_CONNECTED:
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_handleReceivedData(stream);
          if (stream->state == DYAD_STATE_CLOSED) {
            break;
          }
        }
        /* Fall through */

      case DYAD_STATE_CLOSING:
        if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
          stream_flushWriteBuffer(stream);
        }
        break;

      case DYAD_STATE_CONNECTING:
        if (select_has(&dyad_selectSet, SELECT_WRITE, stream->sockfd)) {
          /* Check socket for error */
          int optval = 0;
          socklen_t optlen = sizeof(optval);
          dyad_Event e;
          getsockopt(stream->sockfd, SOL_SOCKET, SO_ERROR, &optval, &optlen);
          if (optval != 0) goto connectFailed;
          /* Handle succeselful connection */
          stream->state = DYAD_STATE_CONNECTED;
          stream->lastActivity = dyad_getTime();
          stream_initAddress(stream);
          /* Emit connect event */
          e = createEvent(DYAD_EVENT_CONNECT);
          e.msg = "connected to server";
          stream_emitEvent(stream, &e);
        } else if (
          select_has(&dyad_selectSet, SELECT_EXCEPT, stream->sockfd)
        ) {
          /* Handle failed connection */
connectFailed:
          stream_error(stream, "could not connect to server", 0);
        }
        break;

      case DYAD_STATE_LISTENING:
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_acceptPendingConnections(stream);
        }
        break;
    }

    /* If data was just now written to the stream we should immediately try to
     * send it */
    if (
      stream->flags & DYAD_FLAG_WRITTEN &&
      stream->state != DYAD_STATE_CLOSED
    ) {
      stream_flushWriteBuffer(stream);
    }

    stream = stream->next;
  }
}

我们来看一下,这个事件循环的思路,首先destroyClosedStreams遍历整个stream,将死连接释放掉,当然会调用起注册的回调函数,如果有的话。然后更新循环中的一个滴答数。接着初始化Set中文件描述集合为全部为0,要注意使用select函数时,每次都要重新设置关注的文件描述符事件。

我们分几种情况,来跟踪以下源码:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "dyad.h"
/* An echo server: Echos any data received by a client back to the client */
static void onData(dyad_Event *e) {
  dyad_write(e->stream, e->data, e->size);
}
static void onAccept(dyad_Event *e) { 
    puts("Accept a connection ");
    dyad_addListener(e->remote, DYAD_EVENT_DATA, onData, NULL);
    dyad_writef(e->remote, "echo server
");
}
static void onError(dyad_Event *e) {
  printf("server error: %s
", e->msg);
}

static void onListen(dyad_Event* e){ 
    fprintf(stdout, "listen success %d", e->type); 
}
int main(void) {
  dyad_Stream *s;
  dyad_init();
  s = dyad_newStream();
  dyad_addListener(s, DYAD_EVENT_LISTEN, onListen, NULL);
  /* 添加错误的回调 */
  dyad_addListener(s, DYAD_EVENT_ERROR,  onError,  NULL);
   /* 添加连接成功后的回调 */
  dyad_addListener(s, DYAD_EVENT_ACCEPT, onAccept, NULL);
  dyad_listen(s, 8000);
  fprintf(stdout, "stream num %d", dyad_getStreamCount());
  while (dyad_getStreamCount() > 0) {
    dyad_update();
  }
  return 0;
}

(1) 没有任何客户端尝试进行连接
我们在24行, s = dyad_newStream();,这行打下断点,发现

注意到这里的创建的listen fd的值为3,因为0, 1, 2都被标准I/O占用,这也说明每个进程能打开的文件描述符都是独立的,不会因为别的进程占用了3,你的进程就所能够打开的文件描述符必须从4开始。这里我们应该注意state为4, 对应的状态为DYAD_EVENT_LISTEN,在dyad_listen那句打上断点,我们看看s的状态:

这个时候,address和port都已经初始化好了。断点现在设置到dyad_getStreamCount调用,我们发现这里返回的只有1,只有一个listen socket 值为3,s的next指向NULL,现在进入到dyad_update(),因为s的状态是DYAD_EVENT_LISTEN同样对应的DYAD_STATE_LISTENING的值,此时将调用:

select_add(&dyad_selectSet, SELECT_READ, stream->sockfd);
break;

意思很明显,将3的读事件加入到关注列表中。因为此时整个服务器后台处理的连接只有一个,s的next为NULL,所以马上跳出while循环。接着就是设置超时的时间为1s:

tv.tv_sec = dyad_updateTimeout;
tv.tv_usec = (dyad_updateTimeout - tv.tv_sec) * 1e6;

然后调用select进行多路I/O复用,由前面可知,select最多1秒后,返回。

select(dyad_selectSet.maxfd + 1,
         dyad_selectSet.fds[SELECT_READ],
         dyad_selectSet.fds[SELECT_WRITE],
         dyad_selectSet.fds[SELECT_EXCEPT],
         &tv);

从select返回后,然后就对整个stream链表进行遍历,因为只有一个listen fd 所以遍历一次,就结束了,遍历的时候,会根据当前fd的状态来进行不同的操作,因为listen fd 此时的状态为DYAD_STATE_LISTENING,那么就会进入:


if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
    stream_acceptPendingConnections(stream);
}

显然判断,是否是当前的套接字3,是否有读事件发生,如果有读事件,那么if里面为true。在这里因为没有客户端端接入,所以并没有进入到这if里面。然后就是判断是否该套接字是可写的,如果是的,那么就应该将数据发送出去。

 if (
      stream->flags & DYAD_FLAG_WRITTEN &&
      stream->state != DYAD_STATE_CLOSED
    ) {
      stream_flushWriteBuffer(stream);
    }

没有任何客户端连接就这样结束了。
(2)有客户端接入
首先,还是stream的listen sock fd为3,状态还是DYAD_STATE_LISTENING,所以,首先添加的select set集合中,将其读事件注册,然后调用select等待超时,或者是listen fd读事件发生,因为这里有客户端接入,那么就应该返回listen fd 可读。然后进入到下面的代码:

 case DYAD_STATE_LISTENING:
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_acceptPendingConnections(stream);
        }
        break;

因为这里的stream sockfd 可读,那么将会调用stream_acceptPendingConnections(stream)。下面是这个函数的关键字代码:

// 创建accept 套接字
dyad_Socket sockfd = accept(stream->sockfd, NULL, NULL);
// Create client stream 
remote = dyad_newStream();
remote->state = DYAD_STATE_CONNECTED;
/* Set stream's socket */
stream_setSocket(remote, sockfd);
 e = createEvent(DYAD_EVENT_ACCEPT);
 e.msg = "accepted connection";
 e.remote = remote;
 stream_emitEvent(stream, &e);

这个里面最重要的是创建remote Stream,创建后其会加入到全局的stream链表,设置remote的 sockfd为accept套接字。然后调用accept事件的回调。此时,sockef为4,注意:我们的echoSever中有onAccept回调函数,这个函数将创建remote stream读事件回调。

static void onAccept(dyad_Event *e) { 
    puts("Accept a connection ");
    // 添加对remote stream中套接字的回调
    dyad_addListener(e->remote, DYAD_EVENT_DATA, onData, NULL);
}

stream_acceptPendingConnections调用完成后,这时也设置当前socket为4的回调也就是onData,同样也是调用select函数,那么这个时候,同样遍历整个stream 流,包含socket值为3和4,根据stream的状态来选择相应的状态,现在4套接字排在了3,而且,4的状态为DYAD_STATE_CONNECTED,所以进入到:

case DYAD_STATE_CONNECTED:
        if (select_has(&dyad_selectSet, SELECT_READ, stream->sockfd)) {
          stream_handleReceivedData(stream);
          if (stream->state == DYAD_STATE_CLOSED) {
            break;
          }
        }

首先判断当前套接字4是否可读,如果可读,那么进入到stream_handleReceivedData(stream),这里我们假设有数据读入,那么select_has将会返回true,我们看看stream_handleReceivedData读入:

  char data[8192];
  int size = recv(stream->sockfd, data, sizeof(data) - 1, 0);
    if (size <= 0) {
      // 这样size为0、可能
      if (size == 0 || errno != EWOULDBLOCK) {
        /* Handle disconnect */
        dyad_close(stream);
        return;
      } else {
        /* No more data */
        return;
      }
    }
    data[size] = 0;

关键recv是用来读数据,应该注意的是最多读取8192-1个数据,这次获取数据后,将会产生事件。产生事件后,调用回调函数。

data[size] = 0;
/* Update status */
stream->bytesReceived += size;
stream->lastActivity = dyad_getTime();
/* Emit data event */
e = createEvent(DYAD_EVENT_DATA);
e.msg = "received data";
e.data = data;
e.size = size;
stream_emitEvent(stream, &e);

destroyClosedStreams()

在dyad_update中,首先调用的是destroyClosedStreams这个函数的作用是,在遍历整个系统接收到的请求队列中,释放掉已经关闭的请求连接。应该注意到的是,所有的socket连接是拉了一个链表串起来的,所以,就是遍历整个链表,将处于close状态的连接删掉

static void destroyClosedStreams(void) {
  dyad_Stream *stream = dyad_streams;
  while (stream) {
    if (stream->state == DYAD_STATE_CLOSED) {
      dyad_Stream *next = stream->next;
      stream_destroy(stream);
      stream = next;
    } else {
      stream = stream->next;
    }
  }
}

注意到这里的删掉,除了是释放掉相应的dyad_Stream的内存,还有就是要对回调函数进行调用,这个过程在stream_destroy中实现。

stream_destroy

static void stream_destroy(dyad_Stream *stream) {
  dyad_Event e;
  dyad_Stream **next;
  /* Close socket */
  if (stream->sockfd != INVALID_SOCKET) {
    close(stream->sockfd);
  }
  /* Emit destroy event */
  e = createEvent(DYAD_EVENT_DESTROY);
  e.msg = "the stream has been destroyed";
  stream_emitEvent(stream, &e);
  /* Remove from list and decrement count */
  next = &dyad_streams;
  while (*next != stream) {
    next = &(*next)->next;
  }
  *next = stream->next;
  dyad_streamCount--;
  /* Destroy and free */
  vec_deinit(&stream->listeners);
  vec_deinit(&stream->lineBuffer);
  vec_deinit(&stream->writeBuffer);
  dyad_free(stream->address);
  dyad_free(stream);
}

在stream_destroy中,我们可以看到,首先要关闭的是套接字描述符,然后是调用相应的回调函数,最后从连接链表中删除连接,最后是释放相应的内存。

updateTickTimer();

这个函数主要的作用就是更新上次遍历整个循环的时间,然后对整个已经建立连接进行遍历,如果你关注了从上次loop时间到现在loop时间的每一秒事件(),那么就会调用该事件的回调函数。

static void updateTickTimer(void) {
  /* Update tick timer */
  if (dyad_lastTick == 0) {
    dyad_lastTick = dyad_getTime();
  }
  // 如果上次遍历的时间小于当前时间
  // 那么关注了DYAD_EVENT_TICK事件
  // 对其中的每一秒都调用相应的回调函数
  while (dyad_lastTick < dyad_getTime()) {
    /* Emit event on all streams */
    dyad_Stream *stream;
    dyad_Event e = createEvent(DYAD_EVENT_TICK);
    e.msg = "a tick has occured";
    stream = dyad_streams;
    while (stream) {
        // 调用回调
      stream_emitEvent(stream, &e);
      stream = stream->next;
    }
    dyad_lastTick += dyad_tickInterval;
  }
}

select_zero

这个函数就是要初始化select 使用的fd_set为0。

static void select_zero(SelectSet *s) {
  int i;
  if (s->capacity == 0) return;
  s->maxfd = 0;
  for (i = 0; i < SELECT_MAX; i++) {
#if _WIN32
    s->fds[i]->fd_count = 0;
#else
    memset(s->fds[i], 0, s->capacity * sizeof(fd_set));
#endif
  }
}

dyad_getStreamCount

这个函数是返回当前有多少个套接字流要处理。

原文地址:https://www.cnblogs.com/bofengqiye/p/7353194.html