SQL大批量插入数据的方式(多表关联) .

前段时间,在工作中遇到这个需求,ADO.NET需要大批量插入几万条甚至几十万的数据。因为业务特殊,多张表的相互关联,通常做法是先往主表里面插入一条数据,然后获取主表的主键ID,再往其他关联的表里面插入ID的关联数据。刚开始做的时候,想到用事务,把几万条SQL拼装起来,在一个事务里面去执行,结果很壮烈,执行性能非常糟糕。几千条业务数据执行了几分钟。用代码分析工具Dottrace一查,发现单单操作数据库的时间占了99.9%。

(Dottrace,代码性能分析工具,它分dottrace Performancedottrace Memory两个工具,dottrace Performance用来分析代码性能,比如函数执行时间,调用次数,消耗时间比率等,dottrace Memory一般用来分析内存占用情况。大家如果有兴趣的同学可以去下载玩下,对代码优化工作很有帮助的。)

言归正传,去网上搜了很多资料,原来ADO.NET2.0有一个新的特性:SqlBulkCopy,效率还是很高的。然后结合自己的业务需求,修改了下代码。现在跟大家一起来学习下。(因为也是参考前辈写的资料,所以下面的示例都大同小异)

建立测试数据库(BulkTestDB)、主表(BulkTestMain)、从表(BulkTestDetail)

