SRS源码——UDP

srs_app_server.cpp 

int SrsServer::listen()
{
    int ret = ERROR_SUCCESS;
    
    if ((ret = listen_rtmp()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_api()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_http_stream()) != ERROR_SUCCESS) {
        return ret;
    }
    
    if ((ret = listen_stream_caster()) != ERROR_SUCCESS) {
        return ret;
    }
    
    return ret;
}

UDP的侦听是在 listen_stream_caster() 中

int SrsServer::listen_stream_caster()
{
    int ret = ERROR_SUCCESS;
    
#ifdef SRS_AUTO_STREAM_CASTER
    close_listeners(SrsListenerMpegTsOverUdp);
    
    std::vector<SrsConfDirective*>::iterator it;
    std::vector<SrsConfDirective*> stream_casters = _srs_config->get_stream_casters();

    for (it = stream_casters.begin(); it != stream_casters.end(); ++it) {
        SrsConfDirective* stream_caster = *it;
        if (!_srs_config->get_stream_caster_enabled(stream_caster)) {
            continue;
        }

        SrsListener* listener = NULL;

        std::string caster = _srs_config->get_stream_caster_engine(stream_caster);
        if (srs_stream_caster_is_udp(caster)) {
            listener = new SrsUdpCasterListener(this, SrsListenerMpegTsOverUdp, stream_caster);
        } else if (srs_stream_caster_is_rtsp(caster)) {
            listener = new SrsRtspListener(this, SrsListenerRtsp, stream_caster);
        } else if (srs_stream_caster_is_flv(caster)) {
            listener = new SrsHttpFlvListener(this, SrsListenerFlv, stream_caster);
        } else {
            ret = ERROR_STREAM_CASTER_ENGINE;
            srs_error("unsupported stream caster %s. ret=%d", caster.c_str(), ret);
            return ret;
        }
        srs_assert(listener != NULL);

        listeners.push_back(listener);
        
        int port = _srs_config->get_stream_caster_listen(stream_caster);
        if (port <= 0) {
            ret = ERROR_STREAM_CASTER_PORT;
            srs_error("invalid stream caster port %d. ret=%d", port, ret);
            return ret;
        }
        
        // TODO: support listen at <[ip:]port>
        if ((ret = listener->listen("0.0.0.0", port)) != ERROR_SUCCESS) {
            srs_error("StreamCaster listen at port %d failed. ret=%d", port, ret);
            return ret;
        }
    }
#endif
    
    return ret;
}

注意首先要在配置文档里有UDP的配置,可以参考 push.mpegts.over.udp.conf

new 了一个 SrsUdpCasterListener 类实例,并调用 listen()

SrsUdpCasterListener 继承自 SrsUdpStreamListener 没有override listen,

int SrsUdpStreamListener::listen(string i, int p)
{
    int ret = ERROR_SUCCESS;

    // the caller already ensure the type is ok,
    // we just assert here for unknown stream caster.
    srs_assert(type == SrsListenerMpegTsOverUdp);
    
    ip = i;
    port = p;

    srs_freep(listener);
    listener = new SrsUdpListener(caster, ip, port);

    if ((ret = listener->listen()) != ERROR_SUCCESS) {
        srs_error("udp caster listen failed. ret=%d", ret);
        return ret;
    }
    
    srs_info("listen thread current_cid=%d, "
        "listen at port=%d, type=%d, fd=%d started success, ep=%s:%d",
        _srs_context->get_id(), p, type, listener->fd(), i.c_str(), p);
    
    // notify the handler the fd changed.
    if ((ret = caster->on_stfd_change(listener->stfd())) != ERROR_SUCCESS) {
        srs_error("notify handler fd changed. ret=%d", ret);
        return ret;
    }

    srs_trace("%s listen at udp://%s:%d, fd=%d", srs_listener_type2string(type).c_str(), ip.c_str(), port, listener->fd());

    return ret;
}

又new了一个SrsUdpListener 调用 listen(),里面具体实现就是绑定socket

srs_app_listener.cpp

SrsUdpListener::cycle() 不停的接受数据,并响应:

