1.0多线程抽取邮件
package com.example.demo.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; public class QuickEmailToWikiExtractor{ private ThreadPoolExecutor threadPool; private BlockingQueue<ExchangeEmailShallowDTO> emailQueue; public QuickEmailToWikiExtractor() { emailQueue = new LinkedBlockingQueue<ExchangeEmailShallowDTO>(); int corePoolSize = Runtime.getRuntime().availableProcessors() * 2; threadPool = new ThreadPoolExecutor(corePoolSize, corePoolSize,101, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000)); } // 每五分钟执行一次 public void exetract(){ System.out.println("开始"); long start = System.currentTimeMillis(); //抽取所有的邮件放到队列中 new ExtractEmailTask().start(); //把队列的文章插入Wiki insertToWiki(); long end = System.currentTimeMillis(); double cost = (end - start) / 1000; System.out.println("完成,花费时间=" + cost + "秒。"); } // 把队列中的文章插入到Wiki private void insertToWiki(){ while(true){ //2s读取不到就退出 ExchangeEmailShallowDTO email; try { email = emailQueue.poll(2, TimeUnit.SECONDS); if(email == null){ break; } threadPool.execute(new InsertToWikiTask(email)); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } protected void extractEmail(){ List<ExchangeEmailShallowDTO> allEmails = new ArrayList();//这里是抽取出的所有邮件 int i = 0; while(i++<10){ allEmails.add(new ExchangeEmailShallowDTO()); } if(allEmails == null){ return; } for(ExchangeEmailShallowDTO email: allEmails){ emailQueue.offer(email); } } //抓取邮件线程 class ExtractEmailTask extends Thread{ public void run(){ System.out.println("抽取邮件。。。"); extractEmail(); } } class InsertToWikiTask implements Runnable{ private ExchangeEmailShallowDTO email; public InsertToWikiTask(ExchangeEmailShallowDTO email){ this.email = email; } public void run() { System.out.println(Thread.currentThread().getName() + "把一个邮件插入到Wiki=" + System.currentTimeMillis()); } } public static void main(String[] args) { QuickEmailToWikiExtractor mail = new QuickEmailToWikiExtractor(); mail.exetract(); } } class ExchangeEmailShallowDTO{ }
2.0
public class Message { } public interface IMsgQueue { } public class MsgQueueManager implements IMsgQueue{ public static final BlockingQueue<Message> messageQueue = new LinkedTransferQueue<>(); public static void put(Message msg) { try { messageQueue.put(msg); } catch (InterruptedException e) { e.printStackTrace(); } } public static Message take() { try { return messageQueue.take(); } catch (InterruptedException e) { e.printStackTrace(); } return null; } } package com.example.demo.test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; //启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息 public class Mains { static class DispatchMessageTask implements Runnable{ @Override public void run() { BlockingQueue<Message> subQueue; for(;;) { //如果没有数据,那么阻塞在这里 Message message = MsgQueueManager.take(); while((subQueue=getSubQueue())==null) { try { Thread.sleep(1000); }catch (Exception e) { e.printStackTrace(); } } try { subQueue.put(message); }catch (Exception e) { e.printStackTrace(); } } } } //使用散列(hash)算法获取一个子队列 public static BlockingQueue<Message> getSubQueue(){ List<BlockingQueue<Message>> subMsgQueues = new ArrayList();//假装这是一个全局的子队列 int errorCount = 0; for(;;){ if(subMsgQueues.isEmpty()){ return null; } int index = (int)(System.nanoTime() % subMsgQueues.size()); try{ return subMsgQueues.get(index); }catch(Exception e){ //出现错误,在获取队列大小之后,队列进行了一次删除操作 if(++errorCount < 3){ continue; } } } } }