批量数据入库

下面主要介绍数据库批量操作数据(主要是 Insert)的方法,涉及 SQL Server、DB2、MySQL 等。

SQL Server

首先,准备工作,新建一个数据库实例

create database Stu_Sqh

在数据库实例中新建一张数据表:学生信息表

1 CREATE TABLE [dbo].[StudentInfo](
2     [NAME] [varchar](20) NOT NULL,
3     [sex] [varchar](5) NULL,
4     [BIRTH] [date] NOT NULL,
5     [AGE]  AS (datediff(year,[BIRTH],getdate())),
6     [ID] [int] IDENTITY(101,1) NOT NULL,
7     [SchoolID] [varchar](5) NULL
8 )
StudentInfo

为联表查询做准备,另新建一张数据表:学校信息表

1 CREATE TABLE [dbo].[SchoolInfo](
2     [ID] [varchar](5) NOT NULL,
3     [NAME] [varchar](10) NOT NULL,
4     [City] [varchar](10) NOT NULL
5 )
SchoolInfo

同时分别为两张表添加主外键约束

alter table [SchoolInfo]
 add constraint PK_SchoolInfo primary key(ID)

alter table [StudentInfo]
 add constraint PK_StudentInfo primary key(ID), 
     constraint FK_StudentInfo_SchoolInfo foreign key(SchoolID) references SchoolInfo(ID)

定义一个学习信息类:Schoolnfo

1 /// <summary>
2     /// 学校信息
3     /// </summary>
4     public class SchoolInfo
5     {
6         public string ID { get; set; }
7         public string Name { get; set; }
8         public string City { get; set; }
9     }
SchoolInfo

注意,务必带着 { get; set; },用于后面通过反射获取参数的属性相关信息。

在进行插入之前,先封装一个类,实现数据库插入操作

 1 public class DataOperation
 2 {
 3     // 数据库连接串
 4     private const string DBConnectionString = Const.DBConnectionString;
 5     // 数据库连接
 6     private static SqlConnection db_Connection = null;
 7     // 日志记录组件
 8     public static ILog Logger = null;
 9 
10     /// <summary>
11     /// 创建数据库连接
12     /// </summary>
13     public static void CreatDBConnection()
14     {
15         if (db_Connection != null)
16         {
17             Logger.Warn("DB connection already exits");
18             return;
19         }
20         else
21         {
22             db_Connection = new SqlConnection(DBConnectionString);
23             db_Connection.Open();
24             Logger.Warn("DB connection open success");
25         }
26     }
27 
28     /// <summary>
29     /// 释放数据库连接
30     /// </summary>
31     public static void DestroyDBConnection()
32     {
33         if (db_Connection == null)
34         {
35             Logger.Warn("DB connection already close");
36             return;
37         }
38         else
39         {
40             if (db_Connection.State != ConnectionState.Closed)
41             {
42                 db_Connection.Close();
43             }
44             db_Connection.Dispose();
45             Logger.Warn("DB connection close success");
46         }
47     }
48 
49     /// <summary>
50     /// 普通insert方法
51     /// </summary>
52     public static bool Insert(string sql)
53     {
54         try
55         {
56             CreatDBConnection();
57             SqlCommand command = new SqlCommand(sql, db_Connection);
58             command.ExecuteNonQuery();
59         }
60         catch(Exception ex)
61         {
62             string msg = string.Format("Insert() 执行异常: {0} {1}", ex.Message, ex.StackTrace);
63             Logger.Error(msg);
64             return false;
65         }
66 
67         return true;
68     }    
69 }
DataOperation - Insert

其中,数据库连接串设置如下

/// <summary>
/// 数据库连接串:本地
/// </summary>
public const string DBConnectionString = "server=.;database=Stu_Sqh;Trusted_Connection=True";

自然而然,最常用的是普通的 insert 方法,可以插入一条或多条数据

1 // 单条插入
2 insert into SchoolInfo values('10286', 'seu', 'NanJing')
3 // 多条插入
4 insert into SchoolInfo
5 values('108', 'pku', 'BeiJing'),
6       ('109', 'hdu', 'HangZhou'),
7       ... ...
8       ('110', 'dltg', 'DaLian')
Insert - 数据库SQL实现

或者直接通过代码实现

string sqlInsert = string.Format(@"insert into SchoolInfo values('xxx','xxx', 'xxx')");
bool insertFlag = DataOperation.Insert(sqlInsert);

