UDT源码剖析(五)之Buffer

UDT源码的体系结构中存在两种Buffer,分别是RecvBuffer和SendBuffer。这两种Buffer分别用于UDT套接字的缓冲区,注意了是UDT SOCKET的数据缓冲,不是UDP SOCKET的数据缓冲。UDP SOCKET有自己的SendQueue和RecvQueue。我会挑选一些非常有必要的代码详细的分析,比如说从Send Buffer中取出数据这种,小众操作。详见代码注释:

CSndBuffer

  • 基础数据结构
class CSndBuffer
{
private:
   udt_pthread_mutex_t m_BufLock;           // used to synchronize buffer operation

   struct Block    //为了方便的提交给SendQueue
   {
      char* m_pcData;                   // 指向数据块
      int m_iLength;                       // 数据块的长度
      int32_t m_iMsgNo;                // 数据块的编号
      uint64_t m_OriginTime;         // 原始请求时间
      int m_iTTL;                            // TTL(ms)

      Block* m_pNext;                   // next block
   } *m_pBlock, *m_pFirstBlock, *m_pCurrBlock, *m_pLastBlock;

   // m_pBlock:         The head pointer
   // m_pFirstBlock:    The first block
   // m_pCurrBlock:	The current block
   // m_pLastBlock:     The last block (if first == last, buffer is empty)

   struct Buffer    //真实的存储Buffer
   {
      char* m_pcData;			// buffer
      int m_iSize;			        // size
      Buffer* m_pNext;			// next buffer
   } *m_pBuffer;			        // physical buffer

   int32_t m_iNextMsgNo;                //下一条消息编号

   int m_iSize;				        // 32 (number of packets)
   int m_iMSS;                                  // 1500
   int m_iCount;			        // 已经使用的Blocks
};
  • 初始化:CSndBuffer::CSndBuffer(int size, int mss)
CSndBuffer::CSndBuffer(int size, int mss):
m_BufLock(),
m_pBlock(NULL),
m_pFirstBlock(NULL),
m_pCurrBlock(NULL),
m_pLastBlock(NULL),
m_pBuffer(NULL),
m_iNextMsgNo(1),
m_iSize(size),
m_iMSS(mss),
m_iCount(0)
{
   // 一次性创建一个大的Buffer,以后Buffer不够用的时候也是再次创建一个Buffer
   m_pBuffer = new Buffer;
   m_pBuffer->m_pcData = new char [m_iSize * m_iMSS];        //创建实际存储的Buffer,注意空间
   m_pBuffer->m_iSize = m_iSize;
   m_pBuffer->m_pNext = NULL;

   // 创建size个Block,循环链表a
   m_pBlock = new Block;    
   Block* pb = m_pBlock;
   for (int i = 1; i < m_iSize; ++ i)
   {
      pb->m_pNext = new Block;
      pb->m_iMsgNo = 0;
      pb = pb->m_pNext;
   }
   pb->m_pNext = m_pBlock;  

   //紧接着调整Block中的指针,指向Buffre中相应的位置 
   pb = m_pBlock;
   char* pc = m_pBuffer->m_pcData;
   for (int i = 0; i < m_iSize; ++ i)   
   {
      pb->m_pcData = pc;
      pb = pb->m_pNext;
      pc += m_iMSS;
   }

   m_pFirstBlock = m_pCurrBlock = m_pLastBlock = m_pBlock;    //调整Block指针的位置

   #ifndef WINDOWS
      pthread_mutex_init(&m_BufLock, NULL);    //初始化互斥锁
   #else
      m_BufLock = CreateMutex(NULL, false, NULL);
   #endif
}
  • 销毁:CSndBuffer::~CSndBuffer()
