庖丁解牛-----Live555源码彻底解密(RTP解包)

Live555 客户端解包

以testRTSPClient.cpp为例讲解:

Medium<-MediaSource<-FramedSource<-RTPSource<-MultiFramedRTPSource<-H264VideoRTPSource

其中,我们要重点关注的类是下面几个:

FramedSource,RTPSource,MultiFramedRTPSource。

 

continuePlaying()函数中调用Source类(以MultiFramedRTPSource为例,因为它以实现doGetFrame()函数)的getNextFrame()函数以得到发送数据,而getNextFrame()是通过调用doGetNextFrame(),继而是doGetNextFrame1(),最终在doNextFrame1中由语句fReorderingBuffer->getNextCompletedPacket()将存放在fReorderingBuffer中的数据取出交给Sink类来发送。

Boolean DummySink::continuePlaying() {

  if (fSource == NULL) return False; // sanity check (should not happen)

 

  // Request the next frame of data from our input source.  "afterGettingFrame()" will get called later, when it arrives:

  fSource->getNextFrame(fReceiveBuffer, DUMMY_SINK_RECEIVE_BUFFER_SIZE,

                        afterGettingFrame, this,

                        onSourceClosure, this);

  return True;

}

fSource调用的是FrameSource类的getNextFrame函数,源码如下:

void FramedSource::getNextFrame(unsigned char* to, unsigned maxSize,

                   afterGettingFunc* afterGettingFunc,

                   void* afterGettingClientData,

                   onCloseFunc* onCloseFunc,

                   void* onCloseClientData) {

  // Make sure we're not already being read:

  if (fIsCurrentlyAwaitingData) {

    envir() << "FramedSource[" << this << "]::getNextFrame(): attempting to read more than once at the same time! ";

    envir().internalError();

  }

 

  fTo = to;

  fMaxSize = maxSize;

  fNumTruncatedBytes = 0; // by default; could be changed by doGetNextFrame()

  fDurationInMicroseconds = 0; // by default; could be changed by doGetNextFrame()

  fAfterGettingFunc = afterGettingFunc;

  fAfterGettingClientData = afterGettingClientData;

  fOnCloseFunc = onCloseFunc;

  fOnCloseClientData = onCloseClientData;

  fIsCurrentlyAwaitingData = True;

 

  doGetNextFrame();

}

跟踪进入到doGetNextFrame()在FramedSource类中为纯虚函数,在子类MultiFramedRTPSource中实现,源码如下:

void MultiFramedRTPSource::doGetNextFrame() {

 

  if (!fAreDoingNetworkReads) {

// Turn on background read handling of incoming packets:

//将标志位置为TRUE

    fAreDoingNetworkReads = True;

    TaskScheduler::BackgroundHandlerProc* handler

      = (TaskScheduler::BackgroundHandlerProc*)&networkReadHandler;

     //如何获取数据:networkReadHandler1()函数不断从fRTPInterface所指向的socket读入, 最终执行语句fReorderingBuffer->storePacket(bPacket);实现。

   

//开始网络数据包接收

    fRTPInterface.startNetworkReading(handler);

  }

 

  fSavedTo = fTo;

  fSavedMaxSize = fMaxSize;

  fFrameSize = 0; // for now

  fNeedDelivery = True;

  doGetNextFrame1();

}

networkReadHandler源码如下:

void MultiFramedRTPSource::networkReadHandler(MultiFramedRTPSource* source, int /*mask*/) {

  source->networkReadHandler1();

}

networkReadHandler1源码如下:

