聊一聊面试中常问的延时队列

  记得去年面试阿里的时候,就问到了一个问题,延时队列是怎么实现的,我当时对这个理解的不是很深,就回答了我们java中会用到DelayQueue实现,说了一下使用PriorityQueue队列实现,他可能也没用过,而我理解也不到位,在这个问题上聊了半天也没聊到点子上,也就不了了之了,过完年第一天上班没啥事,结合之前写的一个延时队列分布式组件和刚刚写的一个单系统延时队列,谈谈我们的延时队列一般都如何实现。

1.什么是延时队列

  我们所有的组件和功能,都是为了解决问题的,一般解决我们自己的问题的会直接在代码中实现,而解决一类问题的我们会将其抽成一个组件封装起来,就像我们今天聊的延时队列,他就是为了解决我们经常会遇到的比如我想给你一个任务,我要延迟一段时间去执行这样一系列的问题,我们会将这些任务和执行时间给封装起来,可以根据时间定时轮询去查找可执行任务并执行,而我想要定制化时间,可配置化之类的,将这些都实现出来,就可以称之为一个中间件了,可以给所有人用的。

2.延时队列如何实现

  我第一次听说这个概念的时候,我心里产生了几个疑惑,1.延时队列如何知道何时执行 ,2.延时队列如何保证在很多个任务时,能及时执行。我想,这也是很多人都会好奇的问题,带着问题我们去思考:

  1.延时队列如何知道何时执行:延时队列保存一个个的任务,任务对象封装的属性需要包含时间,我们根据这个时间去判断是否可执行,但是这样也会有一个问题,我如果每次都遍历队列中所有值,则会导致性能太差,因此我们必须根据时间去排序,保证只要当前任务执行不了,后面的就不需要管。

  2.时队列如何保证在很多个任务时,能及时执行:其实这个我们是没办法去彻底解决的,因为我们的性能不可能满足那么多任务同时可以执行,但是延时任务一般是不会有那么高的时效性的,当然我们也需要根据业务需要设置线程池去执行,提高线程数量,增加机器数量,经过压测,保证可以达到在一定范围内的偏差。

  其实解决了这两个问题,基本延时队列的模型也出来:一个队列里面放置一个个包含所需执行时间的顺序务节点,定时轮询获取可执行任务放入线程池执行,理论已经有了,我们先看一个单系统代码实现:

 1 public class DelayService implements Delayed, Runnable {
 2     private volatile AtomicLong delayTime = new AtomicLong(0);
 3     private String content;
 4     
 5     DelayService(String content) {
 6         this.content = content;
 7     }
 8 
 9     @Override
10     public long getDelay(TimeUnit unit) {
11         return this.delayTime.get() - System.currentTimeMillis();
12     }
13 
14     public void setDelayTime(TimeUnit unit, Long time) {
15         delayTime.set(unit.convert(time, unit));
16     }
17 
18     @Override
19     public int compareTo(Delayed o) {
20         if (o instanceof DelayService) {
21             DelayService test = (DelayService) o;
22 
23             long time = this.delayTime.get() - test.delayTime.get();
24             return time < 0 ? -1 : ((time > 0) ? 1 : 0);
25         }
26         return 0;
27     }
28 
29     @Override
30     public void run() {
31         System.out.println(content);
32     }
33 }
public class DelayClient {
    public static final DelayQueue<DelayService> delayQueue = new DelayQueue<>();

    public static void main(String[] args) throws InterruptedException {
        for(int i=0; i<10; i++){
            long ra = (long) (Math.random()*10000) + 10000;
            long time = ra+System.currentTimeMillis();
            DelayService delayService = new DelayService(String.valueOf(time));
            delayService.setDelayTime(TimeUnit.MILLISECONDS, time);

            delayQueue.add(delayService);
        }

        while(delayQueue.size() > 0){
            System.out.println("开始了");
            DelayService poll = delayQueue.take();
            if(poll == null){
                continue;
            }
            new Thread(poll).start();

        }
    }
}

Service实现一个Delayed接口,实现getDelay方法和compareTo方法,分别用来判断是否可执行和队列排序,我们可以自行实现不同条件的排序。Client主要轮询队列,这里写了一个main方法模拟业务场景将任务塞入队列,注意DelayQueue有两个取任务方法,take是阻塞式的poll是非阻塞的,我们可根据业务场景需要做不同实现,DelayQueue的实现原理我会新开一章来写。上面简单的几十行代码,便实现了延迟阻塞队列的实现,但是这里我们只能在单系统里使用,若我们需要分布式场景下的延时任务队列,则必须要借助分布式组件如redis来实现,我们之前写过一个分布式延时重试队列,用于解决高峰期失败任务延时重试,由于代码过多,我会上传到github上面。其实实现没啥区别,只是将DelayQueue换成redis的zset,定时查询所有当前时间之前执行的任务并执行,流程架构图如下:

 看起来复杂很多,但实质是一样的,当时做这个的时候,由于需要自定义配置重试时间,对外使用只需要添加注解解析,添加指定错误码重试,因此看起来相对复杂很多。做的时候也借鉴了spring-retry的实现原理。

总结:

  其实延迟队列的原理就是一个按执行时间排序的队列,每次取出所有达到时间的任务节点,执行任务,而我们业务中需要重试,可配置化等各种需求,再定制化开发,其本质还是延时队列而已。

原文地址:https://www.cnblogs.com/gmt-hao/p/14412834.html