线程池的创建以及@Async 注解的使用

1.说在前边

在同一个类中,一个方法调用另外一个有注解(比如@Async,@Transational)的方法,注解是不会生效的。

2. SpringBoot自定义线程池

2.1 修改application.properties
task.pool.corePoolSize=20
task.pool.maxPoolSize=40
task.pool.keepAliveSeconds=300
task.pool.queueCapacity=50
2.2 线程池配置属性类
package com.dyaqi.async.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

/**
 * @author: dongyq
 * @date: 2021/12/24 15:59
 * @since:
 * @功能描述: 线程池配置属性类
 */
@ConfigurationProperties(prefix = "task.pool")
public class TaskThreadPoolConfig {

    private int corePoolSize;

    private int maxPoolSize;

    private int keepAliveSeconds;

    private int queueCapacity;

    //...getter and setter methods...
    
}

2.3 创建线程池
package com.dyaqi.async.config;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: dongyq
 * @date: 2021/12/24 16:02
 * @since:
 * @功能描述: 自定义线程池
 */
@Configuration
@EnableAsync
public class TaskExecutePool {

    @Autowired
    private TaskThreadPoolConfig config;

    @Bean("myTaskAsyncPool")
    public Executor myTaskAsyncPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        //活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //线程名字前缀
        executor.setThreadNamePrefix("TaskPool-");

        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

上面我们通过使用ThreadPoolTaskExecutor创建了一个线程池,同时设置了以下这些参数:

  • 核心线程数20:线程池创建时候初始化的线程数
  • 最大线程数40:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
  • 缓冲队列50:用来缓冲执行任务的队列
  • 允许线程的空闲时间300秒:当超过了核心线程出之外的线程在空闲时间到达之后会被销毁
  • 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
  • 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务

注:处理策略

ThreadPoolExecutor.AbortPolicy 丢弃任务并抛出RejectedExecutionException异常(默认)。
ThreadPoolExecutor.DiscardPolic 丢弃任务,但是不抛出异常。
ThreadPoolExecutor.DiscardOldestPolicy 丢弃队列最前面的任务,然后重新尝试执行任务
ThreadPoolExecutor.CallerRunsPolic 由调用线程处理该任务(常用)

说明:setWaitForTasksToCompleteOnShutdown(true)该方法就是这里的关键,用来设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean,这样这些异步任务的销毁就会先于Redis线程池的销毁。同时,这里还设置了setAwaitTerminationSeconds(60),该方法用来设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住。

2.4 创建异步线程任务
package com.dyaqi.async.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @author: dongyq
 * @date: 2021/12/24 16:05
 * @since:
 * @功能描述: 异步线程调用(不能和调用方法同类)
 */
@Component
public class AsyncTask {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async("myTaskAsyncPool") /*使用指定的线程池*/
    public void doTaskPool(int i) {
        logger.info("Task" + i + " started.");
    }

}

2.5 修改启动类

给启动类添加注解

@EnableAsync
@EnableConfigurationProperties({TaskThreadPoolConfig.class} ) // 开启配置属性支持
2.6 测试
package com.dyaqi.async.service.impl;

import com.dyaqi.async.config.AsyncTask;
import com.dyaqi.async.service.IService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * @author: dongyq
 * @date: 2021/12/24 15:43
 * @since:
 * @功能描述: ServiceImpl
 */
@Service
public class ServiceImpl implements IService {

    @Autowired
    private AsyncTask asyncTask;

    @Override
    public String get1() {
        for (int i = 0; i < 100; i++) {
            asyncTask.doTaskPool(i);
        }
        return "he";
    }

}

注:Controller和IService自行创建

3. 修改SpringBoot默认线程池

因为上面的那个线程池使用时候总要加注解@Async("myTaskAsyncPool"),(业务系统中的多处需要修改)如果我们想使用默认的线程池,即使用异步线程池时还是使用@Async的注解。但是只是想修改默认线程池的配置,将默认的异步线程池的参数可配置化,方便系统的调优。

具体实现有以下方案:

  • 重新实现接口AsyncConfigurer
  • 继承AsyncConfigurerSupport
  • 配置由自定义的TaskExecutor替代内置的任务执行器

接下来我们介绍两种。

3.1实现AsyncConfigurer类

源码如下:

public interface AsyncConfigurer {
    @Nullable
    default Executor getAsyncExecutor() {
        return null;
    }

    @Nullable
    default AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

说明:

Executor : 处理异步方法调用时要使用的实例,

AsyncUncaughtExceptionHandler :在使用void返回类型的异步方法执行期间抛出异常时要使用的实例。

3.1.1 获取属性配置类

这个和上面的TaskThreadPoolConfig类相同,这里不重复。

3.1.2 装配线程池
package com.dyaqi.async.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: dongyq
 * @date: 2021/12/24 16:32
 * @since:
 * @功能描述: 修改SpringBoot默认线程池
 */
@Configuration
public class ImplAsyncTaskExecutePool implements AsyncConfigurer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    TaskThreadPoolConfig config;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        //活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //线程名字前缀
        executor.setThreadNamePrefix("ImplTaskPool-");

        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                logger.error("==========================" + ex.getMessage() + "=======================", ex);
                logger.error("exception method:" + method.getName());
            }
        };
    }
}

