表值参数的使用

public class LogStats
    {
        private static readonly LogStats instance = new LogStats()
                                                        {
                                                            _memQueue = new ConcurrentQueue<ProductStat>(),
                                                        };

        public static LogStats GetInstance()
        {
            return instance;
        }

        private ConcurrentQueue<ProductStat> _memQueue;

        public void Push(int softId, int branchId, int platform ,int lan, bool isDown = false)
        {
            ThreadPool.UnsafeQueueUserWorkItem(delegate
            {
                try
                {
                    _memQueue.Enqueue(new ProductStat
                                          {
                                              ProductId = softId,
                                              BranchId = branchId,
                                              Platform = platform,
                                              Language = lan,
                                              DownNum = isDown ? 1 : 0,
                                              RequestNum = 1,
                                          });
                }
                catch (System.Exception er)
                {
                    LogHelper.WriteCustom("Push,数据处理失败:" + er, "watchErr\", false);
                }
            }, null);
        }

        private static SqlMetaData[] _metaData = 
            new SqlMetaData[] { 
                new SqlMetaData("language", SqlDbType.Int), 
                new SqlMetaData("productId", SqlDbType.Int), 
                new SqlMetaData("branchId", SqlDbType.Int), 
                new SqlMetaData("platform", SqlDbType.Int),
                new SqlMetaData("logDate", SqlDbType.Int), 
                new SqlMetaData("requestNum", SqlDbType.BigInt), 
                new SqlMetaData("downNum", SqlDbType.BigInt), 
            };

        public void StartListen()
        {
            ThreadPool.UnsafeQueueUserWorkItem(delegate
            {
                Listening();
            }, null);
        }

        private void Listening()
        {
            while (true)
            {
                Thread.Sleep(10000);
                try
                {
                    if (_memQueue.Count < 1)
                        continue;

                    var xxx = _memQueue;
                    _memQueue = new ConcurrentQueue<ProductStat>();
                    WriteLog(xxx);
                }
                catch (Exception ex)
                {
                    LogHelper.WriteCustom("Listening,数据处理失败:" + ex, "watchErr\", false);
                    Thread.Sleep(10000);
                }
            }
        }

        private void WriteLog(ConcurrentQueue<ProductStat> xxx)
        {
            ThreadPool.UnsafeQueueUserWorkItem(delegate
            {
                try
                {
                    List<SqlDataRecord> r = new List<SqlDataRecord>();
                    ProductStat pro;
                    while (xxx.TryDequeue(out pro))
                    {
                        var record = new SqlDataRecord(_metaData);
                        record.SetInt32(0, pro.Language);
                        record.SetInt32(1, pro.ProductId);
                        record.SetInt32(2, pro.BranchId);
                        record.SetInt32(3, pro.Platform);
                        record.SetInt32(4, pro.LogDate);
                        record.SetInt64(5, pro.RequestNum);
                        record.SetInt64(6, pro.DownNum);
                        r.Add(record);
                    }
                    
                    if (r.Count == 0)
                        return;

                     SqlParameter[] parameters = new[] 
                        { 
                            //new SqlParameter("@tblName", SqlDbType.VarChar,255){ Value = "ProductStat"}, 
                            new SqlParameter("@tblPstat", SqlDbType.Structured){ Value = r,TypeName="dbo.ProductStat"}, 
                        };

                     SqlHelper.ExecuteNonQuery(DbConfig.ProductDb, CommandType.StoredProcedure, "PR_InsertProductStatLog", parameters);

                }
                catch (Exception ex)
                {
                    LogHelper.WriteCustom("WriteLog,写入到数据库失败:" + ex, "watchErr\", false);
                    Thread.Sleep(10000);
                }
            }, null);
        }
    }

调用方法
LogStats.GetInstance().Push(xxx);

监听方法

LogStats.GetInstance().StartListen();

数据库设计

CREATE TYPE [dbo].[ProductStat] AS TABLE(
    [language] [int] NOT NULL,
    [productId] [int] NOT NULL,
    [branchId] [int] NOT NULL,
    [platform] [int] NOT NULL,
    [logDate] [int] NOT NULL,
    [requestNum] [bigint] NOT NULL,
    [downNum] [bigint] NOT NULL
)
GO
CREATE PROCEDURE [dbo].[PR_InsertProductStatLog]
    @tblPstat ProductStat READONLY
AS 
    BEGIN    
    DECLARE @now DATETIME;
        DECLARE @chToday INT;
        SET @now = GETDATE();
        --把时间变成8位年月日的整数形式,用于sql条件
        SET @chToday = DATEPART(yyyy, @now) * 10000 + DATEPART(m, @now) * 100
            + DATEPART(d, @now);
            
         --汇总
        DECLARE @tblTmp TABLE
            (
                [language] [int] NOT NULL,
                [productId] [int] NOT NULL,
                [branchId] [int] NOT NULL,
                [platform] [int] NOT NULL,
                [logDate] [int] NOT NULL,
                [requestNum] [bigint] NOT NULL,
                [downNum] [bigint] NOT NULL
            );    
        --先汇总,插入到临时表
        INSERT  INTO @tblTmp
                ( 
                  [language],
                  [productId] ,
                  [branchId],
                  [platform],
                  [logDate],
                  [requestNum],
                  [downNum]
                )
                SELECT [language],
                  [productId] ,
                  [branchId],
                  [platform],
                  @chToday ,
                   SUM([requestNum]),
                   SUM([downNum])
                FROM    @tblPstat
                GROUP BY [language],
                [productId] ,
                  [branchId],
                  [platform],
                  [logDate];
        Begin  Tran
        
        --更新
        UPDATE a
        SET 
            a.[requestNum] = a.[requestNum]+b.[requestNum],
            a.[downNum] = a.[downNum]+b.[downNum]
        FROM 
            ProductStat a,
            @tblTmp b
        WHERE 
            a.[logDate]= b.[logDate] AND 
            a.[platform]= b.[platform] AND 
            a.[language] = b.[language] AND 
            a.[productId] = b.[productId] AND 
            a.[branchId] = b.[branchId]
        
        --插入    
        INSERT  INTO ProductStat
                (
                  [language],
                 [productId] ,
                  [branchId],
                  [platform],
                  [logDate],
                  [requestNum],
                  [downNum]
                )
        SELECT 
                 b.[language],
                 b.[productId] ,
                 b.[branchId],
                 b.[platform],
                 b.[logDate],
                 b.[requestNum],
                 b.[downNum]
        FROM @tblTmp b left join ProductStat a 
        ON a.[logDate]= b.[logDate] AND 
            a.[platform]= b.[platform] AND 
            a.[language] = b.[language] AND 
            a.[productId] = b.[productId] AND 
            a.[branchId] = b.[branchId] 
        WHERE a.autoID is null;     
        Commit Tran 
    END
原文地址:https://www.cnblogs.com/contraII/p/3240619.html