UDT源码剖析(四)之Socket函数

UDTSOCKET socket(int af, int type, int protocol)

UDT SOCKET的创建顺序:UDTSOCKET UDT::socket(int af,int type,int protocol) -> UDTSOCKET CUDT::socket(int af,int tyoe,int proctol) -> UDTSOCKET CUDTUnited::newSocket(int af, int type)
  • Order 0:UDTSOCKET CUDTUnited::newSocket(int af, int type):创建UDT SOCKET
UDTSOCKET CUDTUnited::newSocket(int af, int type)
{
   if ((type != SOCK_STREAM) && (type != SOCK_DGRAM))    //如果参数不正确,直接返回
      throw CUDTException(5, 3, 0);

   CUDTSocket* ns = NULL;

   try
   {
      ns = new CUDTSocket;    //new一个CUDTSocket
      ns->m_pUDT = new CUDT;    //紧接着new一个CUDT
      if (AF_INET == af)    //根据IPV4 OR IPV6 ,更新本地地址,并将端口预设为0
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in);
         ((sockaddr_in*)(ns->m_pSelfAddr))->sin_port = 0;
      }
      else
      {
         ns->m_pSelfAddr = (sockaddr*)(new sockaddr_in6);
         ((sockaddr_in6*)(ns->m_pSelfAddr))->sin6_port = 0;
      }
   }
   catch (...)
   {
      delete ns;
      throw CUDTException(3, 2, 0);
   }

   CGuard::enterCS(m_IDLock);
   ns->m_SocketID = -- m_SocketID;    //在初始化的时候会随机一个UDT SOCKET,之后新创建的UDT SOCKET在此基础上累加或者累减就可以了
   CGuard::leaveCS(m_IDLock);

   ns->m_Status = INIT;    //调整UDTSocket的状态为INIT
   ns->m_ListenSocket = 0;    //初始化Listen Socket ID为0
   ns->m_pUDT->m_SocketID = ns->m_SocketID;    //将刚刚获得ID注册到CUDT中
   ns->m_pUDT->m_iSockType = (SOCK_STREAM == type) ? UDT_STREAM : UDT_DGRAM;    //确定CUDT的类型
   ns->m_pUDT->m_iIPversion = ns->m_iIPversion = af;
   ns->m_pUDT->m_pCache = m_pCache;    //CUDT与CUDTUnited共用一个CCache

   // protect the m_Sockets structure.
   CGuard::enterCS(m_ControlLock);
   try
   {
      m_Sockets[ns->m_SocketID] = ns;    //在全局的map中保存CUDTSocket*
   }
   catch (...)
   {
      //failure and rollback
      CGuard::leaveCS(m_ControlLock);
      delete ns;
      ns = NULL;
   }
   CGuard::leaveCS(m_ControlLock);

   if (NULL == ns)
      throw CUDTException(3, 2, 0);

   return ns->m_SocketID;
}

int bind(UDTSOCKET u, const struct sockaddr* name, int namelen)

bind的关联顺序:int UDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::bind(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::open() -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
  • Order 0:int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen)
int CUDTUnited::bind(const UDTSOCKET u, const sockaddr* name, int namelen):将UDT SOCKET与某一个地址相关联
{
   CUDTSocket* s = locate(u);    //获取这个UDT SOCKET的CUDTSocket*
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CGuard cg(s->m_ControlLock);

   // cannot bind a socket more than once
   if (INIT != s->m_Status)
      throw CUDTException(5, 0, 0);

   // check the size of SOCKADDR structure
   if (AF_INET == s->m_iIPversion)
   {
      if (namelen != sizeof(sockaddr_in))
         throw CUDTException(5, 3, 0);
   }
   else
   {
      if (namelen != sizeof(sockaddr_in6))
         throw CUDTException(5, 3, 0);
   }

   s->m_pUDT->open();    //调用CUDT*中的open()并修改CUDT实例的状态
   updateMux(s, name);
   s->m_Status = OPENED;    //调整CUDTSocket的状态为Opened

   // copy address information of local node
   s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);

   return 0;
}
  • Order 1:void CUDT::open():继续填充CUDT中的选项