或者通过在代码中以 拼接SQL 的形式的组装要执行的SQL语句

 1 StringBuilder sb = new StringBuilder();
 2 sb.Append("INSERT INTO SchoolInfo(Id,Name,City) VALUES");
 3 using( SqlCommand command = new SqlCommand() )
 4 {
 5     // 拼接SQL
 6     for(int i=0; i<list.count; ++i) {
 7         sb.AppendFormat("('{0}','seu{1}','{2}'),", 
 8              i.ToString(), i.ToString(), NJing);
 9     }
10     
11     CreatDBConnection();
12     command.Connection = db_Connection;
13     command.CommandText = sb.ToString().TrimEnd(',');
14     command.ExecuteNonQuery();
15 }
拼接SQL

但是,该方法在 C# 中有限制,一次性只能批量插入1000条,大数据量只能分批次插入。

综上所述,在大数据量的情况下,普通的 insert 方法插入批量数据执行太慢。

参见.NET批量大数据插入性能分析及比较C#批量插入数据到Sqlserver中的几种方式

下面提供几种方法,提高批量数据插入效率:

1iBatista/MyBatis

// 待更新 ...

参考MyBatis 批量插入数据

2SqlBulkCopy

从 .Net 2.0 提供 SqlBulkCopy 对象,支持一次性批量插入数据,效率高。

首先,封装一个获取 DataTable 对象的方法,采用泛型参数

 1 public class DBHelper
 2 {
 3     public static DataTable GetDataTable<T>(List<T> list) 
 4     {
 5         DataTable dt = new DataTable();
 6 
 7         // 反射获取泛型参数类型、泛型参数的属性名称和属性类型
 8         Type type = typeof(T);
 9         PropertyInfo[] propInfos = type.GetProperties();
10         foreach (var pi in propInfos) {
11             dt.Columns.Add(pi.Name, 
12                 System.Type.GetType(pi.PropertyType.FullName));
13         }
14 
15         // 组装 DataTable
16         foreach (T t in list)
17         {
18             DataRow dr = dt.NewRow();
19             foreach (var pi in propInfos)
20             {
21                 string key = pi.Name;
22                 object value = pi.GetValue(t, null);
23                 dr[key] = value;
24             }
25             dt.Rows.Add(dr);
26         }
27 
28         return dt;
29     }
30 }
GetDataTable

注意,通过反射 Type.GetProperties() 方法,要保证参数 T 中的属性定义包括 { get; set; } 。

然后在 DataOperation 中新增一个 BulkInsert() 方法,利用 SqlBulkCopy 插入批量数据

 1 /// <summary>
 2 /// SqlBulkCopy Insert 方法
 3 /// </summary>
 4 public static bool BulkInsert(DataTable dt)
 5 {
 6     SqlCommand command = null;
 7     SqlTransaction sqlBulkTrans = null;
 8     SqlBulkCopy sqlBulkCopy = null;
 9     try
10     {
11         CreatDBConnection();
12 
13         command = db_Connection.CreateCommand();
14         sqlBulkTrans = db_Connection.BeginTransaction();
15         command.Transaction = sqlBulkTrans;
16 
17         // 插入数据的同时检查约束,如果发生错误则调用sqlbulkTransaction事务
18         sqlBulkCopy = new SqlBulkCopy(db_Connection,
19                             SqlBulkCopyOptions.CheckConstraints, sqlBulkTrans);
20         sqlBulkCopy.DestinationTableName = dt.TableName;
21         foreach (DataColumn dc in dt.Columns) {
22             sqlBulkCopy.ColumnMappings.Add(dc.ColumnName, dc.ColumnName);
23         }
24 
25         sqlBulkCopy.WriteToServer(dt);
26         sqlBulkTrans.Commit();
27     }
28     catch (Exception ex)
29     {
30         string msg = string.Format("BulkInsert() 执行异常: {0} {1}", ex.Message, ex.StackTrace);
31         Logger.Error(msg);
32         if (sqlBulkTrans != null) {
33             sqlBulkTrans.Rollback();
34         }
35 
36         return false;
37     }
38     finally {
39         sqlBulkCopy.Close();
40     }
41     return true;
42 }
BulkInsert

直接按照如下代码调用即可

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 组装 list

DataTable dt = DBHelper.GetDataTable<SchoolInfo>(list);
dt.TableName = "SchoolInfo";
bool bulkInsertFlag = DataOperation.BulkInsert(dt);

在使用用遇到以下问题

给定的 ColumnMapping 与源或目标中的任意列均不匹配

原因及解决方法:表的列名是区分大小写的。务必保证数据库表的列名和 DataTable 的列名完全一致,最好能保证顺序也一致。

