Redis监听事件

  有时候我们希望监听某个key的删除或者其他事件,来做一些自己的业务操作。redis 的pub/sub 提供了这个能力。

  参考官网:https://redis.io/topics/notifications

1. redis 服务端和客户端测试

  redis 官网说了,默认的话事件通知是关闭的。如果需要开启可以修改redis.conf 文件中 notify-keyspace-events 配置。 或者用CONFIG SET 命令修改(只针对当前进程有效,重启失效)。

官网提供的事件类型如下:(每个字母代表一个事件类型的缩写)

K     Keyspace events, published with __keyspace@<db>__ prefix.
E     Keyevent events, published with __keyevent@<db>__ prefix.
g     Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$     String commands
l     List commands
s     Set commands
h     Hash commands
z     Sorted set commands
t     Stream commands
d     Module key type events
x     Expired events (events generated every time a key expires)
e     Evicted events (events generated when a key is evicted for maxmemory)
m     Key miss events (events generated when a key that doesn't exist is accessed)
A     Alias for "g$lshztxed", so that the "AKE" string means all the events except "m".

  K或者E至少有一个存在。如果需要监听所有的事件可以订阅 'KEA' 事件。 

1. 测试监听失效事件

(1) 修改redis.conf

#
notify-keyspace-events Ex
#
#  By default all notifications are disabled because most users don't need
#  this feature and the feature has some overhead. Note that if you don't
#  specify at least one of K or E, no events will be delivered.
# notify-keyspace-events ""

(2) 服务器启动后查看配置

127.0.0.1:6379> config get notify-keyspace-events
1) "notify-keyspace-events"
2) "xE"

(3) 启动一客户端监听所有数据库的失效事件

127.0.0.1:6379> psubscribe __keyevent@*__:expired
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__keyevent@*__:expired"
3) (integer) 1
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "test"
1) "pmessage"
2) "__keyevent@*__:expired"
3) "__keyevent@0__:expired"
4) "mykey"

  __keyevent@*__:expired    中的* 代表任意库,可以指定0-15库中的任意一个库, 也可以用通配符。

2. 监听所有事件

1. 修改配置通知所有事件

127.0.0.1:6379> config set notify-keyspace-events KEA
OK
127.0.0.1:6379> config get notify-keyspace-events
1) "notify-keyspace-events"
2) "AKE"

2.  客户端进行监听(psubsribe 后面的参数是可变数组,可以一次就监听多个事件)

127.0.0.1:6379> psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__key*__:*"
3) (integer) 1

3. 重启一个客户端进行操作数据

127.0.0.1:6379> set key1 value1
OK
127.0.0.1:6379> expire key1 9000
(integer) 1
127.0.0.1:6379> del key1
(integer) 1

4. 查看上面监测的控制台

127.0.0.1:6379> psubscribe '__key*__:*'
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "__key*__:*"
3) (integer) 1
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "set"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:set"
4) "key1"
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "expire"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:expire"
4) "key1"
1) "pmessage"
2) "__key*__:*"
3) "__keyspace@0__:key1"
4) "del"
1) "pmessage"
2) "__key*__:*"
3) "__keyevent@0__:del"
4) "key1"

  可以看到每次操作之后,订阅者可以收到消息的相关信息:注册的事件类型、发生的事件类型、操作的key 名称。

  这里需要注意,如果注册了并且服务宕机了,或者某种原因客户端下线了。这时候再次上线不会收到, 也就是下线期间的事件不会进行记录。

2. Springboot 项目监听事件

1. 编写listener

package com.xm.ggn.config.redis;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {

    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    @Override
    public void onMessage(Message message, byte[] pattern) {
        // 获取过期的key
        String expireKey = message.toString();
        System.out.println("终于失效了");
        System.out.println("key is:" + expireKey);
    }

}

2. 源码查看

(1) org.springframework.data.redis.listener.KeyExpirationEventMessageListener 

package org.springframework.data.redis.listener;

import org.springframework.context.ApplicationEventPublisher;
import org.springframework.context.ApplicationEventPublisherAware;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.core.RedisKeyExpiredEvent;
import org.springframework.lang.Nullable;

public class KeyExpirationEventMessageListener extends KeyspaceEventMessageListener implements ApplicationEventPublisherAware {
    private static final Topic KEYEVENT_EXPIRED_TOPIC = new PatternTopic("__keyevent@*__:expired");
    @Nullable
    private ApplicationEventPublisher publisher;