CSndBuffer::~CSndBuffer()
{
   //逐个删除Block,此时还没有销毁Block中指针指向的实际的Buffer 
   Block* pb = m_pBlock->m_pNext;   
   while (pb != m_pBlock)
   {
      Block* temp = pb;
      pb = pb->m_pNext;
      delete temp;
   }
   delete m_pBlock;

   //逐个删除Buffer,初始化时只创建了一个Buffer,increase()时会增加多个Buffer 
   while (m_pBuffer != NULL)    
   {
      Buffer* temp = m_pBuffer;
      m_pBuffer = m_pBuffer->m_pNext;
      delete [] temp->m_pcData;
      delete temp;
   }

   #ifndef WINDOWS
      pthread_mutex_destroy(&m_BufLock);    //销毁锁
   #else
      CloseHandle(m_BufLock);
   #endif
}
  • 向Buffer添加数据:void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
void CSndBuffer::addBuffer(const char* data, int len, int ttl, bool order)
{
   int size = len / m_iMSS;     //根据MSS判断需要多少个Block
   if ((len % m_iMSS) != 0)
      size ++;

   //如果已经使用的Blocks和即将要使用的Blocks不能满足需求,增加
   while (size + m_iCount >= m_iSize)  
      increase();

   //获取当前的时间 
   uint64_t time = CTimer::getTime();   
   int32_t inorder = order;
   inorder <<= 29;

   //获得最后使用的Block
   Block* s = m_pLastBlock; 
   for (int i = 0; i < size; ++ i)
   {
      int pktlen = len - i * m_iMSS;    //在这块判断当前剩余的len
      if (pktlen > m_iMSS)
         pktlen = m_iMSS;    //如果长度不够MSS时,注意填充

      memcpy(s->m_pcData, data + i * m_iMSS, pktlen);   //将数据逐步拷贝到Block中
      s->m_iLength = pktlen;    //调整Block的长度

      s->m_iMsgNo = m_iNextMsgNo | inorder; //Message Number并判断是否需要交付给用户
      if (i == 0)
         s->m_iMsgNo |= 0x80000000; //如果是这一个数据流的起始,加上起始标志
      if (i == size - 1)    //如果是这个数据流的结尾,加上结尾标志
         s->m_iMsgNo |= 0x40000000;

      s->m_OriginTime = time;   //原始请求时间
      s->m_iTTL = ttl;  //更新TTL

      s = s->m_pNext;   //调整至下一个Block
   }
   m_pLastBlock = s;    //调整上一个访问的Block

   CGuard::enterCS(m_BufLock);
   m_iCount += size;    //更新目前已经使用的Block Count
   CGuard::leaveCS(m_BufLock);

   m_iNextMsgNo ++; //更新信息号(如果数据较大,n个数据报共享同一个MsgNo)
   if (m_iNextMsgNo == CMsgNo::m_iMaxMsgNo) //如果发生回绕,调整MsgNo
      m_iNextMsgNo = 1;
}
  • 确认数据:void CSndBuffer::ackData(int offset):在收到ACK对发送缓冲区中的数据进行确认,类似于TCP的Buffer
void CSndBuffer::ackData(int offset)
{
   CGuard bufferguard(m_BufLock);
    
   //根据offset更新收到的数据
   for (int i = 0; i < offset; ++ i)
      m_pFirstBlock = m_pFirstBlock->m_pNext;    //此处First指向的是已发送,但是还没有确认的数据

   m_iCount -= offset;      //因为已经对数据确认了一部分数据,所以调整已经用过的Block

   CTimer::triggerEvent();
}
  • 获取当前已经使用的数据量:int CSndBuffer::getCurrBufSize() const
int CSndBuffer::getCurrBufSize() const
{
   return m_iCount; 
}
  • 增加Buffer:void CSndBuffer::increase()
