【spring boot】【redis】spring boot 集成redis的发布订阅机制

一.简单介绍

1.redis的发布订阅功能,很简单。
  消息发布者和消息订阅者互相不认得,也不关心对方有谁。
  消息发布者,将消息发送给频道(channel)。
  然后是由 频道(channel)将消息发送给对自己感兴趣的 消息订阅者们,进行消费。


2.redis的发布订阅和专业的MQ相比较

  1>redis的发布订阅只是最基本的功能,不支持持久化,消息发布者将消息发送给频道。如果没有订阅者消费,消息就丢失了。
  2>在消息发布过程中,如果客户端和服务器连接超时,MQ会有重试机制,事务回滚等。但是Redis没有提供消息传输保障。
  3>简单的发布订阅可以使用redis,根据业务需求选择。

二.spring boot 集成[spring boot 2.x]

1.pom.xml文件

<!-- redis -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!--spring2.0集成redis所需common-pool2-->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.4.2</version>
        </dependency>
        <!-- 使用redis的LUA脚本 需要序列化操作的jar-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-annotations</artifactId>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

2.redis的config 

为redis添加消息适配器,绑定消息处理器

消息适配器 可以添加多个 

package com.sxd.swapping.config;

import com.sxd.swapping.redisReceiver.RedisReceiver;
import com.sxd.swapping.redisReceiver.RedisReceiver2;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.data.redis.RedisAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;



/**
 * @author sxd
 * @date 2019/5/27 16:13
 */
@Configuration
@AutoConfigureAfter(RedisAutoConfiguration.class)
public class RedisConfig {



    /**
     * redis消息监听器容器
     * 可以添加多个监听不同话题的redis监听器,只需要把消息监听器和相应的消息订阅处理器绑定,该消息监听器
     * 通过反射技术调用消息订阅处理器的相关方法进行一些业务处理
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter,
                                            MessageListenerAdapter listenerAdapter2)
    {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);

        //可以添加多个 messageListener
        //可以对 messageListener对应的适配器listenerAdapter  指定本适配器 适配的消息类型  是什么
        //在发布的地方 对应发布的redisTemplate.convertAndSend("user",msg);  那这边的就对应的可以消费到指定类型的 订阅消息
        container.addMessageListener(listenerAdapter, new PatternTopic("user"));
        container.addMessageListener(listenerAdapter2, new PatternTopic("goods"));

        return container;
    }


    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     *
     * receiveMessage 是默认监听方法 一般不变
     * @param redisReceiver redis消息处理器,自定义的
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter(RedisReceiver redisReceiver) {
        System.out.println("消息适配器1进来了");
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }


    /**
     * 消息监听器适配器,绑定消息处理器,利用反射技术调用消息处理器的业务方法
     *
     * receiveMessage 是默认监听方法 一般不变
     * @param redisReceiver2 redis消息处理器,自定义的
     * @return
     */
    @Bean
    MessageListenerAdapter listenerAdapter2(RedisReceiver2 redisReceiver2) {
        System.out.println("消息适配器2进来了");
        return new MessageListenerAdapter(redisReceiver2, "receiveMessage");
    }



    //使用默认的工厂初始化redis操作模板
    @Bean
    StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
        return new StringRedisTemplate(connectionFactory);
    }


    @Bean
    public RedisTemplate<String, String> redisTemplate(RedisConnectionFactory factory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        RedisSerializer keySerializer = new StringRedisSerializer();
//        RedisSerializer valueSerializer = new GenericJackson2JsonRedisSerializer();
        //key采用字符串反序列化对象
        redisTemplate.setKeySerializer(keySerializer);
        //value也采用字符串反序列化对象
        //原因:管道操作,是对redis命令的批量操作,各个命令返回结果可能类型不同
        //可能是 Boolean类型 可能是String类型 可能是byte[]类型 因此统一将结果按照String处理
        redisTemplate.setValueSerializer(keySerializer);
        return redisTemplate;
    }









}
View Code

3.创建几个消息处理器[处理消息订阅者  接收到消息后的业务]

package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
 *
 * redis 订阅发布  消息接收器/处理器
 * @author sxd
 * @date 2019/5/30 17:12
 */
@Service
public class RedisReceiver {

    public void receiveMessage(String message) {
        System.out.println("消息处理器1>我处理用户信息:"+message);
        //这里是收到通道的消息之后执行的方法
        //此处执行接收到消息后的 业务逻辑
    }
}
View Code
package com.sxd.swapping.redisReceiver;

import org.springframework.stereotype.Service;

/**
 * redis 订阅发布  消息接收器/处理器2
 * @author sxd
 * @date 2019/5/30 17:15
 */
@Service
public class RedisReceiver2 {

    public void receiveMessage(String message) {
        System.out.println("消息处理器2>我处理商品信息:"+message);
        //这里是收到通道的消息之后执行的方法
        //此处执行接收到消息后的 业务逻辑
    }
}
View Code

4.消息发布controller

@Autowired
    RedisTemplate redisTemplate;


    /**
     * redis 发布订阅pubsub
     */
    @RequestMapping(value = "/redisPubSub")
    public void redisPubSub(String msg){
        if (msg.contains("用户")){
            redisTemplate.convertAndSend("user",msg);
        }else {
            redisTemplate.convertAndSend("goods",msg);
        }
    }
View Code

测试:

发送请求:http://localhost:9666/redistest/redisPubSub?msg=用户---德玛西亚的用户

结果:

消息处理器1>我处理用户信息:用户---德玛西亚的用户

发送请求:http://localhost:9666/redistest/redisPubSub?msg=goods---德玛西亚的用商品

结果:

消息处理器2>我处理商品信息:goods---德玛西亚的用商品

原文地址:https://www.cnblogs.com/sxdcgaq8080/p/10953693.html