CQRS学习——Storage实现(EF+Code First+DynamicReponsitory)[其四]

【这里是的实现,指的是针对各个数据访问框架的一个基础实现】

目标

  •   定义仓储/QueryEntry的基本功能
  •   实现仓储的基本功能,以利于复用
  •   实现一些常用的功能
  •   提供一些便利的功能

目标框架

博主使用的ORM框架是EF6.x,使用MAP来配置模型和数据库之间的映射(因为模型是定义在领域层[CQRS]的),所以不打算使用声明式的Attribute。使用code first来生成数据库。

仓储基本功能

使用一个泛型接口定义了一个仓储需要实现的功能:

public interface IBasicReponsitory<T>
    {
        void Insert(T item);
        void Delete(T item);
        void Delete(Guid aggregateId);
        void Update(T category);
        T Fetch(Guid aggregateId);
        T TryFetch(Guid aggregateId);
        bool Exists(Expression<Func<T, bool>> predict);

        /*以下是额外的一些接口方法,待商榷*/
        IQueryable<T> Query();
        Task<T> FetchAsync(Guid id);
        Task<T> TryFetchAsync(Guid id);
        Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict);
    }
View Code

以及一个QueryEntry需要实现的一些基本功能:

public interface IQueryEntry<T> where T : IHasPrimaryKey
    {
        T TryFetch(Guid id);
        Task<T> TryFetchAsync(Guid id);

        bool Exsits(Guid id);
        bool Exsits(Expression<Func<T, bool>> selector);
    }
View Code

随着时间的推移,这个接口会发生变更,添加一些更多的功能。同时,并不要求所有的仓储或者QueryEntry继承接口,基本接口的定义和实现仅仅是为了提供便利。

为了方便QueryEntry的实现,提供了一个抽象类:

public abstract class ReponsitoryBasedQueryEntry<T> : IQueryEntry<T> where T : IHasPrimaryKey
    {
        public abstract IBasicReponsitory<T> BasicReponsitory { get; }

        public T TryFetch(Guid id)
        {
            return BasicReponsitory.TryFetch(id);
        }

        public Task<T> TryFetchAsync(Guid id)
        {
            return BasicReponsitory.TryFetchAsync(id);
        }

        public bool Exsits(Guid id)
        {
            return BasicReponsitory.Query().Any(i => i.Id == id);
        }

        public bool Exsits(System.Linq.Expressions.Expression<Func<T, bool>> selector)
        {
            return BasicReponsitory.Query().Any(selector);
        }
    }
View Code

基本实现

public class BasicEntityFrameworkReponsitory<T> : IBasicReponsitory<T> where T : class, IHasPrimaryKey
    {
        public BasicEntityFrameworkReponsitory()
        {
            Table = StorageConfiguration.DbContext.Set<T>();
        }

        public DbSet<T> Table { get; private set; }

        public virtual void Insert(T item)
        {
            Table.Add(item);
        }

        public virtual void Delete(T item)
        {
            Table.Remove(item);
        }

        public virtual void Delete(Guid aggregateId)
        {
            var item = TryFetch(aggregateId);
            Delete(item);
        }

        public virtual void Update(T category)
        {
            //do nothing...
        }

        public T Fetch(Guid aggregateId)
        {
            var item = TryFetch(aggregateId);
            if (item == null)
            {
                throw new AggregateRootNotFoundException(aggregateId);
            }
            return item;
        }

        public T TryFetch(Guid aggregateId)
        {
            var item = Query().FirstOrDefault(i => i.Id == aggregateId);
            return item;
        }

        public virtual IQueryable<T> Query()
        {
            return Table;
        }

        public async Task<T> FetchAsync(Guid id)
        {
            return await Table.FirstAsync(i => i.Id == id);
        }

        public async Task<T> TryFetchAsync(Guid id)
        {
            return await Table.FirstOrDefaultAsync(i => i.Id == id);
        }

        public bool Exists(Expression<Func<T, bool>> predict)
        {
            return Table.Any(predict);
        }

        public async Task<IEnumerable<T>> RetriveAsync(Expression<Func<T, bool>> predict)
        {
            return await Table.Where(predict).ToArrayAsync();
        }
    }
View Code

这部分代码表达了个人的几个想法:
1.DbContext的生命周期是由Storage自行管理的。当然,可以通过一定的方式指定。

2.提供了基础的Query()方法,并设置为虚方法。个人并不抵制使用IQueryable对象进行查询。我觉得可以把使用IQueryable对象进行查询的代码片段看作匿名方法。

 常用的功能:软删除

这里是继承基本实现的一个实现:

public class SoftDeleteEntityFrameworkReponsitory<T> : BasicEntityFrameworkReponsitory<T>
        where T : class, IHasPrimaryKey, ISoftDelete
    {
        public override IQueryable<T> Query()
        {
            return base.Query().Where(i => !i.IsDeleted);
        }

        public override void Delete(T item)
        {
            item.IsDeleted = true;
            Update(item);
        }
    }
View Code

这里要求仓储对应的模型实现接口ISoftDelete,为软删除提供支持:

public interface ISoftDelete
    {
        bool IsDeleted { get; set; }
    }

同时override了Query()方法,过滤了已删除的内容。

常用的功能:操作跟踪

好吧,这应该是事件溯源干的事,然而事件溯源目前太难了。原理和软删除差不多:

/// <summary>
    /// 既然开启了跟踪,那么这条数据必然是不能硬删除的
    /// </summary>
    /// <typeparam name="T"></typeparam>
    public class TraceEnabledEntityFrameworkReponsitory<T> : SoftDeleteEntityFrameworkReponsitory<T>
        where T : class, ISoftDelete, ITrackLastModifying, IHasPrimaryKey
    {
        /// <summary>
        /// 开启跟踪时,不允许匿名操作
        /// </summary>
        [Dependency]
        public IDpfbSession Session { get; set; }

        public override void Update(T item)
        {
            if (!Session.UserId.HasValue)
                throw new Exception(); //todo 提供一个明确的异常
            item.LastModifiedBy = Session.UserId.Value;
            item.LastModifiedTime = DateTime.Now;
        }

        public override void Insert(T item)
        {
            if (!Session.UserId.HasValue)
                throw new Exception(); //todo 提供一个明确的异常
            item.LastModifiedBy = Session.UserId.Value;
            item.LastModifiedTime = DateTime.Now;
            base.Insert(item);
        }
    }
View Code

不过这个功能的侵入性很强,Storage应该无法感知“用户”这种概念才对。

便利的功能:动态仓储(DynamicReponsitory)

前一篇文章中说过,引入QueryEntry是为了将查询和提交分来,同时为查询操作提供更大的优化空间。在面对数据库的查询中,多表联查是非常普遍的。所以打算针对多表联查提供一个遍历的组件。同时,直接提交语句查询是和数据库相关的,所以要针对不同的数据库提供不同的DynamicReponsitory。

这个组件解决的问题是:直接提交数据库多表联查,查询结果自动转换模型,提供分页支持。

模型转换

先来解决这个比较有趣的问题:将一个DataReader转换为一个值或者一个可枚举的集合。直接上实现代码:

public class DataReaderTransfer<T> : CacheBlock<string, Func<IDataReader, T>> where T : new()
    {
        protected DataReaderTransfer()
        {
        }

        /// <summary>
        /// 
        /// </summary>
        /// <param name="filedsNameArray"></param>
        /// <param name="key">编译缓存所使用的key,建议使用查询字符串的hash</param>
        /// <returns></returns>
        public Func<IDataReader, T> Compile(string[] filedsNameArray, string key)
        {
            var outType = typeof (T);
            var func = ConcurrentDic.GetOrAdd(key, k =>
            {
                var expressions = new List<Expression>();
                //public T xxx(IDataReader reader){
                var param = Expression.Parameter(typeof (IDataReader));

                //var instance = new T();
                var newExp = Expression.New(outType);
                var varExp = Expression.Variable(outType, "instance");
                var varAssExp = Expression.Assign(varExp, newExp);
                expressions.Add(varAssExp);

                var indexProp = typeof (IDataRecord).GetProperties().Last(p => p.Name == "Item"); //表示 reader[""]
                foreach (var fieldName in filedsNameArray)
                {
                    //if(xxx)xxx.xxx=null;else xxx.xxx = (xxx)value;

                    var prop = outType.GetProperty(fieldName);
                    if (prop == null)
                        continue;
                    var propExp = Expression.PropertyOrField(varExp, fieldName);
                    Expression value = Expression.MakeIndex(param, indexProp,
                        new Expression[] {Expression.Constant(fieldName)});

                    //处理空值
                    var defaultExp = Expression.Default(prop.PropertyType);
                    var isDbNullExp = Expression.TypeIs(value, typeof (DBNull));

                    //处理枚举以及可空枚举
                    if (prop.PropertyType.IsEnum ||
                        prop.PropertyType.IsGenericType && prop.PropertyType.GetGenericArguments()[0].IsEnum)
                    {
                        value = Expression.Convert(value, typeof (int));
                    }
                    var convertedExp = Expression.Convert(value, prop.PropertyType);
                    //读取到dbnull的时候,使用一个默认值
                    var condExp = Expression.IfThenElse(isDbNullExp,
                        Expression.Assign(propExp, defaultExp),
                        Expression.Assign(propExp, convertedExp));

                    expressions.Add(condExp);
                }

                //return instance; 
                var retarget = Expression.Label(outType);
                var returnExp = Expression.Return(retarget, varExp);
                expressions.Add(returnExp);

                //}
                var relabel = Expression.Label(retarget, Expression.Default(outType));
                expressions.Add(relabel);

                var blockExp = Expression.Block(new[] {varExp}, expressions);
                var expression = Expression.Lambda<Func<IDataReader, T>>(blockExp, param);
                return expression.Compile();
            });
            return func;
        }

        public Func<IDataReader, T> Compile(IDataReader reader, string key)
        {
            var length = reader.FieldCount;
            var names = Enumerable.Range(1, length).Select(i => reader.GetName(i - 1)).ToArray();
            return Compile(names, key);
        }

        public static DataReaderTransfer<T> Instance = new DataReaderTransfer<T>();

        //基于反射的映射....
        //private static T DynamicMap<T>(IDataReader reader) where T : new()
        //{
        //    var instance = new T();
        //    var count = reader.FieldCount;
        //    while (count-- > 1)
        //    {
        //        object value = reader[count - 1];
        //        var name = reader.GetName(count - 1);
        //        var prop = typeof (T).GetProperty(name);
        //        if (prop == null)
        //        {
        //            continue;
        //        }
        //        if (value is DBNull)
        //        {
        //            value = null;
        //        }
        //        prop.SetValue(instance, value);
        //    }
        //    return instance;
        //}
    }
