MIT 2012分布式课程基础源码解析-事件管理封装

这部分的内容主要包括Epoll/select的封装,在封装好相应函数后,再使用一个类来管理相应事件,实现的文件为pollmgr.{h, cc}。

事件函数封装

可看到pollmgr.h文件下定一个了一个虚基类aio_mgr

1 class aio_mgr {
2     public:
3         virtual void watch_fd(int fd, poll_flag flag) = 0;
4         virtual bool unwatch_fd(int fd, poll_flag flag) = 0;
5         virtual bool is_watched(int fd, poll_flag flag) = 0;
6         virtual void wait_ready(std::vector<int> *readable, std::vector<int> *writable) = 0;
7         virtual ~aio_mgr() {}
8 };
View Code

这便是具体事件类实现的基类,可看到文件末尾处的继承关系

 1 class SelectAIO : public aio_mgr {
 2     public :
 3 
 4         SelectAIO();
 5         ~SelectAIO();
 6         void watch_fd(int fd, poll_flag flag);
 7         bool unwatch_fd(int fd, poll_flag flag);
 8         bool is_watched(int fd, poll_flag flag);
 9         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
10 
11     private:
12 
13         fd_set rfds_;
14         fd_set wfds_;
15         int highfds_;
16         int pipefd_[2];
17 
18         pthread_mutex_t m_;
19 
20 };
21 
22 #ifdef __linux__ 
23 class EPollAIO : public aio_mgr {
24     public:
25         EPollAIO();
26         ~EPollAIO();
27         void watch_fd(int fd, poll_flag flag);
28         bool unwatch_fd(int fd, poll_flag flag);
29         bool is_watched(int fd, poll_flag flag);
30         void wait_ready(std::vector<int> *readable, std::vector<int> *writable);
31 
32     private:
33         int pollfd_;
34         struct epoll_event ready_[MAX_POLL_FDS];
35         int fdstatus_[MAX_POLL_FDS];
36 
37 };
38 #endif /* __linux */
View Code

相应是使用select和epoll分别实现的事件管理类,其中最主要的方法是wait_ready,这个方法实现了具体的事件查询,其余几个函数用于管理套接字,如增加套接字,删除套接字以及判断套接字是否还存活着。这里我们主要看下epoll实现部分,select实现部分类似。epoll的详解可看这里

  1 EPollAIO::EPollAIO()
  2 {
  3     pollfd_ = epoll_create(MAX_POLL_FDS);
  4     VERIFY(pollfd_ >= 0);
  5     bzero(fdstatus_, sizeof(int)*MAX_POLL_FDS);
  6 }
  7 
  8 EPollAIO::~EPollAIO()
  9 {
 10     close(pollfd_);
 11 }
 12 
 13 //状态转换
 14 static inline
 15 int poll_flag_to_event(poll_flag flag)
 16 {
 17     int f;
 18     if (flag == CB_RDONLY) {
 19         f = EPOLLIN;
 20     }else if (flag == CB_WRONLY) {
 21         f = EPOLLOUT;
 22     }else { //flag == CB_RDWR
 23         f = EPOLLIN | EPOLLOUT;
 24     }
 25     return f;
 26 }
 27 /*
 28  *   这个函数就相当于:准备下一个监听事件的类型
 29  */
 30 void
 31 EPollAIO::watch_fd(int fd, poll_flag flag)
 32 {
 33     VERIFY(fd < MAX_POLL_FDS);
 34 
 35     struct epoll_event ev;
 36     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_ADD;
 37     fdstatus_[fd] |= (int)flag;
 38 
 39     //边缘触发模式
 40     ev.events = EPOLLET;
 41     ev.data.fd = fd;
 42     //注册读事件
 43     if (fdstatus_[fd] & CB_RDONLY) {
 44         ev.events |= EPOLLIN;
 45     }//注册写事件
 46     if (fdstatus_[fd] & CB_WRONLY) {
 47         ev.events |= EPOLLOUT;
 48     }
 49 
 50     if (flag == CB_RDWR) {
 51         VERIFY(ev.events == (uint32_t)(EPOLLET | EPOLLIN | EPOLLOUT));
 52     }
 53     //更改
 54     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
 55 }
 56 
 57 bool 
 58 EPollAIO::unwatch_fd(int fd, poll_flag flag)
 59 {
 60     VERIFY(fd < MAX_POLL_FDS);
 61     fdstatus_[fd] &= ~(int)flag;
 62 
 63     struct epoll_event ev;
 64     int op = fdstatus_[fd]? EPOLL_CTL_MOD : EPOLL_CTL_DEL;
 65 
 66     ev.events = EPOLLET;
 67     ev.data.fd = fd;
 68 
 69     if (fdstatus_[fd] & CB_RDONLY) {
 70         ev.events |= EPOLLIN;
 71     }
 72     if (fdstatus_[fd] & CB_WRONLY) {
 73         ev.events |= EPOLLOUT;
 74     }
 75 
 76     if (flag == CB_RDWR) {
 77         VERIFY(op == EPOLL_CTL_DEL);
 78     }
 79     VERIFY(epoll_ctl(pollfd_, op, fd, &ev) == 0);
 80     return (op == EPOLL_CTL_DEL);
 81 }
 82 
 83 bool
 84 EPollAIO::is_watched(int fd, poll_flag flag)
 85 {
 86     VERIFY(fd < MAX_POLL_FDS);
 87     return ((fdstatus_[fd] & CB_MASK) == flag);
 88 }
 89 /**
 90  *  事件循环,查看有哪些事件已经准备好,准备好的事件则插入相应列表中
 91  */
 92 void
 93 EPollAIO::wait_ready(std::vector<int> *readable, std::vector<int> *writable)
 94 {
 95     //得到已准备好的事件数目
 96     int nfds = epoll_wait(pollfd_, ready_,    MAX_POLL_FDS, -1);
 97     //遍历套接字数组,将可读/可写套接字添加到readable/writable数组中,便于后面处理
 98     for (int i = 0; i < nfds; i++) {
 99         if (ready_[i].events & EPOLLIN) {
100             readable->push_back(ready_[i].data.fd);
101         }
102         if (ready_[i].events & EPOLLOUT) {
103             writable->push_back(ready_[i].data.fd);
104         }
105     }
106 }
View Code