void CUDT::open()
{
   CGuard cg(m_ConnectionLock);
    
   //初始化有效载荷的大小 
   m_iPktSize = m_iMSS - 28;    //packet size = MSS - 28
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;    //payload size = MSS - 28 - 16

   m_iEXPCount = 1;    //异常计数器设置为1
   m_iBandwidth = 1;    //估计带宽为1 packet / S
   m_iDeliveryRate = 16;    //对方的接收速率为16 packet / S
   m_iAckSeqNo = 0;    //上一次收到的ACK为0
   m_ullLastAckTime = 0;        //上一次接收ACK的事件为0

   m_StartTime = CTimer::getTime();    //初始时间为当前时间
   m_llSentTotal = m_llRecvTotal = m_iSndLossTotal = m_iRcvLossTotal = m_iRetransTotal = m_iSentACKTotal = m_iRecvACKTotal = m_iSentNAKTotal = m_iRecvNAKTotal = 0;    //统计信息初始化
   m_LastSampleTime = CTimer::getTime();    //最后的性能采样时间为现在
   m_llTraceSent = m_llTraceRecv = m_iTraceSndLoss = m_iTraceRcvLoss = m_iTraceRetrans = m_iSentACK = m_iRecvACK = m_iSentNAK = m_iRecvNAK = 0;    //统计信息为0
   m_llSndDuration = m_llSndDurationTotal = 0;    

   if (NULL == m_pSNode)    //初始化发送链表,如果不存在,就new一个
      m_pSNode = new CSNode;
   m_pSNode->m_pUDT = this;    //确定Send List Node中的CUDT*的指向
   m_pSNode->m_llTimeStamp = 1;    //将堆的比较值初始化为1
   m_pSNode->m_iHeapLoc = -1;    //目前不存在于堆中

   if (NULL == m_pRNode)    //初始化接收链表,如果不存在就new一个
      m_pRNode = new CRNode;
   m_pRNode->m_pUDT = this;    //确定Recv List Node中的CUDT*的指向
   m_pRNode->m_llTimeStamp = 1;    //将初始化位置信息为1
   m_pRNode->m_pPrev = m_pRNode->m_pNext = NULL;    //暂时不存在于堆中
   m_pRNode->m_bOnList = false;

   m_iRTT = 10 * m_iSYNInterval;    //RTT为 10 * 10000个CPU时钟周期
   m_iRTTVar = m_iRTT >> 1;    //RTT方差为RTT >> 1
   m_ullCPUFrequency = CTimer::getCPUFrequency();    //获得CPU的时钟周期

   // set up the timers
   m_ullSYNInt = m_iSYNInterval * m_ullCPUFrequency;    //初始化SYN发送的时间间隔为10000个CPU时钟周期

   //设置NAK超时下限与超时下限阀值为100ms,这300000个时钟周期就是100ms??
   m_ullMinNakInt = 300000 * m_ullCPUFrequency;
   m_ullMinExpInt = 300000 * m_ullCPUFrequency;

    //ACK与NAK的发送间隔与SYN的发送间隔相同
   m_ullACKInt = m_ullSYNInt;    
   m_ullNAKInt = m_ullMinNakInt;

   uint64_t currtime;    //获得当前的时间,gettimeofday()
   CTimer::rdtsc(currtime);
   m_ullLastRspTime = currtime;    //上一次的请求连接时间为现在
   m_ullNextACKTime = currtime + m_ullSYNInt;    //确定下一次ACK与SYN的发送时间
   m_ullNextNAKTime = currtime + m_ullNAKInt;

   m_iPktCount = 0;    //收到的Packet计数为0
   m_iLightACKCount = 1;    //收到的ACK计数为1

   m_ullTargetTime = 0;    //下一个数据包的预计发送时间为0
   m_ullTimeDiff = 0;    //两个数据包发送间隔为0,之后要根据RTT估算

   // Now UDT is opened.
   m_bOpened = true;    
}
  • Order 1:void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)
{
   CGuard cg(m_ControlLock);

   if ((s->m_pUDT->m_bReuseAddr) && (NULL != addr))
   {
       //如果还没有关联到某一个CMultiplexer上,先获得想要关联的port 
      int port = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)addr)->sin_port) : ntohs(((sockaddr_in6*)addr)->sin6_port);

      // 根据获得port在CMultiplexer中进行寻找
      for (map<int, CMultiplexer>::iterator i = m_mMultiplexer.begin(); i != m_mMultiplexer.end(); ++ i)
      {
         if ((i->second.m_iIPversion == s->m_pUDT->m_iIPversion) && (i->second.m_iMSS == s->m_pUDT->m_iMSS) && i->second.m_bReusable)
         {
            //获得相应的CMultiplexer
            if (i->second.m_iPort == port)
            {
               // reuse the existing multiplexer
               ++ i->second.m_iRefCount;    //首先叠加引用计数
               s->m_pUDT->m_pSndQueue = i->second.m_pSndQueue;    //享用Send Queue
               s->m_pUDT->m_pRcvQueue = i->second.m_pRcvQueue;    //享用Recv Queue
               s->m_iMuxID = i->second.m_iID;    //顺便填充ID,为下一次查找提供便利
               return;
            }
         }
      }
   }

   // 如果没有找到,就意味着需要创建一个新的CMultiplexer,并将新创建的CMultiplexer与CUDT相关联
   CMultiplexer m;
   m.m_iMSS = s->m_pUDT->m_iMSS;
   m.m_iIPversion = s->m_pUDT->m_iIPversion;
   m.m_iRefCount = 1;
   m.m_bReusable = s->m_pUDT->m_bReuseAddr;
   m.m_iID = s->m_SocketID;

   m.m_pChannel = new CChannel(s->m_pUDT->m_iIPversion);    //由CMultiplexer管理CChannel
   m.m_pChannel->setSndBufSize(s->m_pUDT->m_iUDPSndBufSize);
   m.m_pChannel->setRcvBufSize(s->m_pUDT->m_iUDPRcvBufSize);

   try
   {
      if (NULL != udpsock)
         m.m_pChannel->open(*udpsock);    //打开这个UDP SOCKET,如果提供Port,就与相关的Port关联,如果不提供,就随机选择Port
      else
         m.m_pChannel->open(addr);
   }
   catch (CUDTException& e)
   {
      m.m_pChannel->close();
      delete m.m_pChannel;
      throw e;
   }

   //全部都是初始化,将CUDT与CMultiplexer相关联,并将新创建的CMultiplexer添加到其中
   sockaddr* sa = (AF_INET == s->m_pUDT->m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
   m.m_pChannel->getSockAddr(sa);
   m.m_iPort = (AF_INET == s->m_pUDT->m_iIPversion) ? ntohs(((sockaddr_in*)sa)->sin_port) : ntohs(((sockaddr_in6*)sa)->sin6_port);
   if (AF_INET == s->m_pUDT->m_iIPversion) delete (sockaddr_in*)sa; else delete (sockaddr_in6*)sa;

   m.m_pTimer = new CTimer;

   //Send Queue与Recv Queue共享Timer
   m.m_pSndQueue = new CSndQueue;
   m.m_pSndQueue->init(m.m_pChannel, m.m_pTimer);
   m.m_pRcvQueue = new CRcvQueue;
   m.m_pRcvQueue->init(32, s->m_pUDT->m_iPayloadSize, m.m_iIPversion, 1024, m.m_pChannel, m.m_pTimer);

   m_mMultiplexer[m.m_iID] = m;

   s->m_pUDT->m_pSndQueue = m.m_pSndQueue;
   s->m_pUDT->m_pRcvQueue = m.m_pRcvQueue;
   s->m_iMuxID = m.m_iID;
}

int bind2(UDTSOCKET u, UDPSOCKET udpsock)

bind2的关联顺序:int UDT::bind2(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDT::bind(UDTSOCKET u, UDPSOCKET udpsock) -> int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock) -> void CUDTUnited::updateMux(CUDTSocket* s, const sockaddr* addr, const UDPSOCKET* udpsock)

注意:bind的情况基本一样,唯一的不同时UDP端口的选择,如果没有提供UDP端口,会进行随机的选择。

  • Order 0:int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock):将UDT Socket与UDP Socket关联起来
