java 延时队列DelayQueue

今天遇到 支付超时,30min后取消订单

实现方式 :

最简单的方式,定时扫表;例如每分钟扫表一次十分钟之后未支付的订单进行主动支付 ;
优点: 简单
缺点: 每分钟全局扫表,浪费资源,有一分钟延迟

使用RabbitMq 实现 RabbitMq实现延迟队列
优点: 开源,现成的稳定的实现方案;
缺点: RabbitMq是一个消息中间件;延迟队列只是其中一个小功能,如果团队技术栈中本来就是使用RabbitMq那还好,如果不是,那为了使用延迟队列而去部署一套RabbitMq成本有点大;

使用Java中的延迟队列,DelayQueue
优点: java.util.concurrent包下一个延迟队列,简单易用;拿来即用
缺点: 单机、不能持久化、宕机任务丢失等等;

问题: 目前没有用RabbitMq,但是有redis

我的方案:使用Java中的延迟队列,DelayQueue

1. 添加订单到队列,同时添加到redis

2.弹出队列,同时删除redis 中的订单

DelayQueue:简单显例

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Date;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;

import org.junit.Test;

import com.icil.pinpal.payment.common.utils.NumberUtils;

import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.ToString;

public class OrderPayTimeOut {
    /**
     * 3个线程:
     * 线程1:向队列中添加数据
     * 线程2:从队列中取出数据
     * 线程3:向队列中添加数据
     * @throws Exception
     */
    @Test
    public void testDelayQueue() throws Exception {
            ExecutorService executor = Executors.newFixedThreadPool(3);        // 创建线程池并返回ExecutorService实例  
            DelayQueue<PaylogX> queue = new DelayQueue<>();
            FutureTask<String> add = add(queue,"before",3);
            FutureTask<String> take = take(queue);
            Thread.sleep(100l);
            FutureTask<String> add1 = add(queue,"later",1);
            FutureTask<String> add2 = add(queue,"then",1);
            executor.execute(add);  // 执行任务  
            executor.execute(add1);  // 执行任务  
            executor.execute(add2);  // 执行任务  
            executor.execute(take); 
            while(true) {}
    }
    
    
    
    /**
     * 单线程测试
     * @description: 延时队列测试
     * @author: hh
     */
    @Test
    public  void testRun() throws InterruptedException {
        PaylogX item1 = new PaylogX("item1", 1, TimeUnit.MINUTES);
        PaylogX item2 = new PaylogX("item2",3, TimeUnit.MINUTES);
        PaylogX item3 = new PaylogX("item3",5, TimeUnit.MINUTES);
        DelayQueue<PaylogX> queue = new DelayQueue<>();
        queue.offer(item1);
        queue.offer(item2);
        queue.offer(item3);
        System.out.println("begin time:" + LocalDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME));
        
        for (int i = 0; i < 3; i++) 
        {
            PaylogX paylogX = queue.take();
            System.out.format("name:{%s},time:{%s} date:{%s}
",paylogX.oTitle,paylogX.timeStamp, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
        }
    }
    
    
    
    /**
     * 添加数据到队列
     * @param queue
     * @return
     */
    private  FutureTask<String> add(DelayQueue<PaylogX> queue,String name,long expireTime) {
           Callable<String> callable1=new Callable<String>() {
                @Override
                public String call() throws Exception {
                    System.err.println(name+"开始添加数据!!");
                    IntStream.range(0, 1000).forEach(i->{
                        PaylogX paylogX = new PaylogX(name+i, expireTime+new Random().nextInt(10), TimeUnit.SECONDS);
                         try {
                            Thread.sleep(100l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                         queue.put(paylogX);
                         
                    });
                    System.err.println(name+"添加数据结束!!");
                    return null;
                }
             };
             FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
             return futureTask1;
    }
    /**
     * 从队列中取出数据
     * @param queue
     * @return
     */
    private  FutureTask<String> take(DelayQueue<PaylogX> queue) {
        
           Callable<String> callable1=new Callable<String>() {
                @Override
                public String call() throws Exception {
                    while (true) {
                        PaylogX paylogX = queue.take();
//                        queue.remove(take);
                        System.out.format("name:{%s},time:{%s} date:{%s}
",paylogX.oTitle,paylogX.timeStamp, LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                    }
                }
             };
             FutureTask<String> futureTask1 = new FutureTask<String>(callable1);
             return futureTask1;
    }
    
    
    
    @NoArgsConstructor
    @Data
    @ToString
    class PaylogX implements Delayed{
        private String     _id=NumberUtils.genNO() ; //uuid
        private String     oTitle              ; 
        private long     timeStamp=new Date().getTime();
        private transient long     delayTime;  //// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。
        
        public PaylogX(String oTitle ,long delayTime,TimeUnit unit) {
            this.oTitle=oTitle;
            this.delayTime = System.currentTimeMillis() + (delayTime > 0? unit.toMillis(delayTime): 0);
        }
        @Override
        public int compareTo(Delayed o) {
            PaylogX delayed=(PaylogX)o;
            long diff = this.delayTime-delayed.delayTime;
            if (diff <= 0) {// 改成>=会造成问题
                return -1;
            }else {
                return 1;
            }
        }
        
        @Override
        public long getDelay(TimeUnit unit) {
             return this.delayTime - System.currentTimeMillis();
        }
        public void setDelayTime(long delayTime) {
            this.delayTime = delayTime;
        }
    }

}
原文地址:https://www.cnblogs.com/lshan/p/14363628.html