int SrsUdpListener::cycle()
{
    int ret = ERROR_SUCCESS;

    // TODO: FIXME: support ipv6, @see man 7 ipv6
    sockaddr_in from;
    int nb_from = sizeof(sockaddr_in);
    int nread = 0;

    if ((nread = st_recvfrom(_stfd, buf, nb_buf, (sockaddr*)&from, &nb_from, ST_UTIME_NO_TIMEOUT)) <= 0) {
        srs_warn("ignore recv udp packet failed, nread=%d", nread);
        return ret;
    }
    
    if ((ret = handler->on_udp_packet(&from, buf, nread)) != ERROR_SUCCESS) {
        srs_warn("handle udp packet failed. ret=%d", ret);
        return ret;
    }

    if (SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS > 0) {
        st_usleep(SRS_UDP_PACKET_RECV_CYCLE_INTERVAL_MS * 1000);
    }

    return ret;
}

其中的hanler就是SrsUdpStreamListener::caster 也即 SrsMpegtsOverUdp 类的实例

srs_app_mpegts_udp.cpp

其中的 on_udp_packet() 响应接收数据事件,并保存到 buffer 变量中,最后调用 on_udp_bytes() 进行处理

int SrsMpegtsOverUdp::on_udp_packet(sockaddr_in* from, char* buf, int nb_buf)
{
    std::string peer_ip = inet_ntoa(from->sin_addr);
    int peer_port = ntohs(from->sin_port);

    // append to buffer.
    buffer->append(buf, nb_buf);

    srs_info("udp: got %s:%d packet %d/%d bytes",
        peer_ip.c_str(), peer_port, nb_buf, buffer->length());
        
    return on_udp_bytes(peer_ip, peer_port, buf, nb_buf);
}

on_udp_bytes 收到数据后进行处理:

int SrsMpegtsOverUdp::on_udp_bytes(string host, int port, char* buf, int nb_buf)
{
    // ...

    // use stream to parse ts packet.
    int nb_packet =  buffer->length() / SRS_TS_PACKET_SIZE;
    for (int i = 0; i < nb_packet; i++) {
        char* p = buffer->bytes() + (i * SRS_TS_PACKET_SIZE);
        if ((ret = stream->initialize(p, SRS_TS_PACKET_SIZE)) != ERROR_SUCCESS) {
            return ret;
        }

        // process each ts packet
        if ((ret = context->decode(stream, this)) != ERROR_SUCCESS) {
            srs_warn("mpegts: ignore parse ts packet failed. ret=%d", ret);
            continue;
        }
        srs_info("mpegts: parse ts packet completed");
    }
    srs_info("mpegts: parse udp packet completed");

    // erase consumed bytes
    if (nb_packet > 0) {
        buffer->erase(nb_packet * SRS_TS_PACKET_SIZE);
    }

    return ret;
}

注意参数里的buf没有用到,而是用的成员变量 buffer

首先,将数据流切成ts组(188字节,sync byte 0x47已经切除)

然后,调用 SrsStream 类来存储每个 ts 分组(其实就是封装一下)

最后,调用 SrsTsContext::decode 解码

int SrsTsContext::decode(SrsStream* stream, ISrsTsHandler* handler)
{
    int ret = ERROR_SUCCESS;

    // parse util EOF of stream.
    // for example, parse multiple times for the PES_packet_length(0) packet.
    while (!stream->empty()) {
        SrsTsPacket* packet = new SrsTsPacket(this);
        SrsAutoFree(SrsTsPacket, packet);

        SrsTsMessage* msg = NULL;
        if ((ret = packet->decode(stream, &msg)) != ERROR_SUCCESS) {
            srs_error("mpegts: decode ts packet failed. ret=%d", ret);
            return ret;
        }

        if (!msg) {
            continue;
        }
        SrsAutoFree(SrsTsMessage, msg);

        if ((ret = handler->on_ts_message(msg)) != ERROR_SUCCESS) {
            srs_error("mpegts: handler ts message failed. ret=%d", ret);
            return ret;
        }
    }

    return ret;
}

参数 handler 就是 SrsMpegtsOverUdp 本身

首先还是调用 SrsTsPacket::decode 进行解码