3.1.3 创建异步线程任务
package com.dyaqi.async.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

/**
 * @author: dongyq
 * @date: 2021/12/24 16:05
 * @since:
 * @功能描述: 异步线程调用(不能和调用方法同类)
 */
@Component
public class AsyncTask {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Async /*使用默认的线程池(已被我们修改)*/
    public void doNativeTaskPool(int i) {
        logger.info("Task" + i + " started.");
    }
}

3.1.4 测试

ServiceImpl添加方法

	@Override
    public String get2() {
        for (int i = 0; i < 100; i++) {
            asyncTask.doNativeTaskPool(i);
        }
        return "he";
    }

3.2继承AsyncConfigurerSupport类

源码如下:

public class AsyncConfigurerSupport implements AsyncConfigurer {
    public AsyncConfigurerSupport() {
    }

    public Executor getAsyncExecutor() {
        return null;
    }

    @Nullable
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

说明:

Executor : 处理异步方法调用时要使用的实例,

AsyncUncaughtExceptionHandler :在使用void返回类型的异步方法执行期间抛出异常时要使用的实例。

3.2.1 获取属性配置类

这个和上面的TaskThreadPoolConfig类相同,这里不重复。

3.2.2 装配线程池
package com.dyaqi.async.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.lang.reflect.Method;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @author: dongyq
 * @date: 2021/12/24 16:48
 * @since:
 * @功能描述: 修改SpringBoot默认线程池
 */
@Configuration
public class ExtendsAsyncTaskExecutePool extends AsyncConfigurerSupport {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    TaskThreadPoolConfig config;

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心线程池大小
        executor.setCorePoolSize(config.getCorePoolSize());
        //最大线程数
        executor.setMaxPoolSize(config.getMaxPoolSize());
        //队列容量
        executor.setQueueCapacity(config.getQueueCapacity());
        //活跃时间
        executor.setKeepAliveSeconds(config.getKeepAliveSeconds());
        //线程名字前缀
        executor.setThreadNamePrefix("ExtTaskPool-");

        // setRejectedExecutionHandler:当pool已经达到max size的时候,如何处理新任务
        // CallerRunsPolicy:不在新线程中执行任务,而是由调用者所在的线程来执行
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncUncaughtExceptionHandler() {
            @Override
            public void handleUncaughtException(Throwable ex, Method method, Object... params) {
                logger.error("==========================" + ex.getMessage() + "=======================", ex);
                logger.error("exception method:" + method.getName());
            }
        };
    }
}

3.2.3 接下来和上面相同,这里不重复。

4.有返回值Future调用

4.1 创建异步线程任务
	@Async
    public Future<String> doNativeTaskPoolRe1() throws InterruptedException {
        logger.info("开始执行任务一");
        long l1 = System.currentTimeMillis();
        Thread.sleep(2000);
        long l2 = System.currentTimeMillis();
        logger.info("任务一用时:" + (l2 - l1));
        return new AsyncResult<>("任务一完成");
    }

    @Async
    public Future<String> doNativeTaskPoolRe2() throws InterruptedException {
        logger.info("开始执行任务二");
        long l1 = System.currentTimeMillis();
        Thread.sleep(2000);
        long l2 = System.currentTimeMillis();
        logger.info("任务二用时:" + (l2 - l1));
        return new AsyncResult<>("任务二完成");
    }

    @Async
    public Future<String> doNativeTaskPoolRe3() throws InterruptedException {
        logger.info("开始执行任务三");
        long l1 = System.currentTimeMillis();
        Thread.sleep(2000);
        long l2 = System.currentTimeMillis();
        logger.info("任务三用时:" + (l2 - l1));
        return new AsyncResult<>("任务三完成");
    }
4.2 测试
	@Override
    public String get3() {
        try {
            logger.info("开始访问");
            long l1 = System.currentTimeMillis();
            Future<String> poolRe1 = asyncTask.doNativeTaskPoolRe1();
            Future<String> poolRe2 = asyncTask.doNativeTaskPoolRe2();
            Future<String> poolRe3 = asyncTask.doNativeTaskPoolRe3();
            while (true) {//死循环,每隔2000ms执行一次,判断一下这三个异步调用的方法是否全都执行完了。
                if (poolRe1.isDone() && poolRe2.isDone() && poolRe3.isDone()) {//使用Future的isDone()方法返回该方法是否执行完成
                    //如果异步方法全部执行完,跳出循环
                    break;
                }
                Thread.sleep(2000);//每隔2000毫秒判断一次
            }
            long l2 = System.currentTimeMillis();//跳出while循环时说明此时三个异步调用的方法都执行完成了,此时得到当前时间
            String result = poolRe1.get();
            logger.info("结束访问,用时:" + (l2 - l1));
            logger.info("使用get方法获得的返回内容:" + result);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return "he";
    }
4.3 结论

原文地址:https://www.cnblogs.com/dyaqi/p/15728550.html