记一次使用RedisTeamplate 操作Pipeline

需求背景

当前有个需求,需要将一份过滤出来的数据文件,按照一定的格式导入redis中,之后做数据资源池使用。由于文件数据比较大,有1000w行左右。所以使用redis的pipeline管道去分批写入redis

什么是Pipeline?

首先先来介绍一下pipeline:
Pipeline指的是管道技术,指的是客户端允许将多个请求依次发给服务器,过程中而不需要等待请求的回复,在最后再一并读取结果即可。


下面借鉴一下别人的图来说明一下为什么pipeline速度会很快。

说白了,普通请求过程就是一次一次去redis-server端,而当client发送请求之后就会阻塞,并等待server响应之后再去处理下一次请求,当数据量大,且网络波动明显时,耗时便会非常严重。

而pipeline管道则是将多次请求一次性发给server端,server端将多条命令执行完毕,一次性返回,大大减少了多次往返的网络消耗。

图片名称 图片名称

redis准备工作

引入redis依赖

这里我用的是springboot

 <!--springboot redis-->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <!--lettuce连接池需要此依赖-->
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
        <version>2.6.0</version>
    </dependency>

配置文件配置redis参数以及连接设置

这里使用的是lettuce连接池
springboot对连接池的使用非常智能,配置文件中添加lettuce.pool相关配置,则会使用到lettuce连接池,并将相关配置设置为连接池相关参数,这里需要用到上边的commons-pool2依赖

spring:
redis:
  lettuce:
    pool:
      MaxTotal: 50
      minIdle: 1
      maxWaitMillis: 5000
      maxIdle: 5
      testOnBorrow: true
      testOnReturn: true
      testWhileIdle: true
  mac-resource:
    database: 19
    hostName: r-uf6rnu5b4xxxxxxxapd.redis.rds.aliyuncs.com
    port: 6379
    timeout: 5000
    password: kxohxxxxxxxxyL

配置RedisTemplate

package com.hao.redistest.redistemplatetest.config;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import java.time.Duration;

/**
 * @author hao
 * @date 2020/4/27
 */
@Configuration
public class RedisConfig {

    //redis连接池参数设置
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.lettuce.pool")
    public GenericObjectPoolConfig redisPoolConfig() {
        return new GenericObjectPoolConfig();
    }

    //根据配置文件的mac-resource读取 reids资源配置
    @Bean
    @ConfigurationProperties(prefix = "spring.redis.mac-resource")
    public RedisStandaloneConfiguration macResourceConfiguration() {
        return new RedisStandaloneConfiguration();
    }

    //使用lettuceConnectionFactory连接redis
    @Bean
    public LettuceConnectionFactory macResourceFactory() {
        GenericObjectPoolConfig redisPoolConfig = redisPoolConfig();
        LettuceClientConfiguration clientConfiguration = LettucePoolingClientConfiguration.builder()
                .poolConfig(redisPoolConfig).commandTimeout(Duration.ofMillis(redisPoolConfig.getMaxWaitMillis())).build();
        return new LettuceConnectionFactory(macResourceConfiguration(), clientConfiguration);
    }

    //配置RedisTemplate
    @Bean("macResourceRedisTemplate")
    public RedisTemplate<String,String> macResourceRedisTemplate() {
        LettuceConnectionFactory macResourcePoolFactory = macResourceFactory();
        RedisTemplate<String,String> redisTemplate = new RedisTemplate<>();
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        redisTemplate.setKeySerializer(stringRedisSerializer);
        redisTemplate.setHashKeySerializer(stringRedisSerializer);
        redisTemplate.setValueSerializer(stringRedisSerializer);
        redisTemplate.setConnectionFactory(macResourcePoolFactory);
        return redisTemplate;
    }
}

pipeline使用

batchSave方法为pipeline写入redis方法

@Service
public class TestServiceImpl implements TestService {

    @Resource(name = "macResourceRedisTemplate")
    private RedisTemplate<String, String> macResourceRedisTemplate;

    /**
     * 读取文件并写入redis
     * @param path 要写入的文件
     */
    @Override
    public void writeToRedis(String path) {
        try {
            //通过参数path读取要写入的文件
            FileReader fr = new FileReader(path);
            BufferedReader br = new BufferedReader(fr);
            String line = "";
            String[] resourceArr = null;
            AtomicInteger inc = new AtomicInteger();
            //分批执行
            int batchSize = 20000;
            List<Map<String,String>> batch = new ArrayList<>(batchSize);
            while ((line = br.readLine()) != null) {
                //这里是我读取写入的规则,大家可以按自己的规则来
                resourceArr = line.split(",");
                String key = resourceArr[0] + "_" + resourceArr[4];
                Map<String,Object> element = new HashMap<>(1);
                element.put("param1", resourceArr[1]);
                element.put("param2", resourceArr[2]);
                element.put("param3", resourceArr[7]);
                Map<String,String> kv = new HashMap<>();
                kv.put(key, JSON.toJSONString(element));
                batch.add(kv)   ;
                if(batch.size() % batchSize == 0 ){
                    List<Map<String,String>> toSaveBatch = new ArrayList<>(batch);
                    try{
                        //到达设定的batchSize进行pipeline写入
                        batchSave(toSaveBatch,inc);
                        batch = new ArrayList<>(batchSize);
                    }catch (Exception ex ){
                        for(Map<String,String> m :toSaveBatch){
                            for(Map.Entry<String,String> entry : m.entrySet()){
                                FileUtils.writeStringToFile(new File("tmp/mac_error.txt"),entry.getKey() + "@" + entry.getValue() + "
", StandardCharsets.UTF_8,true);
                            }
                        }
                        throw new RuntimeException(ex);
                    }
                }
            }
            br.close();
            fr.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 分批用pipeline写入redis
     * @param batch
     * @param inc
     */
    private void batchSave(List<Map<String,String>> batch,AtomicInteger inc ){
        //调用redisTemplate的executePipelined  重新内部的doInRedis方法,这里用lambda语法写的 隐藏掉了
        macResourceRedisTemplate.executePipelined((RedisCallback<Object>) redisConnection -> {
            //打开pipeline管道
            redisConnection.openPipeline();
            for(Map<String,String> e : batch){
                for(Map.Entry<String,String> entry : e.entrySet() ){
                    try {
                        //遍历集合数据,通过pipeline推入redis
                        redisConnection.lPush(entry.getKey().getBytes(),entry.getValue().getBytes());
                    }catch (Exception ex){
                        System.out.println("key:" + entry.getKey() + ",value: " + entry.getValue());
                        throw new RuntimeException(ex);
                    }
                    System.out.println(inc.incrementAndGet());
                }
            }
            return null;
        });
    }
}

启动项目访问入口

http://localhost:9999/api/write-redis?path=/Users/hao/Desktop/xxxxx.txt

10秒左右已经写了20多w

图片名称

再看看redis中,已经有了写入数据

图片名称

项目地址

github: redis-pipeline-demo

参考内容

https://www.cnblogs.com/littleatp/p/8419796.html
https://blog.csdn.net/ouyang111222/article/details/50942893

原文地址:https://www.cnblogs.com/zhushenghao/p/12795411.html