int CUDTUnited::bind(UDTSOCKET u, UDPSOCKET udpsock)
{
   CUDTSocket* s = locate(u);    //获取CUDTSocket实例
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CGuard cg(s->m_ControlLock);

   // cannot bind a socket more than once
   if (INIT != s->m_Status)
      throw CUDTException(5, 0, 0);

   sockaddr_in name4;
   sockaddr_in6 name6;
   sockaddr* name;
   socklen_t namelen;

   if (AF_INET == s->m_iIPversion)
   {
      namelen = sizeof(sockaddr_in);
      name = (sockaddr*)&name4;
   }
   else
   {
      namelen = sizeof(sockaddr_in6);
      name = (sockaddr*)&name6;
   }

   if (-1 == ::getsockname(udpsock, name, &namelen))
      throw CUDTException(5, 3);

   s->m_pUDT->open();
   updateMux(s, name, &udpsock);
   s->m_Status = OPENED;

   // copy address information of local node
   s->m_pUDT->m_pSndQueue->m_pChannel->getSockAddr(s->m_pSelfAddr);

   return 0;
}
  • 没有提供UDP参数:void CChannel::open(const sockaddr* addr)
void CChannel::open(const sockaddr* addr)
{
   // 创建UDP Socket
   m_iSocket = ::socket(m_iIPversion, SOCK_DGRAM, 0);

   #ifdef WINDOWS
      if (INVALID_SOCKET == m_iSocket)
   #else
      if (m_iSocket < 0)
   #endif
      throw CUDTException(1, 0, NET_ERROR);

   //如果有提供地址,将提供的地址与创建的UDT关联。如果没有提供地址,调用函数从本地获取地址,然后与创建的Socket关联 
   if (NULL != addr)
   {
      socklen_t namelen = m_iSockAddrSize;

      //将addr与UDP Socket关联  
      if (0 != ::bind(m_iSocket, addr, namelen))
         throw CUDTException(1, 3, NET_ERROR);
   }
   else
   {
      //sendto or WSASendTo will also automatically bind the socket
      addrinfo hints;
      addrinfo* res;

      memset(&hints, 0, sizeof(struct addrinfo));

      hints.ai_flags = AI_PASSIVE;
      hints.ai_family = m_iIPversion;
      hints.ai_socktype = SOCK_DGRAM;

      if (0 != ::getaddrinfo(NULL, "0", &hints, &res))
         throw CUDTException(1, 3, NET_ERROR);

      if (0 != ::bind(m_iSocket, res->ai_addr, res->ai_addrlen))
         throw CUDTException(1, 3, NET_ERROR);

      ::freeaddrinfo(res);
   }

   setUDPSockOpt();    //设置UDP的接收/发送缓冲区,并将UDP设置为非阻塞
}
  • 提供UDP参数:void CChannel::open(UDPSOCKET udpsock)
void CChannel::open(UDPSOCKET udpsock)
{
   m_iSocket = udpsock;
   setUDPSockOpt();
}

int listen(UDTSOCKET u, int backlog)

listen的启动顺序:void UDT::listen(UDTSOCKET u,int backlog) -> int CUDT::listen(UDTSOCKET u, int backlog) -> int CUDTUnited::listen(const UDTSOCKET u, int backlog) -> void CUDT::listen()
  • Order 0:int CUDTUnited::listen(const UDTSOCKET u, int backlog):将创建的UDTSOCKET作为Listener Socket
int CUDTUnited::listen(const UDTSOCKET u, int backlog)
{
   CUDTSocket* s = locate(u);    //从全局的map中寻找CUDTSocket实例
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CGuard cg(s->m_ControlLock);

   // 如果当前的CUDTSocket已经处于Listening状态,直接返回
   if (LISTENING == s->m_Status)
      return 0;

   // 如果当前的CUDTSocket还没有打开,抛出异常
   if (OPENED != s->m_Status)
      throw CUDTException(5, 5, 0);

   // Listener不支持交会连接模式
   if (s->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);

   //如果队列长度小于backlog,就直接返回  
   if (backlog <= 0)
      throw CUDTException(5, 3, 0);

   s->m_uiBackLog = backlog;

   try
   {
      s->m_pQueuedSockets = new set<UDTSOCKET>;    //创建连接已完成,未accept()的set
      s->m_pAcceptSockets = new set<UDTSOCKET>;    //创建连接已完成,并且已经accept()的set
   }
   catch (...)
   {
      delete s->m_pQueuedSockets;
      delete s->m_pAcceptSockets;
      throw CUDTException(3, 2, 0);
   }

   s->m_pUDT->listen();    

   s->m_Status = LISTENING;    //调整状态为LISTENING

   return 0;
}
  • Order 1:void CUDT::listen():调整CUDT实例状态
void CUDT::listen()
{
   CGuard cg(m_ConnectionLock);

   //如果状态不正确,就抛出异常
   if (!m_bOpened)    
      throw CUDTException(5, 0, 0);

   if (m_bConnecting || m_bConnected)
      throw CUDTException(5, 2, 0);

   // 如果已经处于LISTENING状态,直接返回
   if (m_bListening)
      return;

   //将这个CUDT实例设置为Listener
   if (m_pRcvQueue->setListener(this) < 0)
      throw CUDTException(5, 11, 0);

   m_bListening = true;
}

UDTSOCKET accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen)

accept的处理顺序:UDTSOCKET UDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDT::accept(UDTSOCKET u, struct sockaddr* addr, int* addrlen) -> UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
  • Order 0:UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