此外, SqlBulkCopyOptions.KeepIdentity 这个参数允许自增型主键导入数据。

局限性

  • 不支持将多条数据传给存储过程
  • 数据库表列名务必与 dt 的列名(大小写敏感)、顺序高度一致

3表值参数

如(1)所述,SqlBulkCopy 对象能够将多条数据一次性传送给SQL Server,但是多条数据仍然无法一次性传给存储过程。

SQL Server 2008 增加的新特性:表值参数(Table-Valued Parameter,TVP),可以将 DataTable 作为参数传递给函数或存储过程。

不必在存储过程中创建临时表或许多参数,即可向存储过程发送多行数据,节省自定义SQL语句。

表值参数通过自定义的表类型来声明,首先,需要自定义一个表类型

IF NOT EXISTS (SELECT * FROM sys.types AS t WHERE t.name='LocalSchoolTVP')  
	BEGIN  
		CREATE TYPE dbo.LocalSchoolTVP AS TABLE(   
			ID varchar(5)  primary key NOT NULL,
			Name varchar(10) NOT NULL,
			City varchar(10) NOT NULL  
		)  
	END;  
GO  

在数据库路径 DbSqh --> 可编程性 --> 类型 --> 用户定义表类型 中可以查看新增的表类型 LocalSchoolTVP。

然后定义一个存储过程,接受一个 LocalSchoolTVP 的表类型参数

 1 USE [DbSqh]
 2 GO
 3 SET ANSI_NULLS ON
 4 GO
 5 SET QUOTED_IDENTIFIER ON
 6 GO
 7 
 8 /*
 9    此处用户自定义表类型  
10 */
11 
12 if exists(select 1 from sysobjects where id=object_id('AddSchoolInfoBatchByTVP') and xtype='P')
13     drop proc AddSchoolInfoBatchByTVP
14 GO
15 
16 create proc AddSchoolInfoBatchByTVP(
17     @TVP LocalSchoolTVP READONLY
18 )
19 as  
20 begin
21     insert into SchoolInfo
22     select ID, Name, City from @TVP
23 end;
24 
25 RETURN 0;
Proc AddSchoolInfoBatchByTVP

注意,存储过程中表值参数声明为 readonly,只读属性,不能对表值参数执行 UPDATE、DELETE 或 INSERT等 DML 操作。

在 SQL Server 中创建表值参数和存储过程的相关信息参考:http://www.cnblogs.com/isfish/p/4337488.html

首先在 SQL Server 中对创建的表类型和存储过程进行简单的测试

 1 declare @TVP as LocalSchoolTVP
 2 
 3 insert into @TVP
 4 values
 5 ('z01','tinghua','BeiJing'),
 6 ('z02','bjlg','BeiJing'),
 7 ('z03','seu','NanJing'),
 8 ('z04','nju','NanJing'),
 9 ('z05','whut','WuHan');
10 
11 EXEC AddSchoolInfoBatchByTVP @TVP
12 GO
TestTvpToProc

在调用之前,首先封装一个方法执行存储过程

 1 /// <summary>
 2 /// TVP表值参数  Insert 方法(方法体其实同 SqlXmlInsertByProc 完全一样)
 3 /// </summary>
 4 public static bool TVPInsertByProc(string ProcName, SqlParameter[] sqlParameters)
 5 {
 6     SqlTransaction sqlTrans = null;
 7     try
 8     {
 9         CreatDBConnection();
10         sqlTrans = db_Connection.BeginTransaction();
11         using (SqlCommand command = new SqlCommand())
12         {
13             command.Connection = db_Connection;
14             command.Transaction = sqlTrans;
15             command.CommandType = CommandType.StoredProcedure;
16             command.CommandText = ProcName;
17             command.Parameters.AddRange(sqlParameters);
18 
19             command.ExecuteNonQuery();
20             sqlTrans.Commit();
21         }
22     }
23     catch (Exception ex)
24     {
25         string msg = string.Format("TVPInsertByProc() 执行异常: {0} {1}", ex.Message, ex.StackTrace);
26         Logger.Error(msg);
27         if (sqlTrans != null)
28         {
29             sqlTrans.Rollback();
30         }
31         return false;
32     }
33     return true;
34 }
TVPInsertByProc

下面在代码中,调用存储过程执行数据批量入库

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 组装 list

