IBMMQ之取发文件

package com.citic.main;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.PrintStream;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.regex.Pattern;

import com.citic.util.*;
import com.citic.util.comm.*;
import com.citic.msgutil.ProImpl;

/**
 * 本类功能:作为二级主类,上接MAIN方法,下接其它操作子类,主要对报文进行发送接收
 * 落脚点在发送MSG方法上:sendmsgs,最终由此方法提交并关闭MQ队列管理器
 * 不管发送还是接收报文,最终都要保证会调用sendmsgs方法
 * @author db2admin
 *
 */
public class MessageProcess implements IConstants{
    private static String[] MQString = null;
    private static int readcnt=5;
    private static String dval=ConfigFileUtil.getValue("debuglevel");
    private static int debuglevel=("".equals(dval)?ALL:Integer.parseInt(dval));
    private static MQUtil mqutil=new MQUtil();
    private int i=100;
    /*
    public static void main(String[] args) throws Exception{
        //log2file(ConfigFileUtil.getInstance().getPathName(".."));
        boolean fileflag=false;
        processOutMessage("d:\data\supis\data\datalist.txt");
    }*/
    
    //0.日志处理
    private static void log2file(String filepth){
        try {
            File tfile = new File(filepth + "logs/");
            if (!tfile.exists()) {
                tfile.mkdir();
            }

            FileOutputStream out = new FileOutputStream(tfile.getPath()
                    + "/systemout" + CommFun.strNow().substring(0, 8) + ".txt", true);
            FileOutputStream errout = new FileOutputStream(tfile.getPath()
                    + "/systemerrout" + CommFun.strNow().substring(0, 8) + ".txt", true);
            
            PrintStream ps = new PrintStream(out);
            PrintStream pserr = new PrintStream(errout);
            System.setOut(ps);
            System.setErr(pserr);
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
    }
    
    public static void processInMessage(String dataDate, final String filepatten,String sendpath) {
        String filePath = ((sendpath==null || "".equals(sendpath))?ConfigFileUtil.getInstance().getPathName(dataDate)+"send":sendpath);
        CommFun.log(debuglevel, "filePath:"+filePath+",dataDate:"+dataDate+",filepatten:"+filepatten);
        File file = new File(filePath);
        File[] filelist = file.listFiles(new FilenameFilter() {
            private Pattern pattern = Pattern.compile(filepatten.replace("_R",
                    "") + ".*\.xml");

            @Override
            public boolean accept(File dir, String name) {
                return pattern.matcher(name).matches();
            }
        });
        String[] files=new String[filelist.length];
        int i=0;
        for (File f : filelist) {
            // processInMessage(f.getAbsolutePath(),0);
            String fapth=f.getAbsolutePath();
            CommFun.log(debuglevel, "添加文件:"+fapth);
            files[i++]=fapth;
        }
        processInMessage(files,0,false);
        
    }
    
    /**
     * 1.3 2017-03-27 增加MQ的处理部分,如果传入readcnt无效,则需要读取config中的值
     * 接收报文:接收后需要分析报文入库,并产生回执和相应业务报文
     * 是1.1,1.2的主要逻辑实现
     * @param filename如果解析已有文件,则传入文件名,否则""
     * @param readcnt一次读取MQ的条数(不含重复条数)
     * @param redoflag是否重做标志,重做意味着不需要进行解析等流程,直接发送
     * @throws Exception
     */
    public static void processInMessage(String[] filename,int readcnt,boolean redoflag){
        CommFun.log(INFO,filename);
        boolean mqflag=false; //说明是MQ取的文件
        HashMap<String,String> hmmq=null;
        if (!"".equals(filename) && filename != null) {
            ConfigFileUtil.setValue("fileFlag", "true");
            //MQString = new String[] { filename };
            MQString = filename;
        } else {
            String rcnt = (readcnt > 0 ? String.valueOf(readcnt)
                    : ConfigFileUtil.getValue("readcnt"));
            //rcnt="";
            if (rcnt != null && !"".equals(rcnt)) {
                readcnt = Integer.parseInt(rcnt);
            }
            CommFun.log(INFO, "每次读取MQ条数为:"+readcnt+"!");
//            MQFileReceiver mqFileReceiver = new MQFileReceiver(readcnt);
            hmmq=mqutil.runGoupReceier(readcnt);
            
//            MQString = mqFileReceiver.getMQFileArray();
            if (hmmq==null || hmmq.size()<=0) {
                CommFun.log("没有取到MQ数据!退出!");
            }else{
                Object[] oba=hmmq.keySet().toArray();
                MQString=new String[oba.length];
                for(int i=0;i<oba.length;i++){
                    MQString[i]=(String) oba[i];
//                    System.out.println(MQString[i]+":"+MQString[i].getClass());
                }
            }
            mqflag=true;
        }
        if(MQString!=null){
            CommFun.log(debuglevel, "共获取MQ条数为:"+MQString.length);    
        }else{
            CommFun.log(debuglevel, "获取MQ条数为:0");
        }
        
        ProImpl pil=new ProImpl();
        HashMap<String,String> hm0=new HashMap<String, String>();
        HashMap<String,String> hm=null;
        for (int i = 0; MQString != null && i < MQString.length; i++) {
            hm=new HashMap<String, String>();
            CommFun.log(INFO,
                    "解析字串["+MQString.length+"-"+i+":原文件名为:"+(hmmq!=null && hmmq.size()>0?hmmq.get(MQString[i]):filename)+"]前300个字符:["
                            + (mqflag ? MQString[i].substring(0, 200)
                                    : MQString[i]) + "]");
            //对于重新处理标志,不进行解析程序,直接发送
            if(redoflag){
                hm.put(MQString[i], "");
            }else{
                CommFun.log(0,"解析文件:"+MQString[i]+"开始!");
                hm=pil.parserXml(MQString[i]);
                CommFun.log(0,"解析文件:"+MQString[i]+"完毕!");
            }
            
            StringBuffer[] stb = new StringBuffer[hm.size()];
            
            int i1 = 0;
            for (String s : hm.keySet()) {
                stb[i1++] = new StringBuffer(s + ":" + hm.get(s));
                hm0.put(s, "");
            }
            //0.对MQ提取的文件进行记载
            if (mqflag && hm.size() > 0) {
                CommFun.log(debuglevel, "MQ处理文件!");
                String tmpfilename = ConfigFileUtil.getInstance().getPathName()
                        + "docs" + File.separator + CommFun.strNowRand().substring(0, 8)
                        + ".txt";
                CommFun.log(INFO, tmpfilename);
                FileOperation.stringbuffer2file(stb, tmpfilename);
            }else{
                CommFun.log(debuglevel, "此次无文件发送,继续循环!");
                continue;
            }
        }
        //1.发送
        sendmsgs(hm0);
        CommFun.log(debuglevel, "处理完毕!");
    }
    
    public static void processOutMessage(String dataDate, final String filepatten){
        processOutMessage(dataDate,filepatten,"");
    }
    
    /**
     * 2.1对指定日期下的按模式匹配文件列表进行批量发送
     * @param dataDate
     * @param filepatten
     * @throws Exception
     */
    public static void processOutMessage(String dataDate, final String filepatten,String sendpath) {
        String filePath = ((sendpath==null || "".equals(sendpath))?ConfigFileUtil.getInstance().getPathName(dataDate)+"send":sendpath);
        CommFun.log(debuglevel, "filePath:"+filePath+",dataDate:"+dataDate+",filepatten:"+filepatten);
        File file = new File(filePath);
        File[] filelist = file.listFiles(new FilenameFilter() {
            private Pattern pattern = Pattern.compile(filepatten.replace("_S",
                    "").replace(".xml", "") + ".*\.xml");

            @Override
            public boolean accept(File dir, String name) {
                return pattern.matcher(name).matches();
            }
        });
        HashMap<String, String> hm = new HashMap<String, String>();
        for (File f : filelist) {
            // processInMessage(f.getAbsolutePath(),0);
            String fapth=f.getAbsolutePath();
            CommFun.log(debuglevel, "添加文件:"+fapth);
            hm.put(fapth, "");
        }
        sendmsgs(hm);
        CommFun.log(debuglevel, "通过通配符处理列表文件发送完毕,日期为:" + dataDate
                + ",filepattern:" + filepatten);
    }
    
    /**
     * 2.2输入文件,根据文件列表进行发送
     * @param filename
     */
    public static void processOutMessage(String filename) {
        if (filename == null || "".equals(filename)) {
            CommFun.log(debuglevel, "传入参数无效");
            return;
        }
        
        File file = new File(filename);
        if (!file.exists()) {
            CommFun.log(debuglevel, "[" + filename + "]不存在");
            throw new RuntimeException("[" + filename + "]不存在");
        }

        try {
            FileReader filereader = new FileReader(file);
            BufferedReader bf = new BufferedReader(filereader);
            String st;
            HashMap<String,String> hm=new HashMap<String, String>();
            while ((st = bf.readLine()) != null && !"".equals(st)) {
                CommFun.log(debuglevel, st);
                if(new File(st).exists()){
                    hm.put(st, "");    
                }else{
                    CommFun.log(debuglevel, "["+st+"]不存在!");
                }
            }
            sendmsgs(hm);
            CommFun.log(debuglevel, "通过列表文件发送完毕");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 2.3产生相应报文并发送
     * @param hm
     */
    public static void processOutMessage(String dateString, String msgtype,
            String sendType, String appendString, String sender) {
        ProImpl pil=new ProImpl();
        HashMap<String,String> hm0=new HashMap<String, String>();
        HashMap<String,String> hm=null;
        String[] msgtypes=msgtype.split(",");
        for(int i=0;i<msgtypes.length;i++){
            CommFun.log(ALL, "生成第[" + i + "]个报文:[" + msgtypes[i] + "]");
            hm=new HashMap<String, String>();
            hm=pil.createXml(Integer.parseInt(dateString),msgtypes[i],sendType,appendString,sender);
            //0.对MQ提取的文件进行记载
            if (hm.size() > 0) {
                CommFun.log(debuglevel, "MQ处理文件!");
                StringBuffer[] stb = new StringBuffer[hm.size()];
                int i1 = 0;
                CommFun.log(DATA, "需要发送以下["+hm.size()+"]个文件!");
                for (String s : hm.keySet()) {
                    String tmpmsgid=hm.get(s);
                    stb[i1++] = new StringBuffer(s + ":" + tmpmsgid);
                    hm0.put(s, tmpmsgid);
                    CommFun.log(DATA, s + ":" + tmpmsgid);
                }
                String tmpfilename = ConfigFileUtil.getInstance().getPathName()
                        + "docs" + File.separator + CommFun.strNowRand().substring(0, 8)
                        + ".txt";
                CommFun.log(DATA, tmpfilename);
                FileOperation.stringbuffer2file(stb, tmpfilename);
            }else{
                CommFun.log(debuglevel, "此次无文件发送,继续坛坛循环!");
                continue;
            }
        }
        
        String flagstr=ConfigFileUtil.getValue("sendflag");
        boolean sendflag=false;
        if(flagstr.length()>0 && flagstr.toUpperCase().startsWith("Y")){
            sendflag=true;
        }
        
        if(sendflag){
            CommFun.log(DATA,"发送文件:"+hm0.toString());
        }else{
            hm0=new HashMap<String, String>();
            CommFun.log(DATA,"只生成数据,不发送"+hm0.toString());
        }
        
        //1.发送
        sendmsgs(hm0);
        try {
            DBOperation.updateMsgHeadSend(hm0.values().toArray());
        } catch (SQLException e) {
            CommFun.log(ERR, e.getMessage());
            e.printStackTrace();
        }
        
        CommFun.log(debuglevel, "处理完毕!");
    }
    
    
    /**
     * 提取公共方法,发送并对MQ进行善后
     * @param hm
     */
    private static void sendmsgs(HashMap<String,String> hm){
        //1.发送
        if(hm!=null && hm.size()>0){
            CommFun.log("MQ共需要发送文件["+hm.size()+"]个");
            int i=0;
            String[] strs=new String[hm.size()];
            for (String s : hm.keySet()) {
                if (s == null || "".equals(s)) {
                    CommFun.log(debuglevel, "[" + s + "]" + "文件为空,不发送!");
                } else {
                    CommFun.log(debuglevel, "[" + s + "]" + "文件不为空,发送!");
                    strs[i++]=s;
                }
            }
            mqutil.runGoupSender(strs);
            CommFun.log("MQ已发送文件["+hm.size()+"]个");
        }
        // 2.对MQ队列管理器进行相应管理,最终的MQ处理
        mqutil.commit();
    }
}
View Code
原文地址:https://www.cnblogs.com/silencemaker/p/12632254.html