void MultiFramedRTPSource::networkReadHandler1() {

  BufferedPacket* bPacket = fPacketReadInProgress;

  if (bPacket == NULL) {

    // Normal case: Get a free BufferedPacket descriptor to hold the new network packet:

    bPacket = fReorderingBuffer->getFreePacket(this);

  }

 

  // Read the network packet, and perform sanity checks on the RTP header:

  //读出网络数据包,并检查RTP包头

  Boolean readSuccess = False;

  do {

Boolean packetReadWasIncomplete = fPacketReadInProgress != NULL;

 

    if (!bPacket->fillInData(fRTPInterface, packetReadWasIncomplete)) {

      if (bPacket->bytesAvailable() == 0) {

     envir() << "MultiFramedRTPSource error: Hit limit when reading incoming packet over TCP. Increase "MAX_PACKET_SIZE" ";

      }

      fPacketReadInProgress = NULL;

      break;

    }

    if (packetReadWasIncomplete) {

      // We need additional read(s) before we can process the incoming packet:

      fPacketReadInProgress = bPacket;

      return;

    } else {

      fPacketReadInProgress = NULL;

    }

#ifdef TEST_LOSS

    setPacketReorderingThresholdTime(0);

       // don't wait for 'lost' packets to arrive out-of-order later

    if ((our_random()%10) == 0) break; // simulate 10% packet loss

#endif

 

    // Check for the 12-byte RTP header:

    if (bPacket->dataSize() < 12) break;

    unsigned rtpHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);

    Boolean rtpMarkerBit = (rtpHdr&0x00800000) != 0;

    unsigned rtpTimestamp = ntohl(*(u_int32_t*)(bPacket->data()));ADVANCE(4);

    unsigned rtpSSRC = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);

 

    // Check the RTP version number (it should be 2):

    if ((rtpHdr&0xC0000000) != 0x80000000) break;

 

    // Skip over any CSRC identifiers in the header:

    unsigned cc = (rtpHdr>>24)&0xF;

    if (bPacket->dataSize() < cc) break;

    ADVANCE(cc*4);

 

    // Check for (& ignore) any RTP header extension

    if (rtpHdr&0x10000000) {

      if (bPacket->dataSize() < 4) break;

      unsigned extHdr = ntohl(*(u_int32_t*)(bPacket->data())); ADVANCE(4);

      unsigned remExtSize = 4*(extHdr&0xFFFF);

      if (bPacket->dataSize() < remExtSize) break;

      ADVANCE(remExtSize);

    }

 

    // Discard any padding bytes:

    if (rtpHdr&0x20000000) {

      if (bPacket->dataSize() == 0) break;

      unsigned numPaddingBytes

     = (unsigned)(bPacket->data())[bPacket->dataSize()-1];

      if (bPacket->dataSize() < numPaddingBytes) break;

      bPacket->removePadding(numPaddingBytes);

    }

    // Check the Payload Type.

    if ((unsigned char)((rtpHdr&0x007F0000)>>16)

     != rtpPayloadFormat()) {

      break;

    }

 

    // The rest of the packet is the usable data.  Record and save it:

    if (rtpSSRC != fLastReceivedSSRC) {

      // The SSRC of incoming packets has changed.  Unfortunately we don't yet handle streams that contain multiple SSRCs,

      // but we can handle a single-SSRC stream where the SSRC changes occasionally:

      fLastReceivedSSRC = rtpSSRC;

      fReorderingBuffer->resetHaveSeenFirstPacket();

    }

    unsigned short rtpSeqNo = (unsigned short)(rtpHdr&0xFFFF);

    Boolean usableInJitterCalculation

      = packetIsUsableInJitterCalculation((bPacket->data()),

                              bPacket->dataSize());

    struct timeval presentationTime; // computed by:

    Boolean hasBeenSyncedUsingRTCP; // computed by:

    receptionStatsDB()

      .noteIncomingPacket(rtpSSRC, rtpSeqNo, rtpTimestamp,

                timestampFrequency(),

                usableInJitterCalculation, presentationTime,

                hasBeenSyncedUsingRTCP, bPacket->dataSize());

 

    // Fill in the rest of the packet descriptor, and store it:

    struct timeval timeNow;

    gettimeofday(&timeNow, NULL);

    bPacket->assignMiscParams(rtpSeqNo, rtpTimestamp, presentationTime,

                    hasBeenSyncedUsingRTCP, rtpMarkerBit,

                    timeNow);

