UDT源码剖析(二)之启动与结束

UDT的启动例程

启动例程顺序:void UDT::startup() -> void CUDT::startup() -> void CUDTUnited::startup() -> void* CUDTUnited::garbageCollect(void* p) -> void CUDTUnited::checkBrokenSockets() -> void CUDTUnited::removeSocket(const UDTSOCKET u) -> void CUDT::close() -> void CUDT::flush()

  • 对于一个完整的UDT SOCKET的清理六部曲:
    • 将CUDT*的状态设置为BROKEN
    • 调用CUDT中的close()
    • 更新UDT SOCKET的关闭时间
    • 将UDT SOCKET设置为Closed
    • 在Closed Array中添加当前UDT SOCKET,在GC线程中进行处理
    • 从全局的MAP中删除

Order 0:void CUDTUnited::startup()

  • 功能A:真正干活的启动函数
    int CUDTUnited::startup()
    {
        CGuard gcinit(m_InitLock);    //拿住启动锁

        if (m_iInstanceCount++ > 0)    //不需要重复启动
            return 0;

        // Global initialization code
        #ifdef WINDOWS
            WORD wVersionRequested;
            WSADATA wsaData;
            wVersionRequested = MAKEWORD(2, 2);

            if (0 != WSAStartup(wVersionRequested, &wsaData))
                throw CUDTException(1, 0,  WSAGetLastError());
        #endif

        //init CTimer::EventLock

        if (m_bGCStatus)    //设置GC线程的状态
            return true;

        m_bClosing = false;    //设置全局的状态
        #ifndef WINDOWS
            pthread_mutex_init(&m_GCStopLock, NULL);
            pthread_cond_init(&m_GCStopCond, NULL);
            pthread_create(&m_GCThread, NULL, garbageCollect, this);    //启动GC线程
        #else
            m_GCStopLock = CreateMutex(NULL, false, NULL);
            m_GCStopCond = CreateEvent(NULL, false, false, NULL);
            DWORD ThreadID;
            m_GCThread = CreateThread(NULL, 0, garbageCollect, this, 0, &ThreadID);
        #endif

        m_bGCStatus = true;    //设置GC线程状态

        return 0;
    }

Order 1:void* CUDTUnited::garbageCollect(void* p)

  • 功能:GC线程(注:unet的实现中使用事件的模式清理资源)
    #ifndef WINDOWS
        void* CUDTUnited::garbageCollect(void* p)
    #else
        DWORD WINAPI CUDTUnited::garbageCollect(LPVOID p)
    #endif
    {
        CUDTUnited* self = (CUDTUnited*)p;    //获取CUDTUnited实例

        CGuard gcguard(self->m_GCStopLock);    //拿到垃圾清理的锁

        while (!self->m_bClosing)    //如果调整目前的CUDTUnited的状态为关闭状态,退出垃圾清理的无限循环
        {
            self->checkBrokenSockets();    //当UDT协议判断某一个UDT SOCKET的状态不正确时,会将其状态设置为BROKEN,并在这个函数中进行处理

            #ifdef WINDOWS
                self->checkTLSValue();
            #endif

            #ifndef WINDOWS
                timeval now;
                timespec timeout;
                gettimeofday(&now, 0);
                timeout.tv_sec = now.tv_sec + 1;    //进入睡眠等待,然后等待下一次可以清理出现BROKEN状态的UDT SOCKET。具体的睡眠为时间为:1S
                timeout.tv_nsec = now.tv_usec * 1000;

                pthread_cond_timedwait(&self->m_GCStopCond, &self->m_GCStopLock, &timeout);    //然后调用当前的GC线程陷入睡眠等待被唤醒
            #else
                WaitForSingleObject(self->m_GCStopCond, 1000);
           #endif
        }

        //剩下的步骤负责清理目前依旧残余的资源 
        // remove all sockets and multiplexers(复用器)
        CGuard::enterCS(self->m_ControlLock);
        for (map<UDTSOCKET, CUDTSocket*>::iterator i = self->m_Sockets.begin(); i != self->m_Sockets.end(); ++ i)    .//逐步遍历CUDTUnited中保留的UTD SOCKET
        {
            i->second->m_pUDT->m_bBroken = true;    //将CUDT*描述的状态设置为BROKEN,后续进行处理,然后调用CUDT中的close函数清理保存连接所消耗的资源
            i->second->m_pUDT->close();
            i->second->m_Status = CLOSED;    //设置状态为关闭
            i->second->m_TimeStamp = CTimer::getTime();    //调整最后一次操作UDT SOCKET的时间
            self->m_ClosedSockets[i->first] = i->second;    //将当前描述连接的CUDT*保存至CloseMap

            // 清理Lintener Queue
            map<UDTSOCKET, CUDTSocket*>::iterator ls = self->m_Sockets.find(i->second->m_ListenSocket);    //从当前的UDT SOCKET中寻找自己的Listener
            if (ls == self->m_Sockets.end())    //如果没有找到Listener
            {
                ls = self->m_ClosedSockets.find(i->second->m_ListenSocket);    //在存活的UDT Map中没有找到的话,就在Close UDT Map中寻找
                if (ls == self->m_ClosedSockets.end())    //如果没有找到,就不再处理存储在Listener两个队列中的资源
                    continue;
            }

            CGuard::enterCS(ls->second->m_AcceptLock);    //获取Listener中的锁
            ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);    //从连接完成但是取出UDT SOCKET的队列中清理
            ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);    //从已完成连接的队列清理
            CGuard::leaveCS(ls->second->m_AcceptLock);    
        }
        self->m_Sockets.clear();    //最后清理资源

        for (map<UDTSOCKET, CUDTSocket*>::iterator j = self->m_ClosedSockets.begin(); j != self->m_ClosedSockets.end(); ++ j)    //最后再次遍历Close UDT SOCKET队列
        {
            j->second->m_TimeStamp = 0;    //将状态设置为0
        }
        CGuard::leaveCS(self->m_ControlLock);

        while (true)
        {
            self->checkBrokenSockets();    //之前遍历依旧存活的队列时,只是将即将清理的UDT SOCKET状态设置为BROKEN,此时,对上述为BROKEN状态的UDT SOCKET进行清理

            CGuard::enterCS(self->m_ControlLock);
            bool empty = self->m_ClosedSockets.empty();    //判断是否为空
            CGuard::leaveCS(self->m_ControlLock);

            if (empty)    //如果为empty,就可以直接退出
                break;

            CTimer::sleep();    //不行的话,再歇一会,再次进行处理
        }

        #ifndef WINDOWS
            return NULL;
        #else
            return 0;
        #endif
    }

