不同数据模型之间的同步

最近,在做新旧数据库之间的迁移。碰到了一个不同数据模型的数据库之间的历史数据迁移问题。

1、背景介绍

旧数据库位于一台winserver2012服务器上,新数据库在阿里云RDS上。源数据表的结构与目标数据表的结构不同。

1)旧表:

2)新表:

2、解决思路

在新的解决方案下,新建一个控制台应用服务。使用ADO.NET获取旧数据库上的历史数据,然后讲这些数据处理,转换成新的数据模型,利用现有的ORM实体映射框架,将新数据插入到新的数据库中来。

3、主要代码

  1         public void MainJob()
  2         {
  3             try
  4             {
  5                 using (var connection = new SqlConnection(_connName))
  6                 {
  7                     connection.Open();
  8 
  9 
 10                     //开启小时数据同步
 11                     var _timer = Config.GetConfig("Timer", 5);
 12                     if (HourTimer == null)
 13                         HourTimer = new TimerX(obj =>
 14                     {
 15 #if DEBUG
 16                         var watch = new Stopwatch();
 17                         watch.Start();
 18 #endif
 19                         if (IsWork)
 20                             SyncHourData();
 21 #if DEBUG
 22                         watch.Stop();
 23                         Container.Debug("同步小时数据耗时:" + watch.ElapsedMilliseconds + "ms");
 24 #endif
 25 
 26 
 27                     }, null, 1000, _timer * 1000);
 28 
 29                     //开启自然日数据同步
 30                     if (DayTimer == null)
 31                         DayTimer = new TimerX(obj =>
 32                         {
 33 #if DEBUG
 34                             var watch = new Stopwatch();
 35                             watch.Start();
 36 #endif
 37                             if (IsWork2)
 38                                 SyncDayData();
 39 #if DEBUG
 40                             watch.Stop();
 41                             Container.Debug("同步自然日数据耗时:" + watch.ElapsedMilliseconds + "ms");
 42 #endif
 43 
 44 
 45                         }, null, 1000, _timer * 1000);
 46 
 47 
 48                     //开启工作日数据同步
 49                     if (ShiftTimer == null)
 50                         ShiftTimer = new TimerX(obj =>
 51                         {
 52 #if DEBUG
 53                             var watch = new Stopwatch();
 54                             watch.Start();
 55 #endif
 56                             if (IsWork3)
 57                                 SyncShiftData();
 58 #if DEBUG
 59                             watch.Stop();
 60                             Container.Debug("同步工作日数据耗时:" + watch.ElapsedMilliseconds + "ms");
 61 #endif
 62 
 63 
 64                         }, null, 1000, _timer * 1000);
 65 
 66                     connection.Close();
 67                 }
 68 
 69             }
 70             catch (Exception ex)
 71             {
 72                 Container.Exception(ex);
 73             }
 74 
 75         }
 76 
 77         /// <summary>
 78         /// 同步小时统计数据(皮带机)
 79         /// </summary>
 80         /// <remarks>存储于分时数据表</remarks>
 81         private bool SyncHourData()
 82         {
 83 
 84             try
 85             {
 86                 if (startTime.CompareTo(endTime) > 0)
 87                 {
 88                     IsWork = false;
 89                     HourTimer?.Dispose();
 90                     Container.Debug("同步小时数据结束!");
 91                     return false;
 92                 }
 93 
 94                 //每次取1天数据
 95                 var end = startTime.AddDays(1);
 96                 var selectSql = "select * from " + "ConveyorByHour" + " where Time >= '" + startTime.ToString() + "' and Time < '" + end.ToString() + "' order by [Time]";
 97                 var adapter = new SqlDataAdapter(selectSql, _connName);
 98                 var dataSet = new DataSet();
 99                 adapter.Fill(dataSet, "Table");
100 
101                 if (dataSet != null && dataSet.Tables.Count > 0)
102                 {
103                     if (dataSet.Tables[0].Rows != null && dataSet.Tables[0].Rows.Count > 0)
104                     {
105                         var rows = dataSet.Tables[0].Rows;
106                         foreach (DataRow row in rows)
107                         {
108                             try
109                             {
110                                 var hour = ObjectExtension.ConvertToModel<ConveyorByHour>(row);
111                                 if (hour != null)
112                                 {
113                                     Container.cyhourContainer.InsertHourEntity(hour);//直接加入SQL数据库
114                                     LastSyncTime = hour.Time;//记录上次同步最后一条记录时间
115                                 }
116 
117                             }
118                             catch (Exception ex)
119                             {
120                                 Container.Exception(ex);
121                                 return false;
122                             }
123 
124                         }
125                     }
126 
127                 }
128                 startTime = startTime.AddDays(1);
129 
130                 return true;
131             }
132             catch (Exception ex)
133             {
134                 Container.Exception(ex);
135                 return false;
136             }
137 
138         }
139 
140 
141         /// <summary>
142         /// 同步自然日统计数据(皮带机)
143         /// </summary>
144         private bool SyncDayData()
145         {
146 
147             try
148             {
149                 if (dayStartTime.CompareTo(dayEndTime) > 0)
150                 {
151                     IsWork2 = false;
152                     DayTimer?.Dispose();
153                     Container.Debug("同步自然日数据结束!");
154                     return false;
155                 }
156                 //每次取1个月数据
157                 var end = dayStartTime.AddMonths(1);
158                 var selectSql = "select * from " + "ConveyorByDay" + " where Time >= '" + dayStartTime.ToString() + "' and Time < '" + end.ToString() + "' order by [Time]";
159                 var adapter = new SqlDataAdapter(selectSql, _connName);
160                 var dataSet = new DataSet();
161                 adapter.Fill(dataSet, "Table2");
162 
163                 if (dataSet != null && dataSet.Tables.Count > 0)
164                 {
165                     if (dataSet.Tables[0].Rows != null && dataSet.Tables[0].Rows.Count > 0)
166                     {
167                         var rows = dataSet.Tables[0].Rows;
168                         foreach (DataRow row in rows)
169                         {
170                             try
171                             {
172                                 var day = ObjectExtension.ConvertToModel<ConveyorByDay>(row);
173                                 if (day != null)
174                                 {
175                                     Container.cydayContainer.InsertDayEntity(day);//直接加入SQL数据库                                                                                  
176                                 }
177 
178                             }
179                             catch (Exception ex)
180                             {
181                                 Container.Exception(ex);
182                                 return false;
183                             }
184 
185                         }
186                     }
187 
188                 }
189                 dayStartTime = dayStartTime.AddMonths(1);
190                 return true;
191             }
192             catch (Exception ex)
193             {
194                 Container.Exception(ex);
195                 return false;
196             }
197 
198         }
199 
200 
201         /// <summary>
202         /// 同步工作日统计数据(皮带机)
203         /// </summary>
204         private bool SyncShiftData()
205         {
206 
207             try
208             {
209                 if (shiftStartTime.CompareTo(shiftEndTime) > 0)
210                 {
211                     IsWork3 = false;
212                     ShiftTimer?.Dispose();
213                     Container.Debug("同步工作日数据结束!");
214                     return false;
215                 }
216                 //每次取1个月数据
217                 var end = shiftStartTime.AddDays(1);//.AddMonths(1);
218                 var selectSql = "select * from " + "ConveyorByWorkShift" + " where Time >= '" + shiftStartTime.ToString() + "' and Time < '" + end.ToString() + "' order by [Time]";
219                 var adapter = new SqlDataAdapter(selectSql, _connName);
220                 var dataSet = new DataSet();
221                 adapter.Fill(dataSet, "Table3");
222 
223                 if (dataSet != null && dataSet.Tables.Count > 0)
224                 {
225                     if (dataSet.Tables[0].Rows != null && dataSet.Tables[0].Rows.Count > 0)
226                     {
227                         var rows = dataSet.Tables[0].Rows;
228                         foreach (DataRow row in rows)
229                         {
230                             try
231                             {
232                                 var shift = ObjectExtension.ConvertToModel<ConveyorByWorkShift>(row);
233                                 if (shift != null)
234                                 {
235                                     Container.cyShiftContainer.InsertSql(shift);//直接加入SQL数据库                                                                                  
236                                 }
237                             }
238                             catch (Exception ex)
239                             {
240                                 Container.Exception(ex);
241                                 return false;
242                             }
243 
244                         }
245                     }
246 
247                 }
248                 shiftStartTime = shiftStartTime.AddDays(1);//.AddMonths(1);
249                 return true;
250             }
251             catch (Exception ex)
252             {
253                 Container.Exception(ex);
254                 return false;
255             }
256 
257         }
View Code

