Springboot定时任务@Scheduled,异步任务@Async

Springboot定时任务

一、注解

@EnableScheduling和@Scheduled

定时任务Schedule,Spring调度默认则是顺序执行的, 使用场景适用于定时任务为固定周期。(如果要改变周期需要重启项目)

eg:

@Scheduled(cron = "0/5 * * * * ?")
public void test(){
    
}

二、基于接口

适用场景为任务周期经常变化,cron表达式来自于数据库获取。

@Slf4j
@Configuration
public class AsyncAndScheduleConf implements SchedulingConfigurer {
    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        taskRegistrar.addTriggerTask(
                //1.添加任务内容(Runnable)
                () -> System.out.println("执行动态定时任务: " + LocalDateTime.now().toLocalTime()),
                //2.设置执行周期(Trigger)
                triggerContext -> {
                    //2.1 从数据库获取执行周期
                    String cron = cronMapper.getCron();
                    //2.2 合法性校验.
                    if (StringUtils.isEmpty(cron)) {
                        // Omitted Code ..
                    }
                    //2.3 返回执行周期(Date)
                    return new CronTrigger(cron).nextExecutionTime(triggerContext);
                }
        );
    }
}

三、异步任务

@EnableAsync和@Async
默认情况下,Spring 使用SimpleAsyncTaskExecutor去执行这些异步方法(此执行器没有限制线程数)。此默认值可以从两个层级进行覆盖。

如果方法级别或应用级别未配置线程池,在使用SimpleAsyncTaskExecutor因为创建了大量线程极有可能造成OOM,以下贴出部分SimpleAsyncTaskExecutor源码会说明为何创建无限制的线程。

public void execute(Runnable task) {
        this.execute(task, 9223372036854775807L);
}

public void execute(Runnable task, long startTimeout) {
        Assert.notNull(task, "Runnable must not be null");
        Runnable taskToUse = this.taskDecorator != null ? this.taskDecorator.decorate(task) : task;
    //判断是否开启限流
        if (this.isThrottleActive() && startTimeout > 0L) {
            //限流入口
            this.concurrencyThrottle.beforeAccess();
            this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(taskToUse));
        } else {
            this.doExecute(taskToUse);
        }

    }

是否限流判断isThrottleActive()中属性concurrencyLimit是否大于0,默认为-1,所以未进行限流。

如果开启了限流,进入beforeAccess() 方法会判断线程数是否超过concurrencyLimit,若超过则当前线程wait,其他线程执行完成后当前线程则会notify。

所以,使用@Async从以下几点入手:

Case1:使用默认的SimpleAsyncTaskExecutor

为了防止创建过多线程,配置线程的限制数

@Configuration
public class ThreadLimitConfig extends AsyncConfigurerSupport {
    @Override
    public Executor getAsyncExecutor() {
        SimpleAsyncTaskExecutor executor = new SimpleAsyncTaskExecutor();
        //设置允许同时执行的线程数为10
 executor.setConcurrencyLimit(10);
        return executor;
    }
}
Case2:使用指定的线程池
@Async("myThreadPoolConfig")
public void mehtod1(){
    //todo
}
Case3:全局配置

@Configuration
@EnableAsync
public class AsyncConfiguration implements AsyncConfigurer {
    // 声明一个线程池(并指定线程池的名字,默认是方法名称)
    @Bean("taskExecutor")
    public Executor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程数5:线程池创建时候初始化的线程数
        executor.setCorePoolSize(5);
        //最大线程数5:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
        executor.setMaxPoolSize(5);
        //缓冲队列大小:用来缓冲执行任务的队列
        executor.setQueueCapacity(500);
        //允许线程的空闲时间60秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
        executor.setKeepAliveSeconds(60);
        //线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
        executor.setThreadNamePrefix("线程名-");
 
          //不在新线程中执行任务,而是用调用者所在的线程来执行
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //执行初始化
        executor.initialize();
 
        return executor;
 
    }
    //异常处理
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new MyAsyncUncaughtExceptionHandler();
    }
}
class MyAsyncUncaughtExceptionHandler implements AsyncUncaughtExceptionHandler {
    @Override
    public void handleUncaughtException(Throwable ex, Method method, Object... params) {
        System.out.println("class#method: " + method.getDeclaringClass().getName() + "#" + method.getName());
        System.out.println("type        : " + ex.getClass().getName());
        System.out.println("exception   : " + ex.getMessage());
 
    }
}

当异步方法有返回值时,可以捕获到异常信息,当无返回值时想要捕获异常需要使用以上配置追踪异常信息。

@Async使用注意:方法不能使用static修饰符,否则注解失效。

关于@EnableAsync

默认启动流程:
1 AnnotationAsyncExecutionInterceptor#getDefaultExecutor方法

搜索关联的线程池定义:上下文中唯一的 TaskExecutor 实例,或一个名为 taskExecutorjava.util.concurrent.Executor 实例;
2 如果以上都没找到,则会使用 SimpleAsyncTaskExecutor 处理异步方法调用

关于springframework提供的类:ThreadPoolTaskScheduler与ThreadPoolTaskExecutor

一个ThreadPoolTaskExecutor通过它的corePoolSize , maxPoolSize , keepAliveSeconds和queueCapacity属性在线程池中提供细粒度的配置。 诸如ThreadPoolTaskScheduler这样的调度器不提供这样的配置。
spring中的线程调度类也是juc包中的间接实现。
因此,在两者之间进行选择归结为以下问题:是否需要执行或计划执行任务?根据不同的用途去选择就可以

参考:https://stackoverflow.com/questions/33453722/spring-threadpooltaskscheduler-vs-threadpooltaskexecutor

juc包中还提供了ScheduledThreadPoolExecutor使用,内部包含一个无界阻塞队列,类似这种代码如果没有进行控制 一定会导致oom。

原文地址:https://www.cnblogs.com/mzc1997/p/14357011.html