UDTSOCKET CUDTUnited::accept(const UDTSOCKET listener, sockaddr* addr, int* addrlen)
{
   if ((NULL != addr) && (NULL == addrlen))
      throw CUDTException(5, 3, 0);

   CUDTSocket* ls = locate(listener);    //首先寻找Listener的CUDTSocket实例

   if (ls == NULL)
      throw CUDTException(5, 4, 0);

   // 如果Listener的状态不正确,直接返回
   if (LISTENING != ls->m_Status)
      throw CUDTException(5, 6, 0);

   // Listener不可能存在交会连接模式
   if (ls->m_pUDT->m_bRendezvous)
      throw CUDTException(5, 7, 0);

   UDTSOCKET u = CUDT::INVALID_SOCK;
   bool accepted = false;

   // !!only one conection can be set up each time!!
   #ifndef WINDOWS
      while (!accepted)
      {
         pthread_mutex_lock(&(ls->m_AcceptLock));
        
         //再次判断连接状态,如果不正确,退出循环   
         if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
         {
            // This socket has been closed.
            accepted = true;
         }
         else if (ls->m_pQueuedSockets->size() > 0)    //如果此时有存在的连接,等到accept()进行处理。取出连接并将其加入已连接队列,退出循环
         {
            u = *(ls->m_pQueuedSockets->begin());    
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());
            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
         {
            accepted = true;
         }
            
         //如果状态正确,但是没有等到连接,进入等待状态,等待事件发生时被唤醒
         if (!accepted && (LISTENING == ls->m_Status))
            pthread_cond_wait(&(ls->m_AcceptCond), &(ls->m_AcceptLock));
        
         //如果等待accept()队列为空,取消关注这个Listener的可读事件   
         if (ls->m_pQueuedSockets->empty())
            m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);

         pthread_mutex_unlock(&(ls->m_AcceptLock));
      }
   #else
      while (!accepted)
      {
         WaitForSingleObject(ls->m_AcceptLock, INFINITE);

         if (ls->m_pQueuedSockets->size() > 0)
         {
            u = *(ls->m_pQueuedSockets->begin());
            ls->m_pAcceptSockets->insert(ls->m_pAcceptSockets->end(), u);
            ls->m_pQueuedSockets->erase(ls->m_pQueuedSockets->begin());

            accepted = true;
         }
         else if (!ls->m_pUDT->m_bSynRecving)
            accepted = true;

         ReleaseMutex(ls->m_AcceptLock);

         if  (!accepted & (LISTENING == ls->m_Status))
            WaitForSingleObject(ls->m_AcceptCond, INFINITE);

         if ((LISTENING != ls->m_Status) || ls->m_pUDT->m_bBroken)
         {
            // Send signal to other threads that are waiting to accept.
            SetEvent(ls->m_AcceptCond);
            accepted = true;
         }

         if (ls->m_pQueuedSockets->empty())
            m_EPoll.update_events(listener, ls->m_pUDT->m_sPollID, UDT_EPOLL_IN, false);
      }
   #endif

   //处理收到了一个无效的SOCKET的情况 
   if (u == CUDT::INVALID_SOCK)
   {
      // non-blocking receiving, no connection available
      if (!ls->m_pUDT->m_bSynRecving)
         throw CUDTException(6, 2, 0);

      // listening socket is closed
      throw CUDTException(5, 6, 0);
   }

   //否则的话,本次获得了一个有效的UDT SOCKET,然后填充用户提供的参数 
   if ((addr != NULL) && (addrlen != NULL))
   {
      if (AF_INET == locate(u)->m_iIPversion)
         *addrlen = sizeof(sockaddr_in);
      else
         *addrlen = sizeof(sockaddr_in6);

      // copy address information of peer node
      memcpy(addr, locate(u)->m_pPeerAddr, *addrlen);
   }

   return u;
}

int connect(UDTSOCKET u, const struct sockaddr* name, int namelen)

connect的处理顺序:int UDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDT::connect(UDTSOCKET u, const struct sockaddr* name, int namelen) -> int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen) -> void CUDT::connect(const sockaddr* serv_addr)
  • Order 0:int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
int CUDTUnited::connect(const UDTSOCKET u, const sockaddr* name, int namelen)
{
   CUDTSocket* s = locate(u);    //获取CUDTSocket实例
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CGuard cg(s->m_ControlLock);

   //检查CUDTSocket*实例状态
   if (AF_INET == s->m_iIPversion)
   {
      if (namelen != sizeof(sockaddr_in))
         throw CUDTException(5, 3, 0);
   }
   else
   {
      if (namelen != sizeof(sockaddr_in6))
         throw CUDTException(5, 3, 0);
   }

   //一个UDTSocket实例只有在INIT的状态下才能进行Connect操作
   if (INIT == s->m_Status)
   {
      if (!s->m_pUDT->m_bRendezvous)    //假设不执行交汇连接操作
      {
         s->m_pUDT->open();    //首先打开,其次将这个UDT SOCKET关联到UDP资源复用器上,具体的代码分析见上述
         updateMux(s);
         s->m_Status = OPENED;    //在成功之后,将这个UDT SOCKET的状态设置为OPENED
      }
      else
         throw CUDTException(5, 8, 0);
   }
   else if (OPENED != s->m_Status)
      throw CUDTException(5, 2, 0);
    
   //然后将UDT SOCKET实例的状态更新为:CONNECTING
   s->m_Status = CONNECTING;
   try
   {
      s->m_pUDT->connect(name);    //调用CUDT中的中的实例,正式进行连接
   }
   catch (CUDTException e)
   {
      s->m_Status = OPENED;
      throw e;
   }
    
   //重新记录对方的地址 
   delete s->m_pPeerAddr;
   if (AF_INET == s->m_iIPversion)
   {
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in);
      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in));
   }
   else
   {
      s->m_pPeerAddr = (sockaddr*)(new sockaddr_in6);
      memcpy(s->m_pPeerAddr, name, sizeof(sockaddr_in6));
   }

   return 0;
}
  • Order 1:void CUDT::connect(const sockaddr* serv_addr):调整CUDT实例的资源