Order 2:void CUDTUnited::checkBrokenSockets()

  • 功能A:真正干活的函数,用于清理处于BROKEN状态的UDT SOCKET
    void CUDTUnited::checkBrokenSockets()
    {
        CGuard cg(m_ControlLock);    //获取GC锁

        // set of sockets To Be Closed and To Be Removed
        vector<UDTSOCKET> tbc;    //收集处于Closeed状态的UDT SOCKET
        vector<UDTSOCKET> tbr;    //收集处于Removed状态的UDT SOCKET

        for (map<UDTSOCKET, CUDTSocket*>::iterator i = m_Sockets.begin(); i != m_Sockets.end(); ++ i)    //从当前的UDT SOCKET MAP中进行检索
        {
            // 检查处于BROKEN状态的UDT SOCKET
            if (i->second->m_pUDT->m_bBroken)    //如果处于BROKEN状态
            {
                if (i->second->m_Status == LISTENING)    //如果是LISTENING UDT SOCKET
                {
                    if (CTimer::getTime() - i->second->m_TimeStamp < 3000000)    //如果有客户端连接,等待额外的3S,等待下次处理
                        continue;
                }
                else if ((i->second->m_pUDT->m_pRcvBuffer != NULL) && (i->second->m_pUDT->m_pRcvBuffer->getRcvDataSize() > 0) && (i->second->m_pUDT->m_iBrokenCounter -- > 0))
                {    //如果缓冲区中依旧有数据,应该等待更长的时间
                    continue;
                }

                //关闭断开的连接并启动清除计时器
                i->second->m_Status = CLOSED;    //将状态设置为CLOSED
                i->second->m_TimeStamp = CTimer::getTime();    //设置UDT SOCKET的关闭时间
                tbc.push_back(i->first);    //将这个UDT SOCKET添加进Closed Array,稍后处理
                m_ClosedSockets[i->first] = i->second;    //将这个UDT SOCKET添加进CLOSED UDT SOCKET MAP

                // 清理Listener Queue
                map<UDTSOCKET, CUDTSocket*>::iterator ls = m_Sockets.find(i->second->m_ListenSocket);
                if (ls == m_Sockets.end())
                {
                    ls = m_ClosedSockets.find(i->second->m_ListenSocket);
                    if (ls == m_ClosedSockets.end())
                       continue;
                }

                CGuard::enterCS(ls->second->m_AcceptLock);
                ls->second->m_pQueuedSockets->erase(i->second->m_SocketID);
                ls->second->m_pAcceptSockets->erase(i->second->m_SocketID);
                CGuard::leaveCS(ls->second->m_AcceptLock);
            }
        }

        for (map<UDTSOCKET, CUDTSocket*>::iterator j = m_ClosedSockets.begin(); j != m_ClosedSockets.end(); ++ j)    //清理CLOSED UDT SOCKET MAP中的实例
        {
            if (j->second->m_pUDT->m_ullLingerExpiration > 0)    //如果还没有到等待关闭时间
            {
                //异步关闭:
                if ((NULL == j->second->m_pUDT->m_pSndBuffer) || (0 == j->second->m_pUDT->m_pSndBuffer->getCurrBufSize()) || (j->second->m_pUDT->m_ullLingerExpiration <= CTimer::getTime()))
                {    //如果发送缓冲区为空,接收缓冲区为空或者等待关闭时间小于0,调整状态为CLOSED直接关闭
                    j->second->m_pUDT->m_ullLingerExpiration = 0;
                    j->second->m_pUDT->m_bClosing = true;    //更新Closed状态,由下一次启动的GC线程回收
                    j->second->m_TimeStamp = CTimer::getTime();    //更新关闭的时间
                }
            }

            //如果已经超时1S,或者这个UDT SOCKET已经从接收链表中移除,就将其添加到Remove Array中 
            if ((CTimer::getTime() - j->second->m_TimeStamp > 1000000) && ((NULL == j->second->m_pUDT->m_pRNode) || !j->second->m_pUDT->m_pRNode->m_bOnList))
            {
                tbr.push_back(j->first);
            }
        }

        //在全局Map中删除处于Closed状态的实例
        for (vector<UDTSOCKET>::iterator k = tbc.begin(); k != tbc.end(); ++ k)
            m_Sockets.erase(*k);

        // 删除处于超时状态的UDT SOCKET
            for (vector<UDTSOCKET>::iterator l = tbr.begin(); l != tbr.end(); ++ l)
                removeSocket(*l);
    }

