SpringBoot自定义线程池处理异步任务

@Async异步调用

就不解释什么是异步调用了,Spring Boot中进行异步调用很简单

1.通过使用@Async注解就能简单的将原来的同步函数变为异步函数

package com.winner.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

/**
 * @author winner_0715
 * @date 2018/12/06
 */
@Service
public class TaskServer {

    @Async
    public void doTaskA() throws InterruptedException {
        System.out.println("TaskA thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);

        Long endTime = System.currentTimeMillis();
        System.out.println("TaskA 耗时:" + (endTime - startTime));
    }

    @Async
    public void doTaskB() throws InterruptedException {
        System.out.println("TaskB thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);
        Long endTime = System.currentTimeMillis();
        System.out.println("TaskB耗时:" + (endTime - startTime));
    }
}

为了让@Async注解能够生效,还需要在Spring Boot的主程序中配置@EnableAsync,如下所示:

package com.winner;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
/**
 * @author winner_0715
 * @date 2018/12/06
 */
@SpringBootApplication
@EnableAsync
public class SpringBootAsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootAsyncApplication.class, args);
    }
    
}

注: @Async所修饰的函数不要定义为static类型,这样异步调用不会生效

测试

package com.winner.web;

import com.winner.service.TaskServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author winner_0715
 * @description:
 * @date 2018/12/6
 */
@RestController
public class HelloController {

    @Resource
    private TaskServer taskServer;

    @GetMapping("/async")
    public String testAsync() throws Exception {
        System.out.println("主线程 name -->" + Thread.currentThread().getName());
        taskServer.doTaskA();
        taskServer.doTaskB();
        return "Hello World";
    }
}

任务线程和主线程的名称不同,表明是异步执行的!

自定义线程池

前面介绍使用@Async注解来实现异步调用了。对于这些异步执行的控制是我们保障自身应用健康的基本技能。下面介绍通过自定义线程池的方式来控制异步调用的并发。

定义线程池

第一步,定义一个线程池,比如:

package com.winner.threadpool;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @author winner_0715
 */
@Configuration
public class ThreadPoolExecutorConfig {
    private static final int THREADS = Runtime.getRuntime().availableProcessors() + 1;
    final ThreadFactory threadFactory = new ThreadFactoryBuilder()
            // -%d不要少
            .setNameFormat("async-task-name-%d")
            .setDaemon(true)
            .build();

    @Bean("taskExecutor")
    public Executor taskExecutor() {
        return new ThreadPoolExecutor(THREADS, 2 * THREADS,
                5, TimeUnit.SECONDS,
                new LinkedBlockingQueue<>(1024),
                threadFactory, (r, executor) -> {
            // 打印日志,添加监控等
            System.out.println("task is rejected!");
        });
    }
}

上面我们通过使用ThreadPoolExecutor创建了一个线程池

使用线程池

在定义了线程池之后,我们如何让异步调用的执行任务使用这个线程池中的资源来运行呢?方法非常简单,我们只需要在@Async注解中指定线程池名即可,比如:

package com.winner.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.concurrent.TimeUnit;

/**
 * @author winner_0715
 * @date 2018/12/06
 */
@Service
public class TaskServer {

    @Async("taskExecutor")
    public void doTaskA() throws InterruptedException {
        System.out.println("MsgServer send A thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);

        Long endTime = System.currentTimeMillis();
        System.out.println("MsgServer send A 耗时:" + (endTime - startTime));
    }

    @Async("taskExecutor")
    public void doTaskB() throws InterruptedException {
        System.out.println("MsgServer send B thread name->" + Thread.currentThread().getName());
        Long startTime = System.currentTimeMillis();
        TimeUnit.SECONDS.sleep(2);
        Long endTime = System.currentTimeMillis();
        System.out.println("MsgServer send B耗时:" + (endTime - startTime));
    }
}

测试

package com.winner.web;

import com.winner.service.TaskServer;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author winner_0715
 * @description:
 * @date 2018/12/6
 */
@RestController
public class HelloController {

    @Resource
    private TaskServer taskServer;

    @GetMapping("/async")
    public String testAsync() throws Exception {
        System.out.println("主线程 name -->" + Thread.currentThread().getName());
        taskServer.doTaskA();
        taskServer.doTaskB();
        return "Hello World";
    }
}

测试结果:

主线程 name -->http-nio-8080-exec-1
MsgServer send A thread name->async-task-name-0
MsgServer send B thread name->async-task-name-1
MsgServer send A 耗时:2001
MsgServer send B耗时:2001

 

原文地址:https://www.cnblogs.com/winner-0715/p/10076832.html