Jedis实现频道的订阅,取消订阅

 第一步:创建一个发布者

package work;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;

/**
 * 发布
 * author:songyan
 * date: 2019/10/17
 **/
public class Publisher  extends Thread  {
    private final JedisPool jedisPool;
    private String chanelName;

    public Publisher(JedisPool jedisPool, String chanelName) {
        this.jedisPool = jedisPool;
        this.chanelName = chanelName;
        System.out.println("【发布者""+chanelName+""初始化成功】");
        System.out.println("请输入要发布的消息:");
    }

    @Override
    public void run() {
        BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
        Jedis jedis = jedisPool.getResource();
        while (true) {
            String line = null;
            try {
                line = reader.readLine();
                if (!"quit".equals(line)) {
                    System.out.println(chanelName+"发布消息成功");
                    jedis.publish(chanelName, line);
                } else {
                    break;
                }
            } catch (IOException e) {
                e.printStackTrace();
            }

        }
    }
    }

第二步:创建一个订阅者的监听器

package work;

import redis.clients.jedis.JedisPubSub;

/**
 * 监听
 * author:songyan
 * date: 2019/10/17
 **/
public class SubscriberListener  extends JedisPubSub {
    private String subName;
    public SubscriberListener(String subName) {
        this.subName = subName;
    }

    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) {
        System.out.println(String.format("【"+subName + "接收到消息】频道:%s;消息:%s。" , channel , message));
    }

    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println(String.format( "【"+subName + "订阅频道成功】频道:%s;频道数:%d。" , channel  , subscribedChannels));
    }

    // 取消订阅时候的处理
    public void onUnsubscribe(String channelName, int subscribedChannels) {
        System.out.println(String.format( "【"+subName + "取消订阅】频道:%s;频道数:%d。",channelName , subscribedChannels));
    }
}

第三步:创建一个订阅者

package work;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;

/**
 * 订阅
 * author:songyan
 * date: 2019/10/17
 **/
public class Subscriber extends Thread {
    //jedis连接池
    private final JedisPool jedisPool;

    private final SubscriberListener subscriberListener;

    private String channelName;

    public Subscriber(JedisPool jedisPool, SubscriberListener subscriberListener, String channelName) {
        super();
        this.jedisPool = jedisPool;
        this.subscriberListener = subscriberListener;
        this.channelName = channelName;
    }

    @Override
    public void run() {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(subscriberListener,channelName);// 通过jedis.subscribe()方法去订阅,入参是1.订阅者、2.频道名称
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println(String.format("频道订阅失败:%s",e));
        } finally {
            if (null != jedis)
                jedis.close();
        }
    }
}

第四步:测试(编写客户端)

(1)发布者客户端

package work.test;

import redis.clients.jedis.JedisPool;
import work.Publisher;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 发布测试
 * author:songyan
 * date: 2019/10/17
 **/
public class PublishClient {
    public static void main(String[] args) throws InterruptedException {
        //创建连接池
        JedisPool jedisPool = new JedisPool("192.168.159.133");
        //创建线程池,并设定线程数量
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        //创建一个发布者
        Publisher publisher = new Publisher(jedisPool,"发布者1");
        executorService.submit(publisher);
        executorService.shutdown();
        executorService.awaitTermination(600, TimeUnit.SECONDS);
    }
}

执行main方法,创建一个发布者。

(2)订阅者客户端

package work.test;

import redis.clients.jedis.JedisPool;
import work.Subscriber;
import work.SubscriberListener;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 订阅者客户端
 * author:songyan
 * date: 2019/10/17
 **/
public class SubscriberClient {

    public static void main(String[] args) throws InterruptedException {
        //创建redis连接池
        JedisPool jedisPool = new JedisPool("192.168.159.133");
        //创建线程池
        ExecutorService executorService = Executors.newFixedThreadPool(5);

        //创建订阅者
        final SubscriberListener subscriberListener = new SubscriberListener("订阅者一号");
        //订阅频道
        Subscriber subscriber = new Subscriber(jedisPool, subscriberListener, "发布者1");
        executorService.submit(subscriber);
        executorService.shutdown();
        executorService.awaitTermination(60, TimeUnit.SECONDS);

        //30s后取消订阅
        Thread.sleep(3000);
        subscriberListener.onUnsubscribe("发布者1", 0);
    }
}

执行main方法,创建一个订阅者(订阅上面发布者的频道)。

发布者发布信息:

 订阅者接收到订阅信息:

订阅者取消订阅:

 

扩展:ExecutorService

原文地址:https://www.cnblogs.com/excellencesy/p/11696580.html