void CUDT::connect(const sockaddr* serv_addr)
{
   CGuard cg(m_ConnectionLock);

   if (!m_bOpened)    //如果目前的状态不正确,直接返回
      throw CUDTException(5, 0, 0);

   if (m_bListening)    //Listener不能调用connect
      throw CUDTException(5, 2, 0);

   if (m_bConnecting || m_bConnected)    //只调整了CUDT SOCKET的Status为CONNECTING,还没有调整CUDT实例的状态
      throw CUDTException(5, 2, 0);

   // 记录对方地址
   delete m_pPeerAddr;
   m_pPeerAddr = (AF_INET == m_iIPversion) ? (sockaddr*)new sockaddr_in : (sockaddr*)new sockaddr_in6;
   memcpy(m_pPeerAddr, serv_addr, (AF_INET == m_iIPversion) ? sizeof(sockaddr_in) : sizeof(sockaddr_in6));

   //这个作用是在交会连接队列中等待HandleShake Packet,等待事件是1S。在我们的实现中可以不考虑
   uint64_t ttl = 3000000;
   if (m_bRendezvous)
      ttl *= 10;
   ttl += CTimer::getTime();
   m_pRcvQueue->registerConnector(m_SocketID, this, m_iIPversion, serv_addr, ttl);

   //因为是主动发起连接,填充想要发送的握手包
   m_ConnReq.m_iVersion = m_iVersion;    
   m_ConnReq.m_iType = m_iSockType;
   m_ConnReq.m_iMSS = m_iMSS;
   m_ConnReq.m_iFlightFlagSize = (m_iRcvBufSize < m_iFlightFlagSize)? m_iRcvBufSize : m_iFlightFlagSize;
   m_ConnReq.m_iReqType = (!m_bRendezvous) ? 1 : 0;
   m_ConnReq.m_iID = m_SocketID;
   CIPAddress::ntop(serv_addr, m_ConnReq.m_piPeerIP, m_iIPversion);

   //在发送握手包的时候随机化一个ISN
   srand((unsigned int)CTimer::getTime());
   m_iISN = m_ConnReq.m_iISN = (int32_t)(CSeqNo::m_iMaxSeqNo * (double(rand()) / RAND_MAX));

   //根据这个ISN,初始化CUDT实例中的信息
   m_iLastDecSeq = m_iISN - 1;    //最后一次发送的序列号
   m_iSndLastAck = m_iISN;    //上一次收到的ACK
   m_iSndLastDataAck = m_iISN;        //最后一次用于更新发送缓冲区的ACK
   m_iSndCurrSeqNo = m_iISN - 1;    //已发送的最大的ACK
   m_iSndLastAck2 = m_iISN;    //最后送回的ACK2
   m_ullSndLastAck2Time = CTimer::getTime();    //最后送回的ACK2的时间

   //打包一个握手包,ID:0是握手包的标志
   CPacket request;
   char* reqdata = new char [m_iPayloadSize];
   request.pack(0, NULL, reqdata, m_iPayloadSize);
   request.m_iID = 0;

   int hs_size = m_iPayloadSize;
   m_ConnReq.serialize(reqdata, hs_size);    //在向已经打包的Packet中添加握手的数据
   request.setLength(hs_size);    //更新握手包的长度
   m_pSndQueue->sendto(serv_addr, request);    //调用发送队列,发送握手包
   m_llLastReqTime = CTimer::getTime();    //修改最后发送数据的时间

   m_bConnecting = true;    //此时还没有收到回应的数据包,所以处于连接建立过程

   // 如果是异步连接,直接返回,在收到回应包的时候对状态进行调整
   if (!m_bSynRecving)
   {
      delete [] reqdata;
      return;
   }

   // 同步连接,等待来自对方的回应,直到连接完成才能返回。不过在这块将回应包进行打包是什么意思??
   CPacket response;
   char* resdata = new char [m_iPayloadSize];
   response.pack(0, NULL, resdata, m_iPayloadSize);

   CUDTException e(0, 0);

   //在这个循环中等待对方的回应
   while (!m_bClosing)    
   {
      //如果距离上一次发送请求的时间已经过去了250ms,再次发送请求 
      if (CTimer::getTime() - m_llLastReqTime > 250000)
      {
         m_ConnReq.serialize(reqdata, hs_size);
         request.setLength(hs_size);
         if (m_bRendezvous)
            request.m_iID = m_ConnRes.m_iID;
         m_pSndQueue->sendto(serv_addr, request);
         m_llLastReqTime = CTimer::getTime();
      }

      response.setLength(m_iPayloadSize);
      if (m_pRcvQueue->recvfrom(m_SocketID, response) > 0)
      {
         if (connect(response) <= 0)    //用于三次握手的第二次,处理收到的响应SYN的数据包
            break;    //返回0是处理成功,<0是处理错误,>0是包丢失需要重新处理

         //新的请求或者回应在收到回复后立即发出 
         m_llLastReqTime = 0;
      }

      if (CTimer::getTime() > ttl)    //如果处理这个Connection花费了太多的时间,抛出异常
      {
         e = CUDTException(1, 1, 0);
         break;
      }
   }

   delete [] reqdata;
   delete [] resdata;

   //根据不同的情况处理异常 
   if (e.getErrorCode() == 0)
   {
      if (m_bClosing)                                                 // if the socket is closed before connection...
         e = CUDTException(1);
      else if (1002 == m_ConnRes.m_iReqType)                          // connection request rejected
         e = CUDTException(1, 2, 0);
      else if ((!m_bRendezvous) && (m_iISN != m_ConnRes.m_iISN))      // secuity check
         e = CUDTException(1, 4, 0);
   }

   if (e.getErrorCode() != 0)
      throw e;
}
  • Order 2:int CUDT::connect(const CPacket& response):发送三次握手的第二次Packet,相应收到的SYN。但是,没见发送ACK2 Packet啊...不过肯定会有,之后在处理Packet的模块进行详解..
