licode学习之erizo篇--Pipeline_handle

erizo的pipeline的handle,是媒体数据处理的基本操作,handle分为3类:IN,OUT,BOTH

IN:数据进入handle,handle需要read数据并传递给下一级

OUT:数据进入handle,handle需要write数据并传递给下一级

BOTH:可以同时进行read和write

在宏观语义上,IN的目标是输出rtp裸数据;OUT的目标是封装rtp裸数据

pipeline的handle的继承体系如下:

handle有三种:InboundHandler、Handler、OutboundHandler;他们分别对应一个Context

InboundHandler==>InboundContextImpl

Handler==>ContextImpl

OutboundHandler==>OutboundContextImpl

他们的工作方式差不多,就挑一个InboundHandler来进行说明。pipeline有addFront方法,该方法是注册Handler的,先看看这个方法的相关实现:

template <class H>
PipelineBase& PipelineBase::addFront(std::shared_ptr<H> handler) {
  typedef typename ContextType<H>::type Context;
  return addHelper(
      std::make_shared<Context>(shared_from_this(), std::move(handler)),
      true);
}
template <class Context>
PipelineBase& PipelineBase::addHelper(
    std::shared_ptr<Context>&& ctx,  // NOLINT
    bool front) {
  ctxs_.insert(front ? ctxs_.begin() : ctxs_.end(), ctx);
  if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_IN) {
    inCtxs_.insert(front ? inCtxs_.begin() : inCtxs_.end(), ctx.get());
  }
  if (Context::dir == HandlerDir_BOTH || Context::dir == HandlerDir_OUT) {
    outCtxs_.insert(front ? outCtxs_.begin() : outCtxs_.end(), ctx.get());
  }
  return *this;
}

addFront的功能,就是创建一个Context的对象,并将之传递给addHelper为参数。Context的构造还被传递了Handler自己的指针,让Context能够知道handler

addHelper将Context存储起来,并且判断类型,分别放到inCtxs_和outCtxs_里面,待用

Context的实际类型是啥呢?

template <class Handler>
struct ContextType {
  typedef typename std::conditional<
    Handler::dir == HandlerDir_BOTH,
    ContextImpl<Handler>,
    typename std::conditional<
      Handler::dir == HandlerDir_IN,
      InboundContextImpl<Handler>,
      OutboundContextImpl<Handler>
    >::type>::type
  type;
};

Context的实际类型,要依托于Handler::dir成员的值,这个值是一个常量,是每个Handler都有的。

HandlerDir_IN:Context为InboundContextImpl

HandlerDir_BOTH:Context为ContextImpl

HandlerDir_其他:Context为OutboundContextImpl

所以,pipeline的addFront方法,实际上是创建HandlerContext的实例,并且将之存储的过程。

pipeline的数据链路建立,是怎么样的呢