int SrsTsPacket::decode(SrsStream* stream, SrsTsMessage** ppmsg)
{
    int ret = ERROR_SUCCESS;

    int pos = stream->pos();

    // 4B ts packet header.
    if (!stream->require(4)) {
        ret = ERROR_STREAM_CASTER_TS_HEADER;
        srs_error("ts: demux header failed. ret=%d", ret);
        return ret;
    }

    sync_byte = stream->read_1bytes();
    if (sync_byte != 0x47) {
        ret = ERROR_STREAM_CASTER_TS_SYNC_BYTE;
        srs_error("ts: sync_bytes must be 0x47, actual=%#x. ret=%d", sync_byte, ret);
        return ret;
    }
    
    int16_t pidv = stream->read_2bytes();
    transport_error_indicator = (pidv >> 15) & 0x01;
    payload_unit_start_indicator = (pidv >> 14) & 0x01;
    transport_priority = (pidv >> 13) & 0x01;
    pid = (SrsTsPid)(pidv & 0x1FFF);

    int8_t ccv = stream->read_1bytes();
    transport_scrambling_control = (SrsTsScrambled)((ccv >> 6) & 0x03);
    adaption_field_control = (SrsTsAdaptationFieldType)((ccv >> 4) & 0x03);
    continuity_counter = ccv & 0x0F;

    // TODO: FIXME: create pids map when got new pid.
    
    srs_info("ts: header sync=%#x error=%d unit_start=%d priotiry=%d pid=%d scrambling=%d adaption=%d counter=%d",
        sync_byte, transport_error_indicator, payload_unit_start_indicator, transport_priority, pid,
        transport_scrambling_control, adaption_field_control, continuity_counter);

    // optional: adaptation field
    if (adaption_field_control == SrsTsAdaptationFieldTypeAdaptionOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) {
        srs_freep(adaptation_field);
        adaptation_field = new SrsTsAdaptationField(this);

        if ((ret = adaptation_field->decode(stream)) != ERROR_SUCCESS) {
            srs_error("ts: demux af faield. ret=%d", ret);
            return ret;
        }
        srs_verbose("ts: demux af ok.");
    }

    // calc the user defined data size for payload.
    int nb_payload = SRS_TS_PACKET_SIZE - (stream->pos() - pos);

    // optional: payload.
    if (adaption_field_control == SrsTsAdaptationFieldTypePayloadOnly || adaption_field_control == SrsTsAdaptationFieldTypeBoth) {
        if (pid == SrsTsPidPAT) {
            // 2.4.4.3 Program association Table
            srs_freep(payload);
            payload = new SrsTsPayloadPAT(this);
        } else {
            SrsTsChannel* channel = context->get(pid);
            if (channel && channel->apply == SrsTsPidApplyPMT) {
                // 2.4.4.8 Program Map Table
                srs_freep(payload);
                payload = new SrsTsPayloadPMT(this);
            } else if (channel && (channel->apply == SrsTsPidApplyVideo || channel->apply == SrsTsPidApplyAudio)) {
                // 2.4.3.6 PES packet
                srs_freep(payload);
                payload = new SrsTsPayloadPES(this);
            } else {
                // left bytes as reserved.
                stream->skip(nb_payload);
            }
        }

        if (payload && (ret = payload->decode(stream, ppmsg)) != ERROR_SUCCESS) {
            srs_error("ts: demux payload failed. ret=%d", ret);
            return ret;
        }
    }

    return ret;
}

先分析数据得到 Packet ID ,然后new出对应的解码类。

解码得到 SrsTsMessage,再回到 SrsMpegtsOverUdp::on_ts_message 进行处理

int SrsMpegtsOverUdp::on_ts_message(SrsTsMessage* msg)
{ 
    // ...

    // parse the stream.
    SrsStream avs;
    if ((ret = avs.initialize(msg->payload->bytes(), msg->payload->length())) != ERROR_SUCCESS) {
        srs_error("mpegts: initialize av stream failed. ret=%d", ret);
        return ret;
    }

    // publish audio or video.
    if (msg->channel->stream == SrsTsStreamVideoH264) {
        return on_ts_video(msg, &avs);
    }
    if (msg->channel->stream == SrsTsStreamAudioAAC) {
        return on_ts_audio(msg, &avs);
    }

    // TODO: FIXME: implements it.
    return ret;
}

其重点是把 ts message 中的 payload 数据封装在 SrsStream 中,

然后调用 on_ts_video 和 on_ts_audio 分别进行处理。

对于video来说已经解码的是H.264流,所以进行了frame提取,并重新封装成flv格式,再通过rtmp转发出去。

对于audio,是对AAC数据提取frame,再封装成ADTS进行传输。

原文地址:https://www.cnblogs.com/zoneofmine/p/10893497.html