文本解析-多线程-分片解析

/**技术:  CyclicBarrier + AtomicLong + BlockingQueue + RandomAccessFile
 */
public class MainTest {
    private static Logger logger = LoggerFactory.getLogger(MainTest.class);

    //给定线程数
    static int threadNum = 4;
    private static String path =      "C:/Users/70403/Downloads/txtTest/txtTest7.txt"; //
    private static String writePath = "C:/Users/70403/Downloads/txtTest/txtTestWriteMe2.txt";

    /**
     *
     */
    public static void main(String[] args) {
        logger.info("开始时间:" + new Date());
        // 声明缓存队列
        BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
        BigFileReaderByPart.Builder builder = new BigFileReaderByPart.Builder(path, new IDataHandle() {
            public void handle(String line) {
                System.out.println("--100----->" + Thread.currentThread() + line);
                writeFun(writePath, line);
                //解析执行的数量时间3460ms,解析数量为5000
                //解析执行的数量时间3226ms,解析数量为5000
                //解析执行的数量时间3907ms,解析数量为5000
                //解析执行的数量时间3279ms,解析数量为5000
            }
        }, queue);
//        builder.withCharset("UTF-8").withTreahdSize(threadNum).withBufferSize(524288);
        BigFileReaderByPart bigFileReader = builder.build();
        bigFileReader.start();

//        // 线程写入数据到同一个文件
//        BufferedWriter  fw = new BufferedWriter(new FileWriter(ConfigUtil.getValue("log.target.path"), true));
//        // 线程池
//        ExecutorService executor = Executors.newCachedThreadPool();
//        executor.execute(new QueueResult2File(fw, queue));
//
//        // 关闭线程
//        executor.shutdown();
//        executor.awaitTermination(5, TimeUnit.DAYS);
//
//        // 关闭数据流
//        fw.close();

        System.out.println("结束时间:" + new Date());
        logger.info("app1 Shutdown...");
    }
}
/**
 *  技术实现:  jsoup
 */
public class BigFileReaderByPart {
    /****************** 初始化  *********************/
//    private Logger logger = Logger.getLogger(BigFileReaderByPart.class); // 日志对象
    private Logger logger = LoggerFactory.getLogger(getClass());

    private int threadSize; // 线程大小
    private String charset; // 编码
    private int bufferSize; // 缓存字节大小
    private IDataHandle handle; // 数据返回定义接口
    private ExecutorService  executorService; // 线程池
    private long fileLength; // 文件长度
    private RandomAccessFile rAccessFile; // 文件读写对象
    private Set<StartEndPair> startEndPairs; // 分片
    private CyclicBarrier cyclicBarrier;
    private AtomicLong counter = new AtomicLong(0); // 计数
    private BlockingQueue<String> queue; // 队列
    private Map<String, String> filterMap; // 过滤URL
//    private CheckSensitiveWord checkSensitiveWord;
    /**
     * @param file
     * @param handle
     * @param charset
     * @param bufferSize
     * @param threadSize
     */
    private BigFileReaderByPart(File file,IDataHandle handle, BlockingQueue<String> queue, String charset,int bufferSize,int threadSize){
        this.fileLength = file.length();
        this.handle = handle;
        this.charset = charset;
        this.bufferSize = bufferSize;
        this.threadSize = threadSize;
        try {
            this.rAccessFile = new RandomAccessFile(file,"rw");
        } catch (FileNotFoundException e) {
            e.printStackTrace();
        }
        this.executorService = newFixedThreadPool(threadSize);
        startEndPairs = new HashSet<BigFileReaderByPart.StartEndPair>();
        this.queue = queue;
        this.filterMap =  new HashMap<String, String>();
//        checkSensitiveWord = new CheckSensitiveWord();
    }

    /**
     * 开启任务
     */
    public void start(){
        long everySize = this.fileLength / this.threadSize;
        try {
            calculateStartEnd(0, everySize);
        } catch (IOException e) {
            e.printStackTrace();
            return;
        }

        final long startTime = System.currentTimeMillis();
        cyclicBarrier = new CyclicBarrier(startEndPairs.size(), new Runnable() {
            /* (non-Javadoc)
             * @see java.lang.Runnable#run()
             */
            public void run() {
                logger.info("use time---5--->: " + (System.currentTimeMillis() - startTime));
                logger.info("all line---8--->: " + counter.get());
                logger.info("--50--->解析执行的数量时间{}ms,解析数量为{} : ", System.currentTimeMillis() - startTime, counter.get());

            }
        });
        logger.info("总分配分片数量----10---->:" + startEndPairs.size());
        for (StartEndPair pair : startEndPairs) {
            logger.info("分配分片----13---->:" + pair);
            this.executorService.execute(new SliceReaderTask(pair));
        }
    }

