第11章Java并发编程实战

1.0多线程抽取邮件

  

package com.example.demo.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class QuickEmailToWikiExtractor{
    private ThreadPoolExecutor threadPool;
    private BlockingQueue<ExchangeEmailShallowDTO> emailQueue;
    
    public QuickEmailToWikiExtractor() {
        emailQueue = new LinkedBlockingQueue<ExchangeEmailShallowDTO>();
        int corePoolSize = Runtime.getRuntime().availableProcessors() * 2;
        threadPool = new ThreadPoolExecutor(corePoolSize, corePoolSize,101, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2000));
    }
    // 每五分钟执行一次
    public void exetract(){
        System.out.println("开始");
        long start = System.currentTimeMillis();
        
        //抽取所有的邮件放到队列中
        new ExtractEmailTask().start();
        //把队列的文章插入Wiki
        insertToWiki();
        
        long end = System.currentTimeMillis();
        double cost = (end - start) / 1000;
        System.out.println("完成,花费时间=" + cost + "秒。");
    }
    // 把队列中的文章插入到Wiki
    private void insertToWiki(){
        while(true){
            //2s读取不到就退出
            ExchangeEmailShallowDTO email;
            try {
                email = emailQueue.poll(2,  TimeUnit.SECONDS);
                if(email == null){
                    break;
                }
                threadPool.execute(new InsertToWikiTask(email));
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    protected void extractEmail(){
        List<ExchangeEmailShallowDTO> allEmails = new ArrayList();//这里是抽取出的所有邮件
        int i = 0;
        while(i++<10){
            allEmails.add(new ExchangeEmailShallowDTO());
        }
        if(allEmails == null){
            return;
        }
        for(ExchangeEmailShallowDTO email: allEmails){
            emailQueue.offer(email);
        }
    }
    //抓取邮件线程
    class ExtractEmailTask extends Thread{
        public void run(){
            System.out.println("抽取邮件。。。");
            extractEmail();
        }
    }
    class InsertToWikiTask implements Runnable{
        private ExchangeEmailShallowDTO email;
        public InsertToWikiTask(ExchangeEmailShallowDTO email){
            this.email = email;
        }
        public void run() {
            System.out.println(Thread.currentThread().getName() + "把一个邮件插入到Wiki=" + System.currentTimeMillis());
        }
    }
    
    public static void main(String[] args) {
        QuickEmailToWikiExtractor mail = new QuickEmailToWikiExtractor();
        mail.exetract();
    }
}

class ExchangeEmailShallowDTO{
    
}

2.0

public class Message {

}

public interface IMsgQueue {

}

public class MsgQueueManager implements IMsgQueue{
    public static final BlockingQueue<Message> messageQueue = new LinkedTransferQueue<>();
    
    
    public static void put(Message msg) {
        try {
            messageQueue.put(msg);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    
    public static Message take() {
        try {
            return messageQueue.take();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return null;
    }
}

package com.example.demo.test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
//启动一个消息分发线程。在这个线程里子队列自动去总队列里获取消息
public class Mains {
    static class DispatchMessageTask implements Runnable{

        @Override
        public void run() {
            BlockingQueue<Message> subQueue;
            for(;;) {
                //如果没有数据,那么阻塞在这里
                Message message = MsgQueueManager.take();
                while((subQueue=getSubQueue())==null) {
                    try {
                        Thread.sleep(1000);
                    }catch (Exception e) {
                        e.printStackTrace();
                    }
                }
                try {
                    subQueue.put(message);
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        
    }
     //使用散列(hash)算法获取一个子队列
    public static BlockingQueue<Message> getSubQueue(){
        List<BlockingQueue<Message>> subMsgQueues = new ArrayList();//假装这是一个全局的子队列
        int errorCount = 0;
        for(;;){
            if(subMsgQueues.isEmpty()){
                return null;
            }
            int index = (int)(System.nanoTime() % subMsgQueues.size());
            try{
                return subMsgQueues.get(index);
            }catch(Exception e){
                //出现错误,在获取队列大小之后,队列进行了一次删除操作
                if(++errorCount < 3){
                    continue;
                }
            }
        }
    }
}
原文地址:https://www.cnblogs.com/helloworldmybokeyuan/p/11757319.html