//将收到的包插入到fReorderingBuffer中

    if (!fReorderingBuffer->storePacket(bPacket)) break;

 

    readSuccess = True;

  } while (0);

  if (!readSuccess) fReorderingBuffer->freePacket(bPacket);

 

  doGetNextFrame1();

  // If we didn't get proper data this time, we'll get another chance

}

1)fillInData函数源码如下:

Boolean BufferedPacket::fillInData(RTPInterface& rtpInterface, Boolean& packetReadWasIncomplete) {

  if (!packetReadWasIncomplete) reset();

 

  unsigned numBytesRead;

  struct sockaddr_in fromAddress;

  unsigned const maxBytesToRead = bytesAvailable();

  if (maxBytesToRead == 0) return False; // exceeded buffer size when reading over TCP

  if (!rtpInterface.handleRead(&fBuf[fTail], maxBytesToRead, numBytesRead, fromAddress, packetReadWasIncomplete)) {

    return False;

  }

  fTail += numBytesRead;

  return True;

}

fillInData调用handleRead实现了将rtpInterface指向的数据存入fBuf中的功能。

源码如下:

Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,

                    unsigned& bytesRead, struct sockaddr_in& fromAddress, Boolean& packetReadWasIncomplete) {

  packetReadWasIncomplete = False; // by default

  Boolean readSuccess;

  if (fNextTCPReadStreamSocketNum < 0) {

    // Normal case: read from the (datagram) 'groupsock':

    readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);

  } else {

    // Read from the TCP connection:

    bytesRead = 0;

    unsigned totBytesToRead = fNextTCPReadSize;

    if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;

    unsigned curBytesToRead = totBytesToRead;

    int curBytesRead;

    while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,

                         &buffer[bytesRead], curBytesToRead,

                         fromAddress)) > 0) {

      bytesRead += curBytesRead;

      if (bytesRead >= totBytesToRead) break;

      curBytesToRead -= curBytesRead;

    }

    fNextTCPReadSize -= bytesRead;

    if (fNextTCPReadSize == 0) {

      // We've read all of the data that we asked for

      readSuccess = True;

    } else if (curBytesRead < 0) {

      // There was an error reading the socket

      bytesRead = 0;

      readSuccess = False;

    } else {

      // We need to read more bytes, and there was not an error reading the socket

      packetReadWasIncomplete = True;

      return True;

    }

    fNextTCPReadStreamSocketNum = -1; // default, for next time

  }

 

  if (readSuccess && fAuxReadHandlerFunc != NULL) {

    // Also pass the newly-read packet data to our auxilliary handler:

    (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);

  }

  return readSuccess;

}

该函数从socket中读数据包;

2)storePacket函数源码如下:

Boolean ReorderingPacketBuffer::storePacket(BufferedPacket* bPacket) {

  unsigned short rtpSeqNo = bPacket->rtpSeqNo();

 

  if (!fHaveSeenFirstPacket) {

    fNextExpectedSeqNo = rtpSeqNo; // initialization

    bPacket->isFirstPacket() = True;

    fHaveSeenFirstPacket = True;

  }

 

  // Ignore this packet if its sequence number is less than the one

  // that we're looking for (in this case, it's been excessively delayed).

  if (seqNumLT(rtpSeqNo, fNextExpectedSeqNo)) return False;

 

  if (fTailPacket == NULL) {

    // Common case: There are no packets in the queue; this will be the first one:

    bPacket->nextPacket() = NULL;

    fHeadPacket = fTailPacket = bPacket;

    return True;

  }

 

  if (seqNumLT(fTailPacket->rtpSeqNo(), rtpSeqNo)) {

    // The next-most common case: There are packets already in the queue; this packet arrived in order => put it at the tail:

    bPacket->nextPacket() = NULL;

    fTailPacket->nextPacket() = bPacket;

    fTailPacket = bPacket;

    return True;

  }

 

  if (rtpSeqNo == fTailPacket->rtpSeqNo()) {

    // This is a duplicate packet - ignore it

    return False;

  }

 

  // Rare case: This packet is out-of-order.  Run through the list (from the head), to figure out where it belongs:

  BufferedPacket* beforePtr = NULL;

  BufferedPacket* afterPtr = fHeadPacket;

  while (afterPtr != NULL) {

    if (seqNumLT(rtpSeqNo, afterPtr->rtpSeqNo())) break; // it comes here

    if (rtpSeqNo == afterPtr->rtpSeqNo()) {

      // This is a duplicate packet - ignore it

      return False;

    }

 

    beforePtr = afterPtr;

    afterPtr = afterPtr->nextPacket();

  }

 

  // Link our new packet between "beforePtr" and "afterPtr":

  bPacket->nextPacket() = afterPtr;

  if (beforePtr == NULL) {

    fHeadPacket = bPacket;

  } else {

    beforePtr->nextPacket() = bPacket;

  }

 

  return True;

}

