基于线程池的线程管理(BlockingQueue生产者消费者方式)实例

1.线程池管理类:

public class ThreadPoolManager {
    private static ThreadPoolManager instance = new ThreadPoolManager();

    private ExecutorService secLogThreadPool;
    private ExecutorService sysLogThreadPool;

    public ExecutorService getSysLogThreadPool() {
        return sysLogThreadPool;
    }

    public void setSysLogThreadPool(ExecutorService sysLogThreadPool) {
        this.sysLogThreadPool = sysLogThreadPool;
    }

    public ExecutorService getSecLogThreadPool() {
        return secLogThreadPool;
    }

    public void setSecLogThreadPool(ExecutorService secLogThreadPool) {
        this.secLogThreadPool = secLogThreadPool;
    }

    public static ThreadPoolManager getInstance(){
        return instance;
    }

    private ThreadPoolManager() {
        secLogThreadPool = new ThreadPoolExecutor(1, 3, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadPoolExecutor.CallerRunsPolicy());
        sysLogThreadPool = Executors.newFixedThreadPool(3);
    }
}

注:  

线程池类为 Java.util.concurrent.ThreadPoolExecutor,常用构造方法为:
 
ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)
 
 参数含义如下:
corePoolSize: 线程池维护线程的最少数量
maximumPoolSize:线程池维护线程的最大数量
keepAliveTime: 线程池维护线程所允许的空闲时间
unit: 线程池维护线程所允许的空闲时间的单位
workQueue: 线程池所使用的缓冲队列
handler: 线程池对拒绝任务的处理策略

  2. 生产者类:

public class SecLogProduceThread implements Runnable {
    SecLogEntity entity = null;

    public SecLogProduceThread(SecLogEntity entity) {
        this.entity = entity;
    }

    @Override
    public void run() {
        SecLogStorage.getInstance().produce(entity);
    }
}

  3.消费者类:

public class SecLogConsumeThread implements Runnable {
    @Override
    public void run() {
        while(true){
            //TODO do something here
            SecLogStorage.getInstance().consume();
        }
    }
}

  4.日志仓储类:BlockingQueue方式

public class SecLogStorage {
    private final int MAX_SIZE = 100;
    private LinkedBlockingDeque<SecLogEntity> list = new LinkedBlockingDeque<SecLogEntity>(MAX_SIZE);
    private static SecLogStorage instance = new SecLogStorage();

    private SecLogStorage() {
    }

    public static SecLogStorage getInstance() {
        return instance;
    }

    public void produce(SecLogEntity seclog) {
        if (list.size() == MAX_SIZE) {
            System.out.println("seclog库存量为" + MAX_SIZE + ",不能再继续生产!");
        }
        try {
            list.put(seclog);
            System.out.println("生产SecLog:"+ JSONObject.fromObject(seclog));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public SecLogEntity consume(){
        SecLogEntity entity = null;
        if(list.isEmpty()){
            System.out.println("seclog库存量为0,不能再继续消费!");
        }
        try {
            entity = list.take();
            System.out.println("消费SecLog:"+JSONObject.fromObject(entity));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return entity;
    }
}

  5. log bean :

public class SecLogEntity {
    private String logName;
    private String logSrc;

    public String getLogName() {
        return logName;
    }

    public void setLogName(String logName) {
        this.logName = logName;
    }

    public String getLogSrc() {
        return logSrc;
    }

    public void setLogSrc(String logSrc) {
        this.logSrc = logSrc;
    }
}

  6. 测试类:

public class ThreadPoolTest {
    public static void main(String[] args) {
        SecLogEntity log1 = new SecLogEntity();
        log1.setLogName("test1");
        log1.setLogSrc("seclog1");

        SecLogEntity log2 = new SecLogEntity();
        log2.setLogName("test2");
        log2.setLogSrc("seclog2");

        SysLogEntity log3 = new SysLogEntity();
        log3.setLogName("test3");
        log3.setLogSrc("syslog1");

        SysLogEntity log4 = new SysLogEntity();
        log4.setLogName("test4");
        log4.setLogSrc("syslog2");

        ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogProduceThread(log1));
        ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogProduceThread(log2));
        ThreadPoolManager.getInstance().getSecLogThreadPool().execute(new SecLogConsumeThread());

        ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogProduceThread(log3));
        ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogProduceThread(log4));
        ThreadPoolManager.getInstance().getSysLogThreadPool().execute(new SysLogConsumeThread());
    }
}

  7. 测试结果:

  生产SecLog:{"logName":"test1","logSrc":"seclog1"}
  生产syslog:{"logName":"test3","logSrc":"syslog1"}
  消费syslog: {"logName":"test3","logSrc":"syslog1"}
  生产SecLog:{"logName":"test2","logSrc":"seclog2"}
  消费syslog: {"logName":"test4","logSrc":"syslog2"}
  syslog库存量为0,无法再消费syslog!
  生产syslog:{"logName":"test4","logSrc":"syslog2"}
  消费SecLog:{"logName":"test1","logSrc":"seclog1"}
  消费SecLog:{"logName":"test2","logSrc":"seclog2"}
  seclog库存量为0,不能再继续消费!

原文地址:https://www.cnblogs.com/hunterCecil/p/6094533.html