园 首页 新随笔 联系 管理 订阅 订阅 RTSP协议转换RTMP直播协议

  RTSP协议也是广泛使用的直播/点播流媒体协议,最近实现了一个RTSP协议转换RTMP直播协议的程序,为的是可以接收远端设备或服务器的多路RTSP直播数据,实时转换为RTMP直播协议,推送到FMS、Red5、wowza server等RTMP服务器,以实现flash观看RTSP直播源的需求。程序同时也具备从FLV文件获取输入数据并转换RTMP直播。实现的思路分享如下。

要点分析

  首先,程序的主要目的,是从多路RTSP输入源中提取AAC编码的音频和H.264编码视频数据,并生成RTMP数据包,然后组装RTMP推送协议,并发往RTMP服务器。在发送的过程中,要求可以从RTSP数据源切换到具有相同h.264和aac编码的FLV文件中,并不影响RTMP直播。因此,本程序的关键点有以下部分:

  1. RTSP直播流的读取
  2. H.264和AAC编码数据的分析、处理
  3. FLV文件数据的提取及与RTSP直接的切换和衔接
  4. RTMP数据包封装
  5. RTMP推送协议

  有了关键点,就可以一项一项的去分析。

 

设计思路

  根据上面分析的要点,首先要选择RTSP直播协议的读取。我们不需要从零做起,网络上有很多和RTSP相关的开源项目可以使用或借鉴,我选择了Live555。

  Live555是一个跨平台的流媒体解决方案,主要支持RTSP协议,好像也支持SIP(这个也是我马上研究的重点,之后会写文章研究SIP相关的技术实现)。Live555实现了RTSP包括服务器-客户端的整套结构,是很知名的一个开源项目。网上有很多关于Live555学习和使用的文章,我就不具体介绍了。

  H.264和AAC数据的分析处理,这个对于从没做过相关项目开发的人来说,应该是一个难点,主要是相关概念的理解。好在我一直在做这块,也比较好弄。

  第4和第5点,可以参照我之前的文章“RTMP协议发送H.264编码及AAC编码的音视频,实现摄像头直播”的技术方法,来加以实现。因此,主要需要处理的就是RTSP直播流数据的获取,以及对其中H.264和AAC编码数据的处理。

  于是可以画出大体结构如下:

  

逻辑与实现

1. 程序框架和模块说明

  

2. 主要接口

  

  RtspCapture是我的程序里管理RTSP直播数据流和分析处理的类,接口很重要,基本上这个设计就可以。(当然,全部代码是不会放上来的,有兴趣的可以和我谈,这里只罗列关键的地方。)

  

  他所使用的live555变量,这里ourRTSPClient实际就是RTSPClient的简单继承

  

3.RtspCapture调用live555的主要流程

  1. 首先,在RtspCapture构造函数初始化

  

  2. 在StartRtsp函数中,创建RtspClient,并发送"describe"命令,开始获取sdp。回调函数就是continueAfterDESCRIBE。在这里创建MediaSession,之后再发送"setup"命令,等,这些都可以在live555的例子以及网上的说明中看到。

  

  3. 还有一个关键点,就是要在自己的线程循环中,调用live555 environment的事件循环,就像这样

  

4. 对rtsp回调h264数据的分析处理

  这里演示了,如何从rtsp回调的h264数据中,提取sps和pps信息。里面的parse函数,是live555自带的。

  

调用方式

   

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

  haibindev.cnblogs.com,合作请联系QQ。(转载请注明作者和出处~)

+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++

 
 

实现自己的Linq to Sql

之前写过一篇《统一的仓储接口》,为了方便使用不同的仓储。在我们的项目中使用的是EF4.0,但是这个版本的EF有一些性能问题没有解决,又不想升级到EF6,具体EF6有没有解决暂时不清楚。我们的项目之前运行的都不错,突然一天数据库服务器CPU 100%,IIS服务器CPU又正常,过几个小时之后又恢复正常,每个星期一早上都这样,可以肯定就是用户同时操作并发过多造成,查找之后,是一个表的数据被锁住。报错was deadlocked on lock,解决办法就是查询sql上加上 with nolock,但是EF不支持,但有不想放弃linq to sql的优势,无奈只能自己实现,方案是之前已有功能并发不多的地方保持不变,依然使用EF,在并发多的地方使用自己实现的linq to sql。