该函数中有do while(0)的妙用,可以参考:

http://www.cnblogs.com/flying_bat/archive/2008/01/18/1044693.html

继续调用doGetNextFrame1()函数,源码如下:

void MultiFramedRTPSource::doGetNextFrame1() {

//是否需要发送,交给sink处理

  while (fNeedDelivery) {

    // If we already have packet data available, then deliver it now.

Boolean packetLossPrecededThis;

//获取下一个完整的数据包

    BufferedPacket* nextPacket

      = fReorderingBuffer->getNextCompletedPacket(packetLossPrecededThis);

    if (nextPacket == NULL) break;

    

  //丢给Sink后将fNeedDelivery置为False

    fNeedDelivery = False;

 

    if (nextPacket->useCount() == 0) {

      // Before using the packet, check whether it has a special header

      // that needs to be processed:

      unsigned specialHeaderSize;

      //处理特殊包头,比如FU-A的分片包头

      if (!processSpecialHeader(nextPacket, specialHeaderSize)) {

     // Something's wrong with the header; reject the packet:

     fReorderingBuffer->releaseUsedPacket(nextPacket);

     fNeedDelivery = True;

     break;

      }

      nextPacket->skip(specialHeaderSize);

    }

 

    // Check whether we're part of a multi-packet frame, and whether

    // there was packet loss that would render this packet unusable:

    if (fCurrentPacketBeginsFrame) {

      if (packetLossPrecededThis || fPacketLossInFragmentedFrame) {

     // We didn't get all of the previous frame.

     // Forget any data that we used from it:

     fTo = fSavedTo; fMaxSize = fSavedMaxSize;

     fFrameSize = 0;

      }

      fPacketLossInFragmentedFrame = False;

    } else if (packetLossPrecededThis) {

      // We're in a multi-packet frame, with preceding packet loss

      fPacketLossInFragmentedFrame = True;

    }

    if (fPacketLossInFragmentedFrame) {

      // This packet is unusable; reject it:

      fReorderingBuffer->releaseUsedPacket(nextPacket);

      fNeedDelivery = True;

      break;

    }

 

    // The packet is usable. Deliver all or part of it to our caller:

    unsigned frameSize;

    nextPacket->use(fTo, fMaxSize, frameSize, fNumTruncatedBytes,

             fCurPacketRTPSeqNum, fCurPacketRTPTimestamp,

             fPresentationTime, fCurPacketHasBeenSynchronizedUsingRTCP,

             fCurPacketMarkerBit);

    fFrameSize += frameSize;

 

    if (!nextPacket->hasUsableData()) {

      // We're completely done with this packet now

      fReorderingBuffer->releaseUsedPacket(nextPacket);

    }

 

//已经是完整的一帧数据了,回调给Sink处理

    if (fCurrentPacketCompletesFrame) {

      // We have all the data that the client wants.

      if (fNumTruncatedBytes > 0) {

     envir() << "MultiFramedRTPSource::doGetNextFrame1(): The total received frame size exceeds the client's buffer size ("

         << fSavedMaxSize << ").  "

         << fNumTruncatedBytes << " bytes of trailing data will be dropped! ";

      }

      // Call our own 'after getting' function, so that the downstream object can consume the data:

      if (fReorderingBuffer->isEmpty()) {

     // Common case optimization: There are no more queued incoming packets, so this code will not get

     // executed again without having first returned to the event loop.  Call our 'after getting' function

     // directly, because there's no risk of a long chain of recursion (and thus stack overflow):

     afterGetting(this);

      } else {

     // Special case: Call our 'after getting' function via the event loop.

     nextTask() = envir().taskScheduler().scheduleDelayedTask(0,

                                      (TaskFunc*)FramedSource::afterGetting, this);

      }

    } else {

      // This packet contained fragmented data, and does not complete

      // the data that the client wants.  Keep getting data:

      fTo += frameSize; fMaxSize -= frameSize;

      fNeedDelivery = True;

    }

  }

}

