Redis——发布和订阅

发布与订阅(又称pub/sub),订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message).每当有消息被发送给指定频道的时候,频道都所有订阅者都会收到消息。

Redis提供都5个发布订阅命令:

命令描述
Redis Psubscribe 命令 订阅一个或多个符合给定模式的频道。
Redis Pubsub 命令 查看订阅与发布系统状态。
Redis Publish 命令 将信息发送到指定的频道。
Redis Punsubscribe 命令 退订所有给定模式的频道。
Redis Subscribe 命令 订阅给定的一个或多个频道的信息。
Redis Unsubscribe 命令 指退订给定的频道。

使用实例:

  首先需要一个订阅者(listener)这里建立一个名为Subscriber的类:

public class Subscriber extends JedisPubSub {

    public void onMessage(String channel, String message) {
        System.out.println("onMessage channel = " + channel+ "message =" + message);
    }

    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",
                channel, subscribedChannels));
    }

    public void onUnsubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",
                channel, subscribedChannels));

    }
}

这个类继承自JedisPubSub,其中onMessage负责接收订阅频道消息后,业务处理逻辑,onSubscribe负责初始化订阅时候的处理,onUnsubscribe取消订阅时候的处理。

然后在定义一个类起一个线程来进行subscribe操作,因为我们需要订阅者一直在线,当发布者一发送消息到相应的频道时,能做出反应

public class SubThread extends Thread {
    JedisPool pool;
    private final Subscriber subscriber = new Subscriber();

    private final String channel = "xx";

    public SubThread( JedisPool pool) {
        super("SubThread");
        this.pool = pool;
    }

    @Override
    public void run() {
        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));
        Jedis jedis = pool.getResource();
            try {
                jedis.subscribe(subscriber, channel);
            } catch (Exception e) {
                System.out.println(String.format("subsrcibe channel error, %s", e));
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
    }
}

再然后就是发布者:

public class Publisher {

    JedisPool pool;

    public Publisher( JedisPool pool) {
        this.pool = pool;
    }

    public void start() {
        Jedis jedis = pool.getResource();
        while(true) {
            jedis.publish("xx", "233");
            try{
                Thread.sleep(5000);
            }catch (Exception e){
                e.printStackTrace();
            }

        }
    }
}

再然后就是主函数的调用:

public class test1 {
    public static void main(String[] args) throws Exception{
        //连接本地的 Redis 服务
        Jedis jedis = new Jedis("localhost");
        System.out.println("连接成功");
        //查看服务是否运行
        System.out.println("服务正在运行: "+jedis.ping());


        JedisPool pool = new JedisPool("localhost", 6379);

        SubThread subThread = new SubThread(pool);
        subThread.start();

        Publisher publisher = new Publisher(pool);
        publisher.start();

    }

因为Jedis不是线程安全的,JedisPool是线程安全的,所以这里使用JedisPool。

输出:

连接成功
服务正在运行: PONG
subscribe redis, channel xx, thread will be blocked
subscribe redis channel success, channel xx, subscribedChannels 1
onMessage channel = xxmessage =233
onMessage channel = xxmessage =233
原文地址:https://www.cnblogs.com/xxbbtt/p/7864953.html