Redis(四)——消息队列

Redis不仅可作为缓存服务器,还可用作消息队列。它的列表类型天生支持用作消息队列。

性质:由于Redis的列表是使用双向链表实现的,保存了头尾节点,所以在列表头尾两边插取元素都是非常快的。

所以可以直接使用Redis的List实现消息队列,只需简单的两个指令lpush和rpop或者rpush和lpop。

(列表常用命令)

RPUSH : RPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表右端

LPUSH : LPUSH key-name value [value1 value2,...] ------------将一个或多个值推入列表左端

RPOP : RPOP key-name----------移除并返回列表最右端元素

LPOP :LPOP key-name----------移除并返回列表最左端元素

LINDEX : LINDEX key-name offset --------------返回列表中偏移量为offset的元素

LRANGE : LRANGE key-name start end -------------返回列表中偏移量从start到end范围内的元素

LTRIM : LTRIM key-name start end ----------------对列表进行修剪,只保留偏移量从start到end范围内的元素

其中简单示例如下:
首先连接redis服务器,其中我应用了Jedispool,代码如下:

package redis;

import java.io.IOException;
import java.util.Properties;

import org.springframework.core.io.support.PropertiesLoaderUtils;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
/**
 * redis单例连接池
 * @author admin
 *
 */
public class RedisPool {

    private static  int TIMEOUT = 1000*30;
    private static  int MAXTOTAL = 1024;
    private static  int MAXIDLE = 100;
    private static  String REDISIP = "bei1";
    private static  int PORT = 6379;
    private static  String PASSWORD ="default";

    static {
        try {
            Properties prop = PropertiesLoaderUtils.loadAllProperties("redis.properties");
            TIMEOUT = Integer.parseInt(prop.getProperty("TIMEOUT","300000"));
            MAXTOTAL = Integer.parseInt(prop.getProperty("MAXTOTAL","1024"));
            MAXIDLE = Integer.parseInt(prop.getProperty("MAXIDLE","100"));
            REDISIP = prop.getProperty("REDISIP","127.0.0.1");
            PORT = Integer.parseInt(prop.getProperty("PORT","6379"));
            PASSWORD = prop.getProperty("PASSWORD","default");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private static JedisPool[] pool = new JedisPool[10];

    private  RedisPool() {}

    private static JedisPool getPool(int database) {
        if(database>10) {
            return null;
        }
        if(pool[database] == null) {
            JedisPoolConfig config = new JedisPoolConfig();
            config.setMaxTotal(MAXTOTAL);
            config.setMaxIdle(MAXIDLE);
            config.setMaxWaitMillis(TIMEOUT);
            config.setTestOnBorrow(true);
            pool[database] = new JedisPool(config,REDISIP,PORT,TIMEOUT,PASSWORD,database);
        }
        return pool[database];
    }
    //单例获取redis连接资源
    public static Jedis getResource(int database) {
        if(database>10) {
            return null;
        }
        Jedis jedis = null;
        if(pool[database] == null) {
            synchronized(RedisPool.class) {
                try {
                    if(pool[database] == null) {
                        pool[database] = getPool(database);
                        try {
                            if (pool[database] != null) {
                                jedis = pool[database].getResource();
                            }
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }else {
            jedis = pool[database].getResource();
        }
        return jedis;
    }

}

定义一个生产者,代码:

package RedisMq;

import com.sun.deploy.util.StringUtils;
import redis.RedisPool;
import redis.clients.jedis.Jedis;

import java.util.concurrent.TimeUnit;

/**
 * <p>  </p>
 *
 * @author ly
 * @since 2019/1/5
 */
public class Producer extends Thread{

    public static final String MESSAGE_KEY = "queue";
    private Jedis jedis;
    private String produceName;
    private volatile int count;

    public Producer(String name){
        this.produceName = name;
        init();
    }
    private void init(){
        jedis = RedisPool.getResource(1);

    }
    public void putMessage(String message) {
        Long size = jedis.lpush(MESSAGE_KEY, message);
        System.out.println(produceName + ": 当前未被处理消息条数为:" + size);
        count++;
    }

    public int getCount() {
        return count;
    }
    @Override
    public void run() {
        try {
            while (true) {
                putMessage("hello world");
                TimeUnit.SECONDS.sleep(1);
            }
        } catch (InterruptedException e) {

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Producer producer = new Producer("myProducer");
        producer.start();

        for (; ; ) {
            System.out.println("main : 已存储消息条数:" + producer.getCount());
            TimeUnit.SECONDS.sleep(10);
        }
    }
}

再定义一个消费者

package RedisMq;

import redis.RedisPool;
import redis.clients.jedis.Jedis;
/**
 * <p>  </p>
 *
 * @author ly
 * @since 2019/1/7
 */


    /**
     * 消息消费者
     * @author yamikaze
     */
    public class Customer extends Thread{

        private String customerName;
        private volatile int count;
        private Jedis jedis;

        public Customer(String name) {
            this.customerName = name;
            init();
        }

        private void init() {
            jedis = RedisPool.getResource(1);
        }

        public void processMessage() {
            String message = jedis.rpop(Producer.MESSAGE_KEY);
            if(message != null) {
                count++;
                handle(message);
            }
        }

        public void handle(String message) {
            System.out.println(customerName + " 正在处理消息,消息内容是: " + message + " 这是第" + count + "条");
        }

        @Override
        public void run() {
            while (true) {
                processMessage();
            }
        }

        public static void main(String[] args) {
            Customer customer = new Customer("小花");
            customer.start();
        }
    }

运行后 生产者和消费者控制台信息分别如下:

Redis 发布与订阅

redis 支持消息队列。发布订阅即是一种消息通信模式:发送者发送消息,订阅者订阅消息。

redis 客户端可以订阅任意数量的频道

(一)发布订阅
使用 publish 指令,格式为 publish channel message

127.0.0.1:6379> publish fruit "apple"
(integer) 0
 

该返回值为0,说明没有人订阅

(二)订阅消息
使用subscribe指令接受消息,格式为 subscribe channel

127.0.0.1:6379> subscribe fruit
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "fruit"
3) (integer) 1

可以看到使用SUBSCRIBE指令后进入了订阅模式,但没有接收到publish发送的消息,这是因为只有在消息发出去前订阅才会接收到。在这个模式下其他指令,只能看到回复。
回复信息分为3类:
1 如果为subscribe,第二个值表示订阅的频道,如上述代码

2 如果为message(消息),第二个值为产生该消息的频道,第三个值为消息,如图:

3 如果退订消息 ,第二个值表示取消订阅的频道,第三个值表示当前客户端的订阅数量。则接受信息如下

(三)取消订阅
使用Unsubscribe 指令,格式为 UNSUBSCRIBE channel [channel ...]

127.0.0.1:6379>  unsubscribe fruit
1) "unsubscribe"
2) "fruit"
3) (integer) 0

参考文章https://blog.csdn.net/qq_34212276/article/details/78455004

原文地址:https://www.cnblogs.com/gloria-liu/p/10232455.html