生产者消费者模式实现数据下载存储流程

  最近在做的一个项目,涉及到数据的下载 解析和存储操作.最初的实现方案是针对下载的原始数据一条条进行处理,即下载,解析和存储操作按顺序流程执行,

这种方案显然难以提高效率.故后来重新实现了下载和解析存储流程并行执行的方案

1. 数据输出接口,数据存储到数据库或其他存储的抽象

public interface IDataOut {
    void save(Object object);
}

2. 参数类

public class DownloadSaveArg {
    public String encode; //编码
    public int maxRetryCount; //下载失败最大重试次数
    public List<IDataOut> saveList; //数据存储列表
    public List<BaseModule> workModules; //解析模块列表
    public List<URLink> links; //下载接口的链接封装
    public CrawlerLog crawlerLog; //下载存储日志记录
}

3. DownloadSaver 下载和存储过程的封装

public abstract class DownloadSaver {
    private static Logger log = Logger.getLogger(DownloadSaver.class);

    private static final int MAX_RETRY_COUNT = 3;

    private String encode;
    private int maxRetryCount;
    private List<IDataOut> saveList;
    private List<BaseModule> workModules;
    private CrawlerLog crawlerLog;
    
     /**
     * 是否添加url队列完成,
     * 为ture时,表示不会再继续添加url
     */
    boolean addUrlFinish = false;
    boolean downloadFinished = false;
    boolean saveFinished = false;

    
    synchronized boolean isAddUrlFinish() {
        return this.addUrlFinish;
    }

    synchronized void setUrlFinish() {
        if (!this.addUrlFinish)
            this.addUrlFinish = true;
    }

    //url队列
    private BlockingQueue<URLink> urlQueue = new LinkedBlockingQueue<>();

    public void addUrl(URLink url) {
        try {
            urlQueue.put(url);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

  private void addUrls(List<URLink> links) {
        if (links == null) return;

        for (URLink l : links)
            try {
                this.urlQueue.put(l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    // 下载结果缓存队列
    private LinkedBlockingQueue<JSONObject> resultQueue = new LinkedBlockingQueue<>(
            1000);

    public DownloadSaver(DownloadSaveArg arg) {
        this.encode = Strings.isNullOrEmpty(arg.encode) ? "utf-8" : arg.encode;
        this.maxRetryCount = arg.maxRetryCount > 0 ? arg.maxRetryCount : MAX_RETRY_COUNT;
        this.saveList = arg.saveList;
        this.addUrls(arg.links);
        this.workModules = arg.workModules;
        this.crawlerLog = arg.crawlerLog;
    }

    private void addUrls(List<URLink> links) {
        if (links == null) return;

        for (URLink l : links)
            try {
                this.urlQueue.put(l);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    }

    /**
     * 下载操作
     * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中
     * addUrlFinish为true并且url队列为空时,结束操作
     */
    public void download() {
    }

   /**
     * 保存操作
     * 消费下载结果队列里的数据,存储
     * 当下载完成并且队列为空时,任务操作结束.
     */
    public void save() {
    
    }


   public static class Downloader{}

   public static class Saver{}

   /**
     * 并行执行下载 解析流程
     */
    public void execute() {
        ExecutorService service = Executors.newCachedThreadPool();

        Downloader downloader = new Downloader("downloader01", this);
        Saver saver = new Saver("saver01", this);

        Future<?> fs1 = service.submit(downloader);
        Future<?> fs2 = service.submit(saver);

        try {
            log.debug(fs1.get() + " downloader finished!");
            log.debug(fs2.get() + " saver finished!");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } finally {
            service.shutdown();
        }

    }   
}

4.Downloader 下载器

    /**
     * 下载器线程.
     */
    public static class Downloader implements Runnable {
        private String instance;
        private DownloadSaver downloadSaver;

        public Downloader(String instance, DownloadSaver downloadSaver) {
            this.instance = instance;
            this.downloadSaver = downloadSaver;
        }

        @Override
        public void run() {
            System.out.println(this.instance + " running...");
            downloadSaver.download();
        }
    }

5. Saver 存储器

    /**
     * 存储线程.
     */
    public static class Saver implements Runnable {
        private String instance;
        private DownloadSaver downloadSaver;

        public Saver(String instance, DownloadSaver downloadSaver) {
            this.instance = instance;
            this.downloadSaver = downloadSaver;
        }

        @Override
        public void run() {
            log.debug(this.instance + " running...");
            downloadSaver.save();
        }
    }

6. 下载操作

/**
     * 下载操作
     * 消费url队列里的url,下载数据,经过解析缓存到resultQueue中
     * addUrlFinish为true并且url队列为空时,结束操作
     */
    public void download() {
        this.crawlerLog.setCrawlerStartTime();
        while (true) {
            try {
                URLink link = urlQueue.take();
                String result = startDownload(link.realLink);

                List<JSONObject> results = parseContent2Json(result, link);
                if (results != null)
                    for (JSONObject j : results) {
                        resultQueue.put(j);
                        crawlerLog.incrCrawlerNum();
                    }

            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (addUrlFinish && urlQueue.isEmpty()) {
                downloadFinished = true;
                break;
            }
        }

        this.crawlerLog.setCrawlerEndTime();
    }

6. 存储操作

/**
     * 保存操作
     * 消费下载结果队列里的数据,存储
     * 当下载完成并且队列为空时,任务操作结束.
     */
    public void save() {
        this.crawlerLog.setStoreStartTime();
        while (true) {
            try {
                parseSaveItem(resultQueue.take());
                int saveCount = this.incSaveCount();
                if (saveCount % 100 == 0)
                    log.debug("saveCount: " + saveCount);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

            if (downloadFinished && resultQueue.isEmpty()) {
                saveFinished = true;
                break;
            }
        }

        this.crawlerLog.setStoreEndTime();
    }
原文地址:https://www.cnblogs.com/taich-flute/p/7144387.html