线程池使用

自动派单多线程理解

通用外系统自动派单逻辑:

  1. 立单时打标记
  2. Task扫描
    1. 根据省份编码,系统编码,租户ID查询码表中配置的 自动派单量和线程数。
    2. 根据派单量和线程数 计算出每个线程需要派发工单的数量
    3. 循环所有工单,将工单添加至新的集合中,当新集合的派单数到达每个线程的派单量时,将该集合的所有工单打上自动派单标记10,启动线程。清空线程集合。

code:

1.线程入口

TaskNewEngine.getInstance().submit(new AutoToOutSysThread(commonParamMap, provCode, tenantId));

2.获取线程池的东西

public class TaskNewEngine {

private static TaskNewEngine instance new TaskNewEngine();

/**
返回一个线程池实例(单例)
@return instance
*/
public static TaskNewEngine getInstance (){ return instance;}

private ThreadPoolExecutor executor;

/**
无参构造线程池实例
*/
private TaskNewEngine(){
executor new ThreadPoolExecutor(1, 30,180L,
TimeUnit.SECONDSnew SynchronousQueue<Runnable>(), new ThreadFactory() {
final AtomicInteger threadNumber new AtomicInteger(1);

public Thread newThread(Runnable runnable) {
// Use our own naming scheme for the threads.
Thread thread = new Thread(Thread.currentThread().getThreadGroup(), runnable,
"autoToOutSysSend-pool-" threadNumber.getAndIncrement(), 0);
// Make workers daemon threads.
thread.setDaemon(true);
if (thread.getPriority() != Thread.NORM_PRIORITY) {
thread.setPriority(Thread.NORM_PRIORITY);
}
return thread;
}
});
}

/**
* Submits a Runnable task for execution and returns a Future
* representing that task.
*
@param 
task the task to submit.
@return a Future representing pending completion of the task,
* and whose <tt>get()</tt> method will return <tt>null</tt>
* upon completion.
@throws RejectedExecutionException if task cannot be scheduled
* for execution.
@throws NullPointerException if task null.
*/
public Future<?> submit(Runnable task) {
return executor.submit(task);
}
}

3.Thread类 implements Runnable

run方法中写业务逻辑

 

ThreadPoolExecutor参数设置

 

一、ThreadPoolExecutor的重要参数

corePoolSize:核心线程数

核心线程会一直存活,及时没有任务需要执行
当线程数小于核心线程数时,即使有线程空闲,线程池也会优先创建新线程处理
设置allowCoreThreadTimeout=true(默认false)时,核心线程会超时关闭
queueCapacity:任务队列容量(阻塞队列)

当核心线程数达到最大时,新任务会放在队列中排队等待执行
maxPoolSize:最大线程数

当线程数>=corePoolSize,且任务队列已满时。线程池会创建新线程来处理任务
当线程数=maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常
keepAliveTime:线程空闲时间

当线程空闲时间达到keepAliveTime时,线程会退出,直到线程数量=corePoolSize
如果allowCoreThreadTimeout=true,则会直到线程数量=0
allowCoreThreadTimeout:允许核心线程超时
rejectedExecutionHandler:任务拒绝处理器

两种情况会拒绝处理任务:

当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务
线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常
ThreadPoolExecutor类有几个内部实现类来处理这类情况:

AbortPolicy 丢弃任务,抛运行时异常
CallerRunsPolicy 执行任务
DiscardPolicy 忽视,什么都不会发生
DiscardOldestPolicy 从队列中踢出最先进入队列(最后一个执行)的任务
实现RejectedExecutionHandler接口,可自定义处理器

二、ThreadPoolExecutor执行顺序:
线程池按以下行为执行任务

当线程数小于核心线程数时,创建线程。
当线程数大于等于核心线程数,且任务队列未满时,将任务放入任务队列。
当线程数大于等于核心线程数,且任务队列已满
若线程数小于最大线程数,创建线程
若线程数等于最大线程数,抛出异常,拒绝任务

三、如何设置参数

默认值

corePoolSize=1
queueCapacity=Integer.MAX_VALUE
maxPoolSize=Integer.MAX_VALUE
keepAliveTime=60s
allowCoreThreadTimeout=false
rejectedExecutionHandler=AbortPolicy()
如何来设置

需要根据几个值来决定

tasks :每秒的任务数,假设为500~1000
taskcost:每个任务花费时间,假设为0.1s
responsetime:系统允许容忍的最大响应时间,假设为1s
做几个计算

corePoolSize = 每秒需要多少个线程处理?

threadcount = tasks/(1/taskcost) =tasks*taskcout = (500~1000)*0.1 = 50~100 个线程。corePoolSize设置应该大于50
根据8020原则,如果80%的每秒任务数小于800,那么corePoolSize设置为80即可
queueCapacity = (coreSizePool/taskcost)*responsetime

计算可得 queueCapacity = 80/0.1*1 = 80。意思是队列里的线程可以等待1s,超过了的需要新开线程来执行
切记不能设置为Integer.MAX_VALUE,这样队列会很大,线程数只会保持在corePoolSize大小,当任务陡增时,不能新开线程来执行,响应时间会随之陡增。
maxPoolSize = (max(tasks)- queueCapacity)/(1/taskcost)

计算可得 maxPoolSize = (1000-80)/10 = 92
(最大任务数-队列容量)/每个线程每秒处理能力 = 最大线程数
rejectedExecutionHandler:根据具体情况来决定,任务不重要可丢弃,任务重要则要利用一些缓冲机制来处理
keepAliveTime和allowCoreThreadTimeout采用默认通常能满足

参数设置不知道copy谁的,当时觉得写得好就给复制到本地了,现在贴出了,给原作者点赞

 

 

 

原文地址:https://www.cnblogs.com/zhaiyt/p/9469902.html