void Pipeline::finalize() {
  front_ = nullptr;
  if (!inCtxs_.empty()) {
    front_ = dynamic_cast<InboundLink*>(inCtxs_.front());
    for (size_t i = 0; i < inCtxs_.size() - 1; i++) {
      inCtxs_[i]->setNextIn(inCtxs_[i+1]);
    }
    inCtxs_.back()->setNextIn(nullptr);
  }

  back_ = nullptr;
  if (!outCtxs_.empty()) {
    back_ = dynamic_cast<OutboundLink*>(outCtxs_.back());
    for (size_t i = outCtxs_.size() - 1; i > 0; i--) {
      outCtxs_[i]->setNextOut(outCtxs_[i-1]);
    }
    outCtxs_.front()->setNextOut(nullptr);
  }

  if (!front_) {
    // detail::logWarningIfNotUnit<R>(
    //     "No inbound handler in Pipeline, inbound operations will throw "
    //     "std::invalid_argument");
  }
  if (!back_) {
    // detail::logWarningIfNotUnit<W>(
    //     "No outbound handler in Pipeline, outbound operations will throw "
    //     "std::invalid_argument");
  }

  for (auto it = ctxs_.rbegin(); it != ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  for (auto it = service_ctxs_.rbegin(); it != service_ctxs_.rend(); it++) {
    (*it)->attachPipeline();
  }

  notifyUpdate();
}

pipeline里面的finalize方法为In和Out的Context设置任务链,并且设置头结点(front_,back_)之后,每个HandlerContext就知道自己的下一级任务是什么了

还需要了解一下触发方式:

MediaStream的OnTransportData获得packet数据

void MediaStream::onTransportData(std::shared_ptr<DataPacket> incoming_packet, Transport *transport) {
  if ((audio_sink_ == nullptr && video_sink_ == nullptr && fb_sink_ == nullptr)) {
    return;
  }

  std::shared_ptr<DataPacket> packet = std::make_shared<DataPacket>(*incoming_packet);

  if (transport->mediaType == AUDIO_TYPE) {
    packet->type = AUDIO_PACKET;
  } else if (transport->mediaType == VIDEO_TYPE) {
    packet->type = VIDEO_PACKET;
  }
  auto stream_ptr = shared_from_this();

  worker_->task([stream_ptr, packet]{
    if (!stream_ptr->pipeline_initialized_) {
      ELOG_DEBUG("%s message: Pipeline not initialized yet.", stream_ptr->toLog());
      return;
    }

    char* buf = packet->data;
    RtpHeader *head = reinterpret_cast<RtpHeader*> (buf);
    RtcpHeader *chead = reinterpret_cast<RtcpHeader*> (buf);
    if (!chead->isRtcp()) {
      uint32_t recvSSRC = head->getSSRC();
      if (stream_ptr->isVideoSourceSSRC(recvSSRC)) {
        packet->type = VIDEO_PACKET;
      } else if (stream_ptr->isAudioSourceSSRC(recvSSRC)) {
        packet->type = AUDIO_PACKET;
      }
    }

    if (stream_ptr->pipeline_) {
      stream_ptr->pipeline_->read(std::move(packet));
    }
  });
}

在里面调用了pipeline的read方法,还需要知道read里面做什么了

void Pipeline::read(std::shared_ptr<DataPacket> packet) {
  if (!front_) {
    return;
  }
  front_->read(std::move(packet));
}

调用了front的read,front是一个HandlerContext对象,并且是处理链的头结点

  void read(std::shared_ptr<DataPacket> packet) override {
    auto guard = this->pipelineWeak_.lock();
    this->handler_->read(this, std::move(packet));
  }

在HandlerContext里面调用了handler_的read方法,并且把自己作为参数也同时传递给了handler。

之后找一个真正的handler对象,看一下它的read实现:

void LayerDetectorHandler::read(Context *ctx, std::shared_ptr<DataPacket> packet) {
  RtcpHeader *chead = reinterpret_cast<RtcpHeader*>(packet->data);
  if (!chead->isRtcp() && enabled_ && packet->type == VIDEO_PACKET) {
    if (packet->codec == "VP8") {
      parseLayerInfoFromVP8(packet);
    } else if (packet->codec == "VP9") {
      parseLayerInfoFromVP9(packet);
    } else if (packet->codec == "H264") {
      parseLayerInfoFromH264(packet);
    }
  }
  ctx->fireRead(std::move(packet));
}

在handler的read里面,再次调用了ctx的fireRead方法,并且把packet进行传递

  void fireRead(std::shared_ptr<DataPacket> packet) override {
    auto guard = this->pipelineWeak_.lock();
    if (this->nextIn_) {
      this->nextIn_->read(std::move(packet));
    }
  }

在fireRead中,调用了nextIn_的read方法,nextIn_是下一个HandlerContext。

这样就形成了一个任务链。

ContextRead-->HanderRead-->Context fireRead-->Context next read-->ContextRead.........

形成一个read的任务链。

write的基本原理也是相似的。

总结:erizo的pipeline的handler是负责实际数据处理的,通过处理链路,将之串联起来

原文地址:https://www.cnblogs.com/limedia/p/licode_erizo_pipeline_handler.html