延迟任务

需求:添加订单后定时发布

考虑过使用定时任务每秒去数据库去捞,mq推送,最后用的redis的zset,

Delay.java
package com.zjjw.city.service.delay;

import java.util.Date;

/**
 * 延时
 * @author tongzuqi
 * @date 2021/6/23 3:03 下午
 */
public interface Delay {

    /**
     * 初始化
     */
    void into();

    /**
     * 添加延迟任务
     * @param datetime 执行任务时间
     * @param delayVo 编号主键
     */
    void add(Date datetime, DelayVo delayVo);


    /**
     * 删除延迟任务
     * @param delayVo 编号主键
     */
    void delete(DelayVo delayVo);

    /**
     * 修改延迟任务
     * @param delayVo 编号主键
     */
    void update(Date datetime, DelayVo delayVo);


}
View Code
DelayVo.java
package com.zjjw.city.service.delay;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * 延迟任务
 * @author tongzuqi
 * @date 2021/6/25 10:23 上午
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public
class DelayVo {


    /**
     * 菜单枚举
     */
    private DelayEnum delayEnum;

    /**
     * 主键
     */
    private long code ;
}
View Code
DelayUtils.java
package com.zjjw.city.service.delay;

import com.zjjw.city.util.utils.RedisUtil;
import java.util.Calendar;
import java.util.Objects;
import java.util.Set;
import javax.annotation.Resource;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.stereotype.Component;

/**
 * 延时工具类
 * @author tongzuqi
 * @date 2021/6/23 3:03 下午
 */
@Component
public class DelayUtils {


    @Resource
    private RedisUtil redisUtil ;

    /**
     * 获取延迟对象
     * @return
     */
    public DelayVo getDelayVo(String redis_key){
        Calendar now = Calendar.getInstance();
        Long nowSecond =  now.getTimeInMillis() ;
        Set<ZSetOperations.TypedTuple<Object>> items = redisUtil.rangebyscorewithscores(redis_key, 0, nowSecond,0,nowSecond);
        if(items == null || items.isEmpty()){
            return null;
        }
        ZSetOperations.TypedTuple s = (ZSetOperations.TypedTuple)items.toArray()[0];
        if(Objects.nonNull(s)){
            Long  score = s.getScore().longValue();
            DelayVo delayVo = (DelayVo)s.getValue();
            if(nowSecond >= score){
                return delayVo;
            }
        }
        return null;
    }

}
  
View Code

 DelayCanteenOrderAuto.java

package com.zjjw.city.service.delay;

import com.zjjw.city.iface.fo.CanteenOrderOperationFo;
import com.zjjw.city.service.CanteenOrderService;
import com.zjjw.city.util.utils.DateUtil;
import com.zjjw.city.util.utils.RedisKeys;
import com.zjjw.city.util.utils.RedisUtil;
import java.util.Date;
import java.util.Objects;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
/**
 * 菜品自动审核
 * @author tongzuqi
 * @date 2021/6/23 3:01 下午
 */
@Slf4j
@Component
public class DelayCanteenOrderAuto extends DelayUtils implements Delay , Runnable{

    private static final String redis_key = RedisKeys.CITY_DELAY_CANTEENIRDERAUTOTIME;
    private static final String redis_key_sign = RedisKeys.CITY_DELAY_SIGN_ORDER;
    private static final String log_name = DelayEnum.CANTEENORDERAUTOTHREAD.getName();

    @Resource
    private RedisUtil redisUtil ;

    @Resource
    private CanteenOrderService canteenOrderService ;

    @PostConstruct
    @Override
    public void into() {
        log.info("延时任务:{} ,开始加载",log_name);
        new Thread(this).start();
    }

    @Override
    public void add(Date datetime,DelayVo delayVo) {
        redisUtil.zadd(redis_key, datetime.getTime(),delayVo);
        log.info("延时任务[{}]添加处理时间:{},vo:{}",log_name , DateUtil.format(datetime, DateUtil.DATE_TIME_PATTERN) ,delayVo );
    }

    @Override
    public void update(Date datetime,DelayVo delayVo) {
        redisUtil.zadd(redis_key, datetime.getTime(),delayVo);
        log.info("延时任务[{}]变更处理时间:{},vo:{}",log_name , DateUtil.format(datetime, DateUtil.DATE_TIME_PATTERN) ,delayVo );
    }