    public KeyExpirationEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }

    protected void doRegister(RedisMessageListenerContainer listenerContainer) {
        listenerContainer.addMessageListener(this, KEYEVENT_EXPIRED_TOPIC);
    }

    protected void doHandleMessage(Message message) {
        this.publishEvent(new RedisKeyExpiredEvent(message.getBody()));
    }

    protected void publishEvent(RedisKeyExpiredEvent event) {
        if (this.publisher != null) {
            this.publisher.publishEvent(event);
        }

    }

    public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
        this.publisher = applicationEventPublisher;
    }
}

  可以看到是借助于Spring的事件机制来完成的。

(2) org.springframework.data.redis.listener.KeyspaceEventMessageListener

package org.springframework.data.redis.listener;

import java.util.Properties;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import org.springframework.util.StringUtils;

public abstract class KeyspaceEventMessageListener implements MessageListener, InitializingBean, DisposableBean {
    private static final Topic TOPIC_ALL_KEYEVENTS = new PatternTopic("__keyevent@*");
    private final RedisMessageListenerContainer listenerContainer;
    private String keyspaceNotificationsConfigParameter = "EA";

    public KeyspaceEventMessageListener(RedisMessageListenerContainer listenerContainer) {
        Assert.notNull(listenerContainer, "RedisMessageListenerContainer to run in must not be null!");
        this.listenerContainer = listenerContainer;
    }

    public void onMessage(Message message, @Nullable byte[] pattern) {
        if (message != null && !ObjectUtils.isEmpty(message.getChannel()) && !ObjectUtils.isEmpty(message.getBody())) {
            this.doHandleMessage(message);
        }
    }

    protected abstract void doHandleMessage(Message var1);

    public void init() {
        if (StringUtils.hasText(this.keyspaceNotificationsConfigParameter)) {
            RedisConnection connection = this.listenerContainer.getConnectionFactory().getConnection();

            try {
                Properties config = connection.getConfig("notify-keyspace-events");
                if (!StringUtils.hasText(config.getProperty("notify-keyspace-events"))) {
                    connection.setConfig("notify-keyspace-events", this.keyspaceNotificationsConfigParameter);
                }
            } finally {
                connection.close();
            }
        }

        this.doRegister(this.listenerContainer);
    }

    protected void doRegister(RedisMessageListenerContainer container) {
        this.listenerContainer.addMessageListener(this, TOPIC_ALL_KEYEVENTS);
    }

    public void destroy() throws Exception {
        this.listenerContainer.removeMessageListener(this);
    }

    public void setKeyspaceNotificationsConfigParameter(String keyspaceNotificationsConfigParameter) {
        this.keyspaceNotificationsConfigParameter = keyspaceNotificationsConfigParameter;
    }

    public void afterPropertiesSet() throws Exception {
        this.init();
    }
}

  可以看到启动后悔修改redis 的事件机制,同时注册监听过期事件。

3. 自己实现类似于Spring的机制

  借助于netty 和 spring 的事件机制实现。

1. com.xm.ggn.test.springevent.RedisConnection

  启动后利用netty 建立一个链接并且发送订阅事件

package com.xm.ggn.test.springevent;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

@Component
@Slf4j
public class RedisConnection {

    private static final String HOST = System.getProperty("host", "192.168.145.139");

    private static final int PORT = Integer.parseInt(System.getProperty("port", "6379"));

    @PostConstruct
    public void init() {
        try {
            Bootstrap b = new Bootstrap();
            EventLoopGroup group = new NioEventLoopGroup();
            b.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new RedisClientHandler());
                        }
                    });

            // Start the connection attempt.
            Channel ch = b.connect(HOST, PORT).sync().channel();
            String sendCmd = "psubscribe __key*__:*";
            ch.writeAndFlush(sendCmd);
            log.info("已经建立redis 连接, 并且发送订阅事件命令");
        } catch (Exception e) {
            log.error("redis handler error", e);
        }
    }

}

2.  RedisClientHandler

package com.xm.ggn.test.springevent;

