springboot多线程TaskExecutor的使用,以及使用@Async实现异步调用

@Async实现异步调用

pom.xml

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

启动类

@EnableAsync
@SpringBootApplication
public class LearnutilsApplication {
 
   public static void main(String[] args) {
      SpringApplication.run(LearnutilsApplication.class, args);
   }
 
   /**
    * 核心线程数10:线程池创建时初始化的线程数
    * 最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    * 缓冲队列200:用来缓冲执行任务的队列
    * 允许线程的空闲时间60秒:超过了核心线程数之外的线程,在空闲时间到达之后会被销毁
    * 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
    * 线程池对拒绝任务的处理策略:此处采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在execute方法的调用线程中运行被拒绝的任务;如果执行程序已被关闭,则会丢弃该任务
    * 设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
    * 设置线程池中任务的等待时间,如果超过这个时候还没有销毁就强制销毁,以确保应用最后能够被关闭,而不是阻塞住
    */
   @EnableAsync
   @Configuration
   class TaskPoolConfig{
      @Bean("taskExecutor")
      public Executor taskExecutor(){
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(10);
         executor.setMaxPoolSize(20);
         executor.setQueueCapacity(200);
         executor.setKeepAliveSeconds(60);
         executor.setThreadNamePrefix("TaskExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         executor.setWaitForTasksToCompleteOnShutdown(true);
         executor.setAwaitTerminationSeconds(60);
         return executor;
      }
   }
 
}

定义controller

@RequestMapping(value = "/AsyncController")
@RestController
public class AsyncController {
 
    @Autowired
    private AsyncService asyncService;
 
    @Autowired
    private AsyncService2 asyncService2;
 
    @Autowired
    private AsyncService3 asyncService3;
 
    @GetMapping(value = "/sendSms")
    public String sendSms() throws Exception{
       
        Future<String> sms = asyncService.sendSms();
        Future<String> sms2 = asyncService2.sendSms();
        Future<String> sms3 = asyncService3.sendSms();
        int i = 0;
        for (;;) {
            //如果都执行完就跳出循环,isDone方法,如果此线程执行完,true
            if (sms.isDone() && sms2.isDone() && sms3.isDone()) {
                break;
            }
        }
        //get是获取结果集
        return sms.get()+sms2.get()+sms3.get();
    }
}

定义接口

public interface AsyncService {
    Future<String> sendSms();
}

实现类

@Service
public class AsyncServiceImpl implements AsyncService {
    //Future<String> 返回结果 AsyncResult<String>
    @Async("taskExecutor")
    @Override
    public Future<String> sendSms() {
        return new AsyncResult<>("000000");
    }
}

将isDone换程CountDownLatch来判断线程是否执行完实例化CountDownLatch并且制定线程个数,线程个数就是从本地异步调用的方法个输,并且传入线程任务中,每个线程执行完毕就调用countDown()方法。最后在调用await()方法。这样在线程计数为零之前,线程就会一直等待。

AsyncResult用来封装结果集,否则结果集无法返回

@GetMapping(value = "/sendSms2")
public String sendSms2() throws Exception{
    CountDownLatch downLatch = new CountDownLatch(3);
    Future<String> s = asyncService4.sendSms(downLatch);
    Future<String> s1 = asyncService5.sendSms(downLatch);
    Future<String> s2 = asyncService6.sendSms(downLatch);
    downLatch.await();
    return s.get()+s1.get()+s2.get();
}

将CountDownLatch传给方法

public interface AsyncService4 {
    Future<String> sendSms(CountDownLatch downLatch);
}

方法

@Service
public class AsyncService4Impl implements AsyncService4 {

    @Async("taskExecutor")
    @Override
    public Future<String> sendSms(CountDownLatch downLatch) {
        downLatch.countDown();
        return new AsyncResult<>("11111");
    }
}

TaskExecutor的使用

注册TaskExecutor

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * @author yanjun
 * @date 2019/8/1 16:04
 **/
@Configuration
public class MainConfiguration {
    @Bean
    public TaskExecutor getTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 设置核心线程数
        executor.setCorePoolSize(5);
        // 设置最大线程数
        executor.setMaxPoolSize(10);
        // 设置队列容量
        executor.setQueueCapacity(20);
        // 设置线程活跃时间(秒)
        executor.setKeepAliveSeconds(60);
        // 设置默认线程名称
        executor.setThreadNamePrefix("post-lending-");
        // 设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任务结束后再关闭线程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

使用TaskExecutor

@Autowired
private TaskExecutor taskExecutor;
 
public ResultVO findHandlingRecordByAssociationId(Integer associationId) throws InterruptedException{
    Map<String, Object> map = new HashMap<>(2);
   //线程计数器(等待所有线程执行完统一返回)
    CountDownLatch countDownLatch = new CountDownLatch(10);
    taskExecutor.execute(() -> {
        try {
            //service调用
            map.put("HandlingRecord", legalLitigationService.findHandlingRecordByAssociationId(associationId));
        }finally {
            countDownLatch.countDown();
        }
    });
    taskExecutor.execute(() -> {
        try {
            map.put("CaseBasic", legalLitigationService.findCaseDetailsById(associationId));
        }finally {
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    return ResultVO.putSuccess(map);
}
定位问题原因* 根据原因思考问题解决方案* 实践验证方案有效性* 提交验证结果
原文地址:https://www.cnblogs.com/jimoliunian/p/13740010.html