事件管理

在pollmgr.h中还有个重要的类

class aio_callback {
    public:
        virtual void read_cb(int fd) = 0;
        virtual void write_cb(int fd) = 0;
        virtual ~aio_callback() {}
};
View Code

这是一个回调虚基类,里面两个函数可从函数名猜到功能,即从对应的套接字读取/写入数据。该基类在后面底层通信中扮演着重要的角色。

然后我们再看后面的PollMgr类,这便是事件管理类,同时它还使用了单例模式。

 1 class PollMgr {
 2     public:
 3         PollMgr();
 4         ~PollMgr();
 5 
 6         static PollMgr *Instance();
 7         static PollMgr *CreateInst();
 8         //在对应的套接字上添加事件
 9         void add_callback(int fd, poll_flag flag, aio_callback *ch);
10         //删除套接字上的所有事件
11         void del_callback(int fd, poll_flag flag);
12         bool has_callback(int fd, poll_flag flag, aio_callback *ch);
13         //阻塞删除套接字,为何阻塞呢?因为删除时,其它线程正在使用该套接字
14         void block_remove_fd(int fd);
15         //主要事件循环方法
16         void wait_loop();
17 
18         static PollMgr *instance;
19         static int useful;
20         static int useless;
21 
22     private:
23         pthread_mutex_t m_;
24         pthread_cond_t changedone_c_;
25         pthread_t th_;
26 
27         aio_callback *callbacks_[MAX_POLL_FDS]; //事件数组,即数组下标为相应的套接字
28         aio_mgr *aio_;   //具体的事件函数类,可实现为epoll/select
29         bool pending_change_;
30 };
View Code

其中最主要的函数是wait_loop

接下来我们看具体实现。

 1 PollMgr *PollMgr::instance = NULL;
 2 static pthread_once_t pollmgr_is_initialized = PTHREAD_ONCE_INIT;
 3 
 4 void
 5 PollMgrInit()
 6 {
 7     PollMgr::instance = new PollMgr();
 8 }
 9 
10 PollMgr *
11 PollMgr::Instance()
12 {
13     //保证PollMgrInit在本线程内只初始化一次
14     pthread_once(&pollmgr_is_initialized, PollMgrInit);
15     return instance;
16 }
View Code

