配置2个redis 分别用户缓存和队列

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.stereotype.Component;


/**
 * @author 
 * redis配置类
 */
@Configuration
@EnableCaching
@Component
public class RedisMainConfig extends CachingConfigurerSupport{
    /**
     * 配置lettuce连接池
     * @return
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
    public GenericObjectPoolConfig redisPool() {
        return new GenericObjectPoolConfig<>();
    }

    /**
     * 配置第一个数据源的
     * @return
     */
    @Bean
    @Primary
    public RedisStandaloneConfiguration redisConfig(@Value("${spring.redis.host}") String host, @Value("${spring.redis.port}") int port
            , @Value("${spring.redis.database}") int db, @Value("${spring.redis.password}") String password) {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
        redisStandaloneConfiguration.setDatabase(db);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        return redisStandaloneConfiguration;
    }
    /**
     * 配置第一个数据源的连接工厂
     * 这里注意:需要添加@Primary 指定bean的名称,目的是为了创建两个不同名称的LettuceConnectionFactory
     *
     * @param config
     * @param redisConfig
     * @return
     */
    @Bean("cachefactory")
    @Primary
    public LettuceConnectionFactory cachefactory(GenericObjectPoolConfig config, RedisStandaloneConfiguration redisConfig) {
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
        return new LettuceConnectionFactory(redisConfig, clientConfiguration);
    }
    /**
     * 配置第一个数据源的RedisTemplate
     * 注意:这里指定使用名称=factory 的 RedisConnectionFactory
     * 并且标识第一个数据源是默认数据源 @Primary
     *
     * @param cachefactory
     * @return
     */
    @Bean("redisTemplate")
    @Primary
    public RedisTemplate<String, String> redisTemplate(@Qualifier("cachefactory") RedisConnectionFactory cachefactory) {
        return getStringStringRedisTemplate(cachefactory);
    }


    /**
     * 配置第二个数据源
     * @return
     */
    @Bean
    public RedisStandaloneConfiguration mqRedisConfig(@Value("${spring.mq.host}") String host, @Value("${spring.mq.port}") int port
            , @Value("${spring.mq.database}") int db, @Value("${spring.mq.password}") String password) {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
        redisStandaloneConfiguration.setDatabase(db);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        return redisStandaloneConfiguration;
    }
    /**
     * 配置第二个数据源的连接工厂
     * @param config
     * @param mqRedisConfig
     * @return
     */
    @Bean("mqfactory")
    public LettuceConnectionFactory mqfactory(GenericObjectPoolConfig config, @Qualifier("mqRedisConfig")RedisStandaloneConfiguration mqRedisConfig) {
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(config).build();
        return new LettuceConnectionFactory(mqRedisConfig, clientConfiguration);
    }
    /**
     * 配置第二个数据源的RedisTemplate
     * 注意:这里指定使用名称=mqfactory 的 RedisConnectionFactory
     *
     * @param mqfactory
     * @return
     */
    @Bean("mqRedisTemplate")
    public StringRedisTemplate mqRedisTemplate(@Qualifier("mqfactory") RedisConnectionFactory mqfactory) {
        StringRedisTemplate template = new StringRedisTemplate(mqfactory);
        return template;
    }



    /**
     * 设置序列化方式 (这一步不是必须的)
     * @param factory
     * @return
     */
    private RedisTemplate<String, String> getStringStringRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate template = new StringRedisTemplate(factory);
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

配置文件 是yml文件

spring:
  redis:
    database: 0
    host: 10.100.2.246
    port: 6379
    password: 123456
    lettuce:
      pool:
        max-active: 8
        max-wait: -1
        max-idle: 8
        min-idle: 0
        timeout: 30000
  mq:
    database: 0
    host: 10.100.2.246
    port: 6380
    password: 123456
消息队列生产者
import lombok.extern.log4j.Log4j;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;

/**
 * ClassName:@Publisher
 * Description:消息队列生产者
 **/
@Component("mqPublisher")
@Slf4j
public class Publisher {

    @Resource(name = "mqRedisTemplate")
    private StringRedisTemplate redisTemplate;

    /**
     * 发送消息
     * @param topic
     * @param msg
     * @return
     */
    public boolean sendMessage(String topic,String msg) {
        log.info("publisher msg,topic:" + topic + ",msg:" + msg);
        try {
            redisTemplate.convertAndSend(topic, msg);
            return true;
        } catch (Exception e) {
            log.error("publisher msg,error:" + e.toString() );
            return false;
        }
    }

}

监听队列

import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.PatternTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;

@Configuration
@AutoConfigureAfter({Receiver.class})//主要是控制类的加载顺序,即 指定的类加载完了,再加载本类
public class SubscriberConfig {

    //先同步数据 根据同步的返回结果 再视情况推送

    /**
     * 创建消息监听容器
     * @param redisConnectionFactory
     * @param messageListenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer getRedisMessageListenerContainer(@Qualifier("mqfactory") RedisConnectionFactory redisConnectionFactory,
                                                                          MessageListenerAdapter messageListenerAdapter,MessageListenerAdapter synMessageListenerAdapter) {
        RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();

        redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);

        //监听TOPIC_ORDER的广播 messageListenerAdapter注入接受消息方法名 用于推送
        redisMessageListenerContainer.addMessageListener(messageListenerAdapter, new PatternTopic("TOPIC_ORDER"));

        //监听SYN_ORDER的广播 messageListenerAdapter注入接受消息方法名 用于同步
        redisMessageListenerContainer.addMessageListener(synMessageListenerAdapter, new PatternTopic("SYN_ORDER"));

        return redisMessageListenerContainer;
    }


    /**
     * 消息监听适配器,注入接受消息方法,输入方法名字 同步数据
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter synMessageListenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver,"synSeveiver"); //当没有继承MessageListener时需要写方法名字  synSeveiver方法名
    }

    /**
     * 消息监听适配器,注入接受消息方法,输入方法名字 推送数据
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter messageListenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver,"reveiver"); //当没有继承MessageListener时需要写方法名字  reveiver方法名
    }

}

消费者

import com.newflows.sync.service.IAsyncService;import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;

/**
 * ClassName:@Publisher
 * Description:消息队列消费者 用户推送 监听处理业务
 **/
@Component
public class Receiver{
    private static final Logger logger = LoggerFactory.getLogger(Receiver.class);

    @Resource(name = "mqRedisTemplate")
    private StringRedisTemplate mqRedisTemplate;
    @Autowired
    private IAsyncService asyncService;    

    /**
     * 用于实现同步业务
     * @param message
     */
    public void synSeveiver(String message) {
        Map<String,Object> paramMap = new ConcurrentHashMap<>();
        for (int i=0;i<1000;i++){
            asyncService.executeAsync(paramMap);
        }
    }

    /**
     * 用于实现推送
     * @param message
     */
    public void reveiver(String message){
        Log.info("进入监听mq消息队列==》推送业务==》message:" + message.toString());
        Map<String,Object> paramMap = new ConcurrentHashMap<>();
        try {
            Map<String, Object> map = ComJsonUtils.jsonToMap(message.toString());           
        }catch (Exception e){
            logger.error("系统异常",e.getMessage());
        }
    }
}
IAsyncService 接口
import java.util.Map;
public interface IAsyncService {
    public void executeAsync(Map<String, Object> paramMap);
}
AsyncServiceImpl实现类
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.Map;

/**
 * 用于异步
 * ClassName:@Publisher
 * Description:单线程进入线程池执行
 **/
@Service
public class AsyncServiceImpl implements IAsyncService {
    private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class);
  
  @Override public void executeAsync(Map<String, Object> paramMap) {
    System.out.println(paramMap);
  }
}
IRedisService接口
import java.util.List;
import java.util.Set;

public interface IRedisService {
    public boolean set(final String key, Object value) ;
    public boolean set(final String key, Object value, Long expireTime) ;
    public boolean exists(final String key);    
    public Object get(final String key) ;
    public void remove(final String key);
    public void remove(final String... keys) ;
    public void removePattern(final String pattern) ;
    public void hashSet(String key, Object hashKey, Object value);
    public Object hashGet(String key, Object hashKey);
    public void push(String k, Object v);
    public List<Object> range(String k, long l, long l1);
    public void setAdd(String key, Object value);
    public Set<Object> setMembers(String key);
    public void zAdd(String key, Object value, double scoure);
    public Set<Object> rangeByScore(String key, double scoure, double scoure1);
}

redisService实现类

import com.newflows.sync.service.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.data.redis.core.*;
import org.springframework.stereotype.Service;

import java.io.Serializable;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;

@Service("redisService")
public class RedisServiceImpl implements IRedisService {
    private Logger logger = LoggerFactory.getLogger(RedisServiceImpl.class);

    @Autowired
    @Qualifier("redisTemplate")
    private RedisTemplate redisTemplate;

    /**
     * set value
     * @param key
     * @param value
     * @return
     */
    public boolean set(final String key, Object value) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            result = true;
        } catch (Exception e) {
            logger.error("set error: key {}, value {}",key,value,e);
        }
        return result;
    }

    /**
     * set value with expireTime
     * @param key
     * @param value
     * @param expireTime
     * @return
     */
    public boolean set(final String key, Object value, Long expireTime) {
        boolean result = false;
        try {
            ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
            operations.set(key, value);
            redisTemplate.expire(key, expireTime, TimeUnit.SECONDS);
            result = true;
        } catch (Exception e) {
            logger.error("set error: key {}, value {},expireTime {}",key,value,expireTime,e);
        }
        return result;
    }

    /**
     * @param key
     * @return
     */
    public boolean exists(final String key) {
        return redisTemplate.hasKey(key);
    }

    /**
     * @param key
     * @return
     */
    public Object get(final String key) {
        Object result = null;
        ValueOperations<Serializable, Object> operations = redisTemplate.opsForValue();
        result = operations.get(key);
        return result;
    }

    /**
     * remove single key
     * @param key
     */
    public void remove(final String key) {
        if (exists(key)) {
            redisTemplate.delete(key);
        }
    }

    /**
     * batch delete
     * @param keys
     */
    public void remove(final String... keys) {
        for (String key : keys) {
            remove(key);
        }
    }

    /**
     * batch delete with pattern
     * @param pattern
     */
    public void removePattern(final String pattern) {
        Set<Serializable> keys = redisTemplate.keys(pattern);
        if (keys.size() > 0)
            redisTemplate.delete(keys);
    }

    /**
     * hash set
     * @param key
     * @param hashKey
     * @param value
     */
    public void hashSet(String key, Object hashKey, Object value){
        HashOperations<String, Object, Object> hash = redisTemplate.opsForHash();
        hash.put(key,hashKey,value);
    }

    /**
     * hash get
     * @param key
     * @param hashKey
     * @return
     */
    public Object hashGet(String key, Object hashKey){
        HashOperations<String, Object, Object>  hash = redisTemplate.opsForHash();
        return hash.get(key,hashKey);
    }

    /**
     *  list push
     * @param k
     * @param v
     */
    public void push(String k,Object v){
        ListOperations<String, Object> list = redisTemplate.opsForList();
        list.rightPush(k,v);
    }

    /**
     *  list range
     * @param k
     * @param l
     * @param l1
     * @return
     */
    public List<Object> range(String k, long l, long l1){
        ListOperations<String, Object> list = redisTemplate.opsForList();
        return list.range(k,l,l1);
    }

    /**
     *  set add
     * @param key
     * @param value
     */
    public void setAdd(String key,Object value){
        SetOperations<String, Object> set = redisTemplate.opsForSet();
        set.add(key,value);
    }

    /**
     * set get
     * @param key
     * @return
     */
    public Set<Object> setMembers(String key){
        SetOperations<String, Object> set = redisTemplate.opsForSet();
        return set.members(key);
    }

    /**
     * ordered set add
     * @param key
     * @param value
     * @param scoure
     */
    public void zAdd(String key,Object value,double scoure){
        ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
        zset.add(key,value,scoure);
    }

    /**
     * rangeByScore
     * @param key
     * @param scoure
     * @param scoure1
     * @return
     */
    public Set<Object> rangeByScore(String key, double scoure, double scoure1){
        ZSetOperations<String, Object> zset = redisTemplate.opsForZSet();
        return zset.rangeByScore(key, scoure, scoure1);
    }
}

 controller层

import com.example.copydemo.mq.Publisher;
import com.example.copydemo.service.IRedisService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 * @program: copy-demo
 * @description: 推送测试
 * @author: Gaojq
 * @create: 2020-01-15 09:05
 **/
@Controller
@RequestMapping("test")
public class PushController {

    private static final Logger logger = LoggerFactory.getLogger(PushController.class);

    @Autowired
    private IRedisService redisService;

    @Autowired
    private Publisher publisher;

    @RequestMapping(value = "/publish")
    @ResponseBody
    public String publish() {
        boolean flag = publisher.sendMessage("SYN_ORDER", "1");
        return "ok";
    }

    @RequestMapping("redis")
    @ResponseBody
    public String redisCache() {
        redisService.set("name","gaojq");
        String name = (String) redisService.get("name");
        logger.info("name="+name);
        return name;
    }

}

 pom.xml

 
原文地址:https://www.cnblogs.com/gjq1126-web/p/12193489.html