SQL Server 批量插入数据的两种方法

 

      在SQL Server 中插入一条数据使用Insert语句,但是如果想要批量插入一堆数据的话,循环使用Insert不仅效率低,而且会导致SQL一系统性能问题。下面介绍SQL Server支持的两种批量数据插入方法:Bulk和表值参数(Table-Valued Parameters)。

运行下面的脚本,建立测试数据库和表值参数。

[c-sharp] view plaincopy
 
 
 
  1. --Create DataBase  
  2. create database BulkTestDB;  
  3. go  
  4. use BulkTestDB;  
  5. go  
  6. --Create Table  
  7. Create table BulkTestTable(  
  8. Id int primary key,  
  9. UserName nvarchar(32),  
  10. Pwd varchar(16))  
  11. go  
  12. --Create Table Valued  
  13. CREATE TYPE BulkUdt AS TABLE  
  14.   (Id int,  
  15.    UserName nvarchar(32),  
  16.    Pwd varchar(16))  

下面我们使用最简单的Insert语句来插入100万条数据,代码如下:

[c-sharp] view plaincopy
 
 
 
  1. Stopwatch sw = new Stopwatch();  
  2.   
  3. SqlConnection sqlConn = new SqlConnection(  
  4.     ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);//连接数据库  
  5.   
  6. SqlCommand sqlComm = new SqlCommand();  
  7. sqlComm.CommandText = string.Format("insert into BulkTestTable(Id,UserName,Pwd)values(@p0,@p1,@p2)");//参数化SQL  
  8. sqlComm.Parameters.Add("@p0", SqlDbType.Int);  
  9. sqlComm.Parameters.Add("@p1", SqlDbType.NVarChar);  
  10. sqlComm.Parameters.Add("@p2", SqlDbType.VarChar);  
  11. sqlComm.CommandType = CommandType.Text;  
  12. sqlComm.Connection = sqlConn;  
  13. sqlConn.Open();  
  14. try  
  15. {  
  16.     //循环插入100万条数据,每次插入10万条,插入10次。  
  17.     for (int multiply = 0; multiply < 10; multiply++)  
  18.     {  
  19.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
  20.         {  
  21.   
  22.             sqlComm.Parameters["@p0"].Value = count;  
  23.             sqlComm.Parameters["@p1"].Value = string.Format("User-{0}", count * multiply);  
  24.             sqlComm.Parameters["@p2"].Value = string.Format("Pwd-{0}", count * multiply);  
  25.             sw.Start();  
  26.             sqlComm.ExecuteNonQuery();  
  27.             sw.Stop();  
  28.         }  
  29.         //每插入10万条数据后,显示此次插入所用时间  
  30.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
  31.     }  
  32. }  
  33. catch (Exception ex)  
  34. {  
  35.     throw ex;  
  36. }  
  37. finally  
  38. {  
  39.     sqlConn.Close();  
  40. }  
  41.   
  42. Console.ReadLine();  

耗时图如下:

使用Insert语句插入10万数据的耗时图

由于运行过慢,才插入10万条就耗时72390 milliseconds,所以我就手动强行停止了。

下面看一下使用Bulk插入的情况:

bulk方法主要思想是通过在客户端把数据都缓存在Table中,然后利用SqlBulkCopy一次性把Table中的数据插入到数据库

代码如下:

[c-sharp] view plaincopy
 
 
 
  1. public static void BulkToDB(DataTable dt)  
  2. {  
  3.     SqlConnection sqlConn = new SqlConnection(  
  4.         ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);  
  5.     SqlBulkCopy bulkCopy = new SqlBulkCopy(sqlConn);  
  6.     bulkCopy.DestinationTableName = "BulkTestTable";  
  7.     bulkCopy.BatchSize = dt.Rows.Count;  
  8.   
  9.     try  
  10.     {  
  11.         sqlConn.Open();  
  12.     if (dt != null && dt.Rows.Count != 0)  
  13.         bulkCopy.WriteToServer(dt);  
  14.     }  
  15.     catch (Exception ex)  
  16.     {  
  17.         throw ex;  
  18.     }  
  19.     finally  
  20.     {  
  21.         sqlConn.Close();  
  22.         if (bulkCopy != null)  
  23.             bulkCopy.Close();  
  24.     }  
  25. }  
  26.   
  27. public static DataTable GetTableSchema()  
  28. {  
  29.     DataTable dt = new DataTable();  
  30.     dt.Columns.AddRange(new DataColumn[]{  
  31.         new DataColumn("Id",typeof(int)),  
  32.         new DataColumn("UserName",typeof(string)),  
  33.     new DataColumn("Pwd",typeof(string))});  
  34.   
  35.     return dt;  
  36. }  
  37.   
  38. static void Main(string[] args)  
  39. {  
  40.     Stopwatch sw = new Stopwatch();  
  41.     for (int multiply = 0; multiply < 10; multiply++)  
  42.     {  
  43.         DataTable dt = Bulk.GetTableSchema();  
  44.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
  45.         {  
  46.             DataRow r = dt.NewRow();  
  47.             r[0] = count;  
  48.             r[1] = string.Format("User-{0}", count * multiply);  
  49.             r[2] = string.Format("Pwd-{0}", count * multiply);  
  50.             dt.Rows.Add(r);  
  51.         }  
  52.         sw.Start();  
  53.         Bulk.BulkToDB(dt);  
  54.         sw.Stop();  
  55.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
  56.     }  
  57.   
  58.     Console.ReadLine();  
  59. }  

