SpringBoot:整合线程池

hello world

pom依赖:(我这里用的boot版本是2.4.2)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

测试Controller:

@RestController
public class TestController {

    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @RequestMapping("test")
    public String test() throws InterruptedException {

        threadPoolTaskExecutor.execute(()->{
            System.out.println("开始");
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("结束");
        });
        return "hello";
    }
}

启动项目后,我们发现ThreadPoolTaskExecutor可以注入到TestController,说明这个类也是被boot自动配置的,并且该方法execute确实是异步执行的

@EnableAsync和@Async

无返回值异步

我们在启动类上标注@EnableAsync注解

@EnableAsync
@SpringBootApplication
public class BootAsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootAsyncApplication.class, args);
    }

}

编写controller和service

@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @Async
    @RequestMapping("test")
    public void test() throws InterruptedException {
        System.out.println("开始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("结束");
    }

    @RequestMapping("test2")
    public String test2(){
        testService.ok();
        return "ok";
    }
}

@Service
public class TestService {

    @Async
    public void ok() {
        System.out.println("service开始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("service结束");

    }
}

我在controller的一个方法和service的一个方法上加上了@Async注解,调用接口观察效果

访问接口后,立马返回结果,后台线程异步执行并打印结果。

有返回值异步

上面测试的是无返回值异步调用。

下面介绍有返回值异步调用,并接受返回值

我们知道在juc中通过Future和Callable可以获取异步调用返回值,spring中支持了AsyncResult类支持获取方法返回值,但其实该类实现的也是Future。

修改test2方法:

    @RequestMapping("test2")
    public String test2() throws ExecutionException, InterruptedException {
        Future<String> ok = testService.ok();
        //Future的get会被阻塞,需要注意
        return ok.get();
    }

@Service
public class TestService {

    @Async
    public Future<String> ok() {
        System.out.println("Execute method asynchronously - "
                + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            return new AsyncResult<>("hello world !!!!");
        } catch (InterruptedException ignored) {
        }
        return null;
    }
}

测试效果:

自动配置类:TaskExecutionAutoConfiguration

image-20210128093624185

大致就是先创建一个TaskExecutorBuilder,然后通过它去创建一个ThreadPoolTaskExecutor。

注意了,如果容器中有Executor,那么默认的ThreadPoolTaskExecutor就不会自动创建了。

注意:这里有一个大坑!!!我们注意到,默认的ThreadPoolTaskExecutor的参数是从TaskExecutionProperties.Pool中取得的,点进去。

image-20210128094032767

竟然发现阻塞队列大小和线程数最大值都是Integer.MAX_VALUE,这种配置极其容易造成生产事故(生产任务大量堆积,OOM异常等)

所以一定不要使用springboot默认配置的线程池!!!

自定义springboot线程池

springboot给了我们两种解决方案

配置参数

image-20210128094649536

我们可以通过配置spring.task.execution来修改配置

spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=30

自定义线程池

这种方式比上一种更好,我们可以自定义线程池,定制化更好,包括我们可以自定义线程异常处理器

我们可以实现AsyncConfigurer接口,并重写接口中的方法

@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {

    @Bean("WjTaskExecutor")
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(50);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        threadPoolTaskExecutor.setQueueCapacity(20);
        threadPoolTaskExecutor.setKeepAliveSeconds(300);
        threadPoolTaskExecutor.setThreadNamePrefix("wj-");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //设置线程池关闭的时候等待所有任务都完成再继续销毁其他的bean
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //设置线程池中任务的等待时间,如果超过这个时候还没有就强制销毁,以确保应用最后能够关闭而不是阻塞住
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params)->{
            log.error("调用异步方法异常: 
" + method, ex);
        };
    }
}

修改controller的test方法:

    @Async
    @RequestMapping("test")
    public void test() throws InterruptedException {
        System.out.println("开始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int i = 1/0;
        System.out.println("结束");
    }

测试:

原文地址:https://www.cnblogs.com/wwjj4811/p/14338597.html