Live555 分析(二):服务端

live555支持单播和组播,我们先分析单播的流媒体服务端,后面分析组播的流媒体服务端。

一、单播的流媒体服务端:

     // Create the RTSP server:
        RTSPServer* rtspServer = NULL;
        // Normal case: Streaming from a built-in RTSP server:
        rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
        if (rtspServer == NULL) {
                *env << "Failed to create RTSP server: " << env->getResultMsg() << "
";
                exit(1);
        }

        *env << "...done initializing 
";

        if( streamingMode == STREAMING_UNICAST )
        {
                ServerMediaSession* sms = ServerMediaSession::createNew(*env, 
                                                                        H264StreamName[video_type], 
                                                                        H264StreamName[video_type], 
                                                                        streamDescription,
                                                                        streamingMode == STREAMING_MULTICAST_SSM);
                sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type], H264VideoBitrate));
                sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(sms->envir(), *H264InputDevice[video_type]));
                
                rtspServer->addServerMediaSession(sms);

                char *url = rtspServer->rtspURL(sms);
                *env << "Play this stream using the URL:	" << url << "
";
                delete[] url;  
        }
      

      // Begin the LIVE555 event loop:
      env->taskScheduler().doEventLoop(&watchVariable); // does not return

我们一步一步分析:

1>  rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

RTSPServer*
RTSPServer::createNew(UsageEnvironment& env, Port ourPort,
              UserAuthenticationDatabase* authDatabase,
              unsigned reclamationTestSeconds)
{
        int ourSocket = -1;

        do {
                int ourSocket = setUpOurSocket(env, ourPort);
                if (ourSocket == -1) break;

                return new RTSPServer(env, ourSocket, ourPort, authDatabase, reclamationTestSeconds);
        } while (0);

        if (ourSocket != -1)  ::closeSocket(ourSocket);
        
        return NULL;
}

  此函数首先创建一个rtsp协议的socket,并且监听rtspServerPortNum端口,创建RTSPServer类的实例。下面我们看下RTSPServer的构造函数:

RTSPServer::RTSPServer(UsageEnvironment& env,
               int ourSocket, Port ourPort,
               UserAuthenticationDatabase* authDatabase,
               unsigned reclamationTestSeconds)
                : Medium(env),
                fServerSocket(ourSocket), fServerPort(ourPort),
                fAuthDB(authDatabase), fReclamationTestSeconds(reclamationTestSeconds),
                fServerMediaSessions(HashTable::create(STRING_HASH_KEYS)), 
                fSessionIdCounter(0) 
{
#ifdef USE_SIGNALS
        // Ignore the SIGPIPE signal, so that clients on the same host that are killed
        // don't also kill us:
        signal(SIGPIPE, SIG_IGN);
#endif

        // Arrange to handle connections from others:
        env.taskScheduler().turnOnBackgroundReadHandling(fServerSocket, (TaskScheduler::BackgroundHandlerProc*)&incomingConnectionHandler, this);
}

  RTSPServer构造函数,初始化fServerMediaSessions为创建的HashTable,初始化fServerSocket为我们前面创建的tcp socket,fServerPort为我们监听的端口rtspServerPortNum,并且向taskScheduler注册fServerSocket的任务函数incomingConnectionHandler,这个任务函数主要监听是否有新的客服端连接accept,如果有新的客服端接入,创建RTSPClientSession的实例。

  RTSPClientSession要提供什么功能呢?可以想象:需要监听客户端的rtsp请求并回应它,需要在DESCRIBE请求中返回所请求的流的信息,需要在SETUP请求中建立起RTP会话,需要在TEARDOWN请求中关闭RTP会话,等等...

RTSPServer::RTSPClientSession::RTSPClientSession(RTSPServer& ourServer, unsigned sessionId, int clientSocket, struct sockaddr_in clientAddr)
                                                   : fOurServer(ourServer), fOurSessionId(sessionId),
                                                    fOurServerMediaSession(NULL),
                                                    fClientSocket(clientSocket), fClientAddr(clientAddr),
                                                    fLivenessCheckTask(NULL),
                                                    fIsMulticast(False), fSessionIsActive(True), fStreamAfterSETUP(False),
                                                    fTCPStreamIdCount(0), fNumStreamStates(0), fStreamStates(NULL) 
{
        // Arrange to handle incoming requests:
        resetRequestBuffer();
        envir().taskScheduler().turnOnBackgroundReadHandling(fClientSocket,(TaskScheduler::BackgroundHandlerProc*)&incomingRequestHandler, this);
        noteLiveness();
}

  上面这个函数是RTSPClientSession的构造函数,初始化sessionId为++fSessionIdCounter,初始化fClientSocket为accept创建的socket(clientSocket),初始化fClientAddr为accept接收的客服端地址,也向taskScheduler注册了fClientSocket的认为函数incomingRequestHandler。

  incomingRequestHandler会调用incomingRequestHandler1,incomingRequestHandler1函数定义如下:

void RTSPServer::RTSPClientSession::incomingRequestHandler1() 
{
        noteLiveness();

        struct sockaddr_in dummy; // 'from' address, meaningless in this case
        Boolean endOfMsg = False;
        unsigned char* ptr = &fRequestBuffer[fRequestBytesAlreadySeen];

        int bytesRead = readSocket(envir(), fClientSocket, ptr, fRequestBufferBytesLeft, dummy);
        if (bytesRead <= 0 || (unsigned)bytesRead >= fRequestBufferBytesLeft) {
                // Either the client socket has died, or the request was too big for us.
                // Terminate this connection:
#ifdef DEBUG
                fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes (of %d); terminating connection!
", this, bytesRead, fRequestBufferBytesLeft);
#endif
                delete this;
                return;
        }
#ifdef DEBUG
        ptr[bytesRead] = '';
        fprintf(stderr, "RTSPClientSession[%p]::incomingRequestHandler1() read %d bytes:%s
", this, bytesRead, ptr);
#endif

        // Look for the end of the message: <CR><LF><CR><LF>
        unsigned char *tmpPtr = ptr;
        if (fRequestBytesAlreadySeen > 0) --tmpPtr;
        // in case the last read ended with a <CR>
        while (tmpPtr < &ptr[bytesRead-1]) {
                if (*tmpPtr == '
' && *(tmpPtr+1) == '
') {
                        if (tmpPtr - fLastCRLF == 2) { // This is it:
                                endOfMsg = 1;
                                break;
                        }
                        fLastCRLF = tmpPtr;
                }
                ++tmpPtr;
        }

        fRequestBufferBytesLeft -= bytesRead;
        fRequestBytesAlreadySeen += bytesRead;

        if (!endOfMsg) return; // subsequent reads will be needed to complete the request

        // Parse the request string into command name and 'CSeq',
        // then handle the command:
        fRequestBuffer[fRequestBytesAlreadySeen] = '';
        char cmdName[RTSP_PARAM_STRING_MAX];
        char urlPreSuffix[RTSP_PARAM_STRING_MAX];
        char urlSuffix[RTSP_PARAM_STRING_MAX];
        char cseq[RTSP_PARAM_STRING_MAX];
        if (!parseRTSPRequestString((char*)fRequestBuffer, fRequestBytesAlreadySeen,
                                                        cmdName, sizeof cmdName,
                                                        urlPreSuffix, sizeof urlPreSuffix,
                                                        urlSuffix, sizeof urlSuffix,
                                                        cseq, sizeof cseq)) 
        {
#ifdef DEBUG
                fprintf(stderr, "parseRTSPRequestString() failed!
");
#endif
                handleCmd_bad(cseq);
        } else {
#ifdef DEBUG
                fprintf(stderr, "parseRTSPRequestString() returned cmdName "%s", urlPreSuffix "%s", urlSuffix "%s"
", cmdName, urlPreSuffix, urlSuffix);
#endif
                if (strcmp(cmdName, "OPTIONS") == 0) {
                        handleCmd_OPTIONS(cseq);
                } else if (strcmp(cmdName, "DESCRIBE") == 0) {
                        printf("incomingRequestHandler1 ~~~~~~~~~~~~~~
");
                        handleCmd_DESCRIBE(cseq, urlSuffix, (char const*)fRequestBuffer);
                } else if (strcmp(cmdName, "SETUP") == 0) {
                        handleCmd_SETUP(cseq, urlPreSuffix, urlSuffix, (char const*)fRequestBuffer);
                } else if (strcmp(cmdName, "TEARDOWN") == 0
                                || strcmp(cmdName, "PLAY") == 0
                                || strcmp(cmdName, "PAUSE") == 0
                                || strcmp(cmdName, "GET_PARAMETER") == 0) {
                        handleCmd_withinSession(cmdName, urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
                } else {
                        handleCmd_notSupported(cseq);
                }
        }

#ifdef DEBUG
        fprintf(stderr, "sending response: %s", fResponseBuffer);
#endif
        send(fClientSocket, (char const*)fResponseBuffer, strlen((char*)fResponseBuffer), 0);

        if (strcmp(cmdName, "SETUP") == 0 && fStreamAfterSETUP) {
                // The client has asked for streaming to commence now, rather than after a
                // subsequent "PLAY" command.  So, simulate the effect of a "PLAY" command:
                handleCmd_withinSession("PLAY", urlPreSuffix, urlSuffix, cseq, (char const*)fRequestBuffer);
        }

        resetRequestBuffer(); // to prepare for any subsequent request
        if (!fSessionIsActive) delete this;
}

  此函数,我们可以看到rtsp的协议的各个命令的接收处理和应答。

2> ServerMediaSession* sms = ServerMediaSession::createNew(... ...)

   创建ServerMediaSession类的实例,初始化fStreamName为"h264_ch1",fInfoSDPString为"h264_ch1",fDescriptionSDPString为"RTSP/RTP stream from NETRA",fMiscSDPLines为null,fCreationTime获取的时间,fIsSSM为false。

3> sms->addSubsession(WISH264VideoServerMediaSubsession::createNew(... ...);

  WISH264VideoServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISH264VideoServerMediaSubsession类的实例加入到fSubsessionsTail链表首节点中。

4> sms->addSubsession(WISPCMAudioServerMediaSubsession::createNew(... ...);

  WISPCMAudioServerMediaSubsession::createNew():这个函数的主要目的是创建OnDemandServerMediaSubsession类的实例,这个类在前面已经分析,是单播时候必须创建的,初始化fWISInput为*H264InputDevice[video_type]。

  sms->addSubsession() 是将WISPCMAudioServerMediaSubsession类的实例加入到fSubsessionsTail->fNext中。

5> rtspServer->addServerMediaSession(sms)

  将rtspServer加入到fServerMediaSessions的哈希表中。

6> env->taskScheduler().doEventLoop(&watchVariable); 

  这个doEventLoop在前面已经分析过,主要处理socket任务和延迟任务。   

二、组播的流媒体服务器:

        // Create the RTSP server:
        RTSPServer* rtspServer = NULL;
        // Normal case: Streaming from a built-in RTSP server:
        rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);
        if (rtspServer == NULL) {
                *env << "Failed to create RTSP server: " << env->getResultMsg() << "
";
                exit(1);
        }

        *env << "...done initializing 
";

        if( streamingMode == STREAMING_UNICAST )
        {
        ... ...
        }
        else
        {
                if (streamingMode == STREAMING_MULTICAST_SSM) 
                {
                        if (multicastAddress == 0) 
                                multicastAddress = chooseRandomIPv4SSMAddress(*env);
                } else if (multicastAddress != 0) {
                        streamingMode = STREAMING_MULTICAST_ASM;
                }

                struct in_addr dest; 
             dest.s_addr = multicastAddress;
        
                const unsigned char ttl = 255;

                // For RTCP:
                const unsigned maxCNAMElen = 100;
                unsigned char CNAME[maxCNAMElen + 1];
                gethostname((char *) CNAME, maxCNAMElen);
                CNAME[maxCNAMElen] = '';      // just in case

                ServerMediaSession* sms;
                sms = ServerMediaSession::createNew(*env, H264StreamName[video_type], H264StreamName[video_type], streamDescription,streamingMode == STREAMING_MULTICAST_SSM);
               
                /* VIDEO Channel initial */
                if(1)
                {
                        // Create 'groupsocks' for RTP and RTCP:
                    const Port rtpPortVideo(videoRTPPortNum);
                    const Port rtcpPortVideo(videoRTPPortNum+1);
                    
                    rtpGroupsockVideo = new Groupsock(*env, dest, rtpPortVideo, ttl);
                    rtcpGroupsockVideo = new Groupsock(*env, dest, rtcpPortVideo, ttl);
                    
                    if (streamingMode == STREAMING_MULTICAST_SSM) {
                            rtpGroupsockVideo->multicastSendOnly();
                            rtcpGroupsockVideo->multicastSendOnly();
                    }
                    
                    setVideoRTPSinkBufferSize();
                    sinkVideo = H264VideoRTPSink::createNew(*env, rtpGroupsockVideo,96, 0x42, "h264");    
                    
                    // Create (and start) a 'RTCP instance' for this RTP sink:
                    unsigned totalSessionBandwidthVideo = (Mpeg4VideoBitrate+500)/1000; // in kbps; for RTCP b/w share
                    rtcpVideo = RTCPInstance::createNew(*env, rtcpGroupsockVideo,
                                                         totalSessionBandwidthVideo, CNAME,
                                                         sinkVideo, NULL /* we're a server */ ,
                                                         streamingMode == STREAMING_MULTICAST_SSM);
                        
                    // Note: This starts RTCP running automatically
                    sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkVideo, rtcpVideo));

                    sourceVideo = H264VideoStreamFramer::createNew(*env, H264InputDevice[video_type]->videoSource());  
// Start streaming: sinkVideo->startPlaying(*sourceVideo, NULL, NULL); } /* AUDIO Channel initial */ if(1) { // there's a separate RTP stream for audio // Create 'groupsocks' for RTP and RTCP: const Port rtpPortAudio(audioRTPPortNum); const Port rtcpPortAudio(audioRTPPortNum+1); rtpGroupsockAudio = new Groupsock(*env, dest, rtpPortAudio, ttl); rtcpGroupsockAudio = new Groupsock(*env, dest, rtcpPortAudio, ttl); if (streamingMode == STREAMING_MULTICAST_SSM) { rtpGroupsockAudio->multicastSendOnly(); rtcpGroupsockAudio->multicastSendOnly(); } if( audioSamplingFrequency == 16000 ) sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 96, audioSamplingFrequency, "audio", "PCMU", 1);        else sinkAudio = SimpleRTPSink::createNew(*env, rtpGroupsockAudio, 0, audioSamplingFrequency, "audio", "PCMU", 1); // Create (and start) a 'RTCP instance' for this RTP sink: unsigned totalSessionBandwidthAudio = (audioOutputBitrate+500)/1000; // in kbps; for RTCP b/w share rtcpAudio = RTCPInstance::createNew(*env, rtcpGroupsockAudio, totalSessionBandwidthAudio, CNAME, sinkAudio, NULL /* we're a server */, streamingMode == STREAMING_MULTICAST_SSM);        // Note: This starts RTCP running automatically        sms->addSubsession(PassiveServerMediaSubsession::createNew(*sinkAudio, rtcpAudio));    sourceAudio = H264InputDevice[video_type]->audioSource(); // Start streaming: sinkAudio->startPlaying(*sourceAudio, NULL, NULL); } rtspServer->addServerMediaSession(sms); { struct in_addr dest; dest.s_addr = multicastAddress; char *url = rtspServer->rtspURL(sms); //char *url2 = inet_ntoa(dest); *env << "Mulicast Play this stream using the URL: " << url << " "; //*env << "2 Mulicast addr: " << url2 << " "; delete[] url; } } // Begin the LIVE555 event loop: env->taskScheduler().doEventLoop(&watchVariable); // does not return

1> rtspServer = RTSPServer::createNew(*env, rtspServerPortNum, NULL);

  同前面单播的分析一样。

2> sms = ServerMediaSession::createNew(... ...)

  同前面单播的分析一样。

3> 视频

  1. 创建视频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现视频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的首节点。

  5. 创建FramedSource类的实例,实现一帧视频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

4> 音频 

  1. 创建音频rtp、rtcp的Groupsock类的实例,实现rtp和rtcp的udp通信socket。这里应该了解下ASM和SSM。

  2. 创建RTPSink类的实例,实现音频数据的RTP打包传输。

  3. 创建RTCPInstance类的实例,实现RTCP打包传输。

  4. 创建PassiveServerMediaSubsession类的实例,并加入到fSubsessionsTail链表中的下一个节点。

  5. 创建FramedSource类的实例,实现一帧音频数据的获取。

  5. 开始发送RTP和RTCP数据到组播地址。

5> rtspServer->addServerMediaSession(sms)

  同前面单播的分析一样。

6> env->taskScheduler().doEventLoop(&watchVariable)

  同前面单播的分析一样。

三、单播和组播的区别

1> 创建socket的时候,组播一开始就创建了,而单播的则是根据收到的“SETUP”命令创建相应的socket。

2> startPlaying的时候,组播一开始就发送数据到组播地址,而单播则是根据收到的“PLAY”命令开始startPlaying。

四、startPlaying分析

  首先分析组播: 

  sinkVideo->startPlaying()实现不在H264VideoRTPSink类中,也不在RTPSink类中,而是在MediaSink类中实现:

Boolean MediaSink::startPlaying(MediaSource& source,
                afterPlayingFunc* afterFunc,
                void* afterClientData) 
{
    // Make sure we're not already being played:
    if (fSource != NULL) {
        envir().setResultMsg("This sink is already being played");
        return False;
    }

    // Make sure our source is compatible:
    if (!sourceIsCompatibleWithUs(source)) {
        envir().setResultMsg("MediaSink::startPlaying(): source is not compatible!");
        return False;
    }
    fSource = (FramedSource*)&source;

    fAfterFunc = afterFunc;
    fAfterClientData = afterClientData;
    
    return continuePlaying();
}

  这里发现调用了continuePlaying()函数,那这个函数在哪里实现的呢?因为sinkVideo是通过 H264VideoRTPSink::createNew()实现,返回的H264VideoRTPSink类的实例,因此我们可以判定这个continuePlaying()在H264VideoRTPSink类实现。

Boolean H264VideoRTPSink::continuePlaying() 
{
    // First, check whether we have a 'fragmenter' class set up yet.
    // If not, create it now:
    if (fOurFragmenter == NULL) {
        fOurFragmenter = new H264FUAFragmenter(envir(), fSource, OutPacketBuffer::maxSize, ourMaxPacketSize() - 12/*RTP hdr size*/);
        fSource = fOurFragmenter;
    }
    
    //printf("function=%s line=%d
",__func__,__LINE__);
    // Then call the parent class's implementation:
    return MultiFramedRTPSink::continuePlaying();
}

  看到这里我们发现调用的是MultiFramedRTPSink类的成员函数continuePlaying,看下这个函数的实现:

Boolean MultiFramedRTPSink::continuePlaying()
{
    // Send the first packet.
    // (This will also schedule any future sends.)
    buildAndSendPacket(True);
    return True;
}

  这里我们发现了buildAndSendPacket(),这个函数实现:

void MultiFramedRTPSink::buildAndSendPacket(Boolean isFirstPacket) 
{
    //此函数中主要是准备rtp包的头,为一些需要跟据实际数据改变的字段留出位置。
    fIsFirstPacket = isFirstPacket;


    // Set up the RTP header:
    unsigned rtpHdr = 0x80000000; // RTP version 2; marker ('M') bit not set (by default; it can be set later)
    rtpHdr |= (fRTPPayloadType << 16);
    rtpHdr |= fSeqNo; // sequence number
    fOutBuf->enqueueWord(rtpHdr);//向包中加入一个字


    // Note where the RTP timestamp will go.
    // (We can't fill this in until we start packing payload frames.)
    fTimestampPosition = fOutBuf->curPacketSize();
    fOutBuf->skipBytes(4); // leave a hole for the timestamp 在缓冲中空出时间戳的位置


    fOutBuf->enqueueWord(SSRC()); 


    // Allow for a special, payload-format-specific header following the
    // RTP header:
    fSpecialHeaderPosition = fOutBuf->curPacketSize();
    fSpecialHeaderSize = specialHeaderSize();
    fOutBuf->skipBytes(fSpecialHeaderSize);


    // Begin packing as many (complete) frames into the packet as we can:
    fTotalFrameSpecificHeaderSizes = 0;
    fNoFramesLeft = False;
    fNumFramesUsedSoFar = 0; // 一个包中已打入的帧数。
    //头准备好了,再打包帧数据
    packFrame();
}

  继续看packFrame():

void MultiFramedRTPSink::packFrame()
{
    // First, see if we have an overflow frame that was too big for the last pkt
    if (fOutBuf->haveOverflowData()) {
        //如果有帧数据,则使用之。OverflowData是指上次打包时剩下的帧数据,因为一个包可能容纳不了一个帧。
        // Use this frame before reading a new one from the source
        unsigned frameSize = fOutBuf->overflowDataSize();
        struct timeval presentationTime = fOutBuf->overflowPresentationTime();
        unsigned durationInMicroseconds =fOutBuf->overflowDurationInMicroseconds();
        fOutBuf->useOverflowData();


        afterGettingFrame1(frameSize, 0, presentationTime,durationInMicroseconds);
    } else {
        //一点帧数据都没有,跟source要吧。
        // Normal case: we need to read a new frame from the source
        if (fSource == NULL)
            return;

        //更新缓冲中的一些位置
        fCurFrameSpecificHeaderPosition = fOutBuf->curPacketSize();
        fCurFrameSpecificHeaderSize = frameSpecificHeaderSize();
        fOutBuf->skipBytes(fCurFrameSpecificHeaderSize);
        fTotalFrameSpecificHeaderSizes += fCurFrameSpecificHeaderSize;

        //从source获取下一帧
        fSource->getNextFrame(fOutBuf->curPtr(),//新数据存放开始的位置
                fOutBuf->totalBytesAvailable(),//缓冲中空余的空间大小
                afterGettingFrame,    //因为可能source中的读数据函数会被放在任务调度中,所以把获取帧后应调用的函数传给source
                this,
                ourHandleClosure, //这个是source结束时(比如文件读完了)要调用的函数。
                this);
    }
}

  fSource定义在MediaSink类中,在这个类中startPlaying()函数中,给fSource赋值为传入的参数sourceVideo,sourceVideo实现getNextFrame()函数在FramedSource中,这是一个虚函数:

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,
                afterGettingFunc* afterGettingFunc,
                void* afterGettingClientData,
                onCloseFunc* onCloseFunc,
                void* onCloseClientData) 
{
    // Make sure we're not already being read:
    if (fIsCurrentlyAwaitingData) {
        envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time!
";
        exit(1);
    }

    fTo = to;
    fMaxSize = maxSize;
    fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()
    fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()
    fAfterGettingFunc = afterGettingFunc;
    fAfterGettingClientData = afterGettingClientData;
    fOnCloseFunc = onCloseFunc;
    fOnCloseClientData = onCloseClientData;
    fIsCurrentlyAwaitingData = True;

    doGetNextFrame();
}

  sourceVideo通过实现H264VideoStreamFramer::createNew()实例化,发现doGetNextFrame()函数实现在H264VideoStreamFramer类中:

void H264VideoStreamFramer::doGetNextFrame()
{

  //fParser->registerReadInterest(fTo, fMaxSize);
  //continueReadProcessing();
  fInputSource->getNextFrame(fTo, fMaxSize,
                             afterGettingFrame, this,
                             FramedSource::handleClosure, this);
}

  这fInputSource在H264VideoStreamFramer的基类StreamParser中被初始化为传入的参数H264InputDevice[video_type]->videoSource(),VideoOpenFileSource类继承OpenFileSource类,因此这个doGetNextFrame再一次FramedSource类中的getNextFrame()函数,这次getNextFrame函数中调用的doGetNextFrame()函数则是在OpenFileSource类实现的:

void OpenFileSource::incomingDataHandler1() {
    int ret;

    if (!isCurrentlyAwaitingData()) return; // we're not ready for the data yet

    ret = readFromFile();
    if (ret < 0) {
        handleClosure(this);
        fprintf(stderr,"In Grab Image, the source stops being readable!!!!
");
    }
    else if (ret == 0) 
    {
        if( uSecsToDelay >= uSecsToDelayMax )
        {
            uSecsToDelay = uSecsToDelayMax;
        }else{
            uSecsToDelay *= 2; 
        }
        nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecsToDelay, (TaskFunc*)incomingDataHandler, this);
    }
    else {
        nextTask() = envir().taskScheduler().scheduleDelayedTask(0, (TaskFunc*)afterGetting, this);
    }
}

  获取一帧数据后,执行延迟队列中的afterGetting()函数,此函数实现父类FramedSource中:

void FramedSource::afterGetting(FramedSource* source) 
{
    source->fIsCurrentlyAwaitingData = False;
    // indicates that we can be read again
    // Note that this needs to be done here, in case the "fAfterFunc"
    // called below tries to read another frame (which it usually will)

    if (source->fAfterGettingFunc != NULL) {
        (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,
                                   source->fFrameSize, 
                                   source->fNumTruncatedBytes,
                                   source->fPresentationTime,
                                   source->fDurationInMicroseconds);
    }
}

  fAfterGettingFunc函数指针在getNextFrame()函数被赋值,在MultiFramedRTPSink::packFrame() 函数中,被赋值MultiFramedRTPSink::afterGettingFrame():

void MultiFramedRTPSink::afterGettingFrame(void* clientData, unsigned numBytesRead,
                                        unsigned numTruncatedBytes,
                                        struct timeval presentationTime,
                                        unsigned durationInMicroseconds) 
{
      MultiFramedRTPSink* sink = (MultiFramedRTPSink*)clientData;
      sink->afterGettingFrame1(numBytesRead, numTruncatedBytes,
                           presentationTime, durationInMicroseconds);
}

  继续看afterGettingFrame1实现:

void MultiFramedRTPSink::afterGettingFrame1(
        unsigned frameSize,
        unsigned numTruncatedBytes,
        struct timeval presentationTime,
        unsigned durationInMicroseconds)
{
    if (fIsFirstPacket) {
        // Record the fact that we're starting to play now:
        gettimeofday(&fNextSendTime, NULL);
    }


    //如果给予一帧的缓冲不够大,就会发生截断一帧数据的现象。但也只能提示一下用户
    if (numTruncatedBytes > 0) {


        unsigned const bufferSize = fOutBuf->totalBytesAvailable();
        envir()
                << "MultiFramedRTPSink::afterGettingFrame1(): The input frame data was too large for our buffer size ("
                << bufferSize
                << ").  "
                << numTruncatedBytes
                << " bytes of trailing data was dropped!  Correct this by increasing "OutPacketBuffer::maxSize" to at least "
                << OutPacketBuffer::maxSize + numTruncatedBytes
                << ", *before* creating this 'RTPSink'.  (Current value is "
                << OutPacketBuffer::maxSize << ".)
";
    }
    unsigned curFragmentationOffset = fCurFragmentationOffset;
    unsigned numFrameBytesToUse = frameSize;
    unsigned overflowBytes = 0;


    //如果包只已经打入帧数据了,并且不能再向这个包中加数据了,则把新获得的帧数据保存下来。
    // If we have already packed one or more frames into this packet,
    // check whether this new frame is eligible to be packed after them.
    // (This is independent of whether the packet has enough room for this
    // new frame; that check comes later.)
    if (fNumFramesUsedSoFar > 0) {
        //如果包中已有了一个帧,并且不允许再打入新的帧了,则只记录下新的帧。
        if ((fPreviousFrameEndedFragmentation && !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(fOutBuf->curPtr(), frameSize))
        {
            // Save away this frame for next time:
            numFrameBytesToUse = 0;
            fOutBuf->setOverflowData(fOutBuf->curPacketSize(), frameSize,
                    presentationTime, durationInMicroseconds);
        }
    }
    
    //表示当前打入的是否是上一个帧的最后一块数据。
    fPreviousFrameEndedFragmentation = False;


    //下面是计算获取的帧中有多少数据可以打到当前包中,剩下的数据就作为overflow数据保存下来。
    if (numFrameBytesToUse > 0) {
        // Check whether this frame overflows the packet
        if (fOutBuf->wouldOverflow(frameSize)) {
            // Don't use this frame now; instead, save it as overflow data, and
            // send it in the next packet instead.  However, if the frame is too
            // big to fit in a packet by itself, then we need to fragment it (and
            // use some of it in this packet, if the payload format permits this.)
            if (isTooBigForAPacket(frameSize)
                    && (fNumFramesUsedSoFar == 0 || allowFragmentationAfterStart())) {
                // We need to fragment this frame, and use some of it now:
                overflowBytes = computeOverflowForNewFrame(frameSize);
                numFrameBytesToUse -= overflowBytes;
                fCurFragmentationOffset += numFrameBytesToUse;
            } else {
                // We don't use any of this frame now:
                overflowBytes = frameSize;
                numFrameBytesToUse = 0;
            }
            fOutBuf->setOverflowData(fOutBuf->curPacketSize() + numFrameBytesToUse,
                    overflowBytes, presentationTime, durationInMicroseconds);
        } else if (fCurFragmentationOffset > 0) {
            // This is the last fragment of a frame that was fragmented over
            // more than one packet.  Do any special handling for this case:
            fCurFragmentationOffset = 0;
            fPreviousFrameEndedFragmentation = True;
        }
    }


    
    if (numFrameBytesToUse == 0 && frameSize > 0) {
        //如果包中有数据并且没有新数据了,则发送之。(这种情况好像很难发生啊!)
        // Send our packet now, because we have filled it up:
        sendPacketIfNecessary();
    } else {
        //需要向包中打入数据。
        
        // Use this frame in our outgoing packet:
        unsigned char* frameStart = fOutBuf->curPtr();
        fOutBuf->increment(numFrameBytesToUse);
        // do this now, in case "doSpecialFrameHandling()" calls "setFramePadding()" to append padding bytes


        // Here's where any payload format specific processing gets done:
        doSpecialFrameHandling(curFragmentationOffset, frameStart,
                numFrameBytesToUse, presentationTime, overflowBytes);


        ++fNumFramesUsedSoFar;


        // Update the time at which the next packet should be sent, based
        // on the duration of the frame that we just packed into it.
        // However, if this frame has overflow data remaining, then don't
        // count its duration yet.
        if (overflowBytes == 0) {
            fNextSendTime.tv_usec += durationInMicroseconds;
            fNextSendTime.tv_sec += fNextSendTime.tv_usec / 1000000;
            fNextSendTime.tv_usec %= 1000000;
        }


        //如果需要,就发出包,否则继续打入数据。
        // Send our packet now if (i) it's already at our preferred size, or
        // (ii) (heuristic) another frame of the same size as the one we just
        //      read would overflow the packet, or
        // (iii) it contains the last fragment of a fragmented frame, and we
        //      don't allow anything else to follow this or
        // (iv) one frame per packet is allowed:
        if (fOutBuf->isPreferredSize()
                || fOutBuf->wouldOverflow(numFrameBytesToUse)
                || (fPreviousFrameEndedFragmentation
                        && !allowOtherFramesAfterLastFragment())
                || !frameCanAppearAfterPacketStart(
                        fOutBuf->curPtr() - frameSize, frameSize)) {
            // The packet is ready to be sent now
            sendPacketIfNecessary();
        } else {
            // There's room for more frames; try getting another:
            packFrame();
        }
    }
}

看一下发送数据的函数:

void MultiFramedRTPSink::sendPacketIfNecessary()
{
    //发送包
    if (fNumFramesUsedSoFar > 0) {
        // Send the packet:
#ifdef TEST_LOSS
        if ((our_random()%10) != 0) // simulate 10% packet loss #####
#endif
        if (!fRTPInterface.sendPacket(fOutBuf->packet(),fOutBuf->curPacketSize())) {
            // if failure handler has been specified, call it
            if (fOnSendErrorFunc != NULL)
                (*fOnSendErrorFunc)(fOnSendErrorData);
        }
        ++fPacketCount;
        fTotalOctetCount += fOutBuf->curPacketSize();
        fOctetCount += fOutBuf->curPacketSize() - rtpHeaderSize
                - fSpecialHeaderSize - fTotalFrameSpecificHeaderSizes;


        ++fSeqNo; // for next time
    }


    //如果还有剩余数据,则调整缓冲区
    if (fOutBuf->haveOverflowData()
            && fOutBuf->totalBytesAvailable() > fOutBuf->totalBufferSize() / 2) {
        // Efficiency hack: Reset the packet start pointer to just in front of
        // the overflow data (allowing for the RTP header and special headers),
        // so that we probably don't have to "memmove()" the overflow data
        // into place when building the next packet:
        unsigned newPacketStart = fOutBuf->curPacketSize()- 
                (rtpHeaderSize + fSpecialHeaderSize + frameSpecificHeaderSize());
        fOutBuf->adjustPacketStart(newPacketStart);
    } else {
        // Normal case: Reset the packet start pointer back to the start:
        fOutBuf->resetPacketStart();
    }
    fOutBuf->resetOffset();
    fNumFramesUsedSoFar = 0;


    if (fNoFramesLeft) {
        //如果再没有数据了,则结束之
        // We're done:
        onSourceClosure(this);
    } else {
        //如果还有数据,则在下一次需要发送的时间再次打包发送。
        // We have more frames left to send.  Figure out when the next frame
        // is due to start playing, then make sure that we wait this long before
        // sending the next packet.
        struct timeval timeNow;
        gettimeofday(&timeNow, NULL);
        int secsDiff = fNextSendTime.tv_sec - timeNow.tv_sec;
        int64_t uSecondsToGo = secsDiff * 1000000
                + (fNextSendTime.tv_usec - timeNow.tv_usec);
        if (uSecondsToGo < 0 || secsDiff < 0) { // sanity check: Make sure that the time-to-delay is non-negative:
            uSecondsToGo = 0;
        }


        // Delay this amount of time:
        nextTask() = envir().taskScheduler().scheduleDelayedTask(uSecondsToGo,
                (TaskFunc*) sendNext, this);
    }
}

  当一帧数据发送完,在doEventLoop()函数执行任务函数sendNext(),继续发送一包,进行下一个循环。音频数据的发送也是如此。

总结一下调用过程(参考牛搞大神):

  单播数据发送:
  单播的时候,只有收到客服端的“PLAY”的命令时,才开始发送数据,在RTSPClientSession类中handleCmd_PLAY()函数中调用
void RTSPServer::RTSPClientSession 
::handleCmd_PLAY(ServerMediaSubsession* subsession, char const* cseq,
          char const* fullRequestStr) 
{

      ... ...


    fStreamStates[i].subsession->startStream(fOurSessionId,
                           fStreamStates[i].streamToken,
                           (TaskFunc*)noteClientLiveness,
                           this,
                           rtpSeqNum,
                           rtpTimestamp);
     ... ...
}
  startStream()函数定义在OnDemandServerMediaSubsession类中:
void OnDemandServerMediaSubsession::startStream(unsigned clientSessionId,
                        void* streamToken,
                        TaskFunc* rtcpRRHandler,
                        void* rtcpRRHandlerClientData,
                        unsigned short& rtpSeqNum,
                        unsigned& rtpTimestamp) 
{   StreamState
* streamState = (StreamState*)streamToken;   Destinations* destinations = (Destinations*)(fDestinationsHashTable->Lookup((char const*)clientSessionId));   if (streamState != NULL) {     streamState->startPlaying(destinations, rtcpRRHandler, rtcpRRHandlerClientData);     if (streamState->rtpSink() != NULL) {       rtpSeqNum = streamState->rtpSink()->currentSeqNo();       rtpTimestamp = streamState->rtpSink()->presetNextTimestamp(); } } }

  startPlaying函数实现在StreamState类中:

void StreamState::startPlaying(Destinations* dests,
           TaskFunc* rtcpRRHandler, void* rtcpRRHandlerClientData) 
{
    if (dests == NULL) return;
    
    if (!fAreCurrentlyPlaying && fMediaSource != NULL) {
        if (fRTPSink != NULL) {
            fRTPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
            fAreCurrentlyPlaying = True;
        } else if (fUDPSink != NULL) {
            fUDPSink->startPlaying(*fMediaSource, afterPlayingStreamState, this);
            fAreCurrentlyPlaying = True;
        }
    }

    if (fRTCPInstance == NULL && fRTPSink != NULL) {
        // Create (and start) a 'RTCP instance' for this RTP sink:
        fRTCPInstance = RTCPInstance::createNew(fRTPSink->envir(), fRTCPgs,
                                            fTotalBW, (unsigned char*)fMaster.fCNAME,
                                            fRTPSink, NULL /* we're a server */);
        // Note: This starts RTCP running automatically
    }

    if (dests->isTCP) {
        // Change RTP and RTCP to use the TCP socket instead of UDP:
        if (fRTPSink != NULL) {
            fRTPSink->addStreamSocket(dests->tcpSocketNum, dests->rtpChannelId);
        }
        if (fRTCPInstance != NULL) {
            fRTCPInstance->addStreamSocket(dests->tcpSocketNum, dests->rtcpChannelId);
            fRTCPInstance->setSpecificRRHandler(dests->tcpSocketNum, dests->rtcpChannelId,
                                            rtcpRRHandler, rtcpRRHandlerClientData);
        }
    } else {
        // Tell the RTP and RTCP 'groupsocks' about this destination
        // (in case they don't already have it):
        if (fRTPgs != NULL) fRTPgs->addDestination(dests->addr, dests->rtpPort);
        if (fRTCPgs != NULL) fRTCPgs->addDestination(dests->addr, dests->rtcpPort);
        if (fRTCPInstance != NULL) {
            fRTCPInstance->setSpecificRRHandler(dests->addr.s_addr, dests->rtcpPort,
                                            rtcpRRHandler, rtcpRRHandlerClientData);
        }
    }
}

  这个函数就会去调用RTPSink类中的startPlaying()函数,但是RTPSink没有实现,直接调用父类MediaSink中的startPlaying函数。后面就跟组播一样的采集,组包,发送数据了。


原文地址:https://www.cnblogs.com/cslunatic/p/3767355.html