int CUDT::connect(const CPacket& response)
{
   // 如果处理成功,就返回0;失败返回-1;返回1 OR 2意味着连接正在进行,但是出现了丢包,需要更多的握手包

   if (!m_bConnecting)    //如果此时的状态没有处于Connecting状态,直接返回错误
      return -1;

   if (m_bRendezvous && ((0 == response.getFlag()) || (1 == response.getType())) && (0 != m_ConnRes.m_iType))
   {
      // 数据包或者保活包的到来,意味着连接已经完成,在这种情况中,意味着连接已经完成
      goto POST_CONNECT;
   }
    
   //如果收到的回应包的类型不正确,直接返回错误 
   if ((1 != response.getFlag()) || (0 != response.getType()))
      return -1;

   m_ConnRes.deserialize(response.m_pcData, response.getLength());    //从回应包中将回应的数据填充到本地

   //判断连接模式,反正正常的连接不会在这个步骤进行额外的操作 
   if (m_bRendezvous)
   {
      // rendezvous connect require 3-way handshake
      if (1 == m_ConnRes.m_iReqType)
         return -1;

      if ((0 == m_ConnReq.m_iReqType) || (0 == m_ConnRes.m_iReqType))
      {
         m_ConnReq.m_iReqType = -1;
         // the request time must be updated so that the next handshake can be sent out immediately.
         m_llLastReqTime = 0;
         return 1;
      }
   }
   else
   {
      // set cookie
      if (1 == m_ConnRes.m_iReqType)    //如果这是一个Keep-Alive Packet,意味着包已经丢失,需要重新发送请求包
      {
         m_ConnReq.m_iReqType = -1;
         m_ConnReq.m_iCookie = m_ConnRes.m_iCookie;
         m_llLastReqTime = 0;
         return 1;
      }
   }

POST_CONNECT:
   //从交会连接队列中移除这个UDT SOCKET
   m_pRcvQueue->removeConnector(m_SocketID);

   // 根据协商的值重新填充数据,因为收到了一个SYN,就意味着需要调整ISN
   m_iMSS = m_ConnRes.m_iMSS;
   m_iFlowWindowSize = m_ConnRes.m_iFlightFlagSize;
   m_iPktSize = m_iMSS - 28;
   m_iPayloadSize = m_iPktSize - CPacket::m_iPktHdrSize;
   m_iPeerISN = m_ConnRes.m_iISN;
   m_iRcvLastAck = m_ConnRes.m_iISN;
   m_iRcvLastAckAck = m_ConnRes.m_iISN;
   m_iRcvCurrSeqNo = m_ConnRes.m_iISN - 1;
   m_PeerID = m_ConnRes.m_iID;
   memcpy(m_piSelfIP, m_ConnRes.m_piPeerIP, 16);

   //连接将要完成,创建所有需要的数据结构
   try
   {
      m_pSndBuffer = new CSndBuffer(32, m_iPayloadSize);
      m_pRcvBuffer = new CRcvBuffer(&(m_pRcvQueue->m_UnitQueue), m_iRcvBufSize);
      m_pSndLossList = new CSndLossList(m_iFlowWindowSize * 2);
      m_pRcvLossList = new CRcvLossList(m_iFlightFlagSize);
      m_pACKWindow = new CACKWindow(1024);
      m_pRcvTimeWindow = new CPktTimeWindow(16, 64);
      m_pSndTimeWindow = new CPktTimeWindow();
   }
   catch (...)
   {
      throw CUDTException(3, 2, 0);
   }
    
   //在Cache中记录这个连接的信息 
   CInfoBlock ib;
   ib.m_iIPversion = m_iIPversion;
   CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
   if (m_pCache->lookup(&ib) >= 0)
   {
      m_iRTT = ib.m_iRTT;
      m_iBandwidth = ib.m_iBandwidth;
   }

   //针对这个连接提供拥塞控制算法,以后详谈
   m_pCC = m_pCCFactory->create();
   m_pCC->m_UDT = m_SocketID;
   m_pCC->setMSS(m_iMSS);
   m_pCC->setMaxCWndSize(m_iFlowWindowSize);
   m_pCC->setSndCurrSeqNo(m_iSndCurrSeqNo);
   m_pCC->setRcvRate(m_iDeliveryRate);
   m_pCC->setRTT(m_iRTT);
   m_pCC->setBandwidth(m_iBandwidth);
   m_pCC->init();

   //根据拥塞控制类型填充拥塞窗口大小
   m_ullInterval = (uint64_t)(m_pCC->m_dPktSndPeriod * m_ullCPUFrequency);
   m_dCongestionWindow = m_pCC->m_dCWndSize;

   // 此时正式进入连接状态
   m_bConnecting = false;    
   m_bConnected = true;

   //将这个UDT SOCKET置于接收队列上面,之后就可以从连接队列上接收数据了
   m_pRNode->m_bOnList = true;
   m_pRcvQueue->setNewEntry(this);

   //更新与这个SOCKET ID相关的事件
   s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, true);

   // 调整UDT SOCKET实例的状态为:Connected
   s_UDTUnited.connect_complete(m_SocketID);

   return 0;
}

int flush(UDTSOCKET u)

flush的处理流程:int UDT::flush(UDTSOCKET u) -> int CUDT::flush(UDTSOCKET u) -> int CUDTUnited::flush(const UDTSOCKET u) -> void CUDT::flush()
  • Order 0:int CUDTUnited::flush(const UDTSOCKET u):交由GC线程回收发送缓存区与接收缓冲区的数据,因为设置了Linger,所以需要判断是否需要一定的延迟
int CUDTUnited::flush(const UDTSOCKET u)
{
   CUDTSocket* s = locate(u);
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   s->m_pUDT->flush();

   return 0;
}
  • Order 1:void CUDT::flush()
void CUDT::flush()
{
   uint64_t entertime = CTimer::getTime();

   while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
   {
      // linger has been checked by previous close() call and has expired
      if (m_ullLingerExpiration >= entertime)
         break;

      if (!m_bSynSending)
      {
         // if this socket enables asynchronous sending, return immediately and let GC to close it later
         if (0 == m_ullLingerExpiration)
            m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;

         return;
      }

      #ifndef WINDOWS
         timespec ts;
         ts.tv_sec = 0;
         ts.tv_nsec = 1000000;
         nanosleep(&ts, NULL);
      #else
         Sleep(1);
      #endif
   }
}

int close(UDTSOCKET u)

close的处理流程: int UDT::close(UDTSOCKET u) -> int CUDT::close(UDTSOCKET u) -> int CUDTUnited::close(const UDTSOCKET u) -> void CUDT::close()
  • Order 0:int CUDTUnited::close(const UDTSOCKET u)
