C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库

谢大哥  https://www.cnblogs.com/wdw984/p/10968808.html

项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。

业务流程:选择照片文件夹,分别访问照片-->调用DLL接口传递照片路径-->接收处理返回值-->写入数据库。

前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用Parallel来进行平行计算(调用DLL处理照片),统一写入Datatable,然后使用BulkInsert批量把Datatable写入数据库,目前测试8万张照片并行计算速度30分钟,速度提高约30%-40%左右。

代码示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
private static SqlConnection sqlconn;
private static ConcurrentDictionary<stringint> currInts = new ConcurrentDictionary<stringint>();
private void Button1_Click(object sender, EventArgs e)
        {          
            var dirPath = "";
            using (var folderBrowser = new FolderBrowserDialog())
            {
                if (folderBrowser.ShowDialog() != DialogResult.OK) return;
                dirPath = folderBrowser.SelectedPath;
                if (!Directory.Exists(dirPath))
                {
                    MessageBox.Show(@"所选路径不存在或无权访问"@"错误", MessageBoxButtons.OK, MessageBoxIcon.Error);
                    return;
                }
            }
  
            BeginInvoke(new Action(async () =>
            {
                button1.Enabled = false;
                var sw = new Stopwatch();
                sw.Start();
  
                //检测服务器链接
                Log.WriteLine(@"尝试连接数据库服务器");
  
                sqlconn = new SqlConnection(
                    $"Data Source={txt_serverIP.Text},{txt_ServerPort.Text};User ID={txt_User.Text};Password={txt_Pwd.Text};Initial Catalog={txt_DB.Text};Persist Security Info=False;Pooling=true;Min Pool Size=30;Max Pool Size=200;");
                if (sqlconn.State == ConnectionState.Closed)
                {
                    try
                    {
                        sqlconn.Open();
                    }
                    catch (Exception exception)
                    {
                        Log.WriteLine($@"连接数据库服务器【失败】-->{exception.Message}");
                        button1.Enabled = true;
                        return;
                    }
                }
  
                Log.WriteLine($@"连接数据库服务器【成功】{Environment.NewLine}获取未转换图片数据。。。");
                var ds = new DataSet();
                int.TryParse(txt_start.Text, out var start);
                int.TryParse(txt_end.Text, out var end);
                var sqlstrALL = "";
                if (start == 0 || end == 0)
                {
                    sqlstrALL = "SELECT * FROM ViewWeiZhuanHuan";
                }
                else
                {
                    sqlstrALL = $"SELECT * FROM ViewWeiZhuanHuan WHERE {txt_mastKey.Text} BETWEEN {start} AND {end}";
                }
  
                var sqlcmd = new SqlCommand(sqlstrALL, sqlconn);
                DataAdapter da = new SqlDataAdapter(sqlcmd);
                da.Fill(ds);
                if (ds.Tables.Count == 0 || ds.Tables[0].Rows.Count == 0)
                {
                    Log.WriteLine("所有图片都已经转换完毕。");
                    sqlconn.Close();
                    return;
                }
  
                Log.WriteLine($"{ds.Tables[0].Rows.Count}个图片需要转换。");
  
                var total = ds.Tables[0].Rows.Count;
                var rowkey = comboBox1.SelectedValue.ToString();
                var splitkey = txt_split.Text.Trim();
  
                #region 定义数据保存
                var dt = new DataTable();
                dt.Columns.Add("zd1"typeof(int));
                dt.Columns.Add("zd2"typeof(int));
                dt.Columns.Add("zd3"typeof(string));
                dt.Columns.Add("zd4"typeof(string));
                dt.Columns.Add("zd5"typeof(string));
                dt.Columns.Add("zd6"typeof(string));
                #endregion
  
                #region 并行执行
                currInts.TryAdd("currInts", 1);//初始化进度数字为1
                await Task.Run(() =>
               {
                  //使用8个CPU核心来运行
                   var result = Parallel.For(0, ds.Tables[0].Rows.Count, new ParallelOptions { MaxDegreeOfParallelism = 8}, (rotIndex, state) =>
                   {
                       BeginInvoke(new Action(() =>
                       {
                           currInts.TryGetValue("currInts"out var currValue);
                           lb_process.Text = $@"{currValue}/{total}";//显示进度
                           var nextValue = currValue + 1;
                           currInts.TryUpdate("currInts", nextValue, currValue);//加1
                       }));                     
                       
                       var fileDirPath = "";//根据选择的文件名格式,用填写的规则生成路径                     
  
                       var file = new List<string>{
                            $"{dirPath}\{fileDirPath}\{ksno}_fp1.jpg",
                            $"{dirPath}\{fileDirPath}\{ksno}_fp2.jpg",
                            $"{dirPath}\{fileDirPath}\{ksno}_fp3.jpg"};
  
                       foreach (var zwzp in file)
                       {
                           try
                           {
                               var model = ZwHelper.zwzhAsync($"{zwzp}").Result;//调用C++转换
                               if (model != null)
                               {
//并行计算下写入Datatable需要锁定才可以,否则会提示datatable索引损坏                           
                                   lock (dt.Rows.SyncRoot)
                                   {
                                       var dr = dt.NewRow();
                                       dr["zd1"] = Convert.ToInt32(filexe);
                                       dr["zd2"] = Convert.ToInt32(ds.Tables[0].Rows[rotIndex]["zd1"]);
                                       dr["zd3"] = model.zhtz;
                                       dr["zd4"] = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");
                                       dr["zd5"] = "";
                                       dr["zd6"] = "";
                                       dt.Rows.Add(dr);
                                   }
                                   
                               }
                               else
                               {
                                   Log.WriteLine($@"{ksno}转换失败");
                                   Log.Log.Error($"{ksno}转换失败。");
                               }
                           }
                           catch (Exception exception)
                           {
                               Log.Log.Error($"学号{ksno},图片路径{zwzp}转换失败。{exception}");
                           }
                       }
  
                   });
                   sw.Stop();
                   Log.WriteLine($"转换耗时:{sw.ElapsedMilliseconds}毫秒");
                   Log.WriteLine($@"开始写入数据库,数量{dt.Rows.Count}");
  
                   #region 批量写入
  
                   if (dt.Rows.Count ==0)
                   {
                       Log.WriteLine(@"没有要写入的数据。");
                       return;
                   }
                   sw.Restart();
                   var sucess = false;
                   if (SqlHelper.BulkInsert(sqlconn, txt_TableName.Text.Trim(), dt, out var err))
                   {
                       sucess = true;
                   }
                   else
                   {
                       Log.Log.Error($"写入数据库失败==》{err}");
                   }
                   sw.Stop();
                   Log.WriteLine($"写入数据库是否成功=>{sucess},耗时{sw.ElapsedMilliseconds}毫秒");
                   #endregion
               });
                #endregion
               
               
                if (sqlconn.State == ConnectionState.Open)
                {
                    sqlconn.Close();
                }
                button1.Enabled = true;
            }));
        }

  SQL批量写入函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/// <summary>
