多线程异步调度任务

业务场景:Controller接受一个参数Object,调到业务层,把Object加入队列,成功就返回,同时开启线程做异步的消息发送。

目录结构:

CreatThreadPool.java

 1 public class CreatThreadPool {
 2 
 3     private ThreadPoolExecutor pool;
 4 
 5     public CreateThreadPool(long maxAlive, int maxSize, int minSize, int queueSize) {
 6         pool = new ThreadPoolExecutor(minSize, maxSize, maxAlive, TimeUnit.SECONDS, new LinkedBlockingQueue(queueSize));
 7     }
 8 
 9     public Future<String> submit(Runnable task) {
10         return pool.submit(task, null);
11     }
12 
13     @ManagedAttribute
14     public String getPoolStatus() {
15         JSONObject json = new JSONObject();
16         json.put("activeCount", pool.getActiveCount());
17         json.put("taskCount", pool.getTaskCount());
18         json.put("completedTaskCount", pool.getCompletedTaskCount());
19         return json.toJSONString();
20     }
21 }

CreatThread.java

 1 public class CreatThread implements Runnable{
 2 
 3     private Object obj;
 4     
 5     private ApplicationContext context;
 6 
 7     @Override
 8     public void run() {
 9 
10         try {
11             sendMessage();
12         } catch (Exception e) {
13             e.getMessage();
14         }
15     }
16 
17     public void sendMessage() {
18           //因为此处是线程类所以spring的bean加载不进来,需要使用ApplicationContext来获取
19         MQPublisher mQPublisher=(MQPublisher) context.getBean(MQPublisher.class);
20 
21         Object obj=(MQLinkDTO) context.getBean(Object.class);
22         //此处是为了把对象转换为JSON格式,所以用了阿里的fastjson
23         JSONObject json=(JSONObject) JSON.toJSON(noticeDTO);
24         
25         mQPublisher.sendMessage(mQLinkDTO.getName(), mQLinkDTO.getKey(),json);
26     }
27 
28     public void setObject(Object obj) {
29         this.obj = obj;
30     }
31     
32     public void setContext(ApplicationContext context) {
33         this.context = context;
34     }
35 }

Disptch.java

 1 public class Dispatch implements Runnable,ApplicationContextAware{
 2 
 3     
 4     private LinkedBlockingQueue<Object> queue;
 5     
 6     private ApplicationContext context;
 7     //注入一个空的线程池
 8     @Autowired
 9     private CreatThreadPool pool;
10     
11     public Dispatcher(int queueSize) {
12         //初始化队列
13         queue=new LinkedBlockingQueue<>(queueSize);
14     }
15 
16     @Override
17     public void run() {
18         while (true) {
19             Object obj = null;
20             try {
21                 obj = queue.take();
22                 
23                 if (obj != null) {
24                //创建线程,把object传过去,并且把context传过去,把线程放到线程池,就绪
25                     CreatThread thread = new CreatThread();
26                     thread.setObject(obj);
27                     thread.setContext(context);
28                     pool.submit(thread);
29                 }
30             } catch (Exception e) {
31                 e.printStackTrace();
32             }
33         }
34         
35     }
36 
37     public boolean offer(Object obj) {
38         //将obj加入队列,成功返回true
39         boolean result = queue.offer(obj);
40         return result;
41     }
42 
43     @Override
44     public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
45        //获取上下文对象
46         this.context=applicationContext;
47     }
48 }

LoadedConfiguration.java

@Configuration
public class LoadedConfiguration {

//读取配置文件,生产Pool、Dispatcher、MQLinkDTO的bean
    @Bean(name="CreatThreadPool")
    public CreatThreadPool loadThreadPool() {
        return new CreatThreadPool(
                Long.parseLong((String) System.getProperty("threadpool.maxAliveTime")),
                Integer.parseInt((String) System.getProperty("maxSize")),
                Integer.parseInt((String) System.getProperty("threadpool.minSize")),
                Integer.parseInt((String) System.getProperty("threadpool.queueSize")));
    }

    @Bean(name="Dispatcher")
    public CreatDispatcher creatDispatcher() {
        Dispatcher dispatcher = new Dispatcher(Integer.parseInt((String) System.getProperty("blockingqueue.size")));
        Thread thread = new Thread(creatDispatcher);
        thread.setName("create.dispatcher");
        thread.start();
        return dispatcher;
    }
    
    @Bean(name="MQLinkDTO")
    public MQLinkDTO creatMQLinkDTO() {
        MQLinkDTO mQLinkDTO = new MQLinkDTO(System.getProperty("topic_Name"),System.getProperty("topic_Key"));
        return mQLinkDTO;
    }
}

Service.java

 1 @Service
 2  public class Service {
 3       
 4       @Autowired
 5       private Dispatcher dispatcher;
 6       
 7       public boolean sendMQ(Object obj){
 8           return dispatcher.offer(obj);
 9       }
10  }

Controller.java

 1 @RestController
 2 public class Controller {
 3 
 4     @Autowired
 5     private Service service;
 6     
 7     @ApiOperation(value = "获取消息提醒", notes = "获取消息提醒")
 8     @RequestMapping(value = "/get", method = RequestMethod.POST)
 9     public Result<NoticeDTO> get(@RequestBody Object obj){
10         if(service.sendMQ(obj)){
11             return new Result<Object>(obj, ResponseCode.SUCCESS);
12         }else{
13                     return new Result<Object>(obj, ResponseCode.Fail);
14                 }    
15     }
16 }

 顺序:JVM首先加载LoadedConfiguration.calss 生成Bean,然后客户端发送请求,经过controller->service->dispatch(调度线程,加入线程池等待就绪)

tip:

1.线程中不支持使用spring的依赖注入

2.在线程中想要获取其他的Bean,需要用到ApplicationContext对象。context.getBean(object.class)

3.做异步处理所以不需要对传入参数加锁,反之需要使用synchronized (obj) {} 对obj加锁,并且使用wait阻塞线程,等待唤醒。

4.使用队列(LinkedBlockingQueue)时,使用offer方法判断是否加入队列,并且在取对象时(take)记得判空。

5.JSON.toJSONString方法可以转Object为Json串,但是会产生转义字符,所以要加入到JSONObject中。

原文地址:https://www.cnblogs.com/wangzun/p/7170273.html