void CSndBuffer::increase()
{
   int unitsize = m_pBuffer->m_iSize;    //获取Buffer的大小,MSS 

   Buffer* nbuf = NULL;
   try
   {
      nbuf  = new Buffer;
      nbuf->m_pcData = new char [unitsize * m_iMSS];    //每次新增加和之前一样的Size
   }
   catch (...)
   {
      delete nbuf;
      throw CUDTException(3, 2, 0);
   }
   nbuf->m_iSize = unitsize;    //调整新增加的Buffer
   nbuf->m_pNext = NULL;

   // insert the buffer at the end of the buffer list
   Buffer* p = m_pBuffer;   //将新创建的Buffer加入到之前Buffer List的Tail
   while (NULL != p->m_pNext)
      p = p->m_pNext;
   p->m_pNext = nbuf;

   // new packet blocks
   Block* nblk = NULL;  //创建Blocks,并调整Blocks中data*的位置
   try
   {
      nblk = new Block;
   }
   catch (...)
   {
      delete nblk;
      throw CUDTException(3, 2, 0);
   }
   Block* pb = nblk;
   for (int i = 1; i < unitsize; ++ i)
   {
      pb->m_pNext = new Block;
      pb = pb->m_pNext;
   }

   // insert the new blocks onto the existing one
   pb->m_pNext = m_pLastBlock->m_pNext;
   m_pLastBlock->m_pNext = nblk;

   pb = nblk;
   char* pc = nbuf->m_pcData;
   for (int i = 0; i < unitsize; ++ i)
   {
      pb->m_pcData = pc;
      pb = pb->m_pNext;
      pc += m_iMSS;
   }

   m_iSize += unitsize; //增加Block整体的数量
}

CRcvBuffer

  • 数据结构:
class CRcvBuffer
{
private:
   CUnit** m_pUnit;                           // 数据缓冲Buffer
   int m_iSize;                                   // 65536(bytes)
   CUnitQueue* m_pUnitQueue;      // 共享的接收队列
   int m_iStartPos;                            // 开始读取data的位置
   int m_iLastAckPos;                       // 上一次被确认的位置,start~lastpos之间的数据可读
                                                        // EMPTY: m_iStartPos = m_iLastAckPos   FULL: m_iStartPos = m_iLastAckPos + 1
   int m_iMaxPos;			        // 数据存在的最大的位置,还没有被确认
   int m_iNotch;			        // 第一个CUnit的读取点
};
struct CUnit
{
   CPacket m_Packet;		// packet
   int m_iFlag;			        // 0: free 1:occupid, 2: msg已经read,但是还没有被free, 3: msg被丢弃
};
class CUnitQueue
{
private:
   struct CQEntry
   {
      CUnit* m_pUnit;		// unit queue
      char* m_pBuffer;		// data buffer
      int m_iSize;		// size of each queue

      CQEntry* m_pNext;
   }
   *m_pQEntry,			// 指向起始的Entry队列
   *m_pCurrQueue,		// 指向当前的Entry队列
   *m_pLastQueue;		// 指向最后一个Entry队列

   CUnit* m_pAvailUnit;         //最近访问的Unit* 
   int m_iSize;			// 总共的Packets数量
   int m_iCount;		// 已经使用的Packets数量
   int m_iMSS;			// unit buffer size
   int m_iIPversion;		// IP version
};
class CPacket
{
public:
   int32_t& m_iSeqNo;                       // sequence number
   int32_t& m_iMsgNo;                       // message number
   int32_t& m_iTimeStamp;                // timestamp
   int32_t& m_iID;			          // socket ID
   char*& m_pcData;                          // data/control information

   static const int m_iPktHdrSize;	  // packet header size = 16

protected:
   uint32_t m_nHeader[4];               // The 128-bit header field
   iovec m_PacketVector[2];             // The 2-demension vector of UDT packet [header, data]

   int32_t __pad;
};
  • 初始化:CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize)
CRcvBuffer::CRcvBuffer(CUnitQueue* queue, int bufsize):
m_pUnit(NULL),
m_iSize(bufsize),
m_pUnitQueue(queue),
m_iStartPos(0),
m_iLastAckPos(0),
m_iMaxPos(0),
m_iNotch(0)
{
   m_pUnit = new CUnit* [m_iSize];  //创建CUnit* Array(65536),一次创建这么多,反正没有实际的数据存在,不揪心
   for (int i = 0; i < m_iSize; ++ i)   //每个CUnit都设置为NULL
      m_pUnit[i] = NULL;
}
  • 销毁:CRcvBuffer::~CRcvBuffer()
