UDT源码剖析(十一)之SendQueue And RecvQueue

SendQueue与RecvQueue在代码中与UDP SOCKET相关。在用户将想要发送的数据提交给Buffer之后,由Buffer将数据打包,根据拥塞控制提供的时间计算,在合适的时间提交给SendQueue进行发送。在接收到数据包之后,通过事件驱动的模式通知用户从RecvQueue中拿去数据包。
删除了交会连接模式的代码。
SendQueue与RecvQueue依赖于几个通用的数据结构,先列出来哈,可能会与前面的描述有重复,怕来回查找麻烦,索性全部列出来:

CMultiplexer:每个UDP端口对应一个此对象,资源的实际持有者

struct CMultiplexer
{
   CSndQueue* m_pSndQueue;	// The sending queue
   CRcvQueue* m_pRcvQueue;	// The receiving queue
   CChannel* m_pChannel;	// The UDP channel for sending and receiving
   CTimer* m_pTimer;		// The timer

   int m_iPort;			// The UDP port number of this multiplexer
   int m_iIPversion;		// IP version
   int m_iMSS;			// Maximum Segment Size
   int m_iRefCount;		//与此资源复用器相关联的UDT实例的数量
   bool m_bReusable;		//这个资源复用器是否可以被共享

   int m_iID;			// multiplexer ID
};

CUnitQueue

struct CUnit
{
   CPacket m_Packet;		// packet
   int m_iFlag;			        // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
};
class CUnitQueue
{
private:
   struct CQEntry
   {
      CUnit* m_pUnit;		// unit queue
      char* m_pBuffer;		// data buffer
      int m_iSize;		// size of each queue

      CQEntry* m_pNext;
   }
   *m_pQEntry,			// 指向起始的Entry队列
   *m_pCurrQueue,		// 指向当前的Entry队列
   *m_pLastQueue;		// 指向最后一个Entry队列

   CUnit* m_pAvailUnit;         //最近访问的Unit* 
   int m_iSize;			// 总共的Packets数量
   int m_iCount;		// 已经使用的Packets数量
   int m_iMSS;			// unit buffer size
   int m_iIPversion;		// IP version
};
  • 初始化:int CUnitQueue::init(int size, int mss, int version)
CUnitQueue::CUnitQueue():
m_pQEntry(NULL),
m_pCurrQueue(NULL),
m_pLastQueue(NULL),
m_iSize(0),
m_iCount(0),
m_iMSS(),
m_iIPversion()
{
}

int CUnitQueue::init(int size, int mss, int version)
{
   CQEntry* tempq = NULL;
   CUnit* tempu = NULL;
   char* tempb = NULL;

   try
   {
      tempq = new CQEntry;  //初始化需要管理的内存
      tempu = new CUnit [size];
      tempb = new char [size * mss];
   }
   catch (...)
   {
      delete tempq;     //出现异常后退出
      delete [] tempu;
      delete [] tempb;

      return -1;
   }

   for (int i = 0; i < size; ++ i)
   {
      tempu[i].m_iFlag = 0; //0表示当前CUint是空闲的
      tempu[i].m_Packet.m_pcData = tempb + i * mss; //初始化packet中的数据字段,只有使用权限
   }
   tempq->m_pUnit = tempu;  //初始化队列
   tempq->m_pBuffer = tempb;    //初始化队列
   tempq->m_iSize = size;   //初始化大小

   m_pQEntry = m_pCurrQueue = m_pLastQueue = tempq; //初始化整体的队列
   m_pQEntry->m_pNext = m_pQEntry;  //初始化下一个指向自己

   m_pAvailUnit = m_pCurrQueue->m_pUnit;    //指向首个可用的CUnit

   m_iSize = size;  //整体的packet个数,CUint大小为0
   m_iMSS = mss;    
   m_iIPversion = version;

   return 0;
}
  • 销毁:CUnitQueue::~CUnitQueue()
CUnitQueue::~CUnitQueue()
{
   CQEntry* p = m_pQEntry;  //有size条队列,获取首条队列

   while (p != NULL)    //释放队列等的信息
   {
      delete [] p->m_pUnit;
      delete [] p->m_pBuffer;    //这个Buffer是一个CQEntry中的首部,指向这个CQEntry中所使用Buffer的起始位置,在分配的时候是一整块分配的,删除的时候也是一整块的删除

      CQEntry* q = p;
      if (p == m_pLastQueue)
         p = NULL;
      else
         p = p->m_pNext;
      delete q;
   }
}
  • 增加队列长度:int CUnitQueue::increase()