DataTable dt = DBHelper.GetDataTable<SchoolInfo>(list);
//dt.TableName = "SchoolInfo"; // 用不到
SqlParameter[] sqlParams = new SqlParameter[1];
sqlParams[0] = new SqlParameter("@TVP", SqlDbType.Structured);
sqlParams[0].Value = dt;
sqlParams[0].TypeName = "LocalSchoolTVP";
string procName = "AddSchoolInfoBatchByTVP";
DataOperation.SqlXmlInsertByProc(procName, sqlParams);

其中,参数 sqlParams[0] 可以简写为如下形式

sqlParams[0] = new SqlParameter("@TVP", SqlDbType.Structured) { 
                Value = dt, TypeName = "LocalSchoolTVP"
            };

局限性

  • 表值参数仅仅在插入数目少于 1000 行时具有很好的执行性能
  • SQL Server 不维护表值参数列的统计信息
  • 表值参数需要预先定义与数据表相对应的表类型,略显麻烦

参考Using SQL Server’s Table Valued Parameters

4SQL XML

将 XML 字符串形式的数据,批量插入到数据库。通过将 XML 字符串作为参数,传给存储过程,存储过程解析 XML 字符串

先来了解下从 SQL Server 2005 新增的 SqlDbType.xml 数据库类型,最大可支持 2GB 大小,新建一张数据表

1 CREATE TABLE [dbo].[CityInfo](
2     ID varchar(5) NOT NULL,
3     CityXml xml NOT NULL
4 )
5 
6 alter table [CityInfo]
7  add constraint PK_CityInfo primary key(ID)
CityInfo with SqlDbType.xml

其中,可以用 varchar(max) 或 nvarchar(max) 代替 xml 类型,效果是一样的。

采用普通的数据库SQL语句,直接插入一条信息

insert into CityInfo
values('1','<City><ID>1</ID><Name>北京</Name></City>')

或者采用代码的形式实现,首先封装一个支持插入 xml 数据库类型的插入方法

 1 /// <summary>
 2 /// 普通insert方法, 数据表包含SqlDbType.Xml数据库类型
 3 /// </summary>
 4 public static bool InsertWithXml(string sql, SqlParameter[] sqlParameters)
 5 {
 6     try 
 7     {
 8         CreatDBConnection();
 9         using (SqlCommand command = new SqlCommand(sql, db_Connection))
10         {
11             command.Parameters.AddRange(sqlParameters);
12             command.ExecuteNonQuery();          
13         }
14     }
15     catch (Exception ex) 
16     {
17         string msg = string.Format("InsertWithXml() 执行异常: {0} {1}", ex.Message, ex.StackTrace);
18         Logger.Error(msg);
19         return false;
20     }
21     return true;
22 }
InsertWithXml

调用该方法插入一条数据(目前只允许一条)

string sql = "insert into CityInfo(ID,CityXml) VALUES(@id,@xml)";
StringBuilder sb = new StringBuilder();
sb.Append("<City><ID>9</ID><Name>广州</Name></City>");

XmlTextReader xtr = new XmlTextReader(sb.ToString(), XmlNodeType.Document, null);
SqlXml sqlXml = new SqlXml(xtr);
SqlParameter[] sqlParams = new SqlParameter[2];
sqlParams[0] = new SqlParameter("@id", id.ToString());
sqlParams[1] = new SqlParameter("@xml", SqlDbType.Xml, sqlXml.Value.Length);
sqlParams[1].Value = sqlXml;
bool insertWithXmlFlag = DataOperation.InsertWithXml(sql, sqlParams);

再或者,直接拼接 insert into TableName values(),() ... () 语句,调用普通的 DataOperation.Insert 方法亦可。

相关信息可参见:在C#中使用SqlDbType.Xml类型参数使用 SQL XML 数据类型 - Java(推荐)

下面开始研究传 XML字符串 存储过程,实现批量数据入库

首先了解 SQLServer 数据库对 XML 的各种操作

  • OpenXml
  • SQL For XML

具体的相关信息可以参见另一篇博客:SQL Server For XML - sqh

