在 MongoDB 中查找缺失的数据包

  1 package syncPacker;
  2 
  3 import java.io.BufferedReader;
  4 import java.io.File;
  5 import java.io.FileReader;
  6 import java.io.FileWriter;
  7 import java.io.IOException;
  8 import java.net.UnknownHostException;
  9 import java.util.ArrayList;
 10 import java.util.Date;
 11 import java.util.HashMap;
 12 import java.util.List;
 13 import java.util.Map;
 14 
 15 import org.apache.http.message.BasicNameValuePair;
 16 import org.apache.log4j.Logger;
 17 import org.springframework.beans.factory.annotation.Autowired;
 18 import org.springframework.beans.factory.annotation.Value;
 19 import org.springframework.stereotype.Service;
 20 
 21 import com.mongodb.BasicDBObject;
 22 import com.mongodb.DB;
 23 import com.mongodb.DBCollection;
 24 import com.mongodb.DBObject;
 25 import com.mongodb.Mongo;
 26 import com.pro.framework.action.BaseController;
 27 
 28 import logCenter.SendLog;
 29 import net.sf.json.JSONObject;
 30 import syncPacker.bean.PatentBibliographicChangeBean;
 31 import syncPacker.bean.SyncDataPackageBean;
 32 import utils.DateUtils;
 33 import utils.DatetimeUtils;
 34 import utils.HttpUtils;
 35 
 36 /**
 37  * 增量数据分块打包 全处理
 38  * 
 39  * http://localhost:8080/PatentSearchExtend/syncPacker!pack.action
 40  * 
 41  * http://10.78.2.21:8080/PatentSearchExtend/syncPacker!pack.action?bean.
 42  * tableName=E_BIBLIOGRAPHIC_CHANGE_TEMP&bean.maxRowsPerSyncPackerPackage= 10000
 43  */
 44 @Service
 45 public class SyncPacker_201603 extends BaseController {
 46 
 47     private static final long serialVersionUID = 1L;
 48 
 49     // 初始化:数据库接口
 50     @Autowired
 51     private SyncPackerDao dao;
 52 
 53     // 初始化:发送端地址
 54     @Value("${url_syncSender}")
 55     private String url_syncSender;
 56 
 57     // 初始化:本地文件存储路径
 58     @Value("${path_syncPacker_package}")
 59     private String path_syncPacker_package;
 60 
 61     // 初始化:读取最大数据包名称的地址
 62     @Value("${url_selectMaxPackageNumber}")
 63     private String url_selectMaxPackageNumber;
 64 
 65     // 初始化:存储最大数据包名称的地址
 66     @Value("${url_insertSyncDataPackage}")
 67     private String url_insertSyncDataPackage;
 68 
 69     // 初始化:查询条件Bean
 70     private SyncDataPackageBean bean = new SyncDataPackageBean();
 71     // 初始化:查询结果List
 72     private List<PatentBibliographicChangeBean> pbcList = new ArrayList<PatentBibliographicChangeBean>();
 73     // 初始化:形成的数据包名称
 74     private List<PatentBibliographicChangeBean> pbcList_withPackageName = new ArrayList<PatentBibliographicChangeBean>();
 75     // 初始化:已打包的增量数据ID表单
 76     private List<String> pdaIdList = new ArrayList<String>();
 77     // 初始化:用于删除数据的临时ID List
 78     private List<String> idPartList = new ArrayList<String>();
 79     // 初始化:传输协议
 80     private HttpUtils httpUtils = new HttpUtils();
 81     // 初始化:键值串
 82     private List<BasicNameValuePair> paramList = new ArrayList<BasicNameValuePair>();
 83     // 初始化:历史最大包编号
 84     private String maxPackageNumber;
 85     // 初始化:记录打包完成后的数据版本
 86     private Integer centerNodeDataVersion;
 87     // 发送远程日志
 88     private SendLog sendLog = new SendLog();
 89     // 记录本地日志
 90     private Logger logger = Logger.getLogger(SyncPacker_201603.class);
 91     // 初始化:判断程序是否正在运行
 92     public static boolean isRunning = false;
 93 
 94     // 本次处理完成后的最大包编号
 95     private String packedPackageNumber;
 96     // 用于返回json的成功信息
 97     private String success = "success";
 98     
 99 
100     /** 文件补发用:指定数据重新打包 */
101     // http://localhost:8080/PatentSearchExtend/syncPacker!packByPackageNumber.action?bean.packageNumberStart=000101&bean.packageNumberEnd=000102
102     public String packByPackageNumber() throws Exception {
103 
104         logMemory("本次请求处理开始。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd() );
105 
106         String job_start = DateUtils.getCurrentTimeString(0);
107         
108         try {
109             
110 
111             for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 
112                     i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) {                
113 
114                 // 开始时间
115                 String package_start = DateUtils.getCurrentTimeString(0);
116                 
117                 // 包编号
118                 String packageNumber = String.format( "%06d", i );
119                 
120                 logMemory("开始,包编号:", packageNumber );
121                 
122                 //(1)读历史表
123                 pbcList = selectList_changeHistory( packageNumber ); 
124                 
125                 if( null == pbcList ) {
126                     logMemory("<span style="color:red;">数据为空 !!!</span>", "");
127                     
128                 } else{
129                     logMemory("数据查询完毕,数据量为", String.valueOf( pbcList.size() ));
130 
131                     //(2)插入MongoDB
132                     insertMongoDB( pbcList );
133                     pbcList.clear();
134                 };
135                 
136                 logMemory("传输结束,包编号:" + packageNumber ," 用时: "
137                         + DatetimeUtils.getDistanceTimes_string(package_start, DateUtils.getCurrentTimeString(0)) );
138             }
139         } catch (Exception e) {
140             // 日志输出
141             logMemory("系统发生异常", e.getMessage());
142             e.printStackTrace();
143         }
144         
145         logMemory("本次请求处理完成,程序结束。", bean.getPackageNumberStart() + " - " + bean.getPackageNumberEnd()
146                 + " 用时: " + DatetimeUtils.getDistanceTimes_string( job_start, DateUtils.getCurrentTimeString(0)) );
147         
148         return SUCCESS;
149     }
150     
151 
152     /**
153      * 读历史表
154      * @param packageNumber
155      * @return
156      */
157     private List<PatentBibliographicChangeBean> selectList_changeHistory( String packageNumber ){
158         
159         for ( int i = 0; i < 100; i++ ) {
160             
161             try {
162                 return dao.selectList_changeHistory( packageNumber );                
163             } catch (Exception e) {
164                 // TODO Auto-generated catch block
165 //                e.printStackTrace();
166                 logMemory("系统发生异常", e.getMessage());
167                 try {
168                     Thread.sleep(500);
169                     logMemory("暂停 0.5 秒", "" );
170                 } catch (InterruptedException e1) {
171                     // TODO Auto-generated catch block
172                     e1.printStackTrace();
173                 }
174             } 
175         }// loop end
176         return null ;
177         
178     }
179     
180     private PatentBibliographicChangeBean select_changeHistory( String packageNumber ){
181         
182         for ( int i = 0; i < 100; i++ ) {
183             
184             try {
185                 return dao.select_changeHistory( packageNumber );                
186             } catch (Exception e) {
187                 // TODO Auto-generated catch block
188 //                e.printStackTrace();
189                 logMemory("系统发生异常", e.getMessage());
190                 try {
191                     Thread.sleep(500);
192                     logMemory("暂停 0.5 秒", "" );
193                 } catch (InterruptedException e1) {
194                     // TODO Auto-generated catch block
195                     e1.printStackTrace();
196                 }
197             } 
198         }// loop end
199         return null ;
200         
201     }
202     
203     
204     
205 
206     /**
207      * 插入 MongoDB
208      */
209     public void insertMongoDB( List<PatentBibliographicChangeBean> pbcList ) {    
210 
211         //# MongoDB(数据加载目标)
212         String syncLoadIntoMongoDbService = "10.78.2.23:27017";
213         String syncLoadIntoMongoDbName = "patent_search_extend";
214         String syncLoadIntoMongoTable = "patent_bibliographic_20160319";
215 
216         // 加载开始
217 //        logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );
218 
219         Mongo m = null;
220         try {
221             m = new Mongo( syncLoadIntoMongoDbService );
222         } catch (UnknownHostException e) {
223             e.printStackTrace();
224             logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());
225         }
226 
227         // 库名
228         DB db = m.getDB( syncLoadIntoMongoDbName );
229 //        logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );
230 
231         // 表名
232         DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
233 //        logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );
234         
235         // 循环列表,将每个元素插入数据库
236         for( PatentBibliographicChangeBean pbcBean : pbcList ){
237             
238             
239             
240             //(1)读取一条著录数据
241 //            JSONObject json = packByPackageNumber_readBibl( pbcBean.getId() );
242             
243 //            if( null == json ) return ;
244 
245             // 列,值
246             BasicDBObject insDoc = new BasicDBObject();
247             
248             insDoc.put("abstract_No"    , pbcBean.getAbstract_No()    );
249             insDoc.put("app_Addr"       , pbcBean.getApp_Addr()       );
250             insDoc.put("app_Cn"         , pbcBean.getApp_Cn()         );
251             insDoc.put("app_Country"    , pbcBean.getApp_Country()    );
252             insDoc.put("app_Date"       , pbcBean.getApp_Date()       );
253             insDoc.put("app_Name"       , pbcBean.getApp_Name()       );
254             insDoc.put("app_Sn"         , pbcBean.getApp_Sn()         );
255             insDoc.put("app_Type"       , pbcBean.getApp_Type()       );
256             insDoc.put("app_Zip"        , pbcBean.getApp_Zip()        );
257             insDoc.put("ecla"           , pbcBean.getEcla()           );
258             insDoc.put("fi"             , pbcBean.getFi()             );
259             insDoc.put("ft"             , pbcBean.getFt()             );
260             insDoc.put("id"             , pbcBean.getId()             );
261             insDoc.put("inv_Title"      , pbcBean.getInv_Title()      );
262             insDoc.put("invent_Type"    , pbcBean.getInvent_Type()    );
263             insDoc.put("inventor"       , pbcBean.getInventor()       );
264             insDoc.put("ipc_Standard"   , pbcBean.getIpc_Standard()   );
265             insDoc.put("locarno"        , pbcBean.getLocarno()        );
266             insDoc.put("operation_Time" , pbcBean.getOperation_Time() );
267             insDoc.put("operation_Type" , pbcBean.getOperation_Type() );
268             insDoc.put("package_Number" , pbcBean.getPackage_Number() );
269             insDoc.put("pct_App_Cn"     , pbcBean.getPct_App_Cn()     );
270             insDoc.put("pct_App_Date"   , pbcBean.getPct_App_Date()   );
271             insDoc.put("pct_App_Sn"     , pbcBean.getPct_App_Sn()     );
272             insDoc.put("pct_Date"       , pbcBean.getPct_Date()       );
273             insDoc.put("pct_Pub_Cn"     , pbcBean.getPct_Pub_Cn()     );
274             insDoc.put("pct_Pub_Date"   , pbcBean.getPct_Pub_Date()   );
275             insDoc.put("pct_Pub_Lang"   , pbcBean.getPct_Pub_Lang()   );
276             insDoc.put("pct_Pub_Sn"     , pbcBean.getPct_Pub_Sn()     );
277             insDoc.put("prn"            , pbcBean.getPrn()            );
278             insDoc.put("prn_Cn"         , pbcBean.getPrn_Cn()         );
279             insDoc.put("prn_Date"       , pbcBean.getPrn_Date()       );
280             insDoc.put("prn_Sn"         , pbcBean.getPrn_Sn()         );
281             insDoc.put("prn_Type"       , pbcBean.getPrn_Type()       );
282             insDoc.put("pub_Cn"         , pbcBean.getPub_Cn()         );
283             insDoc.put("pub_Date"       , pbcBean.getPub_Date()       );
284             insDoc.put("pub_Sn"         , pbcBean.getPub_Sn()         );
285             insDoc.put("pub_Type"       , pbcBean.getPub_Type()       );
286             insDoc.put("uc"             , pbcBean.getUc()             );
287             
288             collection.insert(insDoc);
289             insDoc.clear();
290             
291         }
292 
293         // 循环遍历pdaBeanList
294 //        System.out.println("loading ...");
295 //        logger.info(DateUtils.getNow() + " rows:" + pdaBeanList.size());
296 
297         // 当前记录编号
298         // int currentRowNumber = 0;
299 
300         if ( m != null) m.close();
301         
302 //        System.out.println("Load finished.");
303     }
304     
305 
306     
307     /**
308      * 在 MongoDB 中查找缺失的数据包
309      * @return
310      * @throws Exception
311      */
312     public String findLostPackages_from_mongoDB() throws Exception {
313         
314 
315         //# MongoDB(数据加载目标)
316         String syncLoadIntoMongoDbService = "10.78.2.23:27017";
317         String syncLoadIntoMongoDbName = "patent_search_extend";
318         String syncLoadIntoMongoTable = "patent_bibliographic_20160319";
319 
320         // 加载开始
321 //        logger.info(DateUtils.getNow() + " Load start: " + syncLoadIntoMongoDbService );
322 
323         Mongo m = null;
324         try {
325             m = new Mongo( syncLoadIntoMongoDbService );
326         } catch (UnknownHostException e) {
327             e.printStackTrace();
328             logger.info(DateUtils.getNow() + " UnknownHostException:" + e.getMessage());
329         }
330 
331         // 库名
332         DB db = m.getDB( syncLoadIntoMongoDbName );
333 //        logger.info( DateUtils.getNow() + " Db:" + syncLoadIntoMongoDbName );
334 
335         // 表名
336         DBCollection collection = db.getCollection( syncLoadIntoMongoTable );
337 //        logger.info(DateUtils.getNow() + " Table:" + syncLoadIntoMongoTable );        
338         
339         PatentBibliographicChangeBean pbcBean = new PatentBibliographicChangeBean();
340         
341 
342         for ( int i = Integer.valueOf(bean.getPackageNumberStart()); 
343                 i <= Integer.valueOf(bean.getPackageNumberEnd()); i++) {    
344             
345             //(1)从 Oracle 取一个著录ID
346             pbcBean = select_changeHistory( String.format( "%06d", i ));
347             
348             if( null == pbcBean ) {
349                 logMemory("数据为空!  ", "包编号:"+i+",著录ID:" + pbcBean.getId()  ); continue ;
350             }
351             
352 //            logMemory("包编号:"+i+",著录ID:", pbcBean.getId() );
353             
354             //(2)查询 MongoDB 中是否存在        
355             BasicDBObject docFind = new BasicDBObject( "id", pbcBean.getId() );
356             DBObject findResult = collection.findOne( docFind );
357             
358             if( null == findResult || "".equals( findResult )){
359                 logMemory("缺失  !!! ", "包编号:"+i+",著录ID:" + pbcBean.getId()  );
360             }
361         }
362         
363         // retrieve
364 
365 
366         if ( m != null) m.close();
367         
368         
369         return SUCCESS ;
370     }
371     
372     
373     
374     
375     
376     
377     
378     
379     
380     
381     
382     
383     
384     
385 
386     /** 记录日志 */
387     private void logMemory(String behavior, String content) {
388         // 向服务器发送日志
389 //        sendLog.send("syncPacker", behavior, content);
390         // 记录本地日志
391         logger.info(DateUtils.getNow() + " " + behavior + " :" + content);
392         // 控制台输出日志
393 //        System.out.println("syncPacker : " + DateUtils.getNow() + " " + behavior + " :" + content);
394     }
395 
396     @Override
397     public String insert() throws Exception {
398         return null;
399     }
400 
401     @Override
402     public String update() throws Exception {
403         return null;
404     }
405 
406     @Override
407     public String selectList() throws Exception {
408         return null;
409     }
410 
411     @Override
412     public String delete() throws Exception {
413         return null;
414     }
415 
416     public static boolean isRunning() {
417         return isRunning;
418     }
419 
420     public static void setRunning(boolean isRunning) {
421         SyncPacker_201603.isRunning = isRunning;
422     }
423 
424     public Integer getCenterNodeDataVersion() {
425         return centerNodeDataVersion;
426     }
427 
428     public void setCenterNodeDataVersion(Integer centerNodeDataVersion) {
429         this.centerNodeDataVersion = centerNodeDataVersion;
430     }
431 
432     public String getSuccess() {
433         return success;
434     }
435 
436     public void setSuccess(String success) {
437         this.success = success;
438     }
439 
440     public String getMaxPackageNumber() {
441         return maxPackageNumber;
442     }
443 
444     public void setMaxPackageNumber(String maxPackageNumber) {
445         this.maxPackageNumber = maxPackageNumber;
446     }
447 
448     public String getPackedPackageNumber() {
449         return packedPackageNumber;
450     }
451 
452     public void setPackedPackageNumber(String packedPackageNumber) {
453         this.packedPackageNumber = packedPackageNumber;
454     }
455 
456     public SyncDataPackageBean getBean() {
457         return bean;
458     }
459 
460     public void setBean(SyncDataPackageBean bean) {
461         this.bean = bean;
462     }
463 }
原文地址:https://www.cnblogs.com/livon/p/5302164.html