CRcvBuffer::~CRcvBuffer()
{
   for (int i = 0; i < m_iSize; ++ i)
   {
      if (NULL != m_pUnit[i])
      {
         m_pUnit[i]->m_iFlag = 0;   //逐个将CUnit的标志设置为Free,并减少使用的数量
         -- m_pUnitQueue->m_iCount; //减少Queue中已使用的数量
      }
   }

   delete [] m_pUnit;
}
  • 读取数据:int CRcvBuffer::readBuffer(char* data, int len)
int CRcvBuffer::readBuffer(char* data, int len)
{
   int p = m_iStartPos;     //先前被确认的数据位置
   int lastack = m_iLastAckPos;     //上一次被确认的数据位置
   int rs = len;    //计划读取的数据量

   while ((p != lastack) && (rs > 0))   //有空间可以读取数据并且想要读取的数据>0
   {
      int unitsize = m_pUnit[p]->m_Packet.getLength() - m_iNotch;//第一个CUnit中剩多少data
      if (unitsize > rs)    //如果剩余的data>需要读取的data
         unitsize = rs; //多读取一点也无妨哈

      //将buffer中的数据copy进user data
      memcpy(data, m_pUnit[p]->m_Packet.m_pcData + m_iNotch, unitsize);
      data += unitsize; //更新用户data的位置

      //如果用户还需要读取数据
      if ((rs > unitsize) || (rs == m_pUnit[p]->m_Packet.getLength() - m_iNotch))
      {
         CUnit* tmp = m_pUnit[p];   //将刚刚读取完的CUnit置为NULL,并调整Queue已使用CUnit的数量
         m_pUnit[p] = NULL;    
         tmp->m_iFlag = 0;    //调整为free
         -- m_pUnitQueue->m_iCount;

         if (++ p == m_iSize) //循环的嘛,你懂吧?不过要保证小于ACKED
            p = 0;

         m_iNotch = 0;  //调整下一个在CUnit中读取的起始位置为0
      }
      else
         m_iNotch += rs;    //否则调整在本格CUnit中的位置

      rs -= unitsize;   //调整已经读取的位置
   }

   m_iStartPos = p; //更新可以开始读取的位置
   return len - rs;    //返回已经读取的数据量
}
  • 确认数据:void CRcvBuffer::ackData(int len)
void CRcvBuffer::ackData(int len)
{
   m_iLastAckPos = (m_iLastAckPos + len) % m_iSize; //更新已经被确认的数据的位置
   m_iMaxPos -= len;    //还没有被确认的数据有这么多
   if (m_iMaxPos < 0)   
      m_iMaxPos = 0;

   CTimer::triggerEvent();
}
  • 返回剩余空间(未被确认的数据也被计算在内):int CRcvBuffer::getAvailBufSize() const
int CRcvBuffer::getAvailBufSize() const
{
   // One slot must be empty in order to tell the difference between "empty buffer" and "full buffer"
   return m_iSize - getRcvDataSize() - 1;
}
  • 返回已经确认空间:int CRcvBuffer::getRcvDataSize() const
int CRcvBuffer::getRcvDataSize() const
{
   if (m_iLastAckPos >= m_iStartPos)    //防止回绕    
      return m_iLastAckPos - m_iStartPos;

   return m_iSize + m_iLastAckPos - m_iStartPos;
}
  • 获得一个数据流中的所有数据:int CRcvBuffer::readMsg(char* data, int len)