目前并没有完整实现linq to sql,只是实现了单表的情况,对于有关联引用由于时间限制并未实现。

实现仓储,需要实现两个对象,[IObjectContext、IOrderedQueryable]

上篇说到的上下文对象IObjectContext

1
2
3
4
5
6
public interface IObjectContext:IDisposable
    {
        string Name { get; set; }
        IDbConnection CreateConnection();
        bool SaveChanges();
    }

考虑到项目中有些特殊情况需要使用sql,所以增加了CreateConnection方法返回一个IDbConnection,当然在逼不得已的情况下使用。

DbContext的实现比较简单,重点管理一下连接池,配置,大概如下,具体的实现由于篇幅省略,源码会在篇尾附上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class DbContext : IDisposable, IObjectContext
    {
        public EntityStateManager Manager { get; private set; }
        public string DbType { get; private set; }
        public string ConnectionString { get; set; }
        public int PoolSize { get; set; }
        public bool AllowUpdateWithNoExp { get; set; }
        public bool AllowDeleteWithNoExp { get; set; }
 
        List<System.Data.IDbConnection> connPool = new List<System.Data.IDbConnection>();
 
        public DbContext();
 
        public DbContext(string connectionString, string dbType);
 
        public DbContext(string connectionString, string dbType, bool allowUpdateWithNoExp, bool allowDeleteWithNoExp);
 
        void InitContext(string connectionString, string dbType, int poolSize, bool allowUpdateWithNoExp, bool allowDeleteWithNoExp);
 
        public EntitySet<T> GetEntitySet<T>(string tableName = null, bool noLock = false, bool noTracking = false);
 
        public int SubmitChange();
 
        void IDisposable.Dispose();
 
        string IObjectContext.Name { get; set; }
 
        bool IObjectContext.SaveChanges();
 
        public System.Data.IDbConnection CreateConnection();
 
        System.Data.IDbConnection IObjectContext.CreateConnection();

另一个对象则是仓储EntitySet<T>,大概代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public class EntitySet<T> : IOrderedQueryable<T>
    {
        public DbContext Context { get; private set; }
        string TableName { get; set; }
        bool NoLock { get; set; }
        bool NoTracking { get; set; }
 
        DbQueryProvider MyProvider { get; set; }
        Expression MyExpression { get; set; }
 
        public EntitySet(string tableName, bool noLock, bool noTracking, DbContext context);
 
        public EntitySet(string tableName, bool noLock, bool noTracking, Expression expression, DbContext context);
 
        void InitEntitySet(string tableName, bool noLock, bool noTracking, Expression expression, DbContext context);
 
        public Expression Expression
        {
            get { return this.MyExpression; }
        }
 
        public Type ElementType
        {
            get { return typeof(T); }
        }
 
        public IQueryProvider Provider
        {
            get { return this.MyProvider; }
        }
 
        public IEnumerator<T> GetEnumerator();
 
        IEnumerator IEnumerable.GetEnumerator();
 
        void MyProvider_OnExecuted(object sender, EventArgs e);
 
        void TrackEntity(object result);
 
        void notifyT_PropertyChanged(object sender, PropertyChangedEventArgs e);
 
        public void Add(T item, bool INSERT_IDENTITY = false);
 
        public void Update(Expression<Func<T, bool>> exp, object obj);
 
        public void Remove(T item);
 
        public void Remove(Expression<Func<T, bool>> exp);

在仓储里面我增加了我需要的东西,比如Add可以插入标识,Update可以根据表达式更新对象,而不需要把所需要的对象先取出来,修改再保存,数据量大的时候EF性能有问题,Remove也同样如此,为了防止误操作,所以在之前DbContext中增加了配置,是否允许无条件删除、更新数据。另外一个重点也就是增加nolock支持,当然这个在生成sql的时候加上就行,非常简单。

说到这里其实只不过是个大概,这里面的操作无非就是四种CRUD。我设计了四个Command来解决InsertCommand、UpdateCommand、SelectCommand、DeleteCommand,他们都继承EntityCommand只要实现一个方法public abstract int Execute(IDbConnection conn,IDbTransaction tran);

到这里其实InsertCommand、UpdateCommand、DeleteCommand实现都非常简单,因为有了实现SelectCommand的基础代码,解析Expression就简单的多了,可以说解析Expression才是整个的关键。

解析的代码没有那么多复杂的东西,我个人的原则就是尽量简单,不为了追求设计而增加多余的东西,虽然我对设计模式也很痴迷。

上图就是解析的全部代码,这其中代码其实并不重要,重要的是解决的思路,这里面有两个重要的对象QueryExpressionClosure(查询表达式闭包)、QueryCommnClosure(查询通用闭包)。

我们知道IQueryable其实就是expression tree,我记得以前很早的时候有人实现解析表达式的时候,是一边解析一边生产sql,这样的做法非常不科学,会造成很多不必要的sql闭包,

一个简单的查询:比如q.Where(c=>c.Age>20).OrderBy(c=>c.Id);就会生成最少三个闭包大概sql是这样 select xxx from (select * from (select xxx from tablex) as t1 where t1.Age>20) as t2 order by t2.Id。但是正常来说应该是select xxx from tablex where Age>20 order by Id。

所以正确的应该是先将表达式树解析成表达式闭包,在将表达式闭包解析成sql。

如何将表达式解析成表达式闭包?

稍微分析一下就可以看出,当遇到take、SKIP、sum、max、min、average、any、contains、distinct、first、firstordefault、longcount、count的时候就是另外一个sql闭包了,

大概代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
protected override Expression VisitMethodCall(MethodCallExpression m)
        {
            var methodName = m.Method.Name.ToLower();
            if (this.CurrQueryExpressionClosure == null && this.QueryExpressionClosures.Count == 0 && Utility.ToListMethods.Contains(methodName))
            {
                this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                this.CurrQueryExpressionClosure.MethodName = "tolist";
                this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
            }
 
            switch (methodName)
            {
                case "orderbydescending":
                case "orderby":
                case "thenby":
                case "thenbydescending":
                    {
                        if (!Utility.IgnoreOrderByMethods.Contains(this.CurrQueryExpressionClosure.MethodName))
                        {
                            this.CurrQueryExpressionClosure.OrderByExpressions.Add(m);
                        }
                        break;
                    }
                case "groupby":
                    {
                        this.CurrQueryExpressionClosure.GroupByExpressions.Add(m.Arguments[1]);
                        break;
                    }
                case "where":
                    {
                        var l = ((m.Arguments[1] as UnaryExpression).Operand as LambdaExpression);
                        this.CurrQueryExpressionClosure.WhereExpressions.Add(l);
                        break;
                    }
                case "take":
                    {
                        if (this.CurrQueryExpressionClosure.MethodName != "skip")
                        {
                            this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                            this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                            this.CurrQueryExpressionClosure.MethodName = methodName;
                        }
                        this.CurrQueryExpressionClosure.Take = System.Convert.ToInt32((m.Arguments[1] as ConstantExpression).Value);
                        break;
                    }
                case "skip":
                    {
                        if (this.CurrQueryExpressionClosure.MethodName != "take")
                        {
                            this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                            this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                            this.CurrQueryExpressionClosure.MethodName = methodName;
                        }
                        this.CurrQueryExpressionClosure.Skip += System.Convert.ToInt32((m.Arguments[1] as ConstantExpression).Value);
                        break;
                    }
                case "sum":
                case "max":
                case "min":
                case "average":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        if (m.Arguments.Count > 1)
                        {
                            this.CurrQueryExpressionClosure.EvalNumericExpression = m.Arguments[1];
                        }
                        break;
                    }
                case "any":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        if (m.Arguments.Count > 1)
                        {
                            var l = ((m.Arguments[1] as UnaryExpression).Operand as LambdaExpression);
                            this.CurrQueryExpressionClosure.WhereExpressions.Add(l);
                        }
                        break;
                    }
                case "contains":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        if (m.Arguments.Count > 1)
                        {
                            this.CurrQueryExpressionClosure.WhereExpressions.Add(m.Arguments[1]);
                        }
                        break;
                    }
                case "distinct":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        break;
                    }
                case "select":
                    {
                        var lambdaExp = ((m.Arguments[1] as UnaryExpression).Operand as LambdaExpression);
                        this.CurrQueryExpressionClosure.QuerySelectExpressions.Add(lambdaExp);
                        break;
                    }
                case "first":
                case "firstordefault":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.CurrQueryExpressionClosure.Take = 1;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        if (m.Arguments.Count > 1)
                        {
                            this.CurrQueryExpressionClosure.WhereExpressions.Add(m.Arguments[1]);
                        }
                        break;
                    }
                case "longcount":
                case "count":
                    {
                        this.CurrQueryExpressionClosure = new QueryExpressionClosure();
                        this.CurrQueryExpressionClosure.MethodName = methodName;
                        this.QueryExpressionClosures.Add(this.CurrQueryExpressionClosure);
                        if (m.Arguments.Count > 1)
                        {
                            this.CurrQueryExpressionClosure.WhereExpressions.Add(m.Arguments[1]);
                        }
                        break;
                    }
            }
            return base.VisitMethodCall(m);
        }