其中几个重要的函数解析如下:

1)  getNextCompletedPacket

 

BufferedPacket* ReorderingPacketBuffer

::getNextCompletedPacket(Boolean& packetLossPreceded) {

  if (fHeadPacket == NULL) return NULL;

 

  // Check whether the next packet we want is already at the head

  // of the queue:

  // ASSERT: fHeadPacket->rtpSeqNo() >= fNextExpectedSeqNo

  if (fHeadPacket->rtpSeqNo() == fNextExpectedSeqNo) {

    packetLossPreceded = fHeadPacket->isFirstPacket();

        // (The very first packet is treated as if there was packet loss beforehand.)

    return fHeadPacket;

  }

 

  // We're still waiting for our desired packet to arrive.  However, if

  // our time threshold has been exceeded, then forget it, and return

  // the head packet instead:

  Boolean timeThresholdHasBeenExceeded;

  if (fThresholdTime == 0) {

    timeThresholdHasBeenExceeded = True; // optimization

  } else {

    struct timeval timeNow;

    gettimeofday(&timeNow, NULL);

    unsigned uSecondsSinceReceived

      = (timeNow.tv_sec - fHeadPacket->timeReceived().tv_sec)*1000000

      + (timeNow.tv_usec - fHeadPacket->timeReceived().tv_usec);

    timeThresholdHasBeenExceeded = uSecondsSinceReceived > fThresholdTime;

  }

  if (timeThresholdHasBeenExceeded) {

    fNextExpectedSeqNo = fHeadPacket->rtpSeqNo();

        // we've given up on earlier packets now

    packetLossPreceded = True;

    return fHeadPacket;

  }

 

  // Otherwise, keep waiting for our desired packet to arrive:

  return NULL;

}

2)  processSpecialHeader

H264的特殊头处理函数如下:

Boolean H264VideoRTPSource

::processSpecialHeader(BufferedPacket* packet,

                       unsigned& resultSpecialHeaderSize) {

  unsigned char* headerStart = packet->data();

  unsigned packetSize = packet->dataSize();

 

  // The header has a minimum size of 0, since the NAL header is used

  // as a payload header

  unsigned expectedHeaderSize = 0;

 

  // Check if the type field is 28 (FU-A) or 29 (FU-B)

  fCurPacketNALUnitType = (headerStart[0]&0x1F);

  switch (fCurPacketNALUnitType) {

  case 24: { // STAP-A

    expectedHeaderSize = 1; // discard the type byte

    break;

  }

  case 25: case 26: case 27: { // STAP-B, MTAP16, or MTAP24

    expectedHeaderSize = 3; // discard the type byte, and the initial DON

    break;

  }

  case 28: case 29: { // // FU-A or FU-B

    // For these NALUs, the first two bytes are the FU indicator and the FU header.

    // If the start bit is set, we reconstruct the original NAL header:

    unsigned char startBit = headerStart[1]&0x80;

    unsigned char endBit = headerStart[1]&0x40;

    if (startBit) {

      expectedHeaderSize = 1;

      if (packetSize < expectedHeaderSize) return False;

 

      headerStart[1] = (headerStart[0]&0xE0)+(headerStart[1]&0x1F);

      fCurrentPacketBeginsFrame = True;

    } else {

      // If the startbit is not set, both the FU indicator and header

      // can be discarded

      expectedHeaderSize = 2;

      if (packetSize < expectedHeaderSize) return False;

      fCurrentPacketBeginsFrame = False;

    }

    fCurrentPacketCompletesFrame = (endBit != 0);

    break;

  }

  default: {

    // This packet contains one or more complete, decodable NAL units

    fCurrentPacketBeginsFrame = fCurrentPacketCompletesFrame = True;

    break;

  }

  }

 

  resultSpecialHeaderSize = expectedHeaderSize;

  return True;

}