耗时图如下:

使用Bulk插入100万数据的耗时图

可见,使用Bulk后,效率和性能明显上升。使用Insert插入10万数据耗时72390,而现在使用Bulk插入100万数据才耗时17583。

最后再看看使用表值参数的效率,会另你大为惊讶的。

表值参数是SQL Server 2008新特性,简称TVPs。对于表值参数不熟悉的朋友,可以参考最新的book online,我也会另外写一篇关于表值参数的博客,不过此次不对表值参数的概念做过多的介绍。言归正传,看代码:

[c-sharp] view plaincopy
 
 
 
  1. public static void TableValuedToDB(DataTable dt)  
  2. {  
  3.     SqlConnection sqlConn = new SqlConnection(  
  4.       ConfigurationManager.ConnectionStrings["ConnStr"].ConnectionString);  
  5.     const string TSqlStatement =  
  6.      "insert into BulkTestTable (Id,UserName,Pwd)" +  
  7.      " SELECT nc.Id, nc.UserName,nc.Pwd" +  
  8.      " FROM @NewBulkTestTvp AS nc";  
  9.     SqlCommand cmd = new SqlCommand(TSqlStatement, sqlConn);  
  10.     SqlParameter catParam = cmd.Parameters.AddWithValue("@NewBulkTestTvp", dt);  
  11.     catParam.SqlDbType = SqlDbType.Structured;  
  12.     //表值参数的名字叫BulkUdt,在上面的建立测试环境的SQL中有。  
  13.     catParam.TypeName = "dbo.BulkUdt";  
  14.     try  
  15.     {  
  16.       sqlConn.Open();  
  17.       if (dt != null && dt.Rows.Count != 0)  
  18.       {  
  19.           cmd.ExecuteNonQuery();  
  20.       }  
  21.     }  
  22.     catch (Exception ex)  
  23.     {  
  24.       throw ex;  
  25.     }  
  26.     finally  
  27.     {  
  28.       sqlConn.Close();  
  29.     }  
  30. }  
  31.   
  32. public static DataTable GetTableSchema()  
  33. {  
  34.     DataTable dt = new DataTable();  
  35.     dt.Columns.AddRange(new DataColumn[]{  
  36.       new DataColumn("Id",typeof(int)),  
  37.       new DataColumn("UserName",typeof(string)),  
  38.       new DataColumn("Pwd",typeof(string))});  
  39.   
  40.     return dt;  
  41. }  
  42.   
  43. static void Main(string[] args)  
  44. {  
  45.     Stopwatch sw = new Stopwatch();  
  46.     for (int multiply = 0; multiply < 10; multiply++)  
  47.     {  
  48.         DataTable dt = TableValued.GetTableSchema();  
  49.         for (int count = multiply * 100000; count < (multiply + 1) * 100000; count++)  
  50.         {          
  51.             DataRow r = dt.NewRow();  
  52.             r[0] = count;  
  53.             r[1] = string.Format("User-{0}", count * multiply);  
  54.             r[2] = string.Format("Pwd-{0}", count * multiply);  
  55.             dt.Rows.Add(r);  
  56.         }  
  57.         sw.Start();  
  58.         TableValued.TableValuedToDB(dt);  
  59.         sw.Stop();  
  60.         Console.WriteLine(string.Format("Elapsed Time is {0} Milliseconds", sw.ElapsedMilliseconds));  
  61.     }  
  62.   
  63.     Console.ReadLine();  
  64. }  

