redis发布订阅

一、前言

        最近由于工作需要,将数据更新到redis之后,系统每一个小时读取一次redis,时效性极差,最多可能需要等待一个小时才能生效,如果减少轮询访问的时间间隔,无形中又会增加redis的压力,而且时间间隔真心不好控制。这时候redis的发布订阅可以在一定程度上解决这个问题

二、流程

    1、消费方需要向redis订阅channel,设置监听

    2、生产者发生数据变更时,向redis发送变更通知

    3、redis向消费方发送变更消息

    4、消费方得知数据发生变化,执行相关操作

三、实现

常量

public class RedisConstant {
 
    /**
     * redis发布订阅channel
     */
    public static final String REDIS_SUB_PUB_CHANNEL = "test_topic";
}
生产方
@Service
public class RedisPubService {
 
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
 
    public void publish(){
 
        for (int i = 0; i < 3; i++) {
            redisTemplate.convertAndSend(RedisConstant.REDIS_SUB_PUB_CHANNEL, createList());
            System.out.println("发送成功!");
            try {
                Thread.sleep(new Random().nextInt(3) * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
 
    private List<String> createList() {
        return Arrays.asList("aaa", "测试消息", "!@#_");
    }
}
消费方监听配置
@Configuration
public class RedisSubConfig {
 
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListener listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //配置要订阅的订阅项,可以有多个
        container.addMessageListener(listenerAdapter, new PatternTopic(RedisConstant.REDIS_SUB_PUB_CHANNEL));
        return container;
    }
}
消费方
@Component
public class RedisSubListener implements MessageListener {
 
    private RedisSerializer<?> serializer;
 
    @Resource
    private RedisTemplate<String, Object> redisTemplate;
 
    @PostConstruct
    private void init(){
        serializer = redisTemplate.getDefaultSerializer();
        //如果没有默认的序列化器,则使用jdk序列化器
        if(serializer == null){
            serializer = new JdkSerializationRedisSerializer();
        }
    }
 
    @Override
    public void onMessage(Message message, byte[] pattern) {
        byte[] body = message.getBody();
        byte[] channel = message.getChannel();
        Object deserialize = serializer.deserialize(body);
        String topic = redisTemplate.getStringSerializer().deserialize(channel);
        System.out.println("我是sub,监听" + topic + ",我收到消息:" + deserialize);
    }
}
执行结果
发送成功!
我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_]
发送成功!
我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_]
发送成功!
我是sub,监听test_topic,我收到消息:[aaa, 测试消息, !@#_]

tips:这里顺便小小吐槽一下spring的RedisTemplate真心没有Jedis好用

 

四、使用场景

        redis的发布订阅适用的场景不算多,但有时候可以在一定程度上提高系统的时效性。这里需要明确一点,redis的通知不一定能够到达,这点和zookeeper的watcher类似(watcher是一次性的,redis可以一直监听),也就是说redis只负责发送变更通知,至于消费方是否接收到,接收到后进行什么操作redis是不关心的,可以理解为MQ中没有ack,所以redis并不能保证100%的通知到达。故redis的发布订阅需要满足以下条件才可使用:

        1)对消息通知要求不是100%的严格触达,如果要求消息一定能触达,请使用mq

        2)变更通知可以丢失,redis是基于内存的Nosql,可能存在数据丢失的风险

        3)变更的数据量较小,redis的发布订阅可以理解为一个事件通知,尽量不要把变更数据放在其中,只需要告诉消费方数据发生变化即可(可以适当加入哪条数据发生了增删改的变化)

五、总结

        有人可能会说,既然redis的发布订阅不是100%通知到,为什么还要使用它?这个问题之前在使用zookeeper的watcher时也考虑过,100% + n%和100%有什么区别,而这个n却需要额外开发。这里举个简单的例子

        银行转账时,提示最多两小时到账,但是有时候几秒钟就到账了,用户体验更好一些

        换句话说就是,告知了下限,却将大部分数据做到了上限。

        写在最后,这篇文章的名称纠结了很久,感觉叫发布订阅其实并不是十分的友好,因为这个词通常与MQ息息相关,如果基于redis做MQ,应该使用list这种数据结构,不过这里redis调用的其实是redis的publish和subscribe命令,直译过来也就是发布订阅了,只是希望不要和MQ弄混淆

原文地址:https://www.cnblogs.com/1ning/p/11212291.html