3)use将解析的内容放到to的Buff中;

void BufferedPacket::use(unsigned char* to, unsigned toSize,

               unsigned& bytesUsed, unsigned& bytesTruncated,

               unsigned short& rtpSeqNo, unsigned& rtpTimestamp,

               struct timeval& presentationTime,

               Boolean& hasBeenSyncedUsingRTCP,

               Boolean& rtpMarkerBit) {

  unsigned char* origFramePtr = &fBuf[fHead];

  unsigned char* newFramePtr = origFramePtr; // may change in the call below

  unsigned frameSize, frameDurationInMicroseconds;

  getNextEnclosedFrameParameters(newFramePtr, fTail - fHead,

                    frameSize, frameDurationInMicroseconds);

  if (frameSize > toSize) {

    bytesTruncated += frameSize - toSize;

    bytesUsed = toSize;

  } else {

    bytesTruncated = 0;

    bytesUsed = frameSize;

  }

 

  memmove(to, newFramePtr, bytesUsed);

  fHead += (newFramePtr - origFramePtr) + frameSize;

  ++fUseCount;

 

  rtpSeqNo = fRTPSeqNo;

  rtpTimestamp = fRTPTimestamp;

  presentationTime = fPresentationTime;

  hasBeenSyncedUsingRTCP = fHasBeenSyncedUsingRTCP;

  rtpMarkerBit = fRTPMarkerBit;

 

  // Update "fPresentationTime" for the next enclosed frame (if any):

  fPresentationTime.tv_usec += frameDurationInMicroseconds;

  if (fPresentationTime.tv_usec >= 1000000) {

    fPresentationTime.tv_sec += fPresentationTime.tv_usec/1000000;

    fPresentationTime.tv_usec = fPresentationTime.tv_usec%1000000;

  }

}

其中getNextEnclosedFrameParameters函数源码如下:

void BufferedPacket

::getNextEnclosedFrameParameters(unsigned char*& framePtr, unsigned dataSize,

                    unsigned& frameSize,

                    unsigned& frameDurationInMicroseconds) {

  // By default, use the entire buffered data, even though it may consist

  // of more than one frame, on the assumption that the client doesn't

  // care.  (This is more efficient than delivering a frame at a time)

 

  // For backwards-compatibility with existing uses of (the now deprecated)

  // "nextEnclosedFrameSize()", call that function to implement this one:

  frameSize = nextEnclosedFrameSize(framePtr, dataSize);

 

  frameDurationInMicroseconds = 0; // by default.  Subclasses should correct this.

}

nextEnclosedFrameSize函数源码如下:

 

unsigned H264BufferedPacket

::nextEnclosedFrameSize(unsigned char*& framePtr, unsigned dataSize) {

  unsigned resultNALUSize = 0; // if an error occurs

 

  switch (fOurSource.fCurPacketNALUnitType) {

  case 24: case 25: { // STAP-A or STAP-B

    // The first two bytes are NALU size:

    if (dataSize < 2) break;

    resultNALUSize = (framePtr[0]<<8)|framePtr[1];

    framePtr += 2;

    break;

  }

  case 26: { // MTAP16

    // The first two bytes are NALU size.  The next three are the DOND and TS offset:

    if (dataSize < 5) break;

    resultNALUSize = (framePtr[0]<<8)|framePtr[1];

    framePtr += 5;

    break;

  }

  case 27: { // MTAP24

    // The first two bytes are NALU size.  The next four are the DOND and TS offset:

    if (dataSize < 6) break;

    resultNALUSize = (framePtr[0]<<8)|framePtr[1];

    framePtr += 6;

    break;

  }

  default: {

    // Common case: We use the entire packet data:

    return dataSize;

  }

  }

 

  return (resultNALUSize <= dataSize) ? resultNALUSize : dataSize;

}

