浅析Redis发布订阅机制及其Java实现

  Redis 是一个开源的内存数据库,它以键值对的形式存储数据。由于数据存储在内存中,因此Redis的速度很快,但是每次重启Redis服务时,其中的数据也会丢失,因此,Redis 也提供了持久化存储机制,将数据以某种形式保存在文件中,每次重启时,可以自动从文件加载数据到内存当中

  Redis 的架构包括两个部分:Redis Client 和 Redis Server。Redis 客户端负责向服务器端发送请求并接受来自服务器端的响应。服务器端负责处理客户端请求,例如,存储数据,修改数据等。 Redis通常用作数据库,缓存以及消息系统。

一、Redis 发布订阅机制

1、发布订阅架构

  Redis 提供了发布订阅功能,可以用于消息的传输,Redis 的发布订阅机制包括三个部分:发布者,订阅者和 Channel。

  • PUBLISH 命令向通道发送信息,此客户端称为publisher 发布者;
  • SUBSCRIBE 向命令通道订阅信息,此客户端称为subscriber 订阅者;
  • redis 中 发布订阅模块的名字叫着 PubSub,也就是 PublisherSubscriber;
  • 一个发布者向一个通道发送消息,订阅者可以向多个通道订阅消息;当发布者向通道发布消息后,如果有订阅者订阅该通道,订阅者就会收到消息;这有点像电台,我收听了一个电台的频道,当频道发送消息后,我就能收到消息;

  发布者和订阅者都是 Redis 客户端,Channel 则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel 相当于主题。

2、PUBSub模块命令

  • subscribe: 订阅一个或者多个频道;
  • unsubscribe: 退订一个或者多个频道;
  • publish: 向通道发送消息;
  • psubscribe: 订阅给定模式相匹配的所有频道;
  • punsubscribe: 退订 给定模式所有的频道,若未指定模式,退订所有频道;

  具体的命令使用方式 可以使用 help 命令,示例如下:help subscribe

3、发布订阅功能

(1)发送消息 :Redis 采用PUBLISH命令发送消息,其返回值为接收到该消息的订阅者的数量。

(2)订阅某个频道:Redis采用SUBSCRIBE命令订阅某个频道,其返回值包括客户端订阅的频道,目前已订阅的频道数量,以及接收到的消息,其中subscribe表示已经成功订阅了某个频道。

(3)模式匹配 :模式匹配功能允许客户端订阅符合某个模式的频道,Redis采用 PSUBSCRIBE 订阅符合某个模式所有频道,用“”表示模式,“”可以被任意值代替。

127.0.0.1:6379> publish news.1 first
(integer) 1
127.0.0.1:6379> publish news.2 second
(integer) 1

  假设客户端同时订阅了某种模式和符合该模式的某个频道,那么发送给这个频道的消息将被客户端接收到两次,只不过这两条消息的类型不同,一个是message类型,一个是pmessage类型,但其内容相同

(4)取消订阅 :Redis采用UNSUBSCRIBE和PUNSUBSCRIBE命令取消订阅,其返回值与订阅类似。

  由于Redis的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为 0 时,该客户端会自动退出订阅状态。

4、发布订阅实现

  由于Redis是一个开源的系统,因此我们可以通过其源代码查看内部的实现细节。

(1)SUBSCRIBE

  当客户端订阅某个频道时,Redis需要将该频道和该客户端绑定。

  首先,在客户端结构体client中,有一个属性为pubsub_channels,该属性表明了该客户端订阅的所有频道,它是一个字典类型,通过哈希表实现,其中的每个元素都包含了一个键值对以及指向下一个元素的指针,每次订阅都要向其中插入一个结点,键表示订阅的频道,值为空。

  然后,在表示服务器端的结构体redisServer中,也有一个属性为pubsub_channels,但此处它表示的是该服务器端中的所有频道以及订阅了这个频道的客户端,它也是一个字典类型,插入结点时,键表示频道,值则是订阅了这个频道的所有客户端组成的链表。

  最后Redis通知客户端其订阅成功。

