#pragma once #include <iostream> #include <string> #include <thread> #include <mutex> //过于简单了 // readindex he writeindex class CGBlockRingBuffer { public: CGBlockRingBuffer() :buffer(nullptr) , bufferSize(0) , writeIndex(0) , readIndex(0) , bytesCanRead(0) { } void Init(int buffSize = 1024*1024) { printf("block init--%d-->", buffSize); buffer = new uint8_t[buffSize]; if (!buffer) return; this->bufferSize = buffSize; } void Write(uint8_t* src, int size) { printf("want write %d ,%d %d, bsize %d \n",size,readIndex,writeIndex, bufferSize); if (!src || size == 0) return; std::lock_guard<std::mutex> lk(lock); if (size >= bufferSize - bytesCanRead) { std::this_thread::sleep_for(std::chrono::microseconds(50)); printf("buffer now is full %d,%d,%d!", size, bufferSize, bytesCanRead); return; } int bytesCanWrite = size < bufferSize - bytesCanRead ? size : (bufferSize - bytesCanRead); //if(bytesCanWrite) 如果可写的小于--- 则丢弃-- //这个 if (bytesCanWrite <= bufferSize - writeIndex) { memcpy(buffer + writeIndex, src, bytesCanWrite); writeIndex += bytesCanWrite; if (writeIndex == bufferSize) { writeIndex = 0; } } else { int room = bufferSize - writeIndex; memcpy(buffer + writeIndex, src, room); int left = bytesCanWrite - room; memcpy(buffer, src + room, left); writeIndex = left; } bytesCanRead += bytesCanWrite; printf("real write %d \n", bytesCanWrite); } //先这样--to thi // 如果没有找到的话sleep一会儿---,to thi //1、先直接换ring 2、加这个优化 int getNotRead(uint8_t* dst, int size) { printf("getNotRead %d\n", size); //printf("dst %d\n", dst); if (size > bytesCanRead - 1024 * 50) { std::this_thread::sleep_for(std::chrono::microseconds(50)); printf("getNotRead Buffer isn't enough!---->"); return 0; } std::lock_guard<std::mutex> lk(lock); int bytesRead = size < bytesCanRead ? size : bytesCanRead; if (bytesRead <= bufferSize - readIndex) { memcpy(dst, buffer + readIndex, bytesRead); //readIndex += bytesRead; //if (readIndex == bufferSize) readIndex = 0; } else { int bytesHead = bufferSize - readIndex; memcpy(dst, buffer + readIndex, bytesHead); int bytesTail = bytesRead - bytesHead; memcpy(dst + bytesHead, buffer, bytesTail); //readIndex = bytesTail; } //bytesCanRead -= bytesRead; return bytesRead; } int Read(uint8_t* dst, int size) { if (5120 == size) { int i = 0; i++; } // std::this_thread::sleep_for(std::chrono::microseconds(30)); //read小于则等待 //就看这两个的-- printf("want Read %d ,%d %d \n", size, readIndex, writeIndex); if (!dst || size == 0) return 0; std::lock_guard<std::mutex> lk(lock); //必须50k缓冲 if (size > bytesCanRead-1024*10) { std::this_thread::sleep_for(std::chrono::microseconds(50)); printf("Buffer is empty!---->"); return 0; } int bytesRead = size < bytesCanRead ? size : bytesCanRead; if (bytesRead <= bufferSize - readIndex) { memcpy(dst, buffer + readIndex, bytesRead); readIndex += bytesRead; if (readIndex == bufferSize) readIndex = 0; } else { int bytesHead = bufferSize - readIndex; memcpy(dst, buffer + readIndex, bytesHead); int bytesTail = bytesRead - bytesHead; memcpy(dst + bytesHead, buffer, bytesTail); readIndex = bytesTail; } bytesCanRead -= bytesRead; printf("real Read %d\n", bytesRead); return bytesRead; } private: uint8_t* buffer; int bufferSize; int readIndex; int writeIndex; int bytesCanRead; std::mutex lock; };
#ifndef _CRT_SECURE_NO_WARNINGS #define _CRT_SECURE_NO_WARNINGS #endif /* #include "xop/HttpFlvServer.h" #include "xop/HttpFlvServer.h" */ #include "xop/RtmpServer.h" #include "xop/RtmpPublisher.h" #include "xop/RtmpClient.h" #include "xop/H264Parser.h" #include "net/EventLoop.h" #include "CGBlockRingBuffer.h" #define TEST_RTMP_PUSHER 1 //#define TEST_RTMP_CLIENT 0 //#define TEST_MULTI_THREAD 0 #define RTMP_URL "rtmp://127.0.0.1:1936/live/02" //#define RTMP_URL "rtmp://192.168.5.100:1935/live/01" #define PUSH_FILE "./test.h264" //#define PUSH_FILE "./mcodec1214.h264" //#define PUSH_FILE "./oneFrame.h264" //#define HTTP_URL "http://127.0.0.1:8080/live/01.flv" int TestRtmpPublisher(xop::EventLoop *event_loop); void TestMyRtmpPublisher(); xop::EventLoop* g_event_loop = NULL; CGBlockRingBuffer* pRingBuffer; #define INIT_BUFFER_SIZE 1*1024*1024 void TestMyWriteData(); int main(int argc, char **argv) { //----- pRingBuffer = new CGBlockRingBuffer(); pRingBuffer->Init(INIT_BUFFER_SIZE); //----- int count = 1; /* #if TEST_MULTI_THREAD count = std::thread::hardware_concurrency(); #endif */ xop::EventLoop event_loop(count); /* rtmp server example */ auto rtmp_server = xop::RtmpServer::Create(&event_loop); rtmp_server->SetChunkSize(60000); //rtmp_server->SetGopCache(); /* enable gop cache */ rtmp_server->SetEventCallback([](std::string type, std::string stream_path) { printf("[Event] %s, stream path: %s\n\n", type.c_str(), stream_path.c_str()); }); if (!rtmp_server->Start("0.0.0.0", 1936)) { printf("RTMP Server listen on 1935 failed.\n"); } g_event_loop = &event_loop; #if TEST_RTMP_PUSHER // rtmp pusher example std::thread t([&event_loop] () { //TestRtmpPublisher(&event_loop); TestMyRtmpPublisher(); }); t.detach(); #endif std::thread t1([&event_loop]() { //TestRtmpPublisher(&event_loop); TestMyWriteData(); }); t1.detach(); /* #if TEST_RTMP_CLIENT auto rtmp_client = xop::RtmpClient::Create(&event_loop); rtmp_client->SetFrameCB([](uint8_t* payload, uint32_t length, uint8_t codecId, uint32_t timestamp) { printf("recv frame, type:%u, size:%u,\n", codecId, length); }); std::string status; if (rtmp_client->OpenUrl(RTMP_URL, 3000, status) != 0) { printf("Open url %s failed, status: %s\n", RTMP_URL, status.c_str()); } #endif */ while (1) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); } rtmp_server->Stop(); //http_flv_server.Stop(); return 0; } class H264File { public: H264File(int bufSize = 5000000); ~H264File(); bool open(const char *path); void Close(); bool isOpened() const { return (m_file != NULL); } int readFrame(char *inBuf, int inBufSize, bool *bEndOfFrame); int readFrameFromGbuf(char* inBuf, int inBufSize, bool* bEndOfFrame); int readfileToGBbuf(); private: FILE *m_file = NULL; char *m_buf = NULL; int m_bufSize = 0; int m_bytesUsed = 0; int m_count = 0; }; H264File::H264File(int bufSize) : m_bufSize(bufSize) { m_buf = new char[m_bufSize]; } H264File::~H264File() { delete m_buf; } bool H264File::open(const char *path) { m_file = fopen(path, "rb"); if (m_file == NULL) { return false; } return true; } void H264File::Close() { if (m_file) { fclose(m_file); m_file = NULL; m_count = 0; m_bytesUsed = 0; } } /* int pushOneFrame(char* inBuf, int inBufSize) { pRingBuffer->Write((unsigned char*)inBuf, inBufSize); return 1; } */ #define GBUFF_MAX_SIZE 10*1024 * 1024 char gbuf[GBUFF_MAX_SIZE] = { 0 }; int gbufDatasize = 0; int gBytesRead = 0; // ---- 模拟流 void TestMyWriteData() { while (1) { if (gbufDatasize <= 0) continue; int tmpOneStepRead = (5*1024); if (gBytesRead >= gbufDatasize) gBytesRead = 0; if (gbufDatasize - gBytesRead < (5 * 1024)) tmpOneStepRead = gbufDatasize - gBytesRead; pRingBuffer->Write((unsigned char*)(gbuf + gBytesRead), tmpOneStepRead); gBytesRead += tmpOneStepRead; //printf("write %d", tmpOneStepRead); std::this_thread::sleep_for(std::chrono::milliseconds(30)); } } int H264File::readfileToGBbuf() { fseek(m_file, 0, SEEK_SET); //fseek(m_file, 31, SEEK_SET); int bytesRead = (int)fread(gbuf, 1, GBUFF_MAX_SIZE, m_file); gbufDatasize = bytesRead; fclose(m_file); return 0; } //--- int H264File::readFrameFromGbuf(char* inBuf, int inBufSize, bool* bEndOfFrame) { int tmpOneStepRead = pRingBuffer->getNotRead((UINT8*)m_buf, inBufSize); int bytesRead = tmpOneStepRead; bool bFindStart = false, bFindEnd = false; int i = 0, startCode = 3; *bEndOfFrame = false; for (i = 0; i < bytesRead - 5; i++) { if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 1) { startCode = 3; } else if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 0 && m_buf[i + 3] == 1) { startCode = 4; } else { continue; } if (((m_buf[i + startCode] & 0x1F) == 0x5 || (m_buf[i + startCode] & 0x1F) == 0x1) && ((m_buf[i + startCode + 1] & 0x80) == 0x80)) { bFindStart = true; i += 4; break; } } for (; i < bytesRead - 5; i++) { if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 1) { startCode = 3; } else if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 0 && m_buf[i + 3] == 1) { startCode = 4; } else { continue; } if (((m_buf[i + startCode] & 0x1F) == 0x7) || ((m_buf[i + startCode] & 0x1F) == 0x8) || ((m_buf[i + startCode] & 0x1F) == 0x6) || (((m_buf[i + startCode] & 0x1F) == 0x5 || (m_buf[i + startCode] & 0x1F) == 0x1) && ((m_buf[i + startCode + 1] & 0x80) == 0x80))) { bFindEnd = true; break; } } bool flag = false; if (bFindStart && !bFindEnd && m_count > 0) { flag = bFindEnd = true; i = bytesRead; *bEndOfFrame = true; printf("! find\n"); } if (!bFindStart || !bFindEnd) { printf("!not find\n"); this->Close(); return -1; } int size = (i <= inBufSize ? i : inBufSize); int ret = pRingBuffer->Read((unsigned char*)inBuf, size); if (ret != size) { printf("ret %d != size %d\n", ret, size); } if (!flag) { m_count += 1; m_bytesUsed += i; } else { m_count = 0; m_bytesUsed = 0; //????? } //fseek(m_file, m_bytesUsed, SEEK_SET); //gBytesRead = m_bytesUsed; //--- ��� 20211222 18:01 return size; } int H264File::readFrame(char *inBuf, int inBufSize, bool *bEndOfFrame) { if (m_file == NULL) { return -1; } //if(0 == gi) fseek(m_file, 31, SEEK_SET); int bytesRead = (int)fread(m_buf, 1, m_bufSize, m_file); if (bytesRead == 0) { //-- printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); //return -2; //-- fseek(m_file, 0, SEEK_SET); //fseek(m_file, 31, SEEK_SET); m_count = 0; m_bytesUsed = 0; bytesRead = (int)fread(m_buf, 1, m_bufSize, m_file); if (bytesRead == 0) { this->Close(); return -1; } } bool bFindStart = false, bFindEnd = false; int i = 0, startCode = 3; *bEndOfFrame = false; for (i = 0; i < bytesRead - 5; i++) { if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 1) { startCode = 3; } else if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 0 && m_buf[i + 3] == 1) { startCode = 4; } else { continue; } if (((m_buf[i + startCode] & 0x1F) == 0x5 || (m_buf[i + startCode] & 0x1F) == 0x1) && ((m_buf[i + startCode + 1] & 0x80) == 0x80)) { bFindStart = true; i += 4; break; } } for (; i < bytesRead - 5; i++) { if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 1) { startCode = 3; } else if (m_buf[i] == 0 && m_buf[i + 1] == 0 && m_buf[i + 2] == 0 && m_buf[i + 3] == 1) { startCode = 4; } else { continue; } if (((m_buf[i + startCode] & 0x1F) == 0x7) || ((m_buf[i + startCode] & 0x1F) == 0x8) || ((m_buf[i + startCode] & 0x1F) == 0x6) || (((m_buf[i + startCode] & 0x1F) == 0x5 || (m_buf[i + startCode] & 0x1F) == 0x1) && ((m_buf[i + startCode + 1] & 0x80) == 0x80))) { bFindEnd = true; break; } } bool flag = false; if (bFindStart && !bFindEnd && m_count > 0) { flag = bFindEnd = true; i = bytesRead; *bEndOfFrame = true; } if (!bFindStart || !bFindEnd) { this->Close(); return -1; } int size = (i <= inBufSize ? i : inBufSize); memcpy(inBuf, m_buf, size); if (!flag) { m_count += 1; m_bytesUsed += i; } else { m_count = 0; m_bytesUsed = 0; } fseek(m_file, m_bytesUsed, SEEK_SET); return size; } // std::shared_ptr<xop::RtmpPublisher> publisher; xop::MediaInfo media_info; bool has_sps_pps = false; int InitPub() { if (NULL == g_event_loop) return -1; printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); H264File h264_file; if (!h264_file.open(PUSH_FILE)) { printf("Open %s failed.\n", PUSH_FILE); return -1; } /* push stream to local rtmp server */ xop::MediaInfo media_info; //auto publisher publisher = xop::RtmpPublisher::Create(g_event_loop); publisher->SetChunkSize(60000); std::string status; if (publisher->OpenUrl(RTMP_URL, 3000, status) < 0) { printf("Open url %s failed, status: %s\n", RTMP_URL, status.c_str()); return -1; } return 0; } void procFrame(char* inBuf, int inBufSize, bool* bEndOfFrame) { //int buf_size = 500000; //uint8_t* frame_buf = new uint8_t[buf_size]; uint8_t* frame_buf = (uint8_t*)inBuf; bool end_of_frame = *bEndOfFrame; if (!publisher->IsConnected()) { printf("return\n"); return; } //printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); int frameSize = inBufSize;//h264_file.readFrame((char*)frame_buf, buf_size, &end_of_frame); if (frameSize > 0) { if (!has_sps_pps) { if (frame_buf[3] == 0x67 || frame_buf[4] == 0x67) { xop::Nal sps = xop::H264Parser::findNal(frame_buf, frameSize); if (sps.first != nullptr && sps.second != nullptr && *sps.first == 0x67) { media_info.sps_size = (uint32_t)(sps.second - sps.first + 1); media_info.sps.reset(new uint8_t[media_info.sps_size], std::default_delete<uint8_t[]>()); memcpy(media_info.sps.get(), sps.first, media_info.sps_size); xop::Nal pps = xop::H264Parser::findNal(sps.second, frameSize - (int)(sps.second - frame_buf)); if (pps.first != nullptr && pps.second != nullptr && *pps.first == 0x68) { media_info.pps_size = (uint32_t)(pps.second - pps.first + 1); media_info.pps.reset(new uint8_t[media_info.pps_size], std::default_delete<uint8_t[]>()); memcpy(media_info.pps.get(), pps.first, media_info.pps_size); has_sps_pps = true; publisher->SetMediaInfo(media_info); /* set sps pps */ //printf("Start rtmp pusher, rtmp url: %s , http-flv url: %s \n\n", RTMP_URL, HTTP_URL); printf("Start rtmp pusher, rtmp url: %s \n\n", RTMP_URL); } } } } if (has_sps_pps) { publisher->PushVideoFrame(frame_buf, frameSize); /* send h.264 frame */ } } else { printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); printf("get end!"); //break; } std::this_thread::sleep_for(std::chrono::milliseconds(40)); } void TestMyRtmpPublisher() { int ret = InitPub(); if (ret < 0) return; int buf_size = 100000; bool end_of_frame = false; bool has_sps_pps = false; uint8_t* frame_buf = new uint8_t[buf_size]; H264File h264_file; if (!h264_file.open(PUSH_FILE)) { printf("Open %s failed.\n", PUSH_FILE); return ; } h264_file.readfileToGBbuf(); while (1) { //printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); int frameSize = h264_file.readFrameFromGbuf((char*)frame_buf, buf_size, &end_of_frame); //int frameSize = h264_file.readOneFrameFile((char*)frame_buf, buf_size, &end_of_frame); procFrame((char*)frame_buf, frameSize, &end_of_frame); } } int TestRtmpPublisher(xop::EventLoop *event_loop) { printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); H264File h264_file; if (!h264_file.open(PUSH_FILE)) { printf("Open %s failed.\n", PUSH_FILE); return -1; } /* push stream to local rtmp server */ xop::MediaInfo media_info; auto publisher = xop::RtmpPublisher::Create(event_loop); publisher->SetChunkSize(60000); std::string status; if (publisher->OpenUrl(RTMP_URL, 3000, status) < 0) { printf("Open url %s failed, status: %s\n", RTMP_URL, status.c_str()); return -1; } int buf_size = 500000; bool end_of_frame = false; bool has_sps_pps = false; uint8_t *frame_buf = new uint8_t[buf_size]; while (publisher->IsConnected()) { printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); int frameSize = h264_file.readFrame((char*)frame_buf, buf_size, &end_of_frame); if (frameSize > 0) { if (!has_sps_pps) { if (frame_buf[3] == 0x67 || frame_buf[4] == 0x67) { xop::Nal sps = xop::H264Parser::findNal(frame_buf, frameSize); if (sps.first != nullptr && sps.second != nullptr && *sps.first == 0x67) { media_info.sps_size = (uint32_t)(sps.second - sps.first + 1); media_info.sps.reset(new uint8_t[media_info.sps_size], std::default_delete<uint8_t[]>()); memcpy(media_info.sps.get(), sps.first, media_info.sps_size); xop::Nal pps = xop::H264Parser::findNal(sps.second, frameSize - (int)(sps.second - frame_buf)); if (pps.first != nullptr && pps.second != nullptr && *pps.first == 0x68) { media_info.pps_size = (uint32_t)(pps.second - pps.first + 1); media_info.pps.reset(new uint8_t[media_info.pps_size], std::default_delete<uint8_t[]>()); memcpy(media_info.pps.get(), pps.first, media_info.pps_size); has_sps_pps = true; publisher->SetMediaInfo(media_info); /* set sps pps */ //printf("Start rtmp pusher, rtmp url: %s , http-flv url: %s \n\n", RTMP_URL, HTTP_URL); printf("Start rtmp pusher, rtmp url: %s \n\n", RTMP_URL); } } } } if (has_sps_pps) { publisher->PushVideoFrame(frame_buf, frameSize); /* send h.264 frame */ } } else { printf("[%s %s] %s: %s: %d\n", __DATE__, __TIME__, __FILE__, __func__, __LINE__); printf("push end!"); //break; } std::this_thread::sleep_for(std::chrono::milliseconds(40)); } delete frame_buf; return 0; } /* char gbuf[1024 * 1024] = { 0 }; int gbufsize = 0; BOOLEAN bAlereadyRead = FALSE; int H264File::readOneFrameFile(char* inBuf, int inBufSize, bool* bEndOfFrame) { fseek(m_file, 0, SEEK_SET); int size = 0; if (m_file == NULL) { return -1; } //if(0 == gi) fseek(m_file, 31, SEEK_SET); if (FALSE == bAlereadyRead) { int bytesRead = (int)fread(m_buf, 1, m_bufSize, m_file); gbufsize = bytesRead; memcpy(gbuf, m_buf, gbufsize); memcpy(inBuf, gbuf, gbufsize); inBufSize = gbufsize; *bEndOfFrame = FALSE; bAlereadyRead = TRUE; printf("bAlereadyRead = true ok \n"); return gbufsize; } else { memcpy(inBuf, gbuf, gbufsize); inBufSize = gbufsize; *bEndOfFrame = FALSE; return gbufsize; } return 0; } */