/// 批量插入
/// </summary>
/// <param name="conn">连接对象</param>
/// <param name="tableName">将泛型集合插入到本地数据库表的表名</param>
/// <param name="dataTable">要批量写入的Datatable</param>
/// <param name="err">错误时返回的信息</param>
public static bool BulkInsert(SqlConnection conn, string tableName, DataTable dataTable, out string err)
{
    err = "";
    if (dataTable == null || dataTable.Rows.Count == 0)
    {
        err = "要写入的数据为空";
        return false;
    }
    var tran = conn.BeginTransaction();//开启事务
    var bulkCopy = new SqlBulkCopy(conn, SqlBulkCopyOptions.KeepNulls, tran);
    try
    {
        if (conn.State == ConnectionState.Closed)
        {
            conn.Open();
        }
        bulkCopy.BatchSize = 1000;
        bulkCopy.DestinationTableName = tableName;
        bulkCopy.WriteToServer(dataTable);
        tran.Commit();
        return true;
    }
    catch (Exception e)
    {
        err = e.ToString();
        tran.Rollback();
        return false;
    }
    finally
    {
        bulkCopy.Close();
        if (conn.State == ConnectionState.Open)
        {
            conn.Close();
        }
    }
}
原文地址:https://www.cnblogs.com/ning-xiaowo/p/13153717.html