    @Override
    public void delete(DelayVo delayVo) {
        redisUtil.zrem(redis_key, delayVo);
        log.info("延时任务[{}]删除code:{}" ,log_name,delayVo.getCode());
    }

    @Override
    public void run(){
        while(true){
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            DelayVo delayVo = this.getDelayVo(redis_key);
            if(Objects.nonNull(delayVo)){
                Long code = delayVo.getCode();
                String sign = redis_key_sign + code ;
                boolean fag  = redisUtil.execute(sign,code.toString());
                if(fag) {
                    DelayVo delayNewVo = this.getDelayVo(redis_key);
                    if(Objects.isNull(delayNewVo) || delayNewVo.getCode() != code){
                        log.info("延时任务[{}]code:{},已经被处理了" ,log_name,code.toString());
                        continue;
                    }
                    //调用业务
                    canteenOrderService.operationCanteenOrder(new CanteenOrderOperationFo(1,code,1L));
                    redisUtil.zrem(redis_key, delayVo);
                    log.info("延时任务[{}],处理了code:{}" ,log_name,code);
                    redisUtil.del(sign);
                }
            }
        }
    }
}
View Code

 redisUtil

    /**
     * zset添加
     * @param key   键
     * @param time 时间戳
     * @param value 值
     * @return 是否成功
     */
    public boolean zadd(String key,long time, Object value) {
        try {
            boolean fag = redisTemplate.opsForZSet().add(key,value,time);
            return fag;
        } catch (Exception e) {
            return false;
        }
    }

    /**
     * zset删除
     * @param key   键
     * @param value 值
     * @return 移除的个数
     */
    public Long zrem(String key, Object value) {
        try {
            Long fag = redisTemplate.opsForZSet().remove(key,value);
            return fag;
        } catch (Exception e) {
            return 0L;
        }
    }


    /**
     * @param key   键
     * @param min 值
     * @param max 值
     * @return 移除的个数
     */
    public Set<Object> rangeByScore(String key, double min, double max) {
        try {
            return redisTemplate.opsForZSet().rangeByScore(key, min, max);
        } catch (Exception e) {
            return null;
        }
    }


    /**
     * @param key   键
     * @param min 值
     * @param max 值
     * @return 移除的个数
     */
    public Set<ZSetOperations.TypedTuple<Object>> rangebyscorewithscores(String key, double min, double max, long start, long end) {
        try {
            return redisTemplate.opsForZSet().rangeByScoreWithScores(key, min, max,start,end);
        } catch (Exception e) {
            return null;
        }
    }

    /**
     * 加锁,如果key不存在就加锁,返回ture,存在就返回false
     * @param key   键
     * @param value 值
     * @return 是否加锁成功
     */
    public Boolean execute(String key, String value) {
        return redisTemplate.execute(redisScript, Arrays.asList(key,value), 5000);
    }
View Code
RedisConfig
@Configuration
public class RedisConfig extends CachingConfigurerSupport {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();

        RedisSerializer<String> redisSerializer = new StringRedisSerializer();

        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);

        template.setConnectionFactory(factory);
        //key序列化方式
        template.setKeySerializer(redisSerializer);
        //value序列化
        template.setValueSerializer(jackson2JsonRedisSerializer);
        //value hashmap序列化
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        return template;
    }


    @Bean
    public DefaultRedisScript<Boolean> redisScript() {
        DefaultRedisScript<Boolean> redisScript = new DefaultRedisScript<>();
        redisScript.setScriptSource(new ResourceScriptSource(new ClassPathResource("checkandset.lua")));
        redisScript.setResultType(Boolean.class);
        return redisScript;
    }

}
View Code
checkandset.lua
--- 获取list的第一个参数
local key = KEYS[1]
--- 获取list的第二个参数
local val = KEYS[2]
--- 获取ARGV
local expire = ARGV[1]
--- 找不到则插入
if redis.call("get", key) == false then
    --- 设置过期值
    if redis.call("set", key, val) then
        --- 由于lua脚本接收到参数都会转为String,所以要转成数字类型才能比较
        if tonumber(expire) > 0 then
            --- 设置过期时间
            redis.call("expire", key, expire)
        end
        return true
    end
    return false
else
    return false
end
View Code

 支持分布式部署,支持多个延时任务同时执行

原文地址:https://www.cnblogs.com/mytzq/p/14945217.html