耗时图如下:

使用表值参数插入100万数据的耗时图

比Bulk还快5秒。

SQLBulkCopy,用于数据库之间大批量的数据传递。通常用于新,旧数据库之间数据的更新。即使表结构完全不同,也可以通过字段间的对应关系,顺利的将数据导过来。

首先,SQLBulkCopy需要2个连接。分别连接到不同的旧表所在的数据库,新表所在的数据库。

其次,我们要从旧数据库中,把导出的字段读取出来。用什么读呢?可以用Datatable,也可以用SqlDataReader。因为SqlDataReader不占用内存,对大批量的数据复制,不需要事先导入到系统。所以就用SqlDataReader了。

读出后,设定对应关系,设定目标表名,写入。就这么简单。速度非常快!

初始化Connection对象

SqlConnection ConnectionNew=new SqlConnection("连接信息");

SqlConnection ConnectionOld=new SqlConnection("连接信息");

            try
            {

//1.在旧表中,用SqlDataReader读取出信息
                SqlCommand cmd = new SqlCommand(SQL, ConnectionOld);
                sdr = cmd.ExecuteReader();

               //2.初始化SqlBulkCopy对象,用新的连接作为参数。

                SqlBulkCopy bulkCopy = new SqlBulkCopy(ConnectionNew);

              //3.写对应关系。如旧表的People列的数据,对应新表Human列,那么就写bulkCopy.ColumnMappings.Add("People","Human")

              //如果两张表的结构一样,那么对应关系就不用写了。

             //我是用哈希表存储对应关系的,哈希表作为参数到传入方法中,key的值用来存储旧表的字段名,VALUE的值用来存储新表的值
                foreach (string str in HTDuiYing.Keys)
                {
                    bulkCopy.ColumnMappings.Add(str, HTDuiYing[str].ToString());
                }

               //4.设置目标表名
                bulkCopy.DestinationTableName = TableNmae;

                //额外,可不写:设置一次性处理的行数。这个行数处理完后,会激发SqlRowsCopied()方法。默认为1
                bulkCopy.NotifyAfter = 10;

               //额外,可不写:设置激发的SqlRowsCopied()方法,这里为bulkCopy_SqlRowsCopied
                bulkCopy.SqlRowsCopied += new SqlRowsCopiedEventHandler(bulkCopy_SqlRowsCopied);

               //OK,开始传数据!
                bulkCopy.WriteToServer(sdr);
            }

          //激发的方法写在外头

         private void bulkCopy_SqlRowsCopied(object sender, SqlRowsCopiedEventArgs e)
        {
            执行的内容。
            这里有2个元素值得拿来用
            e.RowsCopied,返回数值类型,表示当前已经复制的行数
            e.Abort,用于赋值true or false,用于停止赋值的操作 

        }

 由于不同批次在不同事务中执行,因此,如果在批量复制操作期间发生错误,则当前批次中的所有行都将被回滚,但以前批次中的行将保留在数据库中。

比如:批量复制100条数据到数据库汇总,batchsize设置为10.则没10条数据复制作为一个事务,整个100条数据的复制操作被分割为10个独立的事务。如果复制到第56条数据时(第6个事务),出错了。则前50条数据提交到数据库中,只回滚出错的事务。

  如果由于发生错误而需要回滚整个批量复制操作,或者批量复制应作为更大的可回滚进程的一部分执行,则可以将 SQLTransaction 对象提供给 SqlBulkCopy 构造函数.

  示例:

 using (SqlConnection destinationConnection = new SqlConnection(connectionString))

            {
                destinationConnection.Open();
                using (SqlTransaction transaction = destinationConnection.BeginTransaction())
                {
                    using (SqlBulkCopy bulkCopy = new SqlBulkCopy( destinationConnection, SqlBulkCopyOptions.Default, transaction))
                    {
                        bulkCopy.BatchSize = 10;
                        bulkCopy.DestinationTableName = "dbo.BulkCopyDemoMatchingColumns";

                        try
                        {
                            bulkCopy.WriteToServer(reader);
                            transaction.Commit();
                        }
                        catch (Exception ex)
                        {
                            Console.WriteLine(ex.Message);
                            transaction.Rollback();
                        }
                        finally
                        {
                            reader.Close();
                        }
                    }
                }
            }

原文地址:https://www.cnblogs.com/zyy1688/p/10002661.html