自定义线程池实践 不影响主程序的情况下, 调用第三方接口,超时停止访问 实践

情景: 在高并发的请求中,  立刻返回响应消息,  调用第三方接口 继续执行, 且超时调用停止该线程,  

package com.cjcx.inter.utils;

import java.util.concurrent.*;

/**
 *
 */
public class ThreadPoolTest {

    // 线程池维护线程的最少数量
    private static Integer corePoolSize = 2;

    // 线程池维护线程的最大数量
    private static Integer maxPoolSize = 10;

    // 线程池维护线程所允许的空闲时间
    private static Integer keepAliveTime = 90;

    // 线程池所使用的缓冲队列大小
    private static Integer workQueueSize = 1024;


    public static void main(String[] args) {
        try {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(corePoolSize,
                    maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(),
                    new ThreadPoolExecutor.CallerRunsPolicy());

            // 每秒输出线程池情况
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        while (true) {
                            Executors.newFixedThreadPool(2);
                            Thread.sleep(3000);
                            System.out.println("============================================活跃的线程数:" + pool.getActiveCount() + ",核心线程数:" + pool.getCorePoolSize() + ",线程池大小:" + pool.getPoolSize() + ",队列的大小" + pool.getQueue().size() + "==============================================");
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();

            // 多任务执行
            for (int i = 0; i < 2000; i++) {
                final int k = i;
                System.out.println("i:" + i);
                /*
                可丢弃任务(超出队列) 后台定时冲DB拉取再处理的场景,
                if(pool.getQueue().size() > 1000){
                    return;
                }*/
                
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        // 任务 小票接口 上传给IBC
                        try {
                            long s = System.currentTimeMillis();
                            System.out.println(Thread.currentThread().getName() + "===================== 任务" + k + " 开始");
                            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                                public Integer call() throws Exception {
                                    return executeTask(k);
                                }
                            });
                            try {
                                Integer result = future.get(1, TimeUnit.SECONDS);
                                System.out.println("设备ID: " + k + ", 结果:" + result + " 上传接口程序 正常完成");
                                UploadDescribe.getInstance().record(System.currentTimeMillis() - s);
                            } catch (InterruptedException | ExecutionException | TimeoutException e) {
                                future.cancel(true);
                                UploadDescribe.getInstance().cancel("DV_" + k);
                                System.out.println("设备ID: " + k + " 上传接口程序 1秒内未完成, 停止上传.退出");
                            } finally {
                                System.out.println(Thread.currentThread().getName() + "清理资源===================== 任务" + k + " 结束, 用时:" + (System.currentTimeMillis() - s));
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // 上传IBC任务
    static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);
    private static Integer executeTask(Integer k) {
        Integer r = -1;
        try {
            int s = (int) (Math.random() * 2000);
            System.out.println("k:" + k + ", sleep:" + s);
            Thread.sleep(s);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        r = 0;
        return r;
    }
}

 // 编写一个类记录 执行的成功,失败次数,记录成功的平均耗时

package com.cjcx.inter.utils;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 接口上传情况
 * @author e58
 *
 */
public class UploadDescribe {
    
    private Logger logger = LoggerFactory.getLogger(UploadDescribe.class);
    
    private static UploadDescribe describe;
    
    private UploadDescribe() {}
    
    public synchronized static UploadDescribe getInstance() {
        if (describe == null) {
            describe = new UploadDescribe();
        }
        return describe;
    }
    
    private ReentrantLock put = new ReentrantLock();
    
    // 上传超时时间ms
    private Long connectTimeOut = 5000L;
    
    // 上传总次数
    private Integer uploadTimes = 0; 
    // 上传平均耗时ms
    private Long uploadAverageTime = 0L;
    // 超时总次数
    private Integer cancelTimes = 0;
    // 超时的设备ID
    private Set<String> sets = new HashSet<String>();
    
    /**
     * 请求成功,记录次数和耗时
     */
    public void record(Long increTime){
        try {
            put.lock();
            // 次数+1
            uploadTimes = uploadTimes + 1;
            // 平均时间
            if(uploadTimes != 0)
                uploadAverageTime = (uploadAverageTime + increTime ) / uploadTimes;
            
            desc();
        } catch (Exception e) {
            logger.error("UploadDescribe record:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 设备请求超时耗时
     */
    public void cancel(String deviceId){
        try {
            put.lock();
            // 次数+1
            cancelTimes = cancelTimes + 1;
            // 平均时间
            sets.add(deviceId);
            
            desc();
        } catch (Exception e) {
            logger.error("UploadDescribe cancel:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 清空数据
     */
    public void clear(){
        try {
            put.lock();
            uploadTimes = 0; 
            uploadAverageTime = 0L;
            cancelTimes = 0;
            sets.clear();
        } catch (Exception e) {
            logger.error("UploadDescribe clear:", e);
        } finally{
            put.unlock();
        }
    }
    
    /**
     * 输出描述信息
     */
    public void desc(){
        logger.info("uploadTimes=" + uploadTimes + ", uploadAverageTime=" + uploadAverageTime + ", cancelTimes=" + cancelTimes + "]");
    }
    
}
原文地址:https://www.cnblogs.com/eason-d/p/11496943.html