int CUnitQueue::increase()
{
   int real_count = 0;
   CQEntry* p = m_pQEntry;  //获取首条队列的指针
   while (p != NULL)
   {
      CUnit* u = p->m_pUnit;    //获取首条队列的首个CUnit
      for (CUnit* end = u + p->m_iSize; u != end; ++ u)
         if (u->m_iFlag != 0)   //如果队列中的Cunit状态不是free
            ++ real_count;  //真实的,使用中的CUnit数量++

      if (p == m_pLastQueue)
         p = NULL;
      else
         p = p->m_pNext;
   }
   m_iCount = real_count;   //更新使用状况
   if (double(m_iCount) / m_iSize < 0.9)    //已经使用的数量/总数量 < 0.9
      return -1; //就是说还有剩余的空间,不进行扩张

   CQEntry* tempq = NULL;   //创建一条新的队列,链接到末尾
   CUnit* tempu = NULL;
   char* tempb = NULL;

   // all queues have the same size
   int size = m_pQEntry->m_iSize;

   try
   {
      tempq = new CQEntry;
      tempu = new CUnit [size];
      tempb = new char [size * m_iMSS];
   }
   catch (...)
   {
      delete tempq;
      delete [] tempu;
      delete [] tempb;

      return -1;
   }

   for (int i = 0; i < size; ++ i)
   {
      tempu[i].m_iFlag = 0;
      tempu[i].m_Packet.m_pcData = tempb + i * m_iMSS;
   }
   tempq->m_pUnit = tempu;
   tempq->m_pBuffer = tempb;
   tempq->m_iSize = size;

   m_pLastQueue->m_pNext = tempq;   //链接到末尾
   m_pLastQueue = tempq;
   m_pLastQueue->m_pNext = m_pQEntry;   //这是一条环形链表

   m_iSize += size; //增加总共的数量

   return 0;
}
  • 获取下一个可用的CUnit:CUnit* CUnitQueue::getNextAvailUnit()
CUnit* CUnitQueue::getNextAvailUnit()
{
   if (m_iCount * 10 > m_iSize * 9) //如果已经使用的比例超过0.9,就增加数量
      increase();

   if (m_iCount >= m_iSize) //如果已经使用的数量超过总数量,就返回
      return NULL;

   CQEntry* entrance = m_pCurrQueue;    //获取当前队列

   do
   {
      for (CUnit* sentinel = m_pCurrQueue->m_pUnit + m_pCurrQueue->m_iSize - 1; m_pAvailUnit != sentinel; ++ m_pAvailUnit)
         if (m_pAvailUnit->m_iFlag == 0)    //在当前的队列中找到第一个free的CUnit
            return m_pAvailUnit;

      if (m_pCurrQueue->m_pUnit->m_iFlag == 0)  //如果当前队列的第一个CUnit为free
      {
         m_pAvailUnit = m_pCurrQueue->m_pUnit;  //更新avail CUnit并返回
         return m_pAvailUnit;
      }

      m_pCurrQueue = m_pCurrQueue->m_pNext; //availCUint一直存在于CurrentQuene
      m_pAvailUnit = m_pCurrQueue->m_pUnit;
   } while (m_pCurrQueue != entrance);

   increase();  //完了立马判断是否需要增加数量

   return NULL;
}

CSndUList

struct CSNode
{
   CUDT* m_pUDT;		        // 指向CUDT*的指针
   uint64_t m_llTimeStamp;     // 堆化时排序的时间戳

   int m_iHeapLoc;		        // 堆的层次,-1意味着暂时不存在与当前堆中
};
class CSndUList
{
private:
   CSNode** m_pHeap;			                // 堆化数组
   int m_iArrayLength;			                // 堆数组长度
   int m_iLastEntry;			                        // 最近一次发送的位置
   udt_pthread_mutex_t m_ListLock;               
   udt_pthread_mutex_t* m_pWindowLock;     
   udt_pthread_cond_t* m_pWindowCond;         
   CTimer* m_pTimer;
};
  • 初始化:CSndUList::CSndUList()
CSndUList::CSndUList():
m_pHeap(NULL),
m_iArrayLength(4096),    //将队列长度预设为4096
m_iLastEntry(-1),
m_ListLock(),
m_pWindowLock(NULL),
m_pWindowCond(NULL),
m_pTimer(NULL)
{
   m_pHeap = new CSNode*[m_iArrayLength];   //创建length条指针队列

   #ifndef WINDOWS
      pthread_mutex_init(&m_ListLock, NULL);
   #else
      m_ListLock = CreateMutex(NULL, false, NULL);
   #endif
}
  • 销毁:CSndUList::~CSndUList()
CSndUList::~CSndUList()
{
   delete [] m_pHeap;   //销毁指针队列

   #ifndef WINDOWS
      pthread_mutex_destroy(&m_ListLock);
   #else
      CloseHandle(m_ListLock);
   #endif
}
  • 向堆中添加CUDT实例:void CSndUList::insert(int64_t ts, const CUDT* u)
void CSndUList::insert(int64_t ts, const CUDT* u)
{
   CGuard listguard(m_ListLock);    //获取锁的guard

   // increase the heap array size if necessary
   if (m_iLastEntry == m_iArrayLength - 1)  //如果上一次使用的是最后一条,增加长度
   {
      CSNode** temp = NULL;

      try   //都是指针,调整起来,消耗也不是很大
      {         
         temp = new CSNode*[m_iArrayLength * 2];    //长度*2,然后将以往的都copy
      }
      catch(...)
      {
         return;
      }

      memcpy(temp, m_pHeap, sizeof(CSNode*) * m_iArrayLength);
      m_iArrayLength *= 2;
      delete [] m_pHeap;    //释放以前的操作
      m_pHeap = temp;
   }

   insert_(ts, u);  //真实的insert
}
  • 向堆中添加CUDT实例:void CSndUList::insert_(int64_t ts, const CUDT* u)