(2)PSUBSCRIBE

  当客户端订阅某个模式时,Redis同样需要将该模式和该客户端绑定。

  首先,在结构体client中,有一个属性为pubsub_patterns,该属性表示该客户端订阅的所有模式,它是一个链表类型,每个结点包括了订阅的模式和指向下一个结点的指针,每次订阅某个模式时,都要向其中插入一个结点。

  然后,在结构体redisServer中,有一个属性也叫pubsub_patterns,它表示了该服务器端中的所有模式和订阅了这些模式的客户端,它也是一个链表类型,插入结点时,每个结点都要包含订阅的模式,以及订阅这个模式的客户端,和指向下一个结点的指针。

(3)PUBLISH

  当客户端向某个频道发送消息时,Redis首先在结构体redisServer中的pubsub_channels中找出键为该频道的结点,遍历该结点的值,即遍历订阅了该频道的所有客户端,将消息发送给这些客户端。

  然后,遍历结构体redisServer中的pubsub_patterns,找出包含该频道的模式的结点,将消息发送给订阅了该模式的客户端。

5、发布订阅在 Redis 中的应用

  Redis的发布订阅功能与Redis中的数据存储是无关的,它不会影响Redis的key space,即不会影响Redis中存储的数据,但通过发布订阅机制,Redis还提供了另一个功能,即Keyspace Notification,允许客户端通过订阅特定的频道,从而得知是否有改变Redis中的数据的事件。

  例如,有一个客户端删除了Redis中键为mykey的数据,该操作会触发两条消息,mykey del和del mykey,前者属于频道keysapce,表示keyspace发生的变化,后者属于频道keyevent,表示执行的操作。

6、Redis发布订阅与ActiveMQ的比较

(1)ActiveMQ支持多种消息协议,包括AMQP,MQTT,Stomp等,并且支持JMS规范,但Redis没有提供对这些协议的支持;

(2)ActiveMQ提供持久化功能,但Redis无法对消息持久化存储,一旦消息被发送,如果没有订阅者接收,那么消息就会丢失;

(3)ActiveMQ提供了消息传输保障,当客户端连接超时或事务回滚等情况发生时,消息会被重新发送给客户端,Redis没有提供消息传输保障。

  总之,ActiveMQ所提供的功能远比Redis发布订阅要复杂,毕竟Redis不是专门做发布订阅的,但是如果系统中已经有了Redis,并且需要基本的发布订阅功能,就没有必要再安装ActiveMQ了,因为可能ActiveMQ提供的功能大部分都用不到,而Redis的发布订阅机制就能满足需求。

二、Java 实现

  定义 2 个订阅者用于订阅频道的消息,在使用 Jedis 时需要继承 JedisPubSub 类, 并重写 onMessage 方法; 订阅者可以在该方法里面进行消息的业务逻辑处理;

  注意 redis 的 发布订阅模式 是阻塞模式 ,一个订阅者需要 重新起一个线程

  缺点:

(1)PubSub 的生产者来一个消息会直接传递给消费者。如果没有消费者,消息会直接丢弃。如果有多个消费者,一个消费者突然挂掉,生产者会继续发送消息,另外的消费者可以持续收到消息。但是挂掉的消费者重新连上后,断连期间的消息会彻底丢失;

(2)如果 Redis 停机重启,PubSub 的消息是不会持久化的。

  首先我们创建两个客户端执行体:第 2 个是一样的创建即可

package com.example.redisdemo.service;
import redis.clients.jedis.JedisPubSub;

// 订阅消息消费体
public class OneJedisPubSub extends JedisPubSub {
   //接收到消息时执行
    @Override
    public void  onMessage(String channel, String message){
        System.out.println("oneJedisPubSub message is" + message);
    }
    //接收到模式消息时执行
    @Override
    public void onPMessage(String pattern, String channel, String message){
        System.out.println("oneJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
    }
    //订阅时执行
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("oneJedisPubSub订阅成功");
    }
    //取消订阅时执行
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels){
        System.out.println("oneJedisPubSub取消订阅"+channel);
    }
    //取消模式订阅时执行
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("oneJedisPubSub取消多订阅"+pattern);
    }
}

  然后我们开始给这两个客户端订阅消息