int CUDTUnited::close(const UDTSOCKET u)
{
   CUDTSocket* s = locate(u);    //获取CUDTSocket实例
   if (NULL == s)
      throw CUDTException(5, 4, 0);

   CGuard socket_cg(s->m_ControlLock);

   if (s->m_Status == LISTENING)    //如果是Listener
   {
      if (s->m_pUDT->m_bBroken)    //如果目前CUDT实例的状态已经损害,直接退出,等待GC线程回收资源
         return 0;

      s->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET关闭时间
      s->m_pUDT->m_bBroken = true;    //然后还是调整CUDT的状态为损坏

      // broadcast all "accept" waiting
      #ifndef WINDOWS
         pthread_mutex_lock(&(s->m_AcceptLock));
         pthread_cond_broadcast(&(s->m_AcceptCond));    //唤醒所有等待accept()的线程
         pthread_mutex_unlock(&(s->m_AcceptLock));
      #else
         SetEvent(s->m_AcceptCond);
      #endif

      return 0;    //调整Listener之后就可以直接退出了
   }

   s->m_pUDT->close();    //调用连接CUDT中的close()调整CUDT的状态

   // synchronize with garbage collection.
   CGuard manager_cg(m_ControlLock);

   // since "s" is located before m_ControlLock, locate it again in case it became invalid
   map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.find(u);    //在全局的map中寻找UDTSocket实例
   if ((i == m_Sockets.end()) || (i->second->m_Status == CLOSED))    //如果没有找到或者状态为已经关闭,也可以直接退出
      return 0;
   s = i->second;    //获得CUDTSocket实例

   s->m_Status = CLOSED;    //调整CUDTSocket的状态为CLOSED

   // UDT SOCKET在关闭的时候不会立刻被移除,以防止其他的回调函数访问无效地址,定时器启动,资源会在1S之后被删除
   s->m_TimeStamp = CTimer::getTime();    //调整UDTSocket关闭的事件

   m_Sockets.erase(s->m_SocketID);    //从全局的map中删除
   m_ClosedSockets.insert(pair<UDTSOCKET, CUDTSocket*>(s->m_SocketID, s));    //将这个UDTSocket加入Closed队列中等待删除

   CTimer::triggerEvent();

   return 0;
}
  • Order 1:void CUDT::close():调整CUDT实例的状态
void CUDT::close()
{
   if (!m_bOpened)    //如果CUDT已经关闭,直接返回
      return;

   if (0 != m_Linger.l_onoff)    //如果设置了稍后关闭,那么在稍后的时候再进行处理
   {
      flush();
   }

   //从接收数据的队列中移除数据
   if (m_bConnected)
      m_pSndQueue->m_pSndUList->remove(this);

   // 清理与这个UDTSocket实例相关的事件
   s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
   s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
   s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);

   // 将这个UDT SOCKET ID从所有注册的Epoll中移除
   try
   {
      for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
         s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
   }
   catch (...)
   {
   }

   if (!m_bOpened)    //再次检查状态
      return;

   // 目前正处于关闭过程中
   m_bClosing = true;

   CGuard cg(m_ConnectionLock);

   // 如果发送者和接受者还在等待数据,发送信号通知他们退出,pthread_cond_signal
   releaseSynch();

   if (m_bListening)    //如果是Listener
   {
      m_bListening = false;    //调整Listener的状态
      m_pRcvQueue->removeListener(this);    //从接收队列中移除这个Listener,不再负责接收的事务处理
   }
   else if (m_bConnecting)    //如果正处于CONNETING
   {
      m_pRcvQueue->removeConnector(m_SocketID);    //从交汇连接队列中移除这个UDT SOCKET
   }

   if (m_bConnected)    //如果连接已经完成
   {
      if (!m_bShutdown)    //如果对方还没有发送ShutDown
         sendCtrl(5);    //name我们就向对方发送ShutDown

      m_pCC->close();    //关闭拥塞控制

      // 在Cache中存储并更新这个连接的信息
      CInfoBlock ib;
      ib.m_iIPversion = m_iIPversion;
      CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
      ib.m_iRTT = m_iRTT;
      ib.m_iBandwidth = m_iBandwidth;
      m_pCache->update(&ib);

      m_bConnected = false;    //调整状态为关闭
   }

   // 等待Send和Recv的停止
   CGuard sendguard(m_SendLock);    //如果在发送数据或者接收数据时,Buffer不够用的时候,一般会使用pthread_cond_wait睡眠一会
   CGuard recvguard(m_RecvLock);

   // CLOSED.
   m_bOpened = false;    //这个UDT SOCKET已经关闭
}

int sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false)

send的处理流程:int UDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(UDTSOCKET u, const char* buf, int len, int ttl = -1, bool inorder = false) -> int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
  • Order 0:int CUDT::send(const char* data, int len):处理发送数据报的情况,仅适用于UDP