Order 3:void CUDTUnited::removeSocket(const UDTSOCKET u)

  • 功能A:清理处于Closed状态的UDT SOCKET
    void CUDTUnited::removeSocket(const UDTSOCKET u)
    {
        map<UDTSOCKET, CUDTSocket*>::iterator i = m_ClosedSockets.find(u);

        // 如果是一个失效的UDT SCOKET,直接返回
        if (i == m_ClosedSockets.end())    r
            return;

        // 由于是多个UDT实例共享一个资源复用器,销毁时要减少引用计数
        const int mid = i->second->m_iMuxID;

        if (NULL != i->second->m_pQueuedSockets)    //如果是一个Listener
        {
            CGuard::enterCS(i->second->m_AcceptLock);

            //如果是一个Listener,关闭连接队列中所有没有accept()的UDT SOCKET,并在稍后删除它
            for (set<UDTSOCKET>::iterator q = i->second->m_pQueuedSockets->begin(); q != i->second->m_pQueuedSockets->end(); ++ q)
            {    //删除六部曲
                m_Sockets[*q]->m_pUDT->m_bBroken = true;    //将CUDT*的状态设置为BROKEN
                m_Sockets[*q]->m_pUDT->close();    //调用CUDT中的close()
                m_Sockets[*q]->m_TimeStamp = CTimer::getTime();    //更新UDT SOCKET的关闭时间
                m_Sockets[*q]->m_Status = CLOSED;    //将UDT SOCKET设置为Closed
                m_ClosedSockets[*q] = m_Sockets[*q];    //在Closed Array中添加当前UDT SOCKET,在GC线程中进行处理
                m_Sockets.erase(*q);    //从全局的MAP中删除
            }

            CGuard::leaveCS(i->second->m_AcceptLock);
        }

        // 从保存连接的map中删除连接
        map<int64_t, set<UDTSOCKET> >::iterator j = m_PeerRec.find((i->second->m_PeerID << 30) + i->second->m_iISN);
        if (j != m_PeerRec.end())
        {
            j->second.erase(u);
            if (j->second.empty())
                m_PeerRec.erase(j);
        }

        // 删除当前的UDT SOCKET
        i->second->m_pUDT->close();
        delete i->second;
        m_ClosedSockets.erase(i);    //在CLOSED Map中也一并删除

        map<int, CMultiplexer>::iterator m;
        m = m_mMultiplexer.find(mid);
        if (m == m_mMultiplexer.end())    //如果这个资源复用器不存在,直接返回
        {
            //something is wrong!!!
            return;
        }

        m->second.m_iRefCount --;    //否则的话,减少这个资源复用器的引用计数
        if (0 == m->second.m_iRefCount)    //如果目前没有UDT SOCKET使用这个资源复用器
        {
            m->second.m_pChannel->close();    //与UDP SOCKET关联的Channel直接关闭
            delete m->second.m_pSndQueue;    //清理资源复用器的资源
            delete m->second.m_pRcvQueue;
            delete m->second.m_pTimer;
            delete m->second.m_pChannel;
            m_mMultiplexer.erase(m);
        }
    }

