redis subscribe/publish(发布订阅)

redis的发布端

package dubbo.wangbiao.project.pubsub;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class PublishClient {
    public static void main(String[] args) throws InterruptedException {
        //创建连接池
        GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
        poolConfig.setMaxIdle(5);
        JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");
        //创建线程池,并设定线程数量
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //创建一个发布者
        Publisher publisher = new Publisher(jedisPool,"发布者1");
        executorService.submit(publisher);
        executorService.shutdown();
        executorService.awaitTermination(600, TimeUnit.SECONDS);
    }
}

 redis订阅端

package dubbo.wangbiao.project.pubsub;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.JedisPool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class SubscriberClient {

    public static void main(String[] args) throws InterruptedException {
        //创建redis连接池

        GenericObjectPoolConfig poolConfig=new GenericObjectPoolConfig();
        poolConfig.setMaxIdle(5);
        JedisPool jedisPool = new JedisPool(poolConfig,"127.0.0.1",6379,1000,"123456");


        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //创建订阅者
        final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号");
        //订阅频道
        Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1");
        executorService.submit(subscriber);
        executorService.shutdown();
        executorService.awaitTermination(60, TimeUnit.SECONDS);

        //30s后取消订阅
        Thread.sleep(3000);
        subscriberListener.onUnsubscribe("发布者1", 0);
    }
}

 redis的发布功能

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * redis的发布
 */
public class Publisher  extends Thread{

        private final JedisPool jedisPool;
        private String chanelName;

        public Publisher(JedisPool jedisPool, String chanelName) {
            this.jedisPool = jedisPool;
            this.chanelName = chanelName;
            System.out.println("【发布者""+chanelName+""初始化成功】");
            System.out.println("请输入要发布的消息:");
        }

        @Override
        public void run() {
            BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
            Jedis jedis = jedisPool.getResource();
            while (true) {
                String line = null;
                try {
                    line = reader.readLine();
                    if (!"quit".equals(line)) {
                        System.out.println(chanelName+"发布消息成功");
                        jedis.publish(chanelName, line);
                    } else {
                        break;
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }
        }
    }

 redis的订阅功能

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * 订阅者
 */
public class Subscriber extends Thread{
    //jedis连接池
    private final JedisPool jedisPool;
    private final SubscriberListener subscriberListener;

    private String channelName;

    public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) {
        super();
        this.jedisPool = jedisPool;
        this.subscriberListener = subscriberListener;
        this.channelName = channelName;
    }

    @Override
    public void run() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(String.format("频道订阅失败:%s",e));
        } finally {
            if (null != jedis) {
                jedis.close();
            }
        }
    }
}

 发布订阅监听端

package dubbo.wangbiao.project.pubsub;

import redis.clients.jedis.JedisPubSub;

/**
 * 订阅的监听
 */
public class SubscriberListener extends JedisPubSub {

    private String subName;
    public SubscriberListener(String subName) {
        this.subName = subName;
    }

    // 取得订阅的消息后的处理
    @Override
    public void onMessage(String channel, String message) {
        System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message));
    }

    // 初始化订阅时候的处理
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel  , subscribedChannels));
    }

    // 取消订阅时候的处理
    @Override
    public void onUnsubscribe(String channelName, int subscribedChannels) {
        System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels));
    }
}

客户端命令演示:

    • publish/subscribe是一对多的关系,
    • Redis Psubscribe 命令订阅一个或多个符合给定模式的频道。
    • 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等)。 news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
一点点学习,一丝丝进步。不懈怠,才不会被时代淘汰
原文地址:https://www.cnblogs.com/wangbiaohistory/p/14854730.html