@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
    @Autowired
    private RedisClient redisClient;
    private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub();
    private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub();

    @PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.subscribe(oneJedisPubSub,"topic1","topic2");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

  请求情况如下图:

  请求结果如图:

  可以看出我们将两个客户端都订阅了一定channel,此时OneJedisPubSub订阅了topic1和topic2,SecondJedisPubSub订阅了topic2,我们尝试推送消息,demo如下:

    @PostMapping("push")
    public void push(@RequestBody QueueTest queueTest){
        log.info("发布一条消息");
        Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName());
        System.out.println("消费者数量"+publish);
    }

  可以看到我们往topic1发布了消息只有OneJedisPubSub接收到了消息,接下来我们往topic2发布消息

  可以看到此时两个客户端都接收到了消息。

  在测试完毕客户端接收消息的能力,我们这时取消SecondJedisPubSub订阅topic2,demo如下:

    @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消订阅消息");
        try {
            secondJedisPubSub.unsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

  在取消后我们再往topic2推送消息,可以看到只有一个客户端接收消息。

  至此我们实验了大部分场景,至于模式订阅由于贴图太麻烦,我就将代码提供出来,大家可以自己实验:

@PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.pubsubPattern(oneJedisPubSub,"topic*");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
@PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消模式订阅消息");
        try {
            secondJedisPubSub.punsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

  最后将redisclient的代码提供给大家

@Component("redisClient")
@Slf4j
public class RedisClient {
    @Resource
    private JedisPool jedisPool;

       /**
     * 发布消息
     * @param topic
     * @param message
     */
    public Long publish(String topic,String message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.publish(topic, message);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }

    /**
     * 订阅消息
     * @param jedisPubSub
     * @param topics
     */
    public void subscribe(JedisPubSub jedisPubSub, String... topics) throws Exception {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(jedisPubSub,topics);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }

        }
    }

    /**
     * 模式匹配订阅消息
     * @param topic
     */
    public void pubsubPattern(JedisPubSub jedisPubSub,String topic){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.psubscribe(jedisPubSub,topic);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }
}

  以上就是reids的发布订阅功能,代码部分来自文章:https://zhuanlan.zhihu.com/p/136484218

三、主要命令及其原理

  首先介绍一下实现功能的主要几个命令:

  1. SUBSCRIBE 命令,这个命令可以让我们订阅任意数量的频道
  2. PUBLISH 命令,此命令是用来发布消息
  3. PSUBSCRIBE命令,此命令用来支持模糊订阅的功能

  在展示具体的demo之前,我们先简单了解下这其中的原理:

  在redisServer结构中的其中一个属性pubsub_channels是用来记录channel和客户端之间的关系,是使用key-->List的数据格式。如图:

  在我们使用SUBSCRIBE 命令在客户端client10086订阅了channel1 channel2,channel3

1、订阅:SUBSCRIBE channel1 channel2,channel3

  这时pubsub_channels的数据将会变为,如图:

  这就可以看出来执行SUBSCRIBE 命令就是将客户端信息添加到对应的channel对应列表的尾部。

2、模式订阅: 模式订阅设计到redisServer的另一个属性pubsub_patterns,也是一个链表,里面存储着客户端订阅的所有模式。结构如下图:

  当客户端订阅了一个模式,此时结构变为:

3、发布:PUBLISH 命令发布消息将消息推送到对应的客户端

  在执行PUBLISH 命令发布消息的时候,首先会在pubsub_channels上找到对应的channel,遍历其中所有的client信息,将消息发送到所有client;同时也会在pubsub_patterns上遍历找到匹配的模式,发给对应的客户端

4、取消订阅:UNSUBSCRIBE命令取消对应客户端的订阅

  当执行UNSUBSCRIBE命令时则将对应的client从channel列表中移除。

原文地址:https://www.cnblogs.com/goloving/p/15247144.html