    /**
     * 计算指针读取开始和结尾即分片
     */
    private void calculateStartEnd(long start, long size) throws IOException {
        if (start > fileLength - 1) {
            return;
        }
        StartEndPair pair = new StartEndPair();
        pair.start = start;
        long endPosition = start + size - 1;
        if (endPosition >= fileLength - 1) {
            pair.end = fileLength - 1;
            startEndPairs.add(pair);
            return;
        }
        // 移动指针读取
        rAccessFile.seek(endPosition);
        byte tmp = (byte) rAccessFile.read();
        while (tmp != '
' && tmp != '
') {
            endPosition++;
            if (endPosition >= fileLength - 1) {
                endPosition = fileLength - 1;
                break;
            }
            rAccessFile.seek(endPosition);
            tmp = (byte) rAccessFile.read();
        }
        pair.end = endPosition;
        startEndPairs.add(pair);

        // 迭代计算
        calculateStartEnd(endPosition + 1, size);
    }

    /**
     * 关闭
     */
    public void shutdown(){
        try {
            this.rAccessFile.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        this.executorService.shutdown();
    }

    /**
     * @throws UnsupportedEncodingException
     */
    private void handle(byte[] bytes) throws UnsupportedEncodingException{
        String line = null;
        if (this.charset == null) {
            line = new String(bytes);
        } else {
            line = new String(bytes, charset);
        }
        if (line != null && !"".equals(line)) {
            this.handle.handle(line);

//            String writePath="C:/Users/70403/Downloads/txtTest/txtTestWriteMe.txt";
//            writeFun(writePath, line);

//            // 抓取内容
//            String [] lineArray = line.split("    ");
//            if (null != lineArray && lineArray.length == 3 && null != lineArray[2]) {
//                if (!checkSensitiveWord.isContaintSensitiveWord(lineArray[2])) {
//                    // 如果.html直接访问
//                    if (!filterMap.containsKey(StringUtil.getDomain(lineArray[2]))) {
//                        String contentStr = grabTitle(lineArray[2]);
//                        if (null != contentStr) {
//                            queue.add(contentStr);
//                        }
//                    }
//                    logger.info("第" + counter.get() + "行 : 已请求" + line);
//                } else {
//                    logger.info("第" + counter.get() + "行 : 已过滤URL: " + line);
//                }
//            }

            counter.incrementAndGet(); // 递增 保证数据计数同步
        }
    }


    /**
     * 抓取网站url的title和keywords
     */
    public String grabTitle(String url) {
        Document doc = null;
        try {
            Connection con = Jsoup.connect(url).timeout(5000);
            con.userAgent("Mozilla/5.0 (Windows NT 10.0; WOW64; rv:44.0) Gecko/20100101 Firefox/44.0");
            doc = con.get();
        } catch (Exception e) {
            //logger.info(url);
            String domainStr = StringUtil.getDomain(url);
            if (!filterMap.containsKey(domainStr)) {
                filterMap.put(domainStr, url); //过滤
            }
            return null;
        }
        if (null == doc) {
            return null;
        }


        String title  = doc.title();
        String keywords = null;
        Element el = doc.getElementsByAttributeValue("name", "keywords").first();
        if (null != el) {
            keywords = el.attr("content");
        }
        if (StringUtil.isNullOrEmpty(title) && StringUtil.isNullOrEmpty(keywords)) {
            return null;
        }

        if (StringUtil.isNullOrEmpty(title)) {
            title = "null";
        }
        if (StringUtil.isNullOrEmpty(keywords)) {
            title = "null";
        }
        StringBuilder sb = new StringBuilder();
        sb.append(doc.title()).append("-&-").append(keywords);
        return sb.toString();
    }

    /**
     * </pre>
     */
    private static class StartEndPair{
        public long start;
        public long end;

        /* (non-Javadoc)
         * @see java.lang.Object#toString()
         */
        @Override
        public String toString() {
            return "star----15---->" + start + ";end----18---->" + end;
        }

        /* (non-Javadoc)
         * @see java.lang.Object#hashCode()
         */
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + (int) (end ^ (end >>> 32));
            result = prime * result + (int) (start ^ (start >>> 32));
            return result;
        }

        /* (non-Javadoc)
         * @see java.lang.Object#equals(java.lang.Object)
         */
        @Override
        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null) {
                return false;
            }
            if (getClass() != obj.getClass()) {
                return false;
            }
            StartEndPair other = (StartEndPair) obj;
            if (end != other.end) {
                return false;
            }
            if (start != other.start) {
                return false;
            }
            return true;
        }

    }

    /**
     * </pre>
     */
    private class SliceReaderTask implements Runnable{
        private long start;
        private long sliceSize;
        private byte[] readBuff;
        /**
         * @paramm start     read position (include)
         * @paramm end     the position read to(include)
         */
        public SliceReaderTask(StartEndPair pair) {
            this.start = pair.start;
            this.sliceSize = pair.end-pair.start+1;
            this.readBuff = new byte[bufferSize];
        }

        /* (non-Javadoc)
         * @see java.lang.Runnable#run()
         */
        public void run() {
            try {
                MappedByteBuffer mapBuffer = rAccessFile.getChannel().map(MapMode.READ_ONLY, start, this.sliceSize);
                ByteArrayOutputStream bos = new ByteArrayOutputStream();
                for (int offset = 0; offset < sliceSize; offset += bufferSize) {
                    int readLength;
                    if (offset + bufferSize <= sliceSize) {
                        readLength = bufferSize;
                    } else {
                        readLength = (int) (sliceSize - offset);
                    }
                    mapBuffer.get(readBuff, 0, readLength);
                    for (int i = 0; i < readLength; i++) {
                        byte tmp = readBuff[i];
                        if (tmp == '
' || tmp == '
') {
                            handle(bos.toByteArray());
                            bos.reset();
                        } else {
                            bos.write(tmp);
                        }
                    }
                }

                if (bos.size() > 0) {
                    handle(bos.toByteArray());
                }
                cyclicBarrier.await();// 等待其他线程操作完毕
            }catch (Exception e) {
                e.printStackTrace();
            }
        }

    }

    /**
     * </pre>
     */
    public static class Builder{
        //给定线程4
        private int threadSize = 4;
//        private int threadSize = 1;
        private String charset = null;
        private int bufferSize = 1024 * 1024;
        private IDataHandle handle;
        private File file;
        private BlockingQueue<String> queue;
        public Builder(String file, IDataHandle handle, BlockingQueue<String> queue){
            this.file = new File(file);
            if(!this.file.exists()){
                throw new IllegalArgumentException("文件不存在!");
            }
            this.handle = handle;
            this.queue = queue;
        }

        public Builder withTreahdSize(int size){
            this.threadSize = size;
            return this;
        }

        public Builder withCharset(String charset){
            this.charset= charset;
            return this;
        }

        public Builder withBufferSize(int bufferSize){
            this.bufferSize = bufferSize;
            return this;
        }

        public BigFileReaderByPart build(){
            return new BigFileReaderByPart(this.file,this.handle,this.queue, this.charset,this.bufferSize,this.threadSize);
        }
    }
}
public interface IDataHandle {

