Redis学习笔记四:独立功能之发布与订阅

客户端可以通过执行 subscribe 命令订阅一个或多个频道,每当有其他客户端向被订阅的频道发送消息时,频道所有的订阅者都会收到这条消息。
客户端还可以通过执行 psubscribe 命令订阅一个或多个模式,消息也会被发送给与频道相匹配模式的订阅者。

频道的订阅与退订

当一个客户端执行 subscribe 命令定于某个频道的时候,就与被订阅频道之间建立起了一种订阅关系。所有频道的订阅关系都保存在服务器状态的 pubsub_channels 字典中,字典的键是被订阅的频道,值是记录了所有订阅这个频道的客户端。

struct redisServer {
 // ...
 dict *pubsub_channels;  /* Map channels to list of subscribed clients */
 // ...
}

订阅频道
每当执行订阅命令时,服务器会将客户端与被订阅的频道在 pubsub_channels 字典中进行关联。频道中已经有其他订阅者,将客户端添加到订阅者链表的末尾;如果还没有订阅者,那么程序首先在 pubsub_channels 字典中为频道创建一个键,并为这个键的值设置为空链表,然后再将客户端添加到链表。

退订频道
订阅频道的逆向过程。从链表中删除客户端,如果链表为空了,则删除这个频道。

模式的订阅与退订

模式的订阅关系保存在 pubsub_patterns 链表属性里。

struct redisServer {
 // ...
 dict *pubsub_channels;  /* Map channels to list of subscribed clients */
 // ...
}

每个节点都包含着一个 pubsubPattern 结构:

typedef struct pubsubPattern {
    redisClient *client;
    robj *pattern;
} pubsubPattern;

client 属性记录了订阅模式的客户端,parttern 属性记录了被订阅的模式。

订阅模式
当客户端执行 psubscribe 命令订阅某个模式时,服务器会对每个订阅的模式新建一个 pubsubPattern 结构,将结构的 client 属性设置为订阅模式的客户端, pattern 设置为订阅的模式。然后将pubsubPattern 结构 添加到 pubsub_patterns 链表的表尾。

退订模式
订阅模式的反操作。在 pubsub_patterns 链表中查找并删除指定 pattern 和 client 的pubsubPattern 结构。

发送消息

当一个 Redis 客户端执行 publish channel message 命令发送给频道 channel 的时候,服务器会将消息 发送给 channel 频道的订阅者,然后发送给模式的订阅者。

发送给频道订阅者
在 pubsub_channels 字典里找到频道 channel 的订阅者名单,并将消息依次发送给所有客户端。

发送给模式订阅者
遍历整个 pubsub_patterns 链表,查找与 channel 频道想匹配的模式,并将消息发送给这些模式的客户端。

查看订阅消息

客户端可以通过 pubsub 这个命令查看频道或模式的相关信息,比如某个频道有多少订阅者,某个模式有多少订阅者。它有三个子命令:

pubsub channels
pubsub channels [pattern],用于返回服务器当前被订阅的频道。其中 pattern 是可选参数,不选默认为查询所有。

127.0.0.1:6379> pubsub channels
1) "liushijie"

pubsub numsub
pubsub numsub [channel-1 channel-2 ... channel-n],用于接收任意多个频道作为输入参数并返回这些频道订阅者的数量。必须要有一个频道,否则查出的是空值。

127.0.0.1:6379> pubsub numsub liushijie
1) "liushijie"
2) (integer) 4

pubsub numpat
pubsub numpat 命令用户返回服务器当前被订阅模式的数量。

127.0.0.1:6379> pubsub numpat
(integer) 0

java 实现

// 发布者
import redis.clients.jedis.Jedis;

public class Publish {
    public static void main(String[] args) throws InterruptedException {
        Jedis jedis = new Jedis("127.0.0.1", 6379);

        while (true) {
            jedis.publish("liushijie", System.currentTimeMillis() + "");
            Thread.sleep(2000);
        }

    }
}

// 订阅者
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;

public class SubScribe {
    public static void main(String[] args) {

        new Thread(new SubscribeRunnable()).start();
        new Thread(new SubscribeRunnable()).start();
        new Thread(new SubscribeRunnable()).start();

    }

    private static class SubscribeRunnable implements Runnable {

        @Override
        public void run() {
            Jedis receiver = new Jedis("127.0.0.1", 6379);
            receiver.subscribe(new JedisPubSub() {
                @Override
                public void onMessage(String channel, String message) {
                    System.out.println(channel + " got message " + message);
                }

                @Override
                public void onPMessage(String pattern, String channel, String message) {

                }

                @Override
                public void onSubscribe(String channel, int subscribedChannels) {

                }

                @Override
                public void onUnsubscribe(String channel, int subscribedChannels) {

                }

                @Override
                public void onPUnsubscribe(String pattern, int subscribedChannels) {

                }

                @Override
                public void onPSubscribe(String pattern, int subscribedChannels) {

                }
            }, "liushijie");
        }
    }

}
原文地址:https://www.cnblogs.com/liushijie/p/5094047.html