import com.xm.ggn.utils.system.SpringBootUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.util.CharsetUtil;
import org.apache.commons.lang3.StringUtils;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class RedisClientHandler extends ChannelDuplexHandler {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
        // 转换发出去的数据格式
        msg = rehandleRequest(msg);
        ctx.writeAndFlush(Unpooled.copiedBuffer(msg.toString(), CharsetUtil.UTF_8));
    }

    /**
     * 重新处理消息,处理为 RESP 认可的数据
     * set foo bar
     * 对应下面数据
     * *3
$3
SET
$3
foo
$3
bar

     */
    private String rehandleRequest(Object msg) {
        String result = msg.toString().trim();
        String[] params = result.split(" ");
        List<String> allParam = new ArrayList<>();
        Arrays.stream(params).forEach(s -> {
            allParam.add("$" + s.length() + "
" + s + "
"); // 参数前$length
, 参数后增加 

        });
        allParam.add(0, "*" + allParam.size() + "
");
        StringBuilder stringBuilder = new StringBuilder();
        allParam.forEach(p -> {
            stringBuilder.append(p);
        });
        return stringBuilder.toString();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf byteBuf = (ByteBuf) msg;
        byte[] bytes = new byte[byteBuf.readableBytes()];
        byteBuf.readBytes(bytes);
        String result = new String(bytes);// 转换接受到的数据格式
        result = rehandleResponse(result).toString();

        try {
            // 接收到消息之后用Spring 的事件机制发送消息
            SpringBootUtils.applicationContext.publishEvent(new RedisEvent(this, result));
        } catch (Exception exception) {
            exception.printStackTrace();
        }
    }

    /**
     * 重新处理响应消息
     */
    private Object rehandleResponse(String result) {
        // 状态恢复 - “+OK
”
        if (result.startsWith("+")) {
            return result.substring(1, result.length() - 2);
        }

        // 错误回复(error reply)的第一个字节是 "-"。例如 `flushallE` 返回的 `-ERR unknown command 'flushallE'
`
        if (result.startsWith("-")) {
            return result.substring(1, result.length() - 2);
        }

        // 整数回复(integer reply)的第一个字节是 ":"。 例如 `llen mylist` 查看list 大小返回的 `:3
`
        if (result.startsWith(":")) {
            return result.substring(1, result.length() - 2);
        }

        // 批量回复(bulk reply)的第一个字节是 "$", 例如:  `get foo` 返回的结果为 `$3
bar
`
        if (result.startsWith("$")) {
            result = StringUtils.substringAfter(result, "
");
            return StringUtils.substringBeforeLast(result, "
");
        }

        // 多条批量回复(multi bulk reply)的第一个字节是 "*", 例如: *2
$3
foo
$4
name

        if (result.startsWith("*")) {
            result = StringUtils.substringAfter(result, "
");
            String[] split = result.split("\$\d
");
            List<String> collect = Arrays.stream(split).filter(tmpStr -> StringUtils.isNotBlank(tmpStr)).collect(Collectors.toList());
            List<String> resultList = new ArrayList<>(collect.size());
            collect.forEach(str1 -> {
                resultList.add(StringUtils.substringBeforeLast(str1, "
"));
            });
            return resultList;
        }

        return "unknow result";
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        System.err.print("exceptionCaught: ");
        cause.printStackTrace(System.err);
        ctx.close();
    }

}

3.  RedisEvent

package com.xm.ggn.test.springevent;

import org.springframework.context.ApplicationEvent;

public class RedisEvent extends ApplicationEvent {

    private static final long serialVersionUID = -9184671635725233773L;

    private Object msg;

    public RedisEvent(Object source, final String msg) {
        super(source);
        this.msg = msg;
    }

    public Object getMsg() {
        return msg;
    }

    public void setMsg(Object msg) {
        this.msg = msg;
    }
}

4. RedisEventListener 事件监听器,用于直接打印消息

package com.xm.ggn.test.springevent;

import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class RedisEventListener implements ApplicationListener<RedisEvent> {

    @Override
    public void onApplicationEvent(RedisEvent applicationEvent) {
        // handle event
        System.out.println("收到事件,消息为:" + applicationEvent.getMsg());
    }
}

5. 测试结果:

(1)在redis 客户进行一系列操作

127.0.0.1:6379> set mykey myvalue
OK
127.0.0.1:6379> del mykey
(integer) 1

(2) 控制台打印如下:

收到事件,消息为:[pmessage
$10
__key*__:*
$18
__keyevent@0__:set, mykey]
收到事件,消息为:[pmessage
$10
__key*__:*
$18
__keyevent@0__:del, mykey]
【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
原文地址:https://www.cnblogs.com/qlqwjy/p/15132798.html