这里实现单例,pthread_once保证了线程中只初始化一次PollMgrInit()函数,所以在具体使用时,只需调用PollMgr::Instance()即可获得该管理类,再在其上处理各种各种事件。这里有个小疑问是:instance变量不应该是私有变量吗?

接下来我们看构造析构函数:

PollMgr::PollMgr() : pending_change_(false)
{
    bzero(callbacks_, MAX_POLL_FDS*sizeof(void *));
    //aio_ = new SelectAIO();
    aio_ = new EPollAIO();
    VERIFY(pthread_mutex_init(&m_, NULL) == 0);
    VERIFY(pthread_cond_init(&changedone_c_, NULL) == 0);
    //this表示本类,wait_loop是本类中的一个方法,false表示不分离(detach)
    VERIFY((th_ = method_thread(this, false, &PollMgr::wait_loop)) != 0);
}

PollMgr::~PollMgr()
{
    //never kill me!!!
    VERIFY(0);
}
View Code

构造函数中初始化了事件类,使用了EpollAIO类,初始化了互斥量和条件变量,然后创建了一个线程调用wait_loop。有意思的是析构函数(never kill me)

 接下来是几个管理函数,管理套接字和回调的函数 

 1 void
 2 PollMgr::add_callback(int fd, poll_flag flag, aio_callback *ch)
 3 {
 4     VERIFY(fd < MAX_POLL_FDS);
 5 
 6     ScopedLock ml(&m_);
 7     aio_->watch_fd(fd, flag);
 8 
 9     VERIFY(!callbacks_[fd] || callbacks_[fd]==ch);
10     callbacks_[fd] = ch;
11 }
12 
13 //remove all callbacks related to fd
14 //the return guarantees that callbacks related to fd
15 //will never be called again
16 void
17 PollMgr::block_remove_fd(int fd)
18 {
19     ScopedLock ml(&m_);
20     aio_->unwatch_fd(fd, CB_RDWR);
21     pending_change_ = true;
22     VERIFY(pthread_cond_wait(&changedone_c_, &m_)==0);
23     callbacks_[fd] = NULL;
24 }
25 
26 //删除相应的回调函数
27 void
28 PollMgr::del_callback(int fd, poll_flag flag)
29 {
30     ScopedLock ml(&m_);
31     if (aio_->unwatch_fd(fd, flag)) {
32         callbacks_[fd] = NULL;
33     }
34 }
35 
36 //
37 bool
38 PollMgr::has_callback(int fd, poll_flag flag, aio_callback *c)
39 {
40     ScopedLock ml(&m_);
41     if (!callbacks_[fd] || callbacks_[fd]!=c)
42         return false;
43 
44     return aio_->is_watched(fd, flag);
45 }
View Code

下面便是循环的主方法,该方法一直循环获取相应的事件,但此方法有个问题是,当某个回调读取需要长时间阻塞时,

会耽误后续事件的读取或写入。

//循环的主方法
void
PollMgr::wait_loop()
{

    std::vector<int> readable;  //可读套接字的vector
    std::vector<int> writable;  //可写套接字的vector
    //
    while (1) {
        {
            ScopedLock ml(&m_);
            if (pending_change_) {
                pending_change_ = false;
                VERIFY(pthread_cond_broadcast(&changedone_c_)==0);
            }
        }
        //首先清空两个vector
        readable.clear();
        writable.clear();
        //这里便监听了事件,读或写事件,有时间发生便将事件的fd插入相应的vector
        aio_->wait_ready(&readable,&writable);
        //如果这次没有可读和可写事件,则继续下一次循环
        if (!readable.size() && !writable.size()) {
            continue;
        } 
        //no locking of m_
        //because no add_callback() and del_callback should 
        //modify callbacks_[fd] while the fd is not dead
        for (unsigned int i = 0; i < readable.size(); i++) {
            int fd = readable[i];
            if (callbacks_[fd]) //相应的回调函数读取套接字上的数据
                callbacks_[fd]->read_cb(fd);
        }

        for (unsigned int i = 0; i < writable.size(); i++) {
            int fd = writable[i];
            if (callbacks_[fd])
                callbacks_[fd]->write_cb(fd);
        }
    }
}
View Code

具体使用时,只需获得单例类即可,然后再添加相应的套接字及回调函数,添加都是线程安全的,因为在相应的实现上都会阻塞在内部互斥变量m_上

原文地址:https://www.cnblogs.com/fwensen/p/5778134.html