4)afterGetting

afterGetting表示一个Loop结束,调用的是FramedSource的afterGetting

void FramedSource::afterGetting(FramedSource* source) {

  source->fIsCurrentlyAwaitingData = False;

      // indicates that we can be read again

      // Note that this needs to be done here, in case the "fAfterFunc"

      // called below tries to read another frame (which it usually will)

 

// fAfterGettingFunc就是afterGettingFrame函数

  if (source->fAfterGettingFunc != NULL) {

    (*(source->fAfterGettingFunc))(source->fAfterGettingClientData,

                      source->fFrameSize, source->fNumTruncatedBytes,

                      source->fPresentationTime,

                      source->fDurationInMicroseconds);

  }

}

最终调用表示将Source的数据传递到Sink,由Sink进行处理,

void DummySink::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,

                     struct timeval presentationTime, unsigned durationInMicroseconds) {

  DummySink* sink = (DummySink*)clientData;

  sink->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);

}

void DummySink::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,

                     struct timeval presentationTime, unsigned /*durationInMicroseconds*/) {

  // We've just received a frame of data.  (Optionally) print out information about it:

#ifdef DEBUG_PRINT_EACH_RECEIVED_FRAME

  if (fStreamId != NULL) envir() << "Stream "" << fStreamId << ""; ";

  envir() << fSubsession.mediumName() << "/" << fSubsession.codecName() << ": Received " << frameSize << " bytes";

  if (numTruncatedBytes > 0) envir() << " (with " << numTruncatedBytes << " bytes truncated)";

  char uSecsStr[6+1]; // used to output the 'microseconds' part of the presentation time

  sprintf(uSecsStr, "%06u", (unsigned)presentationTime.tv_usec);

  envir() << ". Presentation time: " << (int)presentationTime.tv_sec << "." << uSecsStr;

  if (fSubsession.rtpSource() != NULL && !fSubsession.rtpSource()->hasBeenSynchronizedUsingRTCP()) {

    envir() << "!"; // mark the debugging output to indicate that this presentation time is not RTCP-synchronized

  }

#ifdef DEBUG_PRINT_NPT

  envir() << " NPT: " << fSubsession.getNormalPlayTime(presentationTime);

#endif

  envir() << " ";

#endif

 

  // Then continue, to request the next frame of data:

 //再次调用continuePlaying()获取下一帧数据

  continuePlaying();

}

其中还有一个很重要的解析sps/pps函数,源码如下:

解析Sps/Pps函数:

SPropRecord* parseSPropParameterSets(char const* sPropParameterSetsStr,

                                     // result parameter:

                                     unsigned& numSPropRecords) {

  // Make a copy of the input string, so we can replace the commas with ''s:

  char* inStr = strDup(sPropParameterSetsStr);

  if (inStr == NULL) {

    numSPropRecords = 0;

    return NULL;

  }

 

  // Count the number of commas (and thus the number of parameter sets):

  numSPropRecords = 1;

  char* s;

  for (s = inStr; *s != ''; ++s) {

    if (*s == ',') {

      ++numSPropRecords;

      *s = '';

    }

  }

 

  // Allocate and fill in the result array:

  SPropRecord* resultArray = new SPropRecord[numSPropRecords];

  s = inStr;

  for (unsigned i = 0; i < numSPropRecords; ++i) {

    resultArray[i].sPropBytes = base64Decode(s, resultArray[i].sPropLength);

    s += strlen(s) + 1;

  }

 

  delete[] inStr;

  return resultArray;

}

 

from:http://blog.csdn.net/smilestone_322/article/details/18940005

原文地址:https://www.cnblogs.com/lidabo/p/4483524.html