View Code

主要的思想是:在运行期间对一个特定的模型分析一次,分析构造这个模型需要如何访问DataReader,并将访问操作编译为Func<>,通过一个静态字典缓存。下一次构造的时候,直接访问静态字典的Func<>,将DataReader的行转换为模型。这个耗时,大概是硬编码转换的2倍,可以获得比反射好的性能受益。

链式调用以及延时查询

先来看一段调用代码:

[TestClass]
    public class DynamicReponsitorySamples
    {
        static DynamicReponsitorySamples()
        {
            //DbContext 配置
            StorageConfiguration.Config.Use<DbContext, ObjectManageContext>(new ContainerControlledLifetimeManager());
            //无法使用.UseDbContext<ObjectManageContext>(),因为无法提供基于HTTP生命周期的管理对象
            DynamicReponsitory = new DynamicReponsitory();
        }

        public static DynamicReponsitory DynamicReponsitory { get; set; }

        [TestMethod]
        public void Query()
        {
            //直接提交一个SQL查询,并映射到实体
            var queryText =
                "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code";
            var query = DynamicReponsitory.Query<AdminListItem>(queryText);
            //QueryResult对象遵循延时查询的规则,直到执行枚举才会执行查询操作
            query.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName)));
        }

        [TestMethod]
        public void Count()
        {
            //可以直接执行一个COUNT(*)语句
            var countQueryText = "SELECT COUNT(*) FROM [ADMIN]";
            var countQuery = DynamicReponsitory.Count(countQueryText);
            Trace.WriteLine("Count:" + countQuery.Value);
            //可以提供一个SELECT * 语句
            countQueryText = "SELECT * FROM [ADMIN]";
            //但是需要将重载的第二个参数置为true
            countQuery = DynamicReponsitory.Count(countQueryText, true);
            Trace.WriteLine("Count:" + countQuery.Value);
            //可以对一个query对象执行CmountAmount()扩展方法,但是这个query对象代表的查询必须很普通
            var query = DynamicReponsitory.Query<AdminListItem>(
                "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
            countQuery = query.CountAmount();
            //Value的值同样遵循延时查询的规则,但是重复访问会导致访问内存中缓存的数据
            Trace.WriteLine("Count:" + countQuery.Value);
            Trace.WriteLine("Count:" + countQuery.Value);
            //如果需要重新查询,可以调用Result.ReQuery()方法
            var reQuery = countQuery.ReQuery();
            Trace.WriteLine("Count:" + reQuery.Value);
        }

        /// <summary>
        /// 分页调用,支持分页信息和分页列表信息的无序访问
        /// </summary>
        [TestMethod]
        public void Page()
        {
            //可以对所有的query对象执行Page()扩展方法,从而进行分页
            //必须执行要求OrderBy参数的重载,否则会进行内存分页(加载所有行)
            var query = DynamicReponsitory.Query<AdminListItem>(
                "SELECT A.*,D.Name AS DepartmentName FROM [ADMIN] A LEFT JOIN [Department] D ON A.DepartmentCode = D.Code");
            var paged = query.Page("ORDER BY DepartmentName", 1, 2);

            /*
             * 以下表示支持分页信息和分页列表信息的无序访问
             * 如果使用一条sql同时返回这些信息,必须先枚举集合才能继续访问分页信息
             */

            Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
            paged.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName)));
            //重复访问会导致访问内存中缓存的数据
            var resultArray = paged.Take(1);
            Trace.WriteLine(string.Format("从{0}行到{1}行,在所有的{2}行中", paged.From, paged.To, paged.Amount));
            resultArray.Foreach(i => Trace.WriteLine(string.Format("{0}	{1}", i.UserName, i.RealName))); 
        }
    }