int CRcvBuffer::readMsg(char* data, int len)
{
   int p, q;
   bool passack;
   if (!scanMsg(p, q, passack)) //如果没有找到一个流的合适位置
      return 0;

   int rs = len;
   while (p != (q + 1) % m_iSize)   //逐个将流中的数据copy进user data
   {
      int unitsize = m_pUnit[p]->m_Packet.getLength();  //获取起始CUnit的数据
      if ((rs >= 0) && (unitsize > rs)) //如果获取的数据小于一个CUnit的中的数据
         unitsize = rs; //调整获取数据的大小

      if (unitsize > 0)
      {
         memcpy(data, m_pUnit[p]->m_Packet.m_pcData, unitsize);//将数据进行copy
         data += unitsize;  //调整数据copy的位置
         rs -= unitsize;    //调整下一步需要获取数据的length
      }

      if (!passack) //如果已经被ack确认过了
      {
         CUnit* tmp = m_pUnit[p];   //可以在读取之后调整数据指针的位置
         m_pUnit[p] = NULL;
         tmp->m_iFlag = 0;
         -- m_pUnitQueue->m_iCount;
      }
      else
         m_pUnit[p]->m_iFlag = 2;   //没有的话,标记为一斤被读取,但是没有free

      if (++ p == m_iSize)      //防止回绕
         p = 0;
   }

   if (!passack)    //如果已经被确认过了,调整起始的位置
      m_iStartPos = (q + 1) % m_iSize;

   return len - rs; //返回读取的数据量
}
  • 判断已经确认的空间中是否存在一个完成的数据流:bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
bool CRcvBuffer::scanMsg(int& p, int& q, bool& passack)
{
   //如果为空,或者最大位置<=0,返回错误
   if ((m_iStartPos == m_iLastAckPos) && (m_iMaxPos <= 0))
      return false;

   while (m_iStartPos != m_iLastAckPos) //从可以读取的位置到确认的位置进行搜索
   {
      if (NULL == m_pUnit[m_iStartPos]) //如果中间出现的NULL之类的,跳过就行,防止回绕哈
      {
         if (++ m_iStartPos == m_iSize)
            m_iStartPos = 0;
         continue;
      }

      if ((1 == m_pUnit[m_iStartPos]->m_iFlag) && (m_pUnit[m_iStartPos]->m_Packet.getMsgBoundary() > 1))    //如果不是独立的报文流,后续还有报文
      {
         bool good = true;

         //就往后搜索这一系列的报文 
         for (int i = m_iStartPos; i != m_iLastAckPos;)
         {
            if ((NULL == m_pUnit[i]) || (1 != m_pUnit[i]->m_iFlag))
            {
               good = false;
               break;
            }

            if ((m_pUnit[i]->m_Packet.getMsgBoundary() == 1) || (m_pUnit[i]->m_Packet.getMsgBoundary() == 3))
               break;

            if (++ i == m_iSize)
               i = 0;
         }

         if (good)  //说明没有啥问题,已经找到一个流的边界,退出循环
            break;
      }

      CUnit* tmp = m_pUnit[m_iStartPos];    //如果出现了NULL之类的报文,不做任何保存,直接丢弃
      m_pUnit[m_iStartPos] = NULL;
      tmp->m_iFlag = 0;
      -- m_pUnitQueue->m_iCount;

      if (++ m_iStartPos == m_iSize)
         m_iStartPos = 0;
   }

   //真的琐碎..反正就是需要判断是否读取了一个完整的数据流
   p = -1;                  // message head
   q = m_iStartPos;         // message tail
   passack = m_iStartPos == m_iLastAckPos;
   bool found = false;

   //找到一个流的起始位置和结束为止
   for (int i = 0, n = m_iMaxPos + getRcvDataSize(); i <= n; ++ i)
   {
      if ((NULL != m_pUnit[q]) && (1 == m_pUnit[q]->m_iFlag))
      {
         switch (m_pUnit[q]->m_Packet.getMsgBoundary())
         {
         case 3: // 11
            p = q;
            found = true;
            break;

         case 2: // 10
            p = q;
            break;

         case 1: // 01
            if (p != -1)
               found = true;
         }
      }
      else
      {
         // a hole in this message, not valid, restart search
         p = -1;
      }

      if (found)
      {
         //MSG必须被确认并且之前没有读出来
         if (!passack || !m_pUnit[q]->m_Packet.getMsgOrderFlag())
            break;

         found = false;
      }

      if (++ q == m_iSize)
         q = 0;

      if (q == m_iLastAckPos)
         passack = true;
   }

   if (!found)
   {
      if ((p != -1) && ((q + 1) % m_iSize == p))
         found = true;
   }

   return found;
}
原文地址:https://www.cnblogs.com/ukernel/p/9191054.html