上面是主要代码,这里还有以下关键性代码,比如,获取数据架构,表名以及DataRow与实体对象Object之间的转换:

 1 public class SqlHelper
 2     {
 3         private String _SchemaSql = "";
 4         private const bool IsSQL2005 = false;
 5         /// <summary>
 6         /// 获取所有表名
 7         /// </summary>
 8         private readonly string _GetTableNames = "Select Name FROM SysObjects Where XType='U' and left(Name,1)='M' and left(name,2)<>'MH' and LEN(name)=6 orDER BY Name";
 9         /// <summary>
10         /// 获取数据库所有表名
11         /// </summary>
12         public virtual String GetTableNames => _GetTableNames;
13 
14         /// <summary>
15         /// 获取表中所有字段
16         /// </summary>
17         /// <param name="tableName"></param>
18         /// <returns></returns>
19         public static String GetTableMember(string tableName)
20         {
21             return "SELECT Name FROM SysColumns WHERE id=Object_Id('" + tableName + "')";
22         }
23         private readonly String _DescriptionSql2000 = "select b.name n, a.value v from sysproperties a inner join sysobjects b on a.id=b.id where a.smallid=0";
24         private readonly String _DescriptionSql2005 = "select b.name n, a.value v from sys.extended_properties a inner join sysobjects b on a.major_id=b.id and a.minor_id=0 and a.name = 'MS_Description'";
25         /// <summary>取表说明SQL</summary>
26         public virtual String DescriptionSql { get { return IsSQL2005 ? _DescriptionSql2005 : _DescriptionSql2000; } }
27         /// <summary>构架SQL</summary>
28         public virtual String SchemaSql
29         {
30             get
31             {
32                 if (String.IsNullOrEmpty(_SchemaSql))
33                 {
34                     var sb = new StringBuilder();
35                     sb.Append("SELECT ");
36                     sb.Append("表名=d.name,");
37                     sb.Append("字段序号=a.colorder,");
38                     sb.Append("字段名=a.name,");
39                     sb.Append("标识=case when COLUMNPROPERTY( a.id,a.name,'IsIdentity')=1 then Convert(Bit,1) else Convert(Bit,0) end,");
40                     sb.Append("主键=case when exists(SELECT 1 FROM sysobjects where xtype='PK' and name in (");
41                     sb.Append("SELECT name FROM sysindexes WHERE id = a.id AND indid in(");
42                     sb.Append("SELECT indid FROM sysindexkeys WHERE id = a.id AND colid=a.colid");
43                     sb.Append("))) then Convert(Bit,1) else Convert(Bit,0) end,");
44                     sb.Append("类型=b.name,");
45                     sb.Append("占用字节数=a.length,");
46                     sb.Append("长度=COLUMNPROPERTY(a.id,a.name,'PRECISION'),");
47                     sb.Append("小数位数=isnull(COLUMNPROPERTY(a.id,a.name,'Scale'),0),");
48                     sb.Append("允许空=case when a.isnullable=1 then Convert(Bit,1)else Convert(Bit,0) end,");
49                     sb.Append("默认值=isnull(e.text,''),");
50                     sb.Append("字段说明=isnull(g.[value],'')");
51                     sb.Append("FROM syscolumns a ");
52                     sb.Append("left join systypes b on a.xtype=b.xusertype ");
53                     sb.Append("inner join sysobjects d on a.id=d.id  and d.xtype='U' ");
54                     sb.Append("left join syscomments e on a.cdefault=e.id ");
55                     if (IsSQL2005)
56                     {
57                         sb.Append("left join sys.extended_properties g on a.id=g.major_id and a.colid=g.minor_id and g.name = 'MS_Description'  ");
58                     }
59                     else
60                     {
61                         sb.Append("left join sysproperties g on a.id=g.id and a.colid=g.smallid  ");
62                     }
63                     sb.Append("order by a.id,a.colorder");
64                     _SchemaSql = sb.ToString();
65                 }
66                 return _SchemaSql;
67             }
68         }
69 
70         //private static String _IndexSql;
71         //public static String IndexSql
72         //{
73         //    get
74         //    {
75         //        if (_IndexSql == null)
76         //        {
77         //            if (IsSQL2005)
78         //                _IndexSql = "select ind.* from sys.indexes ind inner join sys.objects obj on ind.object_id = obj.object_id where obj.type='u'";
79         //            else
80         //                _IndexSql = "select IndexProperty(obj.id, ind.name,'IsUnique') as is_unique, ObjectProperty(object_id(ind.name),'IsPrimaryKey') as is_primary_key,ind.* from sysindexes ind inner join sysobjects obj on ind.id = obj.id where obj.type='u'";
81         //        }
82         //        return _IndexSql;
83         //    }
84         //}
85     }
View Code
 1     public static class ObjectExtension
 2     {
 3         public static T ConvertToModel<T>(this DataRow dr)
 4         {
 5             T t = Activator.CreateInstance<T>(); //创建实例            
 6             PropertyInfo[] pi = t.GetType().GetProperties();//取类的属性
 7             //属性赋值
 8             foreach (PropertyInfo p in pi)
 9             {
10                 if (p.Name.Equals("Id") || p.Name.Equals("Guid")) continue;
11                 if(p.Name.Equals("WorkShiftId")&& dr.Table.Columns.Contains("WorkShift_Id") && !string.IsNullOrWhiteSpace(dr["WorkShift_Id"].ToString()))
12                 {
13                     p.SetValue(t, Convert.ChangeType(dr["WorkShift_Id"], p.PropertyType), null);
14                 }
15                 if (dr.Table.Columns.Contains(p.Name) && !string.IsNullOrWhiteSpace(dr[p.Name].ToString()))
16                 {
17                     p.SetValue(t, Convert.ChangeType(dr[p.Name], p.PropertyType), null);
18                 }
19             }
20 
21             return t; //Return
22         }
23     }
View Code

 欢迎各路大神提出更好的解决方案并指正!

原文地址:https://www.cnblogs.com/gbat/p/6780318.html