--Create DataBase     
create database BulkTestDB;    
go    
use BulkTestDB;    
go    
--Create Table     
Create table BulkTestMain(    
Id int primary key,    
GuidId long,--辅助的唯一标识   
Batch long,--导入的批次标识   
Name nvarchar(32)    
go    
  
Create table BulkTestDetail(    
Id int primary key,    
PId int,  
Lesson nvarchar(32)   
go

数据库建立完毕,开始编写后台代码

View Code
public void TestMain()
        {
            using (SqlConnection connection = new SqlConnection("你的链接字符串"))
            {
                connection.Open();
                SqlTransaction transaction = connection.BeginTransaction("Transaction1");
                DataTable dtTestMain= GetTableSchema("BulkTestMain");//构建BulkTestMain表结构
                DataTable dtTestDetail = GetTableSchema("BulkTestDetail");//构建BulkTestDetail表结构
                Guid Batch = Guid.NewGuid();//插入的批次,为后面查询dtTestMainTmp 做条件
                for (int i = 0; i < 1000000; i++)//测试100w条数据
                {
                    DataRow dr= dtTestMain.NewRow();
                    Guid newGuid = Guid.NewGuid();
                    dr["_GuidId"] = newGuid;
                    dr["_Batch"] = Batch;
                    dr["_UserName"] = "测试" + i.ToString();
                    dtTestMain.Rows.Add(dr);

                    for(int j = 0;j<10;j++)//给从表每次插入10条数据
                    {
                        DataRow dr1 = dtTestDetail.NewRow();
                        dr1["_GuidId"]= newGuid;
                        dr1["_Lesson"]="课程"+j.ToString();
                        dtTestDetail.Rows.Add(dr1);
                    }
                    //这样做的目的,让主表与从表可以临时通过GuidId关联起来
                }
                BulkToDB(dtTestMain, "BulkTestMain", connection, transaction);//先让BulkTestMain插入了大量的数据,注意这些数据是临时的,在SqlTransaction提交之前,查询时要用with(nolock)
                
                DataSet dtTestMainTmp = GetNewImportData(Batch.ToString());//好吧,我们来查询下,刚才大量插入的10w条数据,这里只需要查询标识的2列字段
                Dictionary<string, long> dicGuidToID = new Dictionary<string, long>();
                foreach (DataRow dr in dtTestMainTmp.Tables[0].Rows)
                {
                    dicGuidToID.Add(dr[1].ToString(), Convert.ToInt64(dr[0]));
                }//dicGuidToID:guid字段与插入的主表ID字段关联起来成字典,用字典是为了访问起来效率(为什么获取字典key的值效率很高,有兴趣的可以去研究“散列表”的概念)

                foreach (DataRow dr in dtTestDetail.Rows)//现在给dtTestDetail的PId字段赋值(PId字段与主表Id外键关联)
                {
                    dr["_PId"] = dicGuidToID[dr["_GuidId"].ToString()].ToString();
                }
                dtTestDetail.Columns.Remove("_GuidId");//移除dtTestDetail的GuidId字段,使它与数据库列匹配
                BulkToDB(dtTestDetail,"BulkTestDetail",connection, transaction);//给从表插入数据
                transaction.Commit();      
                connection.Close();
            }

        }
          /// <summary>
        /// 根据批次Batch获取导进来的临时数据
        /// </summary>
        /// <returns></returns>
        public static DataSet GetNewImportData(string batch)
        {
            StringBuilder strSql = new StringBuilder();
            strSql.Append("SELECT [Id],[GuidId]")
                .Append(" FROM ContactInfo WITH (NOLOCK) WHERE Batch=@batch");
            SqlParameter[] parameters = {
                    new SqlParameter("@Batch", SqlDbType.BigInt){Value = batch}
            };
            DataSet ds = SqlHelper.ExecuteDataset(strSql.ToString(), parameters);
            return ds;
        }

        public static void BulkToDB(DataTable dtSource, string TableName,SqlConnection connection, SqlTransaction transaction)
        {
            using (SqlBulkCopy sqlBulkCopy = new SqlBulkCopy(connection, SqlBulkCopyOptions.KeepIdentity, transaction))
            {
                sqlBulkCopy.DestinationTableName = TableName;//要插入数据的表的名称
                sqlBulkCopy.BatchSize = dtSource.Rows.Count;//数据的行数

                List<SqlBulkCopyColumnMapping> mpList = getMapping(TableName);//获取表映射关系

                foreach (SqlBulkCopyColumnMapping mp in mpList)
                {
                    sqlBulkCopy.ColumnMappings.Add(mp);
                }

                if (dtSource != null && dtSource.Rows.Count != 0)
                {
                    sqlBulkCopy.WriteToServer(dtSource);//插入数据
                }
            }
        }

        public static List<SqlBulkCopyColumnMapping> getMapping(string TableName)
        {
            List<SqlBulkCopyColumnMapping> mpList = new List<SqlBulkCopyColumnMapping>();
            switch(TableName)
            {
                case "BulkTestMain":{
                            mpList.Add(new SqlBulkCopyColumnMapping("_Id", "Id"));
                            mpList.Add(new SqlBulkCopyColumnMapping("_GuidId", "GuidId"));
                            mpList.Add(new SqlBulkCopyColumnMapping("_Batch","Batch"));
                            mpList.Add(new SqlBulkCopyColumnMapping("_UserName", "UserName"));
                }break;
                case "BulkTestDetail":{
                            mpList.Add(new SqlBulkCopyColumnMapping("_Id", "Id"));
                            mpList.Add(new SqlBulkCopyColumnMapping("_PId", "PId"));
                            mpList.Add(new SqlBulkCopyColumnMapping("_Lesson", "Lesson"));
                }break;
            }
            return mpList;
        }
        private static DataTable GetTableSchema(string TableName)
        {
            DataTable dataTable = new DataTable();
            switch(TableName)
            {
               case "BulkTestMain" :{ 
                   dataTable.Columns.AddRange(new DataColumn[] {  
                                        new DataColumn("_Id",typeof(Int32)),
                                        new DataColumn("_GuidId",typeof(Int64)),
                                        new DataColumn("_Batch",typeof(Int64)),
                                        new DataColumn("_UserName",typeof(String))
                                    });}break;
               case "BulkTestDetail":{
                                        dataTable.Columns.AddRange(new DataColumn[] {  
                                        new DataColumn("_Id",typeof(Int32)),
                                        new DataColumn("_PId",typeof(Int32)),
                                        new DataColumn("_GuidId",typeof(Int64)),
                                        new DataColumn("_Lesson",typeof(String))});
               }break;
            }
           return dataTable;
        }

总算把代码copy完了,任务完成。

多表关联批量插入数据库的方法,欢迎大家批评指正。

原文地址:https://www.cnblogs.com/contraII/p/2548519.html