一致性哈希算法(consistent hashing)

概述:

如果采用常用的hash(object)%N算法,那么在有机器添加或者删除后,映射关系就变了,很多原有的缓存就无法找到了

一致性hash:添加删除机器前后映射关系一致,当然,不是严格一致。实现的关键是环形Hash空间。将数据和机器都hash到环上,数据映射到顺时针离自己最近的机器中。

 一致性hash单调性体现在: 
无论是新增主机还是删除主机,被影响的都是离那台主机最近的那些节点,其他节点映射关系没有影响

    一致性哈希算法在1997年由麻省理工学院提出的一种分布式哈希(DHT)实现算法,设计目标是为了解决因特网中的热点(Hot spot)问题,初衷和CARP十分类似。一致性哈希修正了CARP使用的简 单哈希算法带来的问题,使得分布式哈希(DHT)可以在P2P环境中真正得到应用。 

    一致性hash算法提出了在动态变化的Cache环境中,判定哈希算法好坏的四个定义:

1、平衡性(Balance):平衡性是指哈希的结果能够尽可能分布到所有的缓冲中去,这样可以使得所有的缓冲空间都得到利用。很多哈希算法都能够满足这一条件。(虚拟节点解决

2、单调性(Monotonicity):单调性是指如果已经有一些内容通过哈希分派到了相应的缓冲中,又有新的缓冲加入到系统中。哈希的结果应能够保证原有已分配的内容可以被映射到原有的或者新的缓冲中去,而不会被映射到旧的缓冲集合中的其他缓冲区。 

3、分散性(Spread):在分布式环境中,终端有可能看不到所有的缓冲,而是只能看到其中的一部分。当终端希望通过哈希过程将内容映射到缓冲上时,由于不同终端所见的缓冲范围有可能不同,从而导致哈希的结果不一致,最终的结果是相同的内容被不同的终端映射到不同的缓冲区中。这种情况显然是应该避免的,因为它导致相同内容被存储到不同缓冲中去,降低了系统存储的效率。分散性的定义就是上述情况发生的严重程度。好的哈希算法应能够尽量避免不一致的情况发生,也就是尽量降低分散性。 

4、负载(Load):负载问题实际上是从另一个角度看待分散性问题。既然不同的终端可能将相同的内容映射到不同的缓冲区中,那么对于一个特定的缓冲区而言,也可能被不同的用户映射为不同 的内容。与分散性一样,这种情况也是应当避免的,因此好的哈希算法应能够尽量降低缓冲的负荷。

我对一致性的理解是添加删除机器前后映射关系一致,当然,不是严格一致。实现的关键是环形Hash空间

    在分布式集群中,对机器的添加删除,或者机器故障后自动脱离集群这些操作是分布式集群管理最基本的功能。如果采用常用的hash(object)%N算法,那么在有机器添加或者删除后,映射关系就变了,很多原有的缓存数据就无法找到了,这样严重的违反了单调性原则(为什么使用一致性hash)一致性hash将数据和机器都hash到环上,数据映射到顺时针离自己最近的机器中解决了在这个问题

接下来主要讲解一下一致性哈希算法是如何设计的:

环形Hash空间

按照常用的hash算法来将对应的key哈希到一个具有2^32次方个桶的空间中,即0~(2^32)-1的数字空间中。现在我们可以将这些数字头尾相连,想象成一个闭合的环形。如下图

                                                                         

把数据通过一定的hash算法处理后映射到环上

现在我们将object1、object2、object3、object4四个对象通过特定的Hash函数计算出对应的key值,然后散列到Hash环上。如下图:

    Hash(object1) = key1;

    Hash(object2) = key2;

    Hash(object3) = key3;

    Hash(object4) = key4;

                                                           

将机器通过hash算法映射到环上

在采用一致性哈希算法的分布式集群中将新的机器加入,其原理是通过使用与对象存储一样的Hash算法将机器也映射到环中(一般情况下对机器的hash计算是采用机器的IP或者机器唯一的别名作为输入值),然后以顺时针的方向计算,将所有对象存储到离自己最近的机器中。(将数据和机器都hash到环上,数据映射到顺时针离自己最近的机器中。

假设现在有NODE1,NODE2,NODE3三台机器,通过Hash算法得到对应的KEY值,映射到环中,其示意图如下:

Hash(NODE1) = KEY1;

Hash(NODE2) = KEY2;

Hash(NODE3) = KEY3;

                                                             

通过上图可以看出对象与机器处于同一哈希空间中,这样按顺时针转动object1存储到了NODE1中,object3存储到了NODE2中,object2、object4存储到了NODE3中。在这样的部署环境中,hash环是不会变更的,因此,通过算出对象的hash值就能快速的定位到对应的机器中,这样就能找到对象真正的存储位置了。

机器的删除与添加

普通hash求余算法最为不妥的地方就是在有机器的添加或者删除之后会照成大量的对象存储位置失效,这样就大大的不满足单调性了。下面来分析一下一致性哈希算法是如何处理的。

一致性hash单调性体现在: 
无论是新增主机还是删除主机,被影响的都是离那台主机最近的那些节点,其他节点映射关系没有影响(无论添加删除只有一个节点的映射关系被破坏,新增的服务器顺时针最近的服务器的一部分请求会被新增的服务器所替代,删除的服务器其负责处理的请求被顺时针下一个节点委托处理)。

1. 节点(机器)的删除

    以上面的分布为例,如果NODE2出现故障被删除了,那么按照顺时针迁移的方法,object3将会被迁移到NODE3中,这样仅仅是object3的映射位置发生了变化,其它的对象没有任何的改动。如下图:

                                                              

2. 节点(机器)的添加 

    如果往集群中添加一个新的节点NODE4,通过对应的哈希算法得到KEY4,并映射到环中,如下图:

                                                              

    通过按顺时针迁移的规则,那么object2被迁移到了NODE4中,其它对象还保持这原有的存储位置。通过对节点的添加和删除的分析,一致性哈希算法在保持了单调性的同时,还是数据的迁移达到了最小,这样的算法对分布式集群来说是非常合适的,避免了大量数据迁移,减小了服务器的的压力。

平衡性

根据上面的图解分析,一致性哈希算法满足了单调性和负载均衡的特性以及一般hash算法的分散性,但这还并不能当做其被广泛应用的原由,因为还缺少了平衡性。下面将分析一致性哈希算法是如何满足平衡性的。hash算法是不保证平衡的,如上面只部署了NODE1和NODE3的情况(NODE2被删除的图),object1存储到了NODE1中,而object2、object3、object4都存储到了NODE3中,这样就照成了非常不平衡的状态。在一致性哈希算法中,为了尽可能的满足平衡性,其引入了虚拟节点。

    ——“虚拟节点”( virtual node )是实际节点(机器)在 hash 空间的复制品( replica ),一实际个节点(机器)对应了若干个“虚拟节点”,这个对应个数也成为“复制个数”,“虚拟节点”在 hash 空间中以hash值排列。

以上面只部署了NODE1和NODE3的情况(NODE2被删除的图)为例,之前的对象在机器上的分布很不均衡,现在我们以2个副本(复制个数)为例,这样整个hash环中就存在了4个虚拟节点,最后对象映射的关系图如下:

                                                                 

根据上图可知对象的映射关系:object1->NODE1-1,object2->NODE1-2,object3->NODE3-2,object4->NODE3-1。通过虚拟节点的引入,对象的分布就比较均衡了。那么在实际操作中,正真的对象查询是如何工作的呢?对象从hash到虚拟节点到实际节点的转换如下图:

                                         

“虚拟节点”的hash计算可以采用对应节点的IP地址加数字后缀的方式。例如假设NODE1的IP地址为192.168.1.100。引入“虚拟节点”前,计算 cache A 的 hash 值:

Hash(“192.168.1.100”);

引入“虚拟节点”后,计算“虚拟节”点NODE1-1和NODE1-2的hash值:

Hash(“192.168.1.100#1”); // NODE1-1

Hash(“192.168.1.100#2”); // NODE1-2

 具体算法见下方源码获 //取提供者host:port形式地址,挺复杂

Dubbo一致性hash具体代码实现:

/**
 * ConsistentHashLoadBalance
 *
 * @author william.liangf
 */
public class ConsistentHashLoadBalance extends AbstractLoadBalance {
    //接口名.方法名称为key hash选择器为value的map.每个方法一个选择器。
    private final ConcurrentMap<String, ConsistentHashSelector<?>> selectors = new ConcurrentHashMap<String, ConsistentHashSelector<?>>();

    @SuppressWarnings("unchecked")
    @Override
    protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
        //接口名.方法名称为key
        String key = invokers.get(0).getUrl().getServiceKey() + "." + invocation.getMethodName();
        //取的invokers对象的hashcode,验证对象变化
        int identityHashCode = System.identityHashCode(invokers);
        //根据key获取hash选择器
        ConsistentHashSelector<T> selector = (ConsistentHashSelector<T>) selectors.get(key);
        if (selector == null || selector.identityHashCode != identityHashCode) {
            //选择器为null或者,对象已变化,就创建新选择器放入map中
            selectors.put(key, new ConsistentHashSelector<T>(invokers, invocation.getMethodName(), identityHashCode));
            selector = (ConsistentHashSelector<T>) selectors.get(key);
        }
        //通过选择器的select方法,返回选中的invoker
        return selector.select(invocation);
    }

    private static final class ConsistentHashSelector<T> {
        //hash 环(值域)中,某些值(所有虚拟节点数)到虚拟节点的映射。
        private final TreeMap<Long, Invoker<T>> virtualInvokers;
        //每个invoker 需要虚拟的节点数
        private final int replicaNumber;

        private final int identityHashCode;

        private final int[] argumentIndex;

        ConsistentHashSelector(List<Invoker<T>> invokers, String methodName, int identityHashCode) {
            //基于红黑树实现的有序map,有序很重要。
            this.virtualInvokers = new TreeMap<Long, Invoker<T>>();
            this.identityHashCode = identityHashCode;
            URL url = invokers.get(0).getUrl();
            //获取虚拟节点数,默认160个节点,配置例子 <dubbo:parameter key="hash.nodes" value="320" />
            this.replicaNumber = url.getMethodParameter(methodName, "hash.nodes", 160);
            //获取需要hash的参数位置。配置例子<dubbo:parameter key="hash.arguments" value="0,1" /> 默认只hash第一个,0位置参数
            String[] index = Constants.COMMA_SPLIT_PATTERN.split(url.getMethodParameter(methodName, "hash.arguments", "0"));
            argumentIndex = new int[index.length];
            for (int i = 0; i < index.length; i++) {
                argumentIndex[i] = Integer.parseInt(index[i]);
            }
            for (Invoker<T> invoker : invokers) {
                //获取提供者host:port形式地址,以160个虚拟节点为例
                String address = invoker.getUrl().getAddress();
                for (int i = 0; i < replicaNumber / 4; i++) {
                    byte[] digest = md5(address + i);//host:port(0,1,2,3.....39),40份
                    //每个再分别,hash 4次,(这样每个机器就虚拟了160份)
                    for (int h = 0; h < 4; h++) {
                        long m = hash(digest, h);
                        //160份虚拟节点,每一份都映射同一个实际节点
                        virtualInvokers.put(m, invoker);
                    }
                }
            }
        }

        public Invoker<T> select(Invocation invocation) {
            String key = toKey(invocation.getArguments());
            //对拼接后的参数做MD5指纹摘要
            byte[] digest = md5(key);
           //摘要后,hash计算
            return selectForKey(hash(digest, 0));
        }

        /***
         * 把参数直接拼接。
         * @param args
         * @return
         */
        private String toKey(Object[] args) {
            StringBuilder buf = new StringBuilder();
            for (int i : argumentIndex) {
                if (i >= 0 && i < args.length) {
                    buf.append(args[i]);
                }
            }
            return buf.toString();
        }
        //根据hash值,选择invoker方法的核心方法
        private Invoker<T> selectForKey(long hash) {
            Invoker<T> invoker;
            Long key = hash;
        //如果key不在map里,也有可能key已在map里,直接走下面流程
            if (!virtualInvokers.containsKey(key)) {
                //方法加参数hash 参数的key,没在已近映射的ma中,
                //返回比key 大的map值
                SortedMap<Long, Invoker<T>> tailMap = virtualInvokers.tailMap(key);
                if (tailMap.isEmpty()) {//如果key最大,就取虚拟节点第一个(key值最小的节点)
                    key = virtualInvokers.firstKey();
                } else {
                    //取比key大的keys中,最小的一个节点,(离key最近的,大于key的节点)
                    key = tailMap.firstKey();
                }
            }
            //获取key映射的实际invoker
            invoker = virtualInvokers.get(key);
            return invoker;
        }
//hash 算法,也很关键
        private long hash(byte[] digest, int number) {
            //number可以是,0,1,2,3 long 类型64 bit位
            //最后0xFFFFFFFFL;保证4字节位表示数值。相当于Ingter型数值。所以hash环的值域是[0,Integer.max_value]
            //每次取digest4个字节(|操作),组成4字节的数值。
            //当number 为 0,1,2,3时,分别对应digest第
            // 1,2,3,4;
            // 5,6,7,8;
            // 9,10,11,12;
            // 13,14,15,16;字节
            //4批
            return (
                    (
                     //digest的第4(number 为 0时),8(number 为 1),12(number 为 2),16(number 为 3)字节,&0xFF后,左移24位
                     (long) (digest[3 + number * 4] & 0xFF) << 24
                    )
                    |(
                     //digest的第3,7,11,15字节,&0xFF后,左移16位
                     (long) (digest[2 + number * 4] & 0xFF) << 16
                    )
                    |(
                     //digest的第2,6,10,14字节,&0xFF后,左移8位
                      (long) (digest[1 + number * 4] & 0xFF) << 8
                    )
                    |(
                    //digest的第1,5,9,13字节,&0xFF
                      digest[number * 4] & 0xFF
                    )
                    )
                    & 0xFFFFFFFFL;
        }
        //返回16字节总共128bit位的MD5指纹签名byte[]。
        private byte[] md5(String value) {
            MessageDigest md5;
            try {
                md5 = MessageDigest.getInstance("MD5");
            } catch (NoSuchAlgorithmException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.reset();
            byte[] bytes;
            try {
                bytes = value.getBytes("UTF-8");
            } catch (UnsupportedEncodingException e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
            md5.update(bytes);
            return md5.digest();
        }

    }

}

转载请说明出处:http://blog.csdn.net/cywosp/article/details/23397179

原文地址:https://www.cnblogs.com/twoheads/p/10135896.html