void CSndUList::insert_(int64_t ts, const CUDT* u)  //真实的insert
{
   CSNode* n = u->m_pSNode; //反向获取CSNode

   // do not insert repeated node
   //-1意味着,这个CSNode没有在堆中,可以插入.>=1意味着在堆中,直接返回
   if (n->m_iHeapLoc >= 0)
      return;

   m_iLastEntry ++; //修改即将插入的位置
   m_pHeap[m_iLastEntry] = n;   //将这个指针指向的CSNode插入堆中
   n->m_llTimeStamp = ts;   //修改时间为ts

   int q = m_iLastEntry;
   int p = q;
   while (p != 0)   //堆化的过程,调整新插入的位置,根据ts的大小调整位置
   {
      p = (q - 1) >> 1;
      if (m_pHeap[p]->m_llTimeStamp > m_pHeap[q]->m_llTimeStamp) //ts越大,越往上
      {
         CSNode* t = m_pHeap[p];
         m_pHeap[p] = m_pHeap[q];
         m_pHeap[q] = t;
         t->m_iHeapLoc = q;
         q = p;
      }
      else
         break;
   }

   n->m_iHeapLoc = q;   //这个变量还表示在当前堆中的层数吗? 反正大于0,就是存在于堆中了

   // an earlier event has been inserted, wake up sending worker
   if (n->m_iHeapLoc == 0)  //如果当前的CUDT*在最顶层,唤醒发送线程
      m_pTimer->interrupt();

   if (0 == m_iLastEntry)   //如果队列为空,唤醒发送队列??
   {
      #ifndef WINDOWS
         pthread_mutex_lock(m_pWindowLock);
         pthread_cond_signal(m_pWindowCond);
         pthread_mutex_unlock(m_pWindowLock);
      #else
         SetEvent(*m_pWindowCond);
      #endif
   }
}
  • 更新CUDT的发送时间戳:void CSndUList::update(const CUDT* u, bool reschedule)
void CSndUList::update(const CUDT* u, bool reschedule)  //更新CUDT*的发送时间戳
{
   CGuard listguard(m_ListLock);

   CSNode* n = u->m_pSNode; //反向获取指针

   if (n->m_iHeapLoc >= 0)  //>0说明存在于堆中,需要调整
   {
      if (!reschedule)  //如果调整参数为false,直接退出
         return;

      if (n->m_iHeapLoc == 0)   //如果在堆顶部,唤醒发送队列,那还重新调整吗?
      {
         n->m_llTimeStamp = 1;  
         m_pTimer->interrupt();
         return;
      }

      remove_(u);   //删除已经存在的CUDT*
   }

   insert_(1, u);   //重新插入CUDT*
}
  • 取得发送地址中下一个packet和addr,并将CUDT重新加入堆中:int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
int CSndUList::pop(sockaddr*& addr, CPacket& pkt)
{
   CGuard listguard(m_ListLock);

   if (-1 == m_iLastEntry)  //索引有问题,直接返回
      return -1;

   // no pop until the next schedulled time
   uint64_t ts;
   CTimer::rdtsc(ts);   //获取当前时间
   if (ts < m_pHeap[0]->m_llTimeStamp)  //如果发送的时间小于当前的时间(在规定的时间内没有发送出去)
      return -1;

   CUDT* u = m_pHeap[0]->m_pUDT;    //从堆顶部获取并删除
   remove_(u);  //只是将flag调整为-1,不进行直接的删除

   if (!u->m_bConnected || u->m_bBroken)    //和CUDT中的函数挂钩了,回头调整注释
      return -1;

   // pack a packet from the socket
   if (u->packData(pkt, ts) <= 0)
      return -1;

   addr = u->m_pPeerAddr;

   // insert a new entry, ts is the next processing time
   if (ts > 0)
      insert_(ts, u);   //将当前时间设置为下一次的发送时间

   return 1;
}
  • 获取下一次发哦少年宫的事件:uint64_t CSndUList::getNextProcTime()
uint64_t CSndUList::getNextProcTime()
{
   CGuard listguard(m_ListLock);

   if (-1 == m_iLastEntry)
      return 0;

   return m_pHeap[0]->m_llTimeStamp;    //获取下一次的处理时间
}
  • 将CUDT从堆中删除:void CSndUList::remove_(const CUDT* u)
