集群中机器本地缓存同步实现机制:redis的发布订阅机制

背景:

集群中,某一台机器的本地缓存更改了,需要同步到集群中的其他机器上

Redis订阅配置: 注解@EventListener(ContextRefreshedEvent.class),项目启动自动初始化redis订阅

@Slf4j
@Component
public class RedisSubInitializer {
	@Autowired
	private RedisConnection redisConnection;
	@Autowired
	private RedisPubsubListener redisPubsubListener;

	@Async
	@EventListener(ContextRefreshedEvent.class)
	public void onApplicationEvent(ContextRefreshedEvent event) {
		//订阅
		try {
			LOGGER.info("Redis 订阅频道,pubSub={}, channel={}", redisPubsubListener, MSG_CHANNEL);
			redisConnection.subscribe(redisPubsubListener,MSG_CHANNEL);
		} catch (Exception e) {
			LOGGER.error("订阅失败channel={}", MSG_CHANNEL, e);
		}
	}
}

发布事件

在业务代码处,触发事件发布

msgPublisher.publish(MSG_CHANNEL, messageVo);

事件发布类

@Slf4j
@Service
public class MsgPublisher {
	@Autowired
	private RedisConnection redisConnection;

	/**
	 * 消息发布
	 * @param channel
	 * @param message
	 */
	public void publish(String channel, Object message) {
		try {
			if (message instanceof String) {
				redisConnection.publish(channel, String.valueOf(message));
			} else {
				redisConnection.publish(channel, JSON.toJSONString(message));
			}
			LOGGER.info("message publish, channel={}, message={}", channel, message);
		} catch (Exception e) {
			LOGGER.error("message pub error, channel={}, message={}", channel, message, e);
		}
	}
}

监听事件类,实现org.springframework.data.redis.connection.MessageListener接口

@Slf4j
@Component
public class RedisPubsubListener implements MessageListener {
	@Autowired
	private RedisConnection redisConnection;

	@Override
	public void onMessage(Message message, byte[] pattern) {
		String channel = redisConnection.deserializekey(message.getChannel());
		String body = redisConnection.deserializekey(message.getBody());
		LOGGER.info("channel={}, message={}", channel, message);
		try {
			if(MSG_CHANNEL.equals(channel)) {
				//处理相关事件,缓存同步,从数据库中重新加载
                toHandle(body);
			}
		} catch (Exception e) {
			LOGGER.error("channel={}, message={}", channel, message, e);
		}
	}

RedisConnection类:继承org.redisson.spring.data.connection.RedissonConnection类

@Slf4j
public class RedisConnection extends RedissonConnection {
	private RedisSerializer keySerializer;
	private RedisSerializer valueSerializer;
	private RedissonClient client;
	public String deserializekey(byte[] bytes) {
		if (bytes == null) {
			return null;
		}
		return new String(bytes, Charset.forName("UTF-8"));
	}
	public Long publish(String channel, String message) {
		try {
			return publish(getKey(channel), getKey(message));
		} catch (Exception e) {
			LOGGER.error("Redis publish ops error, channel={}, message={}", channel, message, e);
		}
		return 0L;
	}

	public void subscribe(MessageListener listener, String ... channels) {
		try {
			subscribe(listener, getKeys(channels));
		} catch (Exception e) {
			LOGGER.error("Redis subscribe ops error, listener={}, channels={}", listener, channels, e);
		}
	}
    private byte[] getKey(String key) {
		return keySerializer.serialize(getRedisKey(key));
	}

	private byte[][] getKeys(String ... keys) {
		byte[][] bs = new byte[keys.length][];
		for (int i = 0; i < keys.length; i++) {
			bs[i] = getKey(keys[i]);
		}
		return bs;
	}
}

总结

本地缓存同步就可以通过Redis的事件发布订阅机制来实现,但本机也会消费到自己发布的消息,要做幂等性操作。

原文地址:https://www.cnblogs.com/bb-ben99/p/14012536.html