View Code

链式调用是指,我调用了DynamicReponsitory.Query()方法之后,可以紧接着调用Page()或者Count()方法。那么,显而易见,如果查询不是延时的,很容易导致这个问题:我把服务器上1W条数据全down下来了,然后在内存里面数数或者分页。
为了实现延时查询的目标,引入了这几个类型:

public class SqlQueryExpression : ICloneable
    {
        public SqlQueryExpression()
        {
            Parameters = new List<object>();
        }

        public SqlQueryExpression(string expressionText) : this()
        {
            ExpressionText = expressionText;
        }

        public string ExpressionText { get; set; }
        public IList<object> Parameters { get; private set; }

        public IDataReader Read(DbConnection connection)
        {
            var parameters = Parameters.ToArray();
            if (connection.State != ConnectionState.Open)
                connection.Open();
            //查询,开启最低级别的事务隔离,防止默认事务产生争用锁
            var trans = connection.BeginTransaction(IsolationLevel.ReadUncommitted);
            var command = connection.CreateCommand();
            command.CommandType = CommandType.Text;
            command.CommandText = ExpressionText;
            command.Parameters.AddRange(parameters);
            command.Transaction = trans;
            return command.ExecuteReader(CommandBehavior.CloseConnection);
        }

        public virtual object Clone()
        {
            //实现拷贝接口
            var cloned = new SqlQueryExpression(ExpressionText);
            Parameters.Foreach(i =>
            {
                var parameter = (SqlParameter) i;
                var clonedParameter = new SqlParameter(parameter.ParameterName, parameter.Value);
                clonedParameter.Direction = parameter.Direction;
                cloned.Parameters.Add(clonedParameter);
            });
            return cloned;
        }
    }
View Code
 public class SqlQueryResult
    {
        public SqlQueryExpression SqlQueryExpression { get; private set; }
        public DbConnection DbConnection { get; private set; }
        public virtual bool Enumerated { get; protected set; }
        protected IDataReader DataReader;

        protected void Query()
        {
            DataReader = DataReader ?? SqlQueryExpression.Read(DbConnection);
        }

        public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
        {
            SqlQueryExpression = expression;
            DbConnection = connection;
        }
    }

    /// <summary>
    /// 代表DynamicReponsitory的查询结果
    /// </summary>
    /// <typeparam name="T">代表需要构造的类型</typeparam>
    public class SqlQueryResult<T> : SqlQueryResult, IEnumerable<T> where T : new()
    {
        public SqlQueryResult(SqlQueryExpression expression, DbConnection connection)
            : base(expression, connection)
        {

        }

        public IEnumerator<T> GetEnumerator()
        {
            //对于一个Query对象,在第一次访问的时候,要求加载所有数据,防止Skip与Take导致数据丢失
            if (!Enumerated)
            {
                Query();
                using (DataReader)
                {
                    Enumerated = true;
                    var uniqueKey = typeof (T).FullName + SqlQueryExpression.ExpressionText;
                    var func = DataReaderTransfer<T>.Instance.Compile(DataReader, uniqueKey);
                    while (DataReader.Read())
                    {
                        var item = func(DataReader);
                        ResultSet.Add(item);
                        //yield return item;
                    }
                }
            }
            return ResultSet.GetEnumerator();
            //return ((IEnumerable<T>) ResultSet).GetEnumerator();
        }

        protected List<T> ResultSet = new List<T>();

        IEnumerator IEnumerable.GetEnumerator()
        {
            return GetEnumerator();
        }

        public SqlQueryResult<T> ReQuery()
        {
            var exp = SqlQueryExpression.Clone() as SqlQueryExpression;
            return new SqlQueryResult<T>(exp, DbConnection);
        }
    }
View Code

SqlQueryExpression存储了将要执行的查询,而SqlQueryResult则存储了查询返回的结果。同时,SqlQueryExpression实现了拷贝,以支持ReQuery()。
关于具体的分页支持,实际上是使用了一个开窗函数,通过注入子查询的方式,从而支持了各种查询的分页(不奇葩的查询)。

为了防止查询被锁住,默认开启了最低的事务隔离级别。

...

【想到什么再补充】

原文地址:https://www.cnblogs.com/lightluomeng/p/4922671.html