void CSndUList::remove_(const CUDT* u)
{
   CSNode* n = u->m_pSNode;     //反向获取CUDT*在堆中的表现形式

   if (n->m_iHeapLoc >= 0)  //如果存在于堆中,进行删除,堆的删除操作
   {
      // remove the node from heap
      m_pHeap[n->m_iHeapLoc] = m_pHeap[m_iLastEntry];
      m_iLastEntry --;
      m_pHeap[n->m_iHeapLoc]->m_iHeapLoc = n->m_iHeapLoc;

      int q = n->m_iHeapLoc;
      int p = q * 2 + 1;
      while (p <= m_iLastEntry)
      {
         if ((p + 1 <= m_iLastEntry) && (m_pHeap[p]->m_llTimeStamp > m_pHeap[p + 1]->m_llTimeStamp))
            p ++;

         if (m_pHeap[q]->m_llTimeStamp > m_pHeap[p]->m_llTimeStamp)
         {
            CSNode* t = m_pHeap[p];
            m_pHeap[p] = m_pHeap[q];
            m_pHeap[p]->m_iHeapLoc = p;
            m_pHeap[q] = t;
            m_pHeap[q]->m_iHeapLoc = q;

            q = p;
            p = q * 2 + 1;
         }
         else
            break;
      }

      n->m_iHeapLoc = -1;   //此时,这个CSNode不存在于堆中,只调整指针位置,不进行实际的删除
   }

   // the only event has been deleted, wake up immediately
   if (0 == m_iLastEntry)   //如果队列已经empty,唤醒队列
      m_pTimer->interrupt();
}

CSndQueue:Send Queue

class CSndQueue
{
private:
   static void* worker(void* param);        //发送线程
   udt_pthread_t m_WorkerThread;

private:
   CSndUList* m_pSndUList;		    // 堆化的Send List
   CChannel* m_pChannel;                 // The UDP channel for data sending
   CTimer* m_pTimer;			    // 定时器设施

   udt_pthread_mutex_t m_WindowLock;
   udt_pthread_cond_t m_WindowCond;

   volatile bool m_bClosing;		// 发送线程是否启动
   udt_pthread_cond_t m_ExitCond;
};
  • 初始化:void CSndQueue::init(CChannel* c, CTimer* t)
