Springboot整合Redis(默认Lettuce版本)、自定义Reids客户端

----------------------------导航目录--------------------------------

一、简单使用,序列化开启缓存机制使用

二、lua脚本实现Reids分布式锁

三、reids客户端管道使用

四、reids订阅发布应用

-------------------------------------------------------------------

一、简单使用,序列化开启缓存机制使用

1、引入pom文件

  注意版本号的区别哦~,小心采坑,根据需要选型版本号springboot客户端reids版本选择

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    <version>2.1.11.RELEASE</version>
</dependency>

2、添加配置文件application.yml或者properties

spring:
  redis:
    database: 0
    host: localhost
    port: 6379
   # 连接超时时间 单位 ms(毫秒)
    timeout: 3000
    password:
    lettuce:
      pool:
        #连接池中的最大空闲连接,默认值也是0。
        max-idle: 8
        #连接池中的最小空闲连接,默认值也是0。
        min-idle: 0
        # 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        max-active: 8
        # 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出异常
        max-wait: -1

单纯使用的话到这里就可以直接注入RedisTemplate对象使用了,我这里对接下来的使用序列化和缓存注解的使用进行了配置

3、创建Reids客户端,并指定序列化方式并且开启缓存注解的使用

  第一种,使用springboot封装的RedisTemplate(推荐)

  这里引入了3个类如下:

  CacheTtl 枚举类型用来定义缓存的时间

  FastJsonRedisSerializer指定序列化方式,注意引入的包用户排除一下问题

  RedisConfig 配置RedisTemplate对象的序列化和cache管理对象 。注解使用:SpringBoot进阶教程(五十三)整合Redis之@Cacheable、@CachePut、@CacheEvict的应用

package com.niu.reids;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  3:26 PM
 */
public enum CacheTtl {

    /**
     * 1小时
     */
    ONE_HOUR("oneHour", 1),
    /**
     * 1天
     */
    ONE_DAY("oneDay", 24),
    /**
     * 2天
     */
    TWO_DAYS("twoDays", 48),
    /**
     * 1周
     */
    ONE_WEEK("oneWeek", 168);

    private String value;

    private Integer time;

    CacheTtl(String value, Integer time) {
        this.value = value;
        this.time = time;
    }

    public String getValue() {
        return value;
    }

    public Integer getTime() {
        return time;
    }
}
View Code
package com.niu.reids;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.parser.ParserConfig;
import com.alibaba.fastjson.serializer.SerializerFeature;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.SerializationException;
import org.springframework.lang.Nullable;

import java.nio.charset.Charset;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  3:19 PM
 */
public class FastJsonRedisSerializer<T> implements RedisSerializer<T> {

    public static final Charset DEFAULT_CHARSET = Charset.forName("UTF-8");

    private Class<T> clazz;

    /**
     *  解决fastJson autoType is not support错误
     */
    static {
        ParserConfig.getGlobalInstance().addAccept("com.niu");
    }

    public FastJsonRedisSerializer(Class<T> clazz) {
        super();
        this.clazz = clazz;
    }

    /**
     * 指定序列化方式,用与@Cacheable 注解序列化
     * @param t
     * @return
     * @throws SerializationException
     */
    @Nullable
    @Override
    public byte[] serialize(T t) throws SerializationException {
        if (t == null) {
            return new byte[0];
        }
        return JSON.toJSONString(t, SerializerFeature.WriteClassName).getBytes(DEFAULT_CHARSET);
    }

    /**
     * 反序列化方式
     * @param bytes
     * @return
     * @throws SerializationException
     */
    @Nullable
    @Override
    public T deserialize(byte[] bytes) throws SerializationException {
        if (bytes == null || bytes.length <= 0) {
            return null;
        }
        String str = new String(bytes, DEFAULT_CHARSET);
        return JSON.parseObject(str, clazz);
    }

}
View Code
package com.niu.reids;

import com.alibaba.fastjson.support.spring.FastJsonRedisSerializer;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.cache.RedisCacheConfiguration;
import org.springframework.data.redis.cache.RedisCacheManager;
import org.springframework.data.redis.cache.RedisCacheWriter;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  3:19 PM
 */
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

    /**
     * 配置reids 对象
     *
     * @param factory
     * @return
     */
    @Bean
    public RedisTemplate redisTemplate(LettuceConnectionFactory factory) {
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(getJsonRedisSerializer());
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    /**
     * 创建缓存管理器对象
     *
     * @param factory
     * @return
     */
    @Bean
    public CacheManager cacheManager(LettuceConnectionFactory factory) {
        // 默认过期时间1小时
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig().disableKeyPrefix()
                .entryTtl(Duration.ofHours(1))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJsonRedisSerializer()));
        return RedisCacheManager
                .builder(RedisCacheWriter.nonLockingRedisCacheWriter(factory))
                .cacheDefaults(redisCacheConfiguration)
                .withInitialCacheConfigurations(getRedisCacheConfigurationMap())
                .build();
    }

    /**
     * 创建缓存注解中的value属性集合 @Cacheable(value = "oneDay", key = "'b_v'", unless = "#result == null")
     *
     * @return
     */
    private Map<String, RedisCacheConfiguration> getRedisCacheConfigurationMap() {
        Map<String, RedisCacheConfiguration> redisCacheConfigurationMap = new HashMap<>(CacheTtl.values().length);
        for (CacheTtl cacheTtl : CacheTtl.values()) {
            redisCacheConfigurationMap.put(cacheTtl.getValue(), this.getRedisCacheConfigurationWithTtl(1));
        }
        return redisCacheConfigurationMap;
    }

    /**
     * 通过事件创建缓存配置对象,并且指定序列化方式
     *
     * @param hours
     * @return
     */
    private RedisCacheConfiguration getRedisCacheConfigurationWithTtl(Integer hours) {
        RedisCacheConfiguration redisCacheConfiguration = RedisCacheConfiguration.defaultCacheConfig().disableKeyPrefix()
                .entryTtl(Duration.ofHours(hours))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(getJsonRedisSerializer()));
        return redisCacheConfiguration;
    }

    /**
     * 创建序列化对象使用fastJson
     *
     * @return
     */
    private FastJsonRedisSerializer getJsonRedisSerializer() {
        return new FastJsonRedisSerializer<>(Object.class);
    }

}
View Code

  第二种、自定义客户端,也是基于RedisTemplate自定义。

  由于自定义需要引入commons-pool2 创建链接池,广告->Apache Commons Lang 3.10使用简介 

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
            <version>2.5.0</version>
        </dependency>
View Code

  然后创建MyRedisTemplate类创建客户端,当然这是简单配置 

package com.niu.reids;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  3:50 PM
 */
@Configuration
public class MyRedisTemplate {

    @Value("${spring.redis.database}")
    Integer database;
    @Value("${spring.redis.host}")
    String host;
    @Value("${spring.redis.port}")
    Integer port;
    @Value("${spring.redis.password}")
    String password;
    @Value("${spring.redis.lettuce.pool.max-active}")
    Integer maxActive;
    @Value("${spring.redis.lettuce.pool.max-wait}")
    Long maxWait;
    @Value("${spring.redis.lettuce.pool.max-idle}")
    Integer maxIdle;
    @Value("${spring.redis.lettuce.pool.min-idle}")
    Integer minIdle;
    @Value("${spring.redis.timeout}")
    String timeout;

    @Bean("backupRedisTemplate")
    public RedisTemplate backupRedisTemplate() {
        //定义redis链接池
        GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
        poolConfig.setMaxTotal(maxActive);
        poolConfig.setMaxWaitMillis(maxWait);
        poolConfig.setMaxIdle(maxIdle);
        poolConfig.setMinIdle(minIdle);
        //配置对象
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setDatabase(database);
        config.setHostName(host);
        config.setPort(port);
        config.setPassword(password);
        //创建Lettuce工厂
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder().poolConfig(poolConfig).build();
        LettuceConnectionFactory factory = new LettuceConnectionFactory(config, clientConfiguration);
        factory.afterPropertiesSet();
        //创建客户端并指定序列化方式
        RedisTemplate redisTemplate = new RedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        //这里序列化方式自己定义,
        redisTemplate.setHashValueSerializer(new FastJsonRedisSerializer<>(Object.class));
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}
View Code

4、这样就可以使用了,自定义也可以添加 @Qualifier("myRedisTemplate")指定加载的客户端对象

   @Autowired
    private RedisTemplate redisTemplate;

二、lua脚本实现Reids分布式锁

分布式锁就需要保持其原子性,这里使用lua脚本完成,Lua脚本在redis分布式锁场景的运用

package com.niu.reids;

import org.springframework.data.redis.connection.RedisStringCommands;
import org.springframework.data.redis.connection.ReturnType;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.types.Expiration;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  4:03 PM
 */
public class RedisLockService {

    private RedisTemplate<String, String> redisTemplate;

    private static final String UNLOCK_LUA;

    private final static int NUM_KEYS = 1;

    static {
        StringBuilder sb = new StringBuilder();
        sb.append("if redis.call('get', KEYS[1]) == ARGV[1] then");
        sb.append(" return redis.call('del', KEYS[1])");
        sb.append(" else return 0 end");
        UNLOCK_LUA = sb.toString();
    }

    public RedisLockService(RedisTemplate<String, String> redisTemplate) {
        this.redisTemplate = redisTemplate;
    }

    /**
     * 尝试获取分布式锁
     *
     * @param key        锁名称
     * @param val        请求标识
     * @param expireTime 超期时间 秒
     * @return 是否获取成功
     */
    public boolean tryLock(String key, String val, int expireTime) {
        RedisCallback<Boolean> callback = (connection) ->
                connection.set(key.getBytes(), val.getBytes(), Expiration.seconds(expireTime), RedisStringCommands.SetOption.SET_IF_ABSENT);
        boolean result = redisTemplate.execute(callback);
        return result;
    }

    /**
     * 释放分布式锁
     *
     * @param key 锁名称
     * @param val 请求标识 只有标识相同才能解锁
     * @return 是否释放成功
     */
    public boolean releaseLock(String key, String val) {
        RedisCallback<Long> callback = (connection) ->
                connection.eval(UNLOCK_LUA.getBytes(), ReturnType.INTEGER, NUM_KEYS, key.getBytes(), val.getBytes());
        Long result = redisTemplate.execute(callback);
        return result != null && result > 0;
    }
}

三、reids客户端管道使用

  使用executePipelined方法实现RedisCallback接口,并且指定泛型为Integer,具体业务根据业务对象来实现

    /**
     * 插入多条数据
     *
     * @param saveList
     * @param unit
     * @param timeout
     */
    public void batchInsert(List<Map<String, String>> saveList, TimeUnit unit, int timeout) {
        redisTemplate.executePipelined(new SessionCallback<Object>() {
            @Override
            public <K, V> Object execute(RedisOperations<K, V> redisOperations) throws DataAccessException {
                ValueOperations<String, String> value = (ValueOperations<String, String>) redisOperations.opsForValue();
                for (Map<String, String> needSave : saveList) {
                    value.set(needSave.get("key"), needSave.get("value"), timeout, unit);
                }
                return null;
            }
        });
    }

    /**
     * 批量获取多条数据
     *
     * @param keyList
     * @return
     */
    public List<String> batchGet(List<String> keyList) {
        List<Object> objects = redisTemplate.executePipelined(new RedisCallback<String>() {
            @Override
            public String doInRedis(RedisConnection redisConnection) throws DataAccessException {
                StringRedisConnection stringRedisConnection = (StringRedisConnection) redisConnection;
                for (String key : keyList) {
                    stringRedisConnection.get(key);
                }
                return null;
            }
        }, redisTemplate.getValueSerializer());

        List<String> collect = objects.stream().map(val -> String.valueOf(val)).collect(Collectors.toList());
        return collect;
    }

  异常问题:java.lang.ClassCastException: com.sun.proxy.$Proxy77 cannot be cast to org.springframework.data.redis.connection.StringRedisConnection。

  原因是:RedisTemplate的泛型不正确,导致没办法强转。

  处理方案:如下面一样泛型的key和value为String类型。

  @Autowired
    private RedisTemplate<String,String> redisTemplate;

四、redis订阅与发布

1、 会用到的方法与类

  *  RedisMessageListenerContainer Redis订阅发布的监听容器,你的消息发布、订阅配置都必须在这里面实现

  * addMessageListener(MessageListenerAdapter,PatternTopic) 新增订阅频道及订阅者,订阅者必须有相关方法处理收到的消息。
  * setTopicSerializer(RedisSerializer) 对频道内容进行序列化解析

  MessageListenerAdapter 监听适配器

  MessageListenerAdapter(Object , defaultListenerMethod) 订阅者及其方法

  redisTemplate redis模版类中:convertAndSend(String channel, Object message) 消息发布

 2、创建两个Bean,一个是注册监听适配器订阅频道,一个创建监听适配器

    /**
     * 注册监听适配器,订阅频道
     * @param connectionFactory
     * @param listenerAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 订阅频道
        container.addMessageListener(listenerAdapter, new PatternTopic("topic_test"));
        //指定频道内容序列化
        container.setTopicSerializer(getJsonRedisSerializer());
        return container;
    }

    /**
     * 创建监听适配器
     * @param receiver
     * @return
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(MessageReceiver receiver) {
        //创建监听适配器,指定订阅者及其方法
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

3、创建订阅者

package com.niu.reids;

import org.springframework.stereotype.Component;

/**
 * @author niunafei
 * @function
 * @email niunafei0315@163.com
 * @date 2020/5/19  6:17 PM
 */
@Component
public class MessageReceiver {

    public void receiveMessage(String message, String channel) {
        System.out.println(Thread.currentThread().getName()+" topic: "+channel + " 收到消息: " + message);
    }
}

4、进行发送消息,这里引入springboot的web包使用http请求添加信息,也可以使用单元测试。重点是for循环中的代码,指定频道进行发送

  @GetMapping("/send")
    public Object send() {
        List<String> keys = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            redisTemplate.convertAndSend("topic_test", "测试"+i);
        }
        return "ok";
    }

看看结果吧

maven配置阿里云仓库镜像,加速下载maven依赖

SpringBoot进阶教程(五十四)整合Redis之共享Session

原文地址:https://www.cnblogs.com/niunafei/p/12917701.html