int CUDT::sendmsg(const char* data, int len, int msttl, bool inorder)
{
   if (UDT_STREAM == m_iSockType)    //如果不是UDP类型直接返回
      throw CUDTException(5, 9, 0);

   // 如果当前的状态不正确,要记得退出
   if (m_bBroken || m_bClosing)
      throw CUDTException(2, 1, 0);
   else if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   if (len <= 0)    //需要发送的数据长度不正确
      return 0;

   if (len > m_iSndBufSize * m_iPayloadSize)    //如果需要发送的数据  > 发送缓冲×有效载荷,记得退出
      throw CUDTException(5, 12, 0);

   CGuard sendguard(m_SendLock);

   if (m_pSndBuffer->getCurrBufSize() == 0)    //如果当前的缓冲区中没有空间,稍微延迟一会
   {
      uint64_t currtime;
      CTimer::rdtsc(currtime);
      m_ullLastRspTime = currtime;    //记录延迟计时器,避免延迟严重
   }

   //如果需要发送的数据 > 已经使用的空间,也就是说空间还是不够用  
   if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)
   {
      if (!m_bSynSending)    //如果没有设置异步发送标志,要抛出异常
         throw CUDTException(6, 1, 0);
      else
      {
         // wait here during a blocking sending
         #ifndef WINDOWS
            pthread_mutex_lock(&m_SendBlockLock);    //拿到睡眠阻塞锁
            if (m_iSndTimeOut < 0)    //如果此时还没有到超时时间
            {    //再次经过判断空间还是不够用,那就得睡眠一会喽
               while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))    
                  pthread_cond_wait(&m_SendBlockCond, &m_SendBlockLock);
            }
            else    //如果已经超时
            {
               uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;    再给一次机会睡眠一会
               timespec locktime;

               locktime.tv_sec = exptime / 1000000;
               locktime.tv_nsec = (exptime % 1000000) * 1000;

               while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                  pthread_cond_timedwait(&m_SendBlockCond, &m_SendBlockLock, &locktime);
            }
            pthread_mutex_unlock(&m_SendBlockLock);
         #else
            if (m_iSndTimeOut < 0)
            {
               while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len))
                  WaitForSingleObject(m_SendBlockCond, INFINITE);
            }
            else
            {
               uint64_t exptime = CTimer::getTime() + m_iSndTimeOut * 1000ULL;

               while (!m_bBroken && m_bConnected && !m_bClosing && ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len) && (CTimer::getTime() < exptime))
                  WaitForSingleObject(m_SendBlockCond, DWORD((exptime - CTimer::getTime()) / 1000));
            }
         #endif

         // check the connection status
         if (m_bBroken || m_bClosing)
            throw CUDTException(2, 1, 0);
         else if (!m_bConnected)
            throw CUDTException(2, 2, 0);
      }
   }

   if ((m_iSndBufSize - m_pSndBuffer->getCurrBufSize()) * m_iPayloadSize < len)    //如果Buffer还是不够用,并且已经超时,要抛出异常
   {
      if (m_iSndTimeOut >= 0)
         throw CUDTException(6, 3, 0);

      return 0;    //空间不够用但是没有超时,返回0,告诉用户数据没有发送
   }

   // 记录这一次发送数据的时间
   if (0 == m_pSndBuffer->getCurrBufSize())    //此时已经有了可以发送数据的空间
      m_llSndDurationCounter = CTimer::getTime();

   // 将用户提供的数据添加到UDT SOCKET的空间中
   m_pSndBuffer->addBuffer(data, len, msttl, inorder);

   // 如果这个UDT SOCKET的发送链表中,将这个UDT添加到UDP的发送链表中
   m_pSndQueue->m_pSndUList->update(this, false);

   //如果发送缓冲区中没有足够可用的数据,取消可以写入事件标志 
   if (m_iSndBufSize <= m_pSndBuffer->getCurrBufSize())
   {
      s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
   }

   return len;
}

int recvmsg(UDTSOCKET u, char* buf, int len)

recvmsg的处理流程:int UDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(UDTSOCKET u, char* buf, int len) -> int CUDT::recvmsg(char* data, int len)
  • Order 1:int CUDT::recvmsg(char* data, int len):处理UDP数据报的接收请求
int CUDT::recvmsg(char* data, int len)
{
   if (UDT_STREAM == m_iSockType)    //如果是TCP类型的数据报,直接退出
      throw CUDTException(5, 9, 0);

   //如果状态不正确,直接退出
   if (!m_bConnected)
      throw CUDTException(2, 2, 0);

   //如果想要获取len<=0,直接返回
   if (len <= 0)
      return 0;

   CGuard recvguard(m_RecvLock);

   if (m_bBroken || m_bClosing)    //如果目前处于关闭/损坏的状态
   {
      int res = m_pRcvBuffer->readMsg(data, len);    //依旧尝试从Buffer中读取数据

      if (m_pRcvBuffer->getRcvMsgNum() <= 0)    //如果UDT Queue中已经没有数据了,取消此UDT的可读取事件
      {
         // read is not available any more
         s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
      }

      if (0 == res)    //如果读取失败,抛出异常,否则返回读取的字节数
         throw CUDTException(2, 1, 0);
      else
         return res;
   }

   if (!m_bSynRecving)    //如果没有设置异步读取机制,没有读取到数据时要抛出异常,读到了就直接返回
   {
      int res = m_pRcvBuffer->readMsg(data, len);
      if (0 == res)
         throw CUDTException(6, 2, 0);
      else
         return res;
   }

   int res = 0;    //异步读取的话可以延迟一小会
   bool timeout = false;

   do
   {
      #ifndef WINDOWS
         pthread_mutex_lock(&m_RecvDataLock);

         if (m_iRcvTimeOut < 0)    //还没有到超时时间,那么小憩一会
         {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
               pthread_cond_wait(&m_RecvDataCond, &m_RecvDataLock);
         }
         else    //已经到了超时时间,但是再给你一次机会,等待一会
         {
            uint64_t exptime = CTimer::getTime() + m_iRcvTimeOut * 1000ULL;
            timespec locktime;

            locktime.tv_sec = exptime / 1000000;
            locktime.tv_nsec = (exptime % 1000000) * 1000;

            if (pthread_cond_timedwait(&m_RecvDataCond, &m_RecvDataLock, &locktime) == ETIMEDOUT)
               timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);    //等待一会之后,再次尝试读取数据
         }
         pthread_mutex_unlock(&m_RecvDataLock);
      #else
         if (m_iRcvTimeOut < 0)
         {
            while (!m_bBroken && m_bConnected && !m_bClosing && (0 == (res = m_pRcvBuffer->readMsg(data, len))))
               WaitForSingleObject(m_RecvDataCond, INFINITE);
         }
         else
         {
            if (WaitForSingleObject(m_RecvDataCond, DWORD(m_iRcvTimeOut)) == WAIT_TIMEOUT)
               timeout = true;

            res = m_pRcvBuffer->readMsg(data, len);
         }
      #endif

      if (m_bBroken || m_bClosing)    //如果状态不正确,抛出异常
         throw CUDTException(2, 1, 0);
      else if (!m_bConnected)
         throw CUDTException(2, 2, 0);
   } while ((0 == res) && !timeout);    //今天读不到数据,我就赖着不走了

   //如果接收缓冲区中确实没有数据可读,那么必须要取消关注的可读事件 
   if (m_pRcvBuffer->getRcvMsgNum() <= 0)
   {
      // read is not available any more
      s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
   }

   if ((res <= 0) && (m_iRcvTimeOut >= 0))    //如果没有读到数据,还超时了,抛出异常
      throw CUDTException(6, 3, 0);

   return res;
}
原文地址:https://www.cnblogs.com/ukernel/p/9191048.html