Order 4:void CUDT::close()

  • 功能A:调用这个函数清理CUDT(保存连接的Class)中的资源
    void CUDT::close()
    {
        if (!m_bOpened)    //如果这个没有处于Opened状态直接返回
            return;

        if (0 != m_Linger.l_onoff)    //如果还没有到连接关闭时间
        {
            flush();
        }

        // 从SendQueue的发送队列中移除这个对象(CUDT)
        if (m_bConnected)
            m_pSndQueue->m_pSndUList->remove(this);

        // 清理IO事件
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_IN, false);
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_OUT, false);
        s_UDTUnited.m_EPoll.update_events(m_SocketID, m_sPollID, UDT_EPOLL_ERR, true);

        // 从所有的Epoll中删除
        try
        {
            for (set<int>::iterator i = m_sPollID.begin(); i != m_sPollID.end(); ++ i)
                s_UDTUnited.m_EPoll.remove_usock(*i, m_SocketID);
        }
        catch (...)
        {
        }

        if (!m_bOpened)
            return;

        //当前处于Closing
        m_bClosing = true;

        CGuard cg(m_ConnectionLock);

        // 如果接收者与发送者还在等待数据,则发送信号
        releaseSynch();    //唤醒所有的发送/接收线程

        //如果是Listener并Listening状态,关闭Listening状态并从Recv Queue中移除 
        if (m_bListening)    
        {
            m_bListening = false;
            m_pRcvQueue->removeListener(this);
        }
        else if (m_bConnecting)    //如果处于连接状态,直接移除
        {
            m_pRcvQueue->removeConnector(m_SocketID);
        }

        if (m_bConnected)    //如果处于连接完成状态
        {
            if (!m_bShutdown)    //并且对对方没有发送shutdown,那么由我们发送shutdown
                sendCtrl(5);

            m_pCC->close();    //并调用CCC的close()清理拥塞控制资源

            // 存储当前连接的状态
            CInfoBlock ib;
            ib.m_iIPversion = m_iIPversion;
            CInfoBlock::convert(m_pPeerAddr, m_iIPversion, ib.m_piIP);
            ib.m_iRTT = m_iRTT;
            ib.m_iBandwidth = m_iBandwidth;
            m_pCache->update(&ib);

            m_bConnected = false;    //关闭连接
        }

        // waiting all send and recv calls to stop
        CGuard sendguard(m_SendLock);
        CGuard recvguard(m_RecvLock);

        // CLOSED.
        m_bOpened = false;
}

Order 5:void CUDT::flush()

  • 功能A:当UDT SOCKET的Linger不为0时,调用这个函数
    void CUDT::flush()
    {
        uint64_t entertime = CTimer::getTime();    //获取当前时间

        while (!m_bBroken && m_bConnected && (m_pSndBuffer->getCurrBufSize() > 0) && (CTimer::getTime() - entertime < m_Linger.l_linger * 1000000ULL))
        {
            // 先前调用的close()已经检查了延迟,并且已经过期
            if (m_ullLingerExpiration >= entertime)
                break;

            if (!m_bSynSending)
            {
                // 如果该SOCKET启用了异步关闭机制,稍后使用GC清理资源,立即返回
                if (0 == m_ullLingerExpiration)
                    m_ullLingerExpiration = entertime + m_Linger.l_linger * 1000000ULL;

                return;
            }

            #ifndef WINDOWS
                timespec ts;    //在这块延迟1S
                ts.tv_sec = 0;
                ts.tv_nsec = 1000000;
                nanosleep(&ts, NULL);
            #else
                Sleep(1);
            #endif
        }
    }

UDT的清理例程

清理例程顺序:void UDT::cleanup() -> void CUDT::cleanup() -> int CUDTUnited::cleanup()

Order 0:int CUDTUnited::cleanup()

  • 功能A:在全局CUDTUnited中的清理例程
    int CUDTUnited::cleanup()
    {
        CGuard gcinit(m_InitLock);

        if (--m_iInstanceCount > 0)    //判断引用计数,如果>0,暂时不关闭
            return 0;

        if (!m_bGCStatus)
            return 0;
       
        //调整CUDTUnited状态,终止GC线程 
            m_bClosing = true;
        #ifndef WINDOWS
            pthread_cond_signal(&m_GCStopCond);
            pthread_join(m_GCThread, NULL);    //清理GC线程的资源
            pthread_mutex_destroy(&m_GCStopLock);
            pthread_cond_destroy(&m_GCStopCond);
        #else
            SetEvent(m_GCStopCond);
            WaitForSingleObject(m_GCThread, INFINITE);
            CloseHandle(m_GCThread);
            CloseHandle(m_GCStopLock);
            CloseHandle(m_GCStopCond);
        #endif

        m_bGCStatus = false;    //调整CUDTUnited状态

        // Global destruction code
        #ifdef WINDOWS
            WSACleanup();
        #endif

         return 0;
    }
原文地址:https://www.cnblogs.com/ukernel/p/8993078.html