使用Redis和定时实现延时消费

背景:

项目业务上需要实现延时发送消息的需求。最开始想到的就是消息中间件,公司统一用的RocketMq,于是开始整。。。但是,业务需求要求的延时消息时间是可自由指定的,但是公司居然用的是开源的RocketMq,开源的只支持18个固定级别的延时,

我们这里不重点说RocketMq,有兴趣的自己查吧。结论就是开源的RocketMq没法实现现在的需求,要不就用阿里云的,公司也不愿出那份钱吧。哈哈哈。于是想别的方法。

办法二。那就只能定时轮询配合实现了,查数据库的话,会增加数据库的压力,效率也不好。于是使用Redis配合定时实现延时消费。

Redis实现代码

使用zSet数据结构

生产者代码:

//key:redis,zSet集合key
//msg:key的value值,存储消息对象
//execTime:执行时间,时间戳
public void producerRedisDelayMsg(String key, Object msg, long execTime) { redisService.zSetAdd(key, msg, execTime); }

调用的zSetAdd方法实际就是对redis的操作,重点就是时间戳为score这个值,集合就是通过这个值进行排序的

 public boolean zSetAdd(String key, Object value, long score) {
        boolean re;
        try {
            re = redisTemplate.opsForZSet().add(key, value, score);
            return re;
        } catch (Exception e) {
            return false;
        }
    }

至此,生产者就写完了。接下来是消费者的实现

//key:key值,就是生产者的那个key值
//startTime,endTime:操作时间范围,时间戳

public void consumerRedisDelayMsg(String key, long startTime, long endTime) {

    //查询符合时间条件下的集合
    Set<Object> set = redisService.zSetRangeByScore(key, startTime, endTime);
    if (CollectionUtils.isNotEmpty(set)) {
    //具体业务操作

    //移除集合
    redisService.zSetRemoveRangeByScore(key, startTime, endTime);
    }

}

具体看一下redis的两个操作

zSetRangeByScore,实际就是查询key集合下min-max范围内的数据
public Set<Object> zSetRangeByScore(String key, double min, double max) {
        try {
            Set<Object> value = redisTemplate.opsForZSet().rangeByScore(key, min, max);
            return value;
        } catch (Exception e) {
            return null;
        }
    }
zSetRemoveRangeByScore,实际就是移除key集合下min-max范围内的数据
 public Long zSetRemoveRangeByScore(String key, double min, double max) {
        try {
            Long value = redisTemplate.opsForZSet().removeRangeByScore(key, min, max);
            return value;
        } catch (Exception e) {
            return null;
        }
    }

至此,有关redis的操作就结束了。定时的代码,我们这里就不写了,1秒轮询一次。

最后总结一下,整体的流程。定时每秒轮询一次,通过consumerRedisDelayMsg方法,首先查询符合时间范围内的集合数据,查询出来进行相应的业务实现,然后将查出来的集合数据移除。这里有个重要的问题,就是时间范围,开始值一般确定为0即可,结束值为当前系统时间。

生产者方法producerRedisDelayMsg比较重要的就是对execTime参数的理解,执行时间,即为redis集合中的score值,排序依据。

至此可以实现定时,Redis延时消费。

原文地址:https://www.cnblogs.com/feiyangbahu/p/13084025.html