    /**
     * 处理接口
     * 
     * @param line
     */
    public void handle(String line);
}
public class StringUtil {
    /**
     * 禁止实例化
     */
    private StringUtil() {
        
    }
    
    /**
     * 判断是否为空
     * 
     * @param value
     * @return
     */
    public static final boolean isNullOrEmpty(String value) {
        return (value == null) || (value.length() == 0);
    }
    
    /**
     * @param curl
     * @return
     */
    public static String getDomain(String curl) {
        URL url = null;
        String q = "";
        try {
            url = new URL(curl);
            q = url.getHost();
        } catch (MalformedURLException e) {

        }
        url = null;
        return q;
    }
    
    /**
     * 截取url中的域名
     * 
     * @param url 初始化请求url
     * @return url域名
     */
    public static String getDomainName(String url) {
        if (null != url && url != "") {
            Pattern p = Pattern.compile("(?<=//|)((\w)+\.)+\w+");
            Matcher m = p.matcher(url);
            if(m.find()){
                  return m.group();
            }
        }
        return null;
    }
    
    public static void main(String[] args) {
        System.out.println(getDomain("http://tf.360.cn/e/wb?_=76b6fa2e03fe712e&ip=49.69.92.204&reduce=0&width=0&height=0"));;
    }
}

https://github.com/butter-fly/big_file_read;

原文地址:https://www.cnblogs.com/hahajava/p/10265382.html