SpringBoot中使用Redis的发布/订阅模式

redis的发布订阅模式,使发布者和订阅者完全解耦 

 1.pom.xml and application.properties

<!-- 引入redis -->
 
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
            <exclusions>
                <exclusion>
                    <groupId>io.lettuce</groupId>
                    <artifactId>lettuce-core</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
 
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
spring:
   redis:
     #数据库索引
     database: 5
     host: 127.0.0.1
     port: 6379
     password: 123456
     jedis:
       pool:
         #最大连接数
         max-active: 8
         #最大阻塞等待时间(负数表示没限制)
         #最大空闲
         max-idle: 8
         #最小空闲
         min-idle: 0

2.消息发布者、消息处理者POJO、redis消息监听器容器以及redis监听器注入IOC容器

@Configuration //相当于xml中的beans
public class RedisConfig {
 
    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean //相当于xml中的bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter,
                                            MessageListenerAdapter listenerAdapter2) {
 
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //订阅了一个叫chat 的通道
        container.addMessageListener(listenerAdapter, new PatternTopic("chat"));
        container.addMessageListener(listenerAdapter2, new PatternTopic("chat2"));
        //这个container 可以添加多个 messageListener
        return container;
    }
 
    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     * @param receiver
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }
    
    @Bean
    MessageListenerAdapter listenerAdapter2(MessageReceiver receiver) {
        //这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage2”
        //也有好几个重载方法,这边默认调用处理器的方法 叫handleMessage 可以自己到源码里面看
        return new MessageListenerAdapter(receiver, "receiveMessage2");
    }
 
    /**redis 读取内容的template */
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }
    
    @Bean
    RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
 
         Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
         ObjectMapper om = new ObjectMapper();
         om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
         om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
         jackson2JsonRedisSerializer.setObjectMapper(om);
         RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
         template.setConnectionFactory(connectionFactory);
         template.setKeySerializer(jackson2JsonRedisSerializer);
         template.setValueSerializer(jackson2JsonRedisSerializer);
         template.setHashKeySerializer(jackson2JsonRedisSerializer);
         template.setHashValueSerializer(jackson2JsonRedisSerializer);
         template.afterPropertiesSet();
         return template;
 
    }
 
}

MessageListenerAdapter通过反射使普通的POJO就可以处理消息。具体情况见MessageListenerAdapter的onMessage方法。

3.消息发布者

@EnableScheduling //开启定时器功能
@Component
public class MessageSender {
 
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
 
    @Scheduled(fixedRate = 2000) //间隔2s 通过StringRedisTemplate对象向redis消息队列chat频道发布消息
    public void sendMessage(){
        stringRedisTemplate.convertAndSend("chat",String.valueOf(Math.random()));
        stringRedisTemplate.convertAndSend("chat2",String.valueOf(Math.random()));
    }
}

4.普通的消息处理器POJO

@Component
public class MessageReceiver {
 
    /**接收消息的方法*/
    public void receiveMessage(String message){
        System.out.println("收到一条chat的消息:"+message);
    }
    
    /**接收消息的方法*/
    public void receiveMessage2(String message){
        System.out.println("收到一条chat2的消息:"+message);
    }
 
}

MessageListenerAdapter通过反射调用receiveMessage方法处理消息


5.其他方式(参考)

配置

package com.example.day3_30.redisConfiBack;
 
/**
 * redis消息队列配置-订阅者
 */
 
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
 
import java.util.concurrent.CountDownLatch;
 
/**
 * redis消息队列配置-订阅者
 */
@Configuration
public class RedisMessageListener {
    /**
     * 创建连接工厂
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                                   MessageListenerAdapter listenerAdapter,
                                                   MessageListenerAdapter listenerAdapterTest2){
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        //接受消息的key
        container.addMessageListener(listenerAdapter,new PatternTopic("phone"));
        container.addMessageListener(listenerAdapterTest2,new PatternTopic("phoneTest2"));
        return container;
    }
 
 
    /**
     * 绑定消息监听者和接收监听的方法
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(ReceiverRedisMessage  receiver){
        return new MessageListenerAdapter(receiver,"receiveMessage");
    }
    /**
     * 绑定消息监听者和接收监听的方法
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapterTest2(ReceiverRedisMessage  receiver){
        return new MessageListenerAdapter(receiver,"receiveMessage2");
    }
    /**
     * 注册订阅者
     * @param latch
     * @return
     */
    @Bean
    ReceiverRedisMessage receiver(CountDownLatch latch) {
        return new ReceiverRedisMessage(latch);
    }
 
 
    /**
     * 计数器,用来控制线程
     * @return
     */
    @Bean
    public CountDownLatch latch(){
        return new CountDownLatch(1);//指定了计数的次数 1
    }
}

消息处理

package com.example.day3_30.redisConfiBack;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
 
import java.util.concurrent.CountDownLatch;
 
/**
 * 注入消息接受类
 */
public class ReceiverRedisMessage {
    private static final Logger log = LoggerFactory.getLogger(ReceiverRedisMessage.class);
    private CountDownLatch latch;
 
    @Autowired
    public ReceiverRedisMessage(CountDownLatch latch) {
        this.latch = latch;
    }
 
 
    /**
     * 队列消息接收方法
     *
     * @param jsonMsg
     */
    public void receiveMessage(String jsonMsg) {
        log.info("[开始消费REDIS消息队列phone数据...]");
        try {
            System.out.println(jsonMsg);
            log.info("[消费REDIS消息队列phone数据成功.]");
        } catch (Exception e) {
            log.error("[消费REDIS消息队列phone数据失败,失败信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
 
    /**
     * 队列消息接收方法
     *
     * @param jsonMsg
     */
    public void receiveMessage2(String jsonMsg) {
        log.info("[开始消费REDIS消息队列phoneTest2数据...]");
        try {
            System.out.println(jsonMsg);
            /**
             *  此处执行自己代码逻辑 例如 插入 删除操作数据库等
             */
 
            log.info("[消费REDIS消息队列phoneTest2数据成功.]");
        } catch (Exception e) {
            log.error("[消费REDIS消息队列phoneTest2数据失败,失败信息:{}]", e.getMessage());
        }
        latch.countDown();
    }
 
}

测试

package com.example.day3_30.redisConfiBack;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
@RestController
@RequestMapping
public class PublisherController {
    private static final Logger log = LoggerFactory.getLogger(PublisherController.class);
    @Autowired
    private RedisTemplate redisTemplate;
    @GetMapping(value = "pub/{id}")
    public String pubMsg(@PathVariable String id){
        redisTemplate.convertAndSend("phone","223333");
        redisTemplate.convertAndSend("phoneTest2","34555665");
        log.info("Publisher sendes Topic... ");
        return "success";
    }
}
原文地址:https://www.cnblogs.com/xiejn/p/15698826.html