封装一个方法,调用存储过程

 1 /// <summary>
 2 /// SQL XML Insert 方法
 3 /// </summary>
 4 public static bool SqlXmlInsertByProc(string ProcName, SqlParameter[] sqlParameters)
 5 {   
 6     SqlTransaction sqlTrans = null;
 7     try
 8     {
 9         CreatDBConnection();
10         sqlTrans = db_Connection.BeginTransaction();
11         using (SqlCommand command = new SqlCommand())
12         {                   
13             command.Connection = db_Connection;
14             command.Transaction = sqlTrans;
15             command.CommandType = CommandType.StoredProcedure;
16             command.CommandText = ProcName;
17             command.Parameters.AddRange(sqlParameters);
18 
19             command.ExecuteNonQuery();
20             sqlTrans.Commit();
21         }
22     }
23     catch(Exception ex)
24     {
25         string msg = string.Format("SqlXmlInsertByProc() 执行异常: {0} {1}", ex.Message, ex.StackTrace);
26         Logger.Error(msg);
27         if (sqlTrans != null) {
28             sqlTrans.Rollback();
29         }
30         return false;
31     }
32     return true;
33 }
SqlXmlInsertByProc

相应的存储过程,接受一个 xml 数据库类型的参数

 1 USE [DbSqh]
 2 GO
 3 SET ANSI_NULLS ON
 4 GO
 5 SET QUOTED_IDENTIFIER ON
 6 GO
 7 
 8 if exists(select 1 from sysobjects where id=object_id('AddSchoolInfoBatch') and xtype='P')
 9     drop proc AddSchoolInfoBatch
10 GO
11 
12 create proc AddSchoolInfoBatch(
13     @SchoolXml xml
14 )
15 as 
16  
17 begin
18     declare @ErrMsg nvarchar(100);
19     declare @XmlHandle int;  
20     EXEC sp_xml_preparedocument @XmlHandle OUTPUT, @SchoolXml 
21 
22     insert into SchoolInfo(ID, Name, City)
23     select ID, Name, City
24     from OPENXML(@XmlHandle, '/Schools/School', 2)
25     with(
26         ID varchar(5),
27         Name varchar(10),
28         City varchar(10)
29     );
30 
31     EXEC sp_xml_removedocument @XmlHandle
32 end;
33 
34 RETURN 0;
PROC AddSchoolInfoBatch

其中,sp_xml_preparedocumentsp_xml_removedocument 用于获取和删除一个指向内存中XML文档的句柄。

关于在存储过程中解析 XML 字符串,参见:http://www.cnblogs.com/JuneZhang/p/5142772.htmlhttp://www.xuebuyuan.com/1210868.html

首先在 SQL Server 中对该存储过程的正确性作简单测试

 1 DECLARE @idoc int
 2 DECLARE @doc xml
 3 
 4 SET @doc ='<Schools>
 5 <School><ID>121</ID><Name>nju</Name><City>NJ</City></School>
 6 </Schools>'
 7 
 8 EXEC sp_xml_preparedocument @idoc OUTPUT, @doc
 9 
10 
11 SELECT *
12 FROM      
13 OPENXML (@idoc, '/Schools/School',2)
14 WITH (        
15     ID varchar(5),
16     Name varchar(10),
17     City varchar(10)
18 )
19 
20 EXEC sp_xml_removedocument @idoc
TestXmlToProc

下面给出在代码中的调用方法

List<SchoolInfo> list = new List<SchoolInfo>();
list.Add(xxx);  // 组装 list

// xml字符串拼接, 以结点形式, 不包括属性
StringBuilder sb = new StringBuilder();
sb.Append("<Schools>");
foreach (var si in list) {
	sb.AppendFormat("<School><ID>{0}</ID><Name>{1}</Name><City>{2}</City></School>", 
	//sb.AppendFormat("<School ID="{0}" Name="{1}" City="{2}"></School>",
		si.ID, si.Name, si.City);
}
sb.Append("</Schools>");

XmlTextReader xtr = new XmlTextReader(sb.ToString(), XmlNodeType.Document, null);
SqlXml sqlXml = new SqlXml(xtr);
SqlParameter[] sqlParams = new SqlParameter[1];
sqlParams[0] = new SqlParameter("@SchoolXml", SqlDbType.Xml, sqlXml.Value.Length);
sqlParams[0].Value = sqlXml;
string procName = "AddSchoolInfoBatch";
DataOperation.SqlXmlInsertByProc(procName, sqlParams);

上述代码中,入参类型是 SqlDbType.Xml 与存储过程入参类型完全匹配,其实此处设置为 SqlDbType.Varchar,直接传参 sb.ToString() 也可以。

务必注意,传递的XML字符串不要包含 <?xml version="1.0"?> ,否则报错。再一个问题,XML 字符串长度最好不要超过 8000 个字符。

参考Sql Server参数化查询之where in和like实现之xml和DataTable传参(TVP + SQL XML)

DB2

// 待更新 ...

MySQL

// 待更新 ...

原文地址:https://www.cnblogs.com/wjcx-sqh/p/7092116.html