有了表达式闭包之后,这个时候理解起来就清晰多了,就可以通过一个ParserContext梳理一遍表达式闭包,生成一个通用闭包,并且得到需要的信息。

通用闭包大概:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class QueryCommnClosure
    {
        public int Take { get; set; }
        public int Skip { get; set; }
        public bool NoLock { get; set; }
        public string MethodName { get; set; }
        public string TableName { get; set; }
        public List<string> SelectColumns { get; set; }
        //public List<SelectTypeConstructor> SelectTypes { get; set; }
        public Dictionary<string, string> OrderBys { get; set; }
        public List<string> GroupBys { get; set; }
        public List<string> WhereSQLs { get; set; }
        public string EvalNumericSQL { get; set; }
        public string TableAlias { get; set; }
 
        public QueryCommnClosure()
        {
            this.SelectColumns = new List<string>();
            //this.SelectTypes = new List<SelectTypeConstructor>();
            this.OrderBys = new Dictionary<string, string>();
            this.GroupBys = new List<string>();
            this.WhereSQLs = new List<string>();
        }
 
        public void Generate(ParserContext context)
        {
            ...篇幅限制省略
        }
    }

另外一个元数据,其实这个非常简单,我为了灵活,支持解析EF的edmx(msl、csdl)、Attribute(松散灵活的,实体上可以加Attribute,也可以不加)两种。有了这个元数据就可以做到实体、表的映射。

 源码下载

人快30了,成家却未能立业,做了一年多的项目因为省领导政策的原因失败,说实话干这个行当不知道对不对,可能是有着一张不老的脸,在别人眼里,都以为是才24、5岁,对我也是不够信任,但是实际干起来别人才知道我实力如何,但老板不知道。总是干的最多,拿的只能算个一般,呵呵...。

昆明有看上俺的可以联系下我,求出路,目前公司也不是说要倒闭什么的,其实也很稳定,但是这个项目完完了,另一个稳定gps是其他人做的,感觉在公司已经多余了,工资也不是看涨的样子,毕竟要买房,养家糊口。

 
 
标签: Linq to Sql
 
标签: 视频c++流媒体直播
原文地址:https://www.cnblogs.com/Leo_wl/p/3435466.html