如何实现分布式的延时队列

借助redisson实现延时队列

public class RedissonDelayQueue {

    private RedissonClient redissonClient;
    private RBlockingDeque<String> rBlockingDeque;
    private RDelayedQueue<String> rDelayedQueue;

    public void delaySend(String jsonObject, Long delay, TimeUnit timeUnit) {
        this.rDelayedQueue.offer(jsonObject, delay, timeUnit);
    }

    public RedissonDelayQueue() {
        Config config = new Config();
        config.useClusterServers().setScanInterval(2000).addNodeAddress("redis://10.13.65.178:6390");
        this.redissonClient = Redisson.create(config);
        this.rBlockingDeque = redissonClient.getBlockingDeque("MXZ_DELAY_QUEUE");
        if (this.rBlockingDeque == null) {
            return;
        }
        this.rDelayedQueue = redissonClient.getDelayedQueue(rBlockingDeque);
        if (this.rDelayedQueue == null) {
            return;
        }
        this.startConsumerDelayQueue();
        System.out.println("启动时间" + LocalDateTime.now());
    }

    private void startConsumerDelayQueue() {

        Thread thread = new Thread(() -> {
            while (true) {
                try {
                    String jsonObject = this.rBlockingDeque.take();
                    System.out.println("--> 延迟队列获取数据:{}" + jsonObject);
                } catch (InterruptedException e) {

                }
            }
        });
        thread.setDaemon(true);
        thread.start();
    }

    public static void main(String[] args) {

        RedissonDelayQueue queue = new RedissonDelayQueue();
        queue.delaySend("one", 10l,TimeUnit.SECONDS);

        queue.delaySend("two", 20l,TimeUnit.SECONDS);
    }
}
原文地址:https://www.cnblogs.com/juniorMa/p/14659856.html