CSndQueue::CSndQueue():
m_WorkerThread(),
m_pSndUList(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_WindowLock(),
m_WindowCond(),
m_bClosing(false),
m_ExitCond()
{
   #ifndef WINDOWS
      pthread_cond_init(&m_WindowCond, NULL);
      pthread_mutex_init(&m_WindowLock, NULL);
   #else
      m_WindowLock = CreateMutex(NULL, false, NULL);
      m_WindowCond = CreateEvent(NULL, false, false, NULL);
      m_ExitCond = CreateEvent(NULL, false, false, NULL);
   #endif
}

void CSndQueue::init(CChannel* c, CTimer* t)    //讲真,SendQueue和RecvQueue共用UDP SOCKET的Timer
{
   m_pChannel = c;
   m_pTimer = t;
   m_pSndUList = new CSndUList;
   m_pSndUList->m_pWindowLock = &m_WindowLock;  //初始化SendList控制变量
   m_pSndUList->m_pWindowCond = &m_WindowCond;
   m_pSndUList->m_pTimer = m_pTimer;

   #ifndef WINDOWS
      if (0 != pthread_create(&m_WorkerThread, NULL, CSndQueue::worker, this))
      { //启动工作线程
         m_WorkerThread = 0;
         throw CUDTException(3, 1);
      }
   #else
      DWORD threadID;
      m_WorkerThread = CreateThread(NULL, 0, CSndQueue::worker, this, 0, &threadID);
      if (NULL == m_WorkerThread)
         throw CUDTException(3, 1);
   #endif
}
  • 工作线程:
void* CSndQueue::worker(void* param) //工作线程
{
   CSndQueue* self = (CSndQueue*)param; //获取处理的Queue

   while (!self->m_bClosing)    //控制发送线程是否继续发送
   {
      uint64_t ts = self->m_pSndUList->getNextProcTime();   //获得发送队列下一个将要发送的包的具体信息

      if (ts > 0)   //如果还没有到发送时间
      {
         // wait until next processing time of the first socket on the list
         uint64_t currtime;
         CTimer::rdtsc(currtime);
         if (currtime < ts) //如果当前的时间小鱼发送时间,小睡一会
            self->m_pTimer->sleepto(ts);

         // it is time to send the next pkt
         sockaddr* addr;    //已经到了发送时间
         CPacket pkt;   
         if (self->m_pSndUList->pop(addr, pkt) < 0) //从发送队列中获取包和发送地址
            continue;

         self->m_pChannel->sendto(addr, pkt);   //调用Channel的UDP的封装发送
      }
      else
      {
          //如果没有包需要发送的时候,就在这块休眠
         // wait here if there is no sockets with data to be sent
         #ifndef WINDOWS
            pthread_mutex_lock(&self->m_WindowLock);
            if (!self->m_bClosing && (self->m_pSndUList->m_iLastEntry < 0))
               pthread_cond_wait(&self->m_WindowCond, &self->m_WindowLock);
            pthread_mutex_unlock(&self->m_WindowLock);
         #else
            WaitForSingleObject(self->m_WindowCond, INFINITE);
         #endif
      }
   }

   #ifndef WINDOWS
      return NULL;
   #else
      SetEvent(self->m_ExitCond);
      return 0;
   #endif
}
  • 发送数据包:int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
int CSndQueue::sendto(const sockaddr* addr, CPacket& packet)
{
   m_pChannel->sendto(addr, packet);    //调用Channel发送
   return packet.getLength();
}

CRcvUList

struct CRNode
{
   CUDT* m_pUDT;                // Pointer to CUDT*
   uint64_t m_llTimeStamp;      // Time Stamp

   CRNode* m_pPrev;             // previous link
   CRNode* m_pNext;             // next link

   bool m_bOnList;              // 当前节点是否在双向链表上
};
class CRcvUList    //用于接收数据的双向链表
{
public:
   CRNode* m_pUList;		// the head node
private:
   CRNode* m_pLast;		// the last node
};
  • 向双向链表中插入CUDT实例:void CRcvUList::insert(const CUDT* u)
void CRcvUList::insert(const CUDT* u)
{
   CRNode* n = u->m_pRNode; //反向获取在双向链表中的表现形式
   CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间,gettimeofday()

   if (NULL == m_pUList)    //如果双向链表为空
   {
      // empty list, insert as the single node
      n->m_pPrev = n->m_pNext = NULL;
      m_pLast = m_pUList = n;

      return;
   }

   // always insert at the end for RcvUList
   n->m_pPrev = m_pLast;    //插入双向链表的末尾
   n->m_pNext = NULL;
   m_pLast->m_pNext = n;
   m_pLast = n;
}
  • 向双向链表中移除CUDT实例:void CRcvUList::remove(const CUDT* u)
void CRcvUList::remove(const CUDT* u)
{
   CRNode* n = u->m_pRNode; //方向获取在链表中的表现形式

   if (!n->m_bOnList)   //如果不在链表中,直接返回
      return;

   if (NULL == n->m_pPrev)  //如果需要删除的结点是首部结点
   {
      // n is the first node
      m_pUList = n->m_pNext;
      if (NULL == m_pUList)
         m_pLast = NULL;
      else
         m_pUList->m_pPrev = NULL;
   }
   else     //反正不是真正的删除,只是在链表中摘除,然后调整是否在链表中的标志
   {
      n->m_pPrev->m_pNext = n->m_pNext;
      if (NULL == n->m_pNext)
      {
         // n is the last node
         m_pLast = n->m_pPrev;
      }
      else
         n->m_pNext->m_pPrev = n->m_pPrev;
   }

   n->m_pNext = n->m_pPrev = NULL;
}
  • 更新CUDT在双向链表中的位置:void CRcvUList::update(const CUDT* u)
void CRcvUList::update(const CUDT* u)
{
   CRNode* n = u->m_pRNode; //反向获取在链表中的表现形式

   if (!n->m_bOnList)
      return;

   CTimer::rdtsc(n->m_llTimeStamp); //获取当前时间

   // if n is the last node, do not need to change
   if (NULL == n->m_pNext)  //如果n是末尾的结点,不调整
      return;

   if (NULL == n->m_pPrev)  //如果n是头部结点
   {
      m_pUList = n->m_pNext;    //将下一个结点调整为头部结点
      m_pUList->m_pPrev = NULL;
   }
   else
   {
      n->m_pPrev->m_pNext = n->m_pNext; //否则在链表中剔除这个结点
      n->m_pNext->m_pPrev = n->m_pPrev;
   }

   n->m_pPrev = m_pLast;    //然后把这个结点添加到末尾
   n->m_pNext = NULL;
   m_pLast->m_pNext = n;
   m_pLast = n;
}

CHash

class CHash
{
private:
   struct CBucket
   {
      int32_t m_iID;		// Socket ID
      CUDT* m_pUDT;		// Socket instance

      CBucket* m_pNext;		// next bucket
   } **m_pBucket;		// list of buckets (the hash table)

   int m_iHashSize;		// size of hash table
};
  • 初始化:void CHash::init(int size)
CHash::CHash():
m_pBucket(NULL),
m_iHashSize(0)
{
}

void CHash::init(int size)
{
   m_pBucket = new CBucket* [size]; //创建size个HASH ENTRY

   for (int i = 0; i < size; ++ i)
      m_pBucket[i] = NULL;  //每个HASH ENTRY指向空

   m_iHashSize = size;  //调整HASH SIZE
}
  • 销毁:CHash::~CHash()
CHash::~CHash()
{
   for (int i = 0; i < m_iHashSize; ++ i)   //删除所有的Buckets
   {
      CBucket* b = m_pBucket[i];
      while (NULL != b)
      {
         CBucket* n = b->m_pNext;
         delete b;
         b = n;
      }
   }

   delete [] m_pBucket;
}
  • 查找:CUDT* CHash::lookup(int32_t id)
CUDT* CHash::lookup(int32_t id)
{
   // simple hash function (% hash table size); suitable for socket descriptors
   CBucket* b = m_pBucket[id % m_iHashSize];

   while (NULL != b)    //寻找CUDT*
   {
      if (id == b->m_iID)
         return b->m_pUDT;
      b = b->m_pNext;
   }

   return NULL;
}
  • 插入:void CHash::insert(int32_t id, CUDT* u)
void CHash::insert(int32_t id, CUDT* u) //新加入的Bucket加入到最接近数组的底部
{
   CBucket* b = m_pBucket[id % m_iHashSize];

   CBucket* n = new CBucket;
   n->m_iID = id;
   n->m_pUDT = u;
   n->m_pNext = b;

   m_pBucket[id % m_iHashSize] = n;
}
  • 删除:void CHash::remove(int32_t id)
void CHash::remove(int32_t id)
{
   CBucket* b = m_pBucket[id % m_iHashSize];    //找到Bucket
   CBucket* p = NULL;   

   while (NULL != b)    //在桶中的链表中删除一个结点
   {
      if (id == b->m_iID)
      {
         if (NULL == p)
            m_pBucket[id % m_iHashSize] = b->m_pNext;
         else
            p->m_pNext = b->m_pNext;

         delete b;

         return;
      }

      p = b;
      b = b->m_pNext;
   }
}

CRcvQueue

class CRcvQueue
{
private:
   static void* worker(void* param);    //接收线程
   udt_pthread_t m_WorkerThread;

private:
   CUnitQueue m_UnitQueue;		// The received packet queue(就是那个类似于Hash的组织)

   CRcvUList* m_pRcvUList;		// 这个List中的UDT实例准备从Queue中读取数据
   CHash* m_pHash;			// HASH可以加速在List中寻找UDT实例
   CChannel* m_pChannel;		// UDP channel for receving packets
   CTimer* m_pTimer;			// 与发送队列共享Timer

   int m_iPayloadSize;                      // packet中的有效载荷

   volatile bool m_bClosing;              // 接收线程是否启动
   udt_pthread_cond_t m_ExitCond;

private:
   udt_pthread_mutex_t m_LSLock;
   CUDT* m_pListener;                                   // pointer to the (unique, if any) listening UDT entity
   CRendezvousQueue* m_pRendezvousQueue;                // 汇合模式中的UDT SOCKET列表

   std::vector<CUDT*> m_vNewEntry;                                  // 新添加的条目
   udt_pthread_mutex_t m_IDLock;

   std::map<int32_t, std::queue<CPacket*> > m_mBuffer;	// 用于集合连接请求的临时缓冲区
   udt_pthread_mutex_t m_PassLock;
   udt_pthread_cond_t m_PassCond;
};
  • 初始化:void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
CRcvQueue::CRcvQueue():
m_WorkerThread(),
m_UnitQueue(),
m_pRcvUList(NULL),
m_pHash(NULL),
m_pChannel(NULL),
m_pTimer(NULL),
m_iPayloadSize(),
m_bClosing(false),
m_ExitCond(),
m_LSLock(),
m_pListener(NULL),
m_pRendezvousQueue(NULL),
m_vNewEntry(),
m_IDLock(),
m_mBuffer(),
m_PassLock(),
m_PassCond()
{
   #ifndef WINDOWS
      pthread_mutex_init(&m_PassLock, NULL);
      pthread_cond_init(&m_PassCond, NULL);
      pthread_mutex_init(&m_LSLock, NULL);
      pthread_mutex_init(&m_IDLock, NULL);
   #else
      m_PassLock = CreateMutex(NULL, false, NULL);
      m_PassCond = CreateEvent(NULL, false, false, NULL);
      m_LSLock = CreateMutex(NULL, false, NULL);
      m_IDLock = CreateMutex(NULL, false, NULL);
      m_ExitCond = CreateEvent(NULL, false, false, NULL);
   #endif
}

void CRcvQueue::init(int qsize, int payload, int version, int hsize, CChannel* cc, CTimer* t)
{
   m_iPayloadSize = payload;

   m_UnitQueue.init(qsize, payload, version);   

   m_pHash = new CHash;
   m_pHash->init(hsize);

   m_pChannel = cc;
   m_pTimer = t;

   m_pRcvUList = new CRcvUList;
   m_pRendezvousQueue = new CRendezvousQueue;

   #ifndef WINDOWS
      if (0 != pthread_create(&m_WorkerThread, NULL, CRcvQueue::worker, this))
      { //启动接收线程
         m_WorkerThread = 0;
         throw CUDTException(3, 1);
      }
   #else
      DWORD threadID;
      m_WorkerThread = CreateThread(NULL, 0, CRcvQueue::worker, this, 0, &threadID);
      if (NULL == m_WorkerThread)
         throw CUDTException(3, 1);
   #endif
}
  • 销毁:CRcvQueue::~CRcvQueue()
CRcvQueue::~CRcvQueue()
{
   m_bClosing = true;

   #ifndef WINDOWS
      if (0 != m_WorkerThread)  //终止接受线程
         pthread_join(m_WorkerThread, NULL);
      pthread_mutex_destroy(&m_PassLock);
      pthread_cond_destroy(&m_PassCond);
      pthread_mutex_destroy(&m_LSLock);
      pthread_mutex_destroy(&m_IDLock);
   #else
      if (NULL != m_WorkerThread)
         WaitForSingleObject(m_ExitCond, INFINITE);
      CloseHandle(m_WorkerThread);
      CloseHandle(m_PassLock);
      CloseHandle(m_PassCond);
      CloseHandle(m_LSLock);
      CloseHandle(m_IDLock);
      CloseHandle(m_ExitCond);
   #endif

   delete m_pRcvUList;
   delete m_pHash;
   delete m_pRendezvousQueue;

   // 删除队列中所有没有处理的信息
   for (map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.begin(); i != m_mBuffer.end(); ++ i)
   {
      while (!i->second.empty())
      {
         CPacket* pkt = i->second.front();
         delete [] pkt->m_pcData;
         delete pkt;
         i->second.pop();
      }
   }
}
  • 发送工作线程: void* CRcvQueue::worker(void* param)
{
   CRcvQueue* self = (CRcvQueue*)param; //反向获取接收队列对象

   //准备好ID,地址之类的东西
   sockaddr* addr = (AF_INET == self->m_UnitQueue.m_iIPversion) ? (sockaddr*) new sockaddr_in : (sockaddr*) new sockaddr_in6;
   CUDT* u = NULL;
   int32_t id;

   while (!self->m_bClosing)
   {
      #ifdef NO_BUSY_WAITING
         self->m_pTimer->tick();
      #endif

      // check waiting list, if new socket, insert it to the list
      while (self->ifNewEntry())    //如果有新的CUDT*
      {
         CUDT* ne = self->getNewEntry();    //将新的CUDT*加入到合适的队列中
         if (NULL != ne)
         {
            self->m_pRcvUList->insert(ne);
            self->m_pHash->insert(ne->m_SocketID, ne);
         }
      }

      // 为了接收packet,获取一个可用的slot
      CUnit* unit = self->m_UnitQueue.getNextAvailUnit();
      if (NULL == unit) //如果获取失败
      {
         // 没有足够的空间,读取这个packet后直接drop掉
         CPacket temp;
         temp.m_pcData = new char[self->m_iPayloadSize];
         temp.setLength(self->m_iPayloadSize);
         self->m_pChannel->recvfrom(addr, temp);
         delete [] temp.m_pcData;
         goto TIMER_CHECK;
      }

      unit->m_Packet.setLength(self->m_iPayloadSize);//设置packet的有效载荷

      //recv一个packet.如果返回-1,就相当于什么都没有收到 
      if (self->m_pChannel->recvfrom(addr, unit->m_Packet) < 0)
         goto TIMER_CHECK;

      id = unit->m_Packet.m_iID;    //获取收到的packet的ID

      // ID 0 is for connection request, which should be passed to the listening socket or rendezvous sockets
      if (0 == id)//Connect request
      {
         if (NULL != self->m_pListener)
            self->m_pListener->listen(addr, unit->m_Packet);//通过这个CUDT*处理这个packet
         else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
         {  //同样是处理new connect
            // asynchronous connect: call connect here
            // otherwise wait for the UDT socket to retrieve this packet
            if (!u->m_bSynRecving)
               u->connect(unit->m_Packet);
            else
               self->storePkt(id, unit->m_Packet.clone());
         }
      }
      else if (id > 0)  //发送往一个socket的数据包
      {
         if (NULL != (u = self->m_pHash->lookup(id)))//找到UDT对应的CUDT*
         {
            if (CIPAddress::ipcmp(addr, u->m_pPeerAddr, u->m_iIPversion))
            {//对比地址
               if (u->m_bConnected && !u->m_bBroken && !u->m_bClosing)
               {//如果这个CUDT*的状态正常
                  if (0 == unit->m_Packet.getFlag())//如果这是一个数据包
                     u->processData(unit);  //处理数据
                  else//如果这是一个控制包
                     u->processCtrl(unit->m_Packet);//处理控制信息

                  u->checkTimers();//检查定时器
                  self->m_pRcvUList->update(u);//将这个处理过的CUDT*插入List末尾
               }
            }
         }
         else if (NULL != (u = self->m_pRendezvousQueue->retrieve(addr, id)))
         {//如果是一个新的连接状态
            if (!u->m_bSynRecving)
               u->connect(unit->m_Packet);//建立稳定的连接状态
            else
               self->storePkt(id, unit->m_Packet.clone());//连接状态还没有稳定,先存储数据,稍后处理
         }
      }

      //当drop packet时或者没有free CUnit时,跳转到这块
TIMER_CHECK:
      // take care of the timing event for all UDT sockets

      uint64_t currtime;
      CTimer::rdtsc(currtime);  //获取当前的时间

      CRNode* ul = self->m_pRcvUList->m_pUList;//获取头部的CUDT*
      uint64_t ctime = currtime - 100000 * CTimer::getCPUFrequency();
      while ((NULL != ul) && (ul->m_llTimeStamp < ctime))
      {
         CUDT* udt = ul->m_pUDT;    //获取CUDT*

         if (udt->m_bConnected && !udt->m_bBroken && !udt->m_bClosing)
         {//如果这个CUDT*的状态正常,更新到Recv List的后面就行了
            udt->checkTimers(); 
            self->m_pRcvUList->update(udt);
         }
         else
         {//如果这个CUDT*的状态出现差错,直接删除
            // the socket must be removed from Hash table first, then RcvUList
            self->m_pHash->remove(udt->m_SocketID);
            self->m_pRcvUList->remove(udt);
            udt->m_pRNode->m_bOnList = false;
         }

         ul = self->m_pRcvUList->m_pUList;
      }

      //还没有进入正常的连接状态的CUDT*,发送探测包
      // Check connection requests status for all sockets in the RendezvousQueue.
      self->m_pRendezvousQueue->updateConnStatus();
   }

   //收尾工作
   if (AF_INET == self->m_UnitQueue.m_iIPversion)
      delete (sockaddr_in*)addr;
   else
      delete (sockaddr_in6*)addr;

   #ifndef WINDOWS
      return NULL;
   #else
      SetEvent(self->m_ExitCond);
      return 0;
   #endif
}
  • 从Queue中获取一个Packet:int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
int CRcvQueue::recvfrom(int32_t id, CPacket& packet)
{
   CGuard bufferlock(m_PassLock);

   map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);

   if (i == m_mBuffer.end())    //如果描述这个UDT的Packet Queue为空
   {
      #ifndef WINDOWS
         uint64_t now = CTimer::getTime();
         timespec timeout;

         timeout.tv_sec = now / 1000000 + 1;
         timeout.tv_nsec = (now % 1000000) * 1000;

         pthread_cond_timedwait(&m_PassCond, &m_PassLock, &timeout);//就睡一会
      #else
         ReleaseMutex(m_PassLock);
         WaitForSingleObject(m_PassCond, 1000);
         WaitForSingleObject(m_PassLock, INFINITE);
      #endif

      i = m_mBuffer.find(id);   //被唤醒后还没有packet可读的话,设置packet,并返回-1
      if (i == m_mBuffer.end())
      {
         packet.setLength(-1);
         return -1;
      }
   }

   // retrieve the earliest packet
   CPacket* newpkt = i->second.front(); //获取一个包

   if (packet.getLength() < newpkt->getLength())//如果两个包的数据量有差错,返回-1
   {
      packet.setLength(-1);
      return -1;
   }

   // copy packet content
   // 将首部的packet拷贝出来
   memcpy(packet.m_nHeader, newpkt->m_nHeader, CPacket::m_iPktHdrSize);
   memcpy(packet.m_pcData, newpkt->m_pcData, newpkt->getLength());
   packet.setLength(newpkt->getLength());

   delete [] newpkt->m_pcData;
   delete newpkt;

   // remove this message from queue,
   // if no more messages left for this socket, release its data structure
   i->second.pop();
   if (i->second.empty())   //如果队列为空的话,删除这个队列。对应上文,如果在m_buffer中没有找到Queue,就意味着没有Packet可读
      m_mBuffer.erase(i);

   return packet.getLength();
}
  • 设置与取消Listener:int CRcvQueue::setListener(CUDT* u)void CRcvQueue::removeListener(const CUDT* u)
int CRcvQueue::setListener(CUDT* u)
{
   CGuard lslock(m_LSLock);

   if (NULL != m_pListener)
      return -1;

   m_pListener = u;
   return 0;
}

void CRcvQueue::removeListener(const CUDT* u)
{
   CGuard lslock(m_LSLock);

   if (u == m_pListener)
      m_pListener = NULL;
}
  • 从队列中删除UDT SOCKET以及为其维护的信息:void CRcvQueue::removeConnector(const UDTSOCKET& id)
void CRcvQueue::removeConnector(const UDTSOCKET& id)    //删除和控制包有关的所有信息,这个队列中维护着和连接相关的信息
{
   m_pRendezvousQueue->remove(id);

   CGuard bufferlock(m_PassLock);

   map<int32_t, std::queue<CPacket*> >::iterator i = m_mBuffer.find(id);
   if (i != m_mBuffer.end())
   {
      while (!i->second.empty())
      {
         delete [] i->second.front()->m_pcData;
         delete i->second.front();
         i->second.pop();
      }
      m_mBuffer.erase(i);
   }
}
  • 处理连接尚未完全建立的情况:void CRcvQueue::setNewEntry(CUDT* u) bool CRcvQueue::ifNewEntry() CUDT* CRcvQueue::getNewEntry()
void CRcvQueue::setNewEntry(CUDT* u)        //加入新的CUDT*
{
   CGuard listguard(m_IDLock);
   m_vNewEntry.push_back(u);
}

bool CRcvQueue::ifNewEntry()        //判断是否有新的entry
{
   return !(m_vNewEntry.empty());
}

CUDT* CRcvQueue::getNewEntry()        //获取新的Entry
{
   CGuard listguard(m_IDLock);

   if (m_vNewEntry.empty())
      return NULL;

   CUDT* u = (CUDT*)*(m_vNewEntry.begin());
   m_vNewEntry.erase(m_vNewEntry.begin());

   return u;
}
原文地址:https://www.cnblogs.com/ukernel/p/9191068.html