UDT源码剖析(六)之EPoll

常用的事件处理体系EPoll在UDT中有一种独特的用法,初见时觉得鸡肋,越到后来越是觉得妙趣横生。

EPoll

  • 基础数据结构:
struct CEPollDesc
{
   int m_iID;                                //    //在map中的索引ID
   std::set<UDTSOCKET> m_sUDTSocksOut;       // UDT关注的写事件
   std::set<UDTSOCKET> m_sUDTSocksIn;        // UDT关注的读事件
   std::set<UDTSOCKET> m_sUDTSocksEx;        // UDT关注的异常事件

   int m_iLocalID;                           //真实的由epoll_create返回的EID
   std::set<SYSSOCKET> m_sLocals;            // UDP/TCP之类的系统描述符

   std::set<UDTSOCKET> m_sUDTWrites;         // 可以提交给用户的UDT写事件
   std::set<UDTSOCKET> m_sUDTReads;          // 可以提交给用户的UDT读事件
   std::set<UDTSOCKET> m_sUDTExcepts;        // 可以提交给用户的UDT异常事件 (包括连接BROKEN之类的事件)
};
class CEPoll    //这个CEpoll使用map同时管理多个epoll实例,每个epoll实例管理1024个描述符
{
private:
   int m_iIDSeed;                            //缓存从map中索引epoll实例的ID,每次+1,到0x7fffffff后回绕,不过肯定没有这么多的epoll实例存在
   udt_pthread_mutex_t m_SeedLock;

   std::map<int, CEPollDesc> m_mPolls;       // 所有的EPoll
   udt_pthread_mutex_t m_EPollLock;
};
  • 初始化:CEPoll::CEPoll()
CEPoll::CEPoll():
m_iIDSeed(0)    //将种子初始化为0
{
   CGuard::createMutex(m_EPollLock);
}
  • 创建真实的EPoll:int CEPoll::create()
int CEPoll::create()
{
   CGuard pg(m_EPollLock);

   int localid = 0;

   #ifdef LINUX
   localid = epoll_create(1024);    //每个创建一个只管理1024个描述符的EPoll
   if (localid < 0)
      throw CUDTException(-1, 0, errno);
   #else
   // on BSD, use kqueue
   // on Solaris, use /dev/poll
   // on Windows, select
   #endif

   if (++ m_iIDSeed >= 0x7FFFFFFF)    //防止种子进行回绕
      m_iIDSeed = 0;

   CEPollDesc desc;    //创建一个Desc初始化后加入EPoll管理的map中
   desc.m_iID = m_iIDSeed;
   desc.m_iLocalID = localid;
   m_mPolls[desc.m_iID] = desc;

   return desc.m_iID;
}
  • 向EPoll中添加UDT SOCKET:int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
int CEPoll::add_usock(const int eid, const UDTSOCKET& u, const int* events)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);      //找到合适的epoll
   if (p == m_mPolls.end())   
      throw CUDTException(5, 13);

   // BARCHART: Manage all event types.
   if (!events || (*events & UDT_EPOLL_IN))     //向Desc的观察描述符set中添加关注的事件
      p->second.m_sUDTSocksIn.insert(u);
   if (!events || (*events & UDT_EPOLL_OUT))
      p->second.m_sUDTSocksOut.insert(u);
   if (!events || (*events & UDT_EPOLL_ERR))
      p->second.m_sUDTSocksEx.insert(u);

   return 0;
}
  • 更新EPoll中的UDT SOCKET关注的事件:int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)
int CEPoll::update_usock(const int eid, const UDTSOCKET& u, const int* events)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //寻找合适的EPoll
   if (p == m_mPolls.end()){
        throw CUDTException(5, 13);
   }

   if(events){    //根据提供的事件,在Desc的观察set中添加或者删除事件
    if (*events & UDT_EPOLL_IN){    
        p->second.m_sUDTSocksIn.insert(u);
    }else{
        p->second.m_sUDTSocksIn.erase(u);
    }
    if (*events & UDT_EPOLL_OUT){
        p->second.m_sUDTSocksOut.insert(u);
    } else{
        p->second.m_sUDTSocksOut.erase(u);
    }
   }

   return 0;
}
  • 获取UDT SOCKET关注的事件:int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)
int CEPoll::verify_usock(const int eid, const UDTSOCKET& u, int* events)
{

   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //寻找合适的EPoll
   if (p == m_mPolls.end()){
        throw CUDTException(5, 13);
   }

   if(events){
    if(p->second.m_sUDTSocksIn.find(u) != p->second.m_sUDTSocksIn.end()){    //将关注的事件填充到event中
        *events |= UDT_EPOLL_IN;
    }
    if(p->second.m_sUDTSocksOut.find(u) != p->second.m_sUDTSocksOut.end()){
        *events |= UDT_EPOLL_OUT;
    }
   }

   return 0;

}
  • 向EPoll中添加SYS SOCKET,使用epoll系列函数:int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
int CEPoll::add_ssock(const int eid, const SYSSOCKET& s, const int* events)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //寻找合适的EPoll
   if (p == m_mPolls.end())
      throw CUDTException(5, 13);

#ifdef LINUX
   epoll_event ev;    //获取事件

   if (NULL == events)
      ev.events = EPOLLIN | EPOLLOUT | EPOLLERR;    //如果没有提供关注的事件,则SYS默认关注所有的事件
   else
   {
      ev.events = 0;    //否则根据提供的事件列表决定关注哪些事件
      if (*events & UDT_EPOLL_IN)
         ev.events |= EPOLLIN;
      if (*events & UDT_EPOLL_OUT)
         ev.events |= EPOLLOUT;
      if (*events & UDT_EPOLL_ERR)
         ev.events |= EPOLLERR;
   }

   ev.data.fd = s;
   if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_ADD, s, &ev) < 0)
      throw CUDTException();
#endif

   p->second.m_sLocals.insert(s);    //向描述的SYS SOCKET中添加事件

   return 0;
}
  • 删除EPoll中关注的UDT SOCKET:int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)
int CEPoll::remove_usock(const int eid, const UDTSOCKET& u)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //寻找EPoll Desc
   if (p == m_mPolls.end())
      throw CUDTException(5, 13);

   p->second.m_sUDTSocksIn.erase(u);    //从关注读写的set中删除UDT SOCKET
   p->second.m_sUDTSocksOut.erase(u);

   p->second.m_sUDTReads.erase(u);    //从读写已发送的set中删除UDT SOCKET
   p->second.m_sUDTWrites.erase(u);

   return 0;
}
  • 删除EPoll关注的SYS SOCKET,使用epoll系列函数:int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
int CEPoll::remove_ssock(const int eid, const SYSSOCKET& s)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //获取EPoll Desc
   if (p == m_mPolls.end())
      throw CUDTException(5, 13);

#ifdef LINUX
   epoll_event ev;     //删除关注的SYS SOCKET
   if (::epoll_ctl(p->second.m_iLocalID, EPOLL_CTL_DEL, s, &ev) < 0)
      throw CUDTException();
#endif

   p->second.m_sLocals.erase(s);

   return 0;
}
int CEPoll::wait(const int eid, set<UDTSOCKET>* readfds, set<UDTSOCKET>* writefds, int64_t msTimeOut, set<SYSSOCKET>* lrfds, set<SYSSOCKET>* lwfds)
{
   if (!readfds && !writefds && !lrfds && lwfds && (msTimeOut < 0))    // 如果提供的参数有问题,就抛出异常
      throw CUDTException(5, 3, 0);

   //因为是填充,先清除提供的参数
   if (readfds) readfds->clear();
   if (writefds) writefds->clear();
   if (lrfds) lrfds->clear();
   if (lwfds) lwfds->clear();

   int total = 0;

   int64_t entertime = CTimer::getTime();   //先获取一手时间,看你怎么说
   while (true)
   {
      CGuard::enterCS(m_EPollLock);

      map<int, CEPollDesc>::iterator p = m_mPolls.find(eid);    //找到相应的epoll实例,没找到就抛异常,在抛出异常前先释放锁
      if (p == m_mPolls.end())
      {
         CGuard::leaveCS(m_EPollLock);
         throw CUDTException(5, 13);
      }

      //如果找到的epoll观察的各种参数都是空的,抛出异常
      if (p->second.m_sUDTSocksIn.empty() && p->second.m_sUDTSocksOut.empty() && p->second.m_sLocals.empty() && (msTimeOut < 0))
      {
         CGuard::leaveCS(m_EPollLock);
         throw CUDTException(5, 3);
      }

      // 将已发生的读事件和异常事件(UDTSOCKET)填充到用户提供的容器中,读事件是直接获得描述符set,异常事件是加入,然后更新total.写事件做同样的处理
      if ((NULL != readfds) && (!p->second.m_sUDTReads.empty() || !p->second.m_sUDTExcepts.empty()))
      {
         *readfds = p->second.m_sUDTReads;    //不进行复制,直接将用户提供的指针指向UDT的可读set,然后将异常UDT添加到可读set中
         for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
            readfds->insert(*i);
         total += p->second.m_sUDTReads.size() + p->second.m_sUDTExcepts.size();    //调整返回的事件发生的数量
      }
      if ((NULL != writefds) && (!p->second.m_sUDTWrites.empty() || !p->second.m_sUDTExcepts.empty()))
      {
         *writefds = p->second.m_sUDTWrites;    //直接调整写事件的指向,并将异常事件添加到写事件中,并更新事件数量
         for (set<UDTSOCKET>::const_iterator i = p->second.m_sUDTExcepts.begin(); i != p->second.m_sUDTExcepts.end(); ++ i)
            writefds->insert(*i);
         total += p->second.m_sUDTWrites.size() + p->second.m_sUDTExcepts.size();
      }

      // 因为异常事件的描述符已经添加,清除当前epoll实例中的异常事件
      if(total > 0 && !p->second.m_sUDTExcepts.empty()){
        p->second.m_sUDTExcepts.clear();
      }

      //处理系统事件  
      if (lrfds || lwfds)
      {
         #ifdef LINUX
         const int max_events = p->second.m_sLocals.size();       //获取EPoll Desc中系统描述符数量,并对系统描述符做epoll_wait处理
         epoll_event ev[max_events];
         int nfds = ::epoll_wait(p->second.m_iLocalID, ev, max_events, 0);    //调用wait函数处理这些事件
        
         //填充系统描述符的返回列表,累加total
         for (int i = 0; i < nfds; ++ i)
         {
            if ((NULL != lrfds) && (ev[i].events & EPOLLIN))
           {
               lrfds->insert(ev[i].data.fd);
               ++ total;
            }
            if ((NULL != lwfds) && (ev[i].events & EPOLLOUT))
            {
               lwfds->insert(ev[i].data.fd);
               ++ total;
            }
         }
         #else
         //currently "select" is used for all non-Linux platforms.
         //faster approaches can be applied for specific systems in the future.

         //"select" has a limitation on the number of sockets

         fd_set readfds2;
         fd_set writefds2;
         FD_ZERO(&readfds2);
         FD_ZERO(&writefds2);

         for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
         {
            if (lrfds)
               FD_SET(*i, &readfds2);
            if (lwfds)
               FD_SET(*i, &writefds2);
         }

         timeval tv;
         tv.tv_sec = 0;
         tv.tv_usec = 0;
         if (::select(0, &readfds2, &writefds2, NULL, &tv) > 0)
         {
            for (set<SYSSOCKET>::const_iterator i = p->second.m_sLocals.begin(); i != p->second.m_sLocals.end(); ++ i)
            {
               if (lrfds && FD_ISSET(*i, &readfds2))
               {
                  lrfds->insert(*i);
                  ++ total;
               }
               if (lwfds && FD_ISSET(*i, &writefds2))
               {
                  lwfds->insert(*i);
                  ++ total;
               }
            }
         }
         #endif
      }

      CGuard::leaveCS(m_EPollLock);

      if (total > 0)    //如果已经获得事件,直接返回就可以了
         return total;

      if ((msTimeOut >= 0) && (int64_t(CTimer::getTime() - entertime) >= msTimeOut * 1000LL))    //没有事件发生并且已经超时,抛出异常
         throw CUDTException(6, 3, 0);

      CTimer::waitForEvent();   //如果无事发生就等待一会
   }

   return 0;
}
  • 释放某一个EPoll:int CEPoll::release(const int eid)
int CEPoll::release(const int eid)    
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator i = m_mPolls.find(eid);    //获取EPoll Desc
   if (i == m_mPolls.end())
      throw CUDTException(5, 13);

   #ifdef LINUX
   // release local/system epoll descriptor
   ::close(i->second.m_iLocalID);    //直接关闭eid
   #endif

   m_mPolls.erase(i);    //从map中移除Epoll Desc

   return 0;
}
  • 更新事件:int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
int CEPoll::update_events(const UDTSOCKET& uid, std::set<int>& eids, int events, bool enable)
{
   CGuard pg(m_EPollLock);

   map<int, CEPollDesc>::iterator p;    //获取EPoll Desc

   vector<int> lost;
   for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)    //在所有的EPoll Desc中进行轮询
   {   
      p = m_mPolls.find(*i);    //获取一个EPoll Dssc
      if (p == m_mPolls.end())  //如果没有找到这个epoll,就把这个ID加入lost中
      {
         lost.push_back(*i);
      }
      else
      {    //找到的话,根据参数决定是否将事件从watch中添加到result中
         if ((events & UDT_EPOLL_IN) != 0)
            update_epoll_sets(uid, p->second.m_sUDTSocksIn, p->second.m_sUDTReads, enable);
         if ((events & UDT_EPOLL_OUT) != 0)
            update_epoll_sets(uid, p->second.m_sUDTSocksOut, p->second.m_sUDTWrites, enable);
         if ((events & UDT_EPOLL_ERR) != 0)
            update_epoll_sets(uid, p->second.m_sUDTSocksEx, p->second.m_sUDTExcepts, enable);
      }
   }
    
   //如果没有找到合适的EPoll,移除这些丢失的EPoll
   for (vector<int>::iterator i = lost.begin(); i != lost.end(); ++ i)
      eids.erase(*i);

   return 0;
}
  • 独立于EPoll的函数:void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
void update_epoll_sets(const UDTSOCKET& uid, const set<UDTSOCKET>& watch, set<UDTSOCKET>& result, bool enable)
{
   if (enable && (watch.find(uid) != watch.end()))    //从关注的事件中根据参数,决定是否从结果中移除
   {
      result.insert(uid);
   }
   else if (!enable)
   {
      result.erase(uid);
   }
}
原文地址:https://www.cnblogs.com/ukernel/p/9191055.html