Lettuce之RedisClusterClient使用以及源码分析

  

  Redis Cluster模式简介

    redis集群并没有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽.也就是说如果key是不变的对应的slot也是不变的

  Redis 服务器命令    

  • cluster info

可以通过cluster info 命令查看集群信息

cluster info
cluster_state:ok
cluster_slots_assigned:16384
cluster_slots_ok:16384
cluster_slots_pfail:0
cluster_slots_fail:0
cluster_known_nodes:12
  • cluster nodes 

通过cluster nodes命令查看当前节点以及该节点分配的slot,如下图可以发现当前redis集群有12个节点,每个节点大约管理1365个slot

xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191

  

  • client list

Redis Client List 命令用于返回所有连接到服务器的客户端信息和统计数据。

redis 127.0.0.1:6379> CLIENT LIST 
addr=127.0.0.1:43143 fd=6 age=183 idle=0 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=32768 obl=0 oll=0 omem=0 events=r cmd=client 
addr=127.0.0.1:43163 fd=5 age=35 idle=15 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=ping
addr=127.0.0.1:43167 fd=7 age=24 idle=6 flags=N db=0 sub=0 psub=0 multi=-1 qbuf=0 qbuf-free=0 obl=0 oll=0 omem=0 events=r cmd=get

  

  •  cluster slots 

  Redis Client Slots 命令用于当前的集群状态

redis 127.0.0.1:6379> cluster slots
1) 1) (integer) 0
   2) (integer) 4095
   3) 1) "127.0.0.1"
      2) (integer) 7000
   4) 1) "127.0.0.1"
      2) (integer) 7004
2) 1) (integer) 12288
   2) (integer) 16383
   3) 1) "127.0.0.1"
      2) (integer) 7003
   4) 1) "127.0.0.1"
      2) (integer) 7007
3) 1) (integer) 4096
   2) (integer) 8191
   3) 1) "127.0.0.1"
      2) (integer) 7001
   4) 1) "127.0.0.1"
      2) (integer) 7005
4) 1) (integer) 8192
   2) (integer) 12287
   3) 1) "127.0.0.1"
      2) (integer) 7002
   4) 1) "127.0.0.1"
      2) (integer) 7006

  

  • cluster keyslot

cluster keyslot key  返回一个整数,用于标识指定键所散列到的哈希槽

cluster keyslot test
(integer) 6918

  

请求重定向

由于每个节点只负责部分slot,以及slot可能从一个节点迁移到另一节点,造成客户端有可能会向错误的节点发起请求。因此需要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不同的重定向场景:

  • MOVED

         声明的是slot所有权的转移,收到的客户端需要更新其key-node映射关系

  • ASK

         申明的是一种临时的状态.在重新进行分片期间,源节点向目标节点迁移一个slot过程中,可能会出现这样一种情况:属于被迁移slot的一部分键值对保存在源节点里面,一部分保存在目标节点里面.当客户端向源节点发送一个与键有关的命令,并且这个键企恰好被迁移到目标节点,则向客户端返回一个ASK错误.因为这个节点还在处于迁移过程中,所有权还没有转移,所以客户端在接收到ASK错误后,需要在目标节点执行命令前,先发送一个ASKING命令,如果不发放该命令到话,则会返回MOVED错误,ASKING表示已经知道迁移状态,则会执行该命令.

通过集群查询数据key为test的值 redis-cli为单机模式;如果为集群模式时(redis-cli -c) 接收到MOVED 错误时是不会打印MOVED错误,而是根据MOVED信息自动重定向到正确节点,并打印出重定向信息

xx.xxx.xxx.xx:6959> get test
(error) MOVED 6918 xx.xxx.xx.xxx:6956  

  此时返回的结果表示该key在6956这个实例上,通过这个实例可以获取到缓存值

xx.xxx.xx.xxx:6956> get test
"cluster"

  通过上文的示例可以发现获取缓存值的过程需要访问cluster两次,既然key到slot值的算法是已知的,如果可以通过key直接计算slot,在通过每个节点的管理的slot范围就可以知道这个key对应哪个节点了,这样不就可以一次获取到了吗?其实lettuce中就是这样处理的.下文会有详细介绍

    如果mget操作值跨slot时会怎样呢? 

mget test test1
(error) CROSSSLOT Keys in request don't hash to the same slot

Lettuce使用

    @Bean(name="clusterRedisURI")
    RedisURI clusterRedisURI(){
        return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build();
    }
  //配置集群选项,自动重连,最多重定型1次
    @Bean
    ClusterClientOptions clusterClientOptions(){
        return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build();
    }

//创建集群客户端 @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集群连接 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }

  Lettuce在Spring 中的使用通过上文中的配置方式进行配置后就可以使用了

  1. 通过StatefulRedisClusterConnection获取命令处理方式,同步,异步以及响应式
  2. 执行redis相关命令

  

Lettuce相关源码

     lettuce的使用方式还是很简单的那么它的处理过程到底是怎样的呢?下面将通过源码进行解析.

通过上文可以知道连接是通过RedisClusterClient创建的,它默认使用了StringCodec(LettuceCharsets.UTF8)作为编码器创建连接

 public StatefulRedisClusterConnection<String, String> connect() {
        return connect(newStringStringCodec());
    }

  

     在创建连接时就会主动发现集群拓扑信息,在第一次创建的时候partitions一定为null则此时需要初始化分区信息

  <K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) {
         //如果分区信息为null则初始化分区信息
        if (partitions == null) {
            initializePartitions();
        }
        //如果需要就激活拓扑刷新
        activateTopologyRefreshIfNeeded();

 初始化集群分片信息,就是将加载分片信息赋值给partitions属性 

 protected void initializePartitions() {
        this.partitions = loadPartitions();
    }

  具体加载分片信息处理过程如下:

  protected Partitions loadPartitions() {
        //获取拓扑刷新信息,
        Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource();

        String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource;
        try {
            //加载拓扑信息
            Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());

第一次可以知道partitions为null则此时需要初始化种子节点的,那么它的种子节点又是什么呢?通过代码可以发现种子节点就是初始化的URI,那么它又是什么时候设置的呢?

protected Iterable<RedisURI> getTopologyRefreshSource() {

        //是否初始化种子节点
        boolean initialSeedNodes = !useDynamicRefreshSources();

        Iterable<RedisURI> seed;
        //如果需要初始化种子节点或分区信息为null或分区信息为空 则将初始URI赋值给种子
        if (initialSeedNodes || partitions == null || partitions.isEmpty()) {
            seed = RedisClusterClient.this.initialUris;
        } else {//不需要初始化种子节点
            List<RedisURI> uris = new ArrayList<>();
            for (RedisClusterNode partition : TopologyComparators.sortByUri(partitions)) {
                uris.add(partition.getUri());
            }
            seed = uris;
        }
        return seed;
    }

  通过如下代码可以发现种子节点是在创建redisClusterClient的时候指定的

 protected RedisClusterClient(ClientResources clientResources, Iterable<RedisURI> redisURIs) {

        super(clientResources);

        assertNotEmpty(redisURIs);
        assertSameOptions(redisURIs);
        //初始化节点
        this.initialUris = Collections.unmodifiableList(LettuceLists.newList(redisURIs));
         //根据第一个URI的超时时间作为默认超时时间
        setDefaultTimeout(getFirstUri().getTimeout());
        setOptions(ClusterClientOptions.builder().build());
    }

  默认使用动态刷新

 protected boolean useDynamicRefreshSources() {

        //如果集群客户端选项不为null
        if (getClusterClientOptions() != null) {
            //获取集群拓扑刷新选项
            ClusterTopologyRefreshOptions topologyRefreshOptions = getClusterClientOptions().getTopologyRefreshOptions();
            //返回集群拓扑刷新选项中配置到是否使用动态刷新
            return topologyRefreshOptions.useDynamicRefreshSources();
        }
        //默认动态刷新
        return true;
    }

  下面看看加载分区信息的处理过程,第一次则根据种子节点的连接获取整个集群的拓扑信息

 public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) {

        //获取超时时间,默认60秒
        long commandTimeoutNs = getCommandTimeoutNs(seed);

        Connections connections = null;
        try {
            //获取所有种子连接
            connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS);
            //Requests将异步执行命令封装到多个节点
   //cluster nodes Requests requestedTopology = connections.requestTopology();
//client list Requests requestedClients = connections.requestClients(); //获取节点拓扑视图 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找额外节点 //获取集群节点 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除种子节点,得到需要发现节点 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //如果需要发现节点不为空 if (!discoveredNodes.isEmpty()) { //需要发现节点连接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合并连接 connections = connections.mergeWith(discoveredConnections); //合并请求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //获取节点视图 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri对应分区信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }

  

     这样在创建connection的时候就已经知道集群中的所有有效节点.根据之前的文章可以知道对于集群命令的处理是在ClusterDistributionChannelWriter中处理的.其中有一些信息在初始化writer的时候就初始化了

class ClusterDistributionChannelWriter implements RedisChannelWriter {
    //默认写入器
    private final RedisChannelWriter defaultWriter;
    //集群事件监听器
    private final ClusterEventListener clusterEventListener;
    private final int executionLimit;
    //集群连接提供器
    private ClusterConnectionProvider clusterConnectionProvider;
    //异步集群连接提供器
    private AsyncClusterConnectionProvider asyncClusterConnectionProvider;
    //是否关闭
    private boolean closed = false;
    //分区信息
    private volatile Partitions partitions;

  写命令的处理如下,会根据key计算出slot,进而找到这个slot对应的node,直接访问这个node,这样可以有效减少访问cluster次数

public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) {

        LettuceAssert.notNull(command, "Command must not be null");
        //如果连接已经关闭则抛出异常
        if (closed) {
            throw new RedisException("Connection is closed");
        }
        //如果是集群命令且命令没有处理完毕
        if (command instanceof ClusterCommand && !command.isDone()) {
            //类型转换, 转换为ClusterCommand
            ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command;
            if (clusterCommand.isMoved() || clusterCommand.isAsk()) {

                HostAndPort target;
                boolean asking;
                //如果集群命令已经迁移,此时通过ClusterCommand中到重试操作进行到此
                if (clusterCommand.isMoved()) {
                    //获取命令迁移目标节点
                    target = getMoveTarget(clusterCommand.getError());
                    //触发迁移事件
                    clusterEventListener.onMovedRedirection();
                    asking = false;
                } else {//如果是ask
                    target = getAskTarget(clusterCommand.getError());
                    asking = true;
                    clusterEventListener.onAskRedirection();
                }

                command.getOutput().setError((String) null);
                //连接迁移后的目标节点
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider
                        .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort());
                //成功建立连接,则向该节点发送命令
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(command, asking, connectFuture.join(), null);
                } else {
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable));
                }

                return command;
            }
        }
        //不是集群命令就是RedisCommand,第一个请求命令就是非ClusterCommand
         //将当前命令包装为集群命令
        ClusterCommand<K, V, T> commandToSend = getCommandToSend(command);
        //获取命令参数
        CommandArgs<K, V> args = command.getArgs();

        //排除集群路由的cluster命令
        if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) {
            //获取第一个编码后的key
            ByteBuffer encodedKey = args.getFirstEncodedKey();
            //如果encodedKey不为null
            if (encodedKey != null) {
                //获取slot值
                int hash = getSlot(encodedKey);
                //根据命令类型获取命令意图 是读还是写
                ClusterConnectionProvider.Intent intent = getIntent(command.getType());
                //根据意图和slot获取连接
                CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider)
                        .getConnectionAsync(intent, hash);
                //如果成功获取连接
                if (isSuccessfullyCompleted(connectFuture)) {
                    writeCommand(commandToSend, false, connectFuture.join(), null);
                } else {//如果连接尚未处理完,或有异常,则添加完成处理器
                    connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection,
                            throwable));
                }

                return commandToSend;
            }
        }

        writeCommand(commandToSend, defaultWriter);

        return commandToSend;
    }

  但是如果计算出的slot因为集群扩展导致这个slot已经不在这个节点上lettuce是如何处理的呢?通过查阅ClusterCommand源码可以发现在complete方法中对于该问题进行了处理;如果响应是MOVED则会继续访问MOVED目标节点,这个重定向的此时可以指定的,默认为5次,通过上文的配置可以发现,在配置中只允许一次重定向

 @Override
    public void complete() {
        //如果响应是MOVED或ASK
        if (isMoved() || isAsk()) {
            //如果最大重定向次数大于当前重定向次数则可以进行重定向
            boolean retryCommand = maxRedirections > redirections;
            //重定向次数自增
            redirections++;

            if (retryCommand) {
                try {
                    //重定向
                    retry.write(this);
                } catch (Exception e) {
                    completeExceptionally(e);
                }
                return;
            }
        }
        super.complete();
        completed = true;
    }

  如果是ask向重定向目标发送命令前需要同步发送asking

 private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking,
            StatefulRedisConnection<K, V> connection, Throwable throwable) {

        if (throwable != null) {
            command.completeExceptionally(throwable);
            return;
        }

        try {
            //如果需要发送asking请求,即接收到ASK错误消息,则在重定向到目标主机后需要发送asking命令
            if (asking) {
                connection.async().asking();
            }
            //发送命令
            writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter());
        } catch (Exception e) {
            command.completeExceptionally(e);
        }
    }

  

  上文主要介绍了lettuce对于单个key的处理,如果存在多个key,如mget lettuce又是如何处理的呢?其主要思路是将key根据slot进行分组,将在同一个slot的命令一起发送到对应的节点,再将所有请求的返回值合并作为最终结果.源码如下:

  @Override
    public RedisFuture<List<KeyValue<K, V>>> mget(Iterable<K> keys) {
        //获取分区和key的映射关系
        Map<Integer, List<K>> partitioned = SlotHash.partition(codec, keys);
        //如果分区数小于2也就是只有一个分区即所有key都落在一个分区就直接获取
        if (partitioned.size() < 2) {
            return super.mget(keys);
        }
        //每个key与slot映射关系
        Map<K, Integer> slots = SlotHash.getSlots(partitioned);

        Map<Integer, RedisFuture<List<KeyValue<K, V>>>> executions = new HashMap<>();
        //遍历分片信息,逐个发送
        for (Map.Entry<Integer, List<K>> entry : partitioned.entrySet()) {
            RedisFuture<List<KeyValue<K, V>>> mget = super.mget(entry.getValue());
            executions.put(entry.getKey(), mget);
        }

        //恢复key的顺序
        return new PipelinedRedisFuture<>(executions, objectPipelinedRedisFuture -> {
            List<KeyValue<K, V>> result = new ArrayList<>();
            for (K opKey : keys) {
                int slot = slots.get(opKey);

                int position = partitioned.get(slot).indexOf(opKey);
                RedisFuture<List<KeyValue<K, V>>> listRedisFuture = executions.get(slot);
                result.add(MultiNodeExecution.execute(() -> listRedisFuture.get().get(position)));
            }

            return result;
        });
    }

  

  

  

原文地址:https://www.cnblogs.com/wei-zw/p/9193630.html