xmemecached中的一致性hash算法

网上有很多文章都是说一致性hash算法的。但是没有啥代码实现,所以一直不是特别清楚。
周末看了一下xmemcached 这个memcached客户端的代码。
其中有一种选取策略就是一致性hash算法的。
别的不说,贴代码才是硬道理。langyu的blog上写了一部分解释,但是有一部分还是没有看特别明白。
和同事讨论一下,得到了一下的结论。自己mark一下,免得以后忘记。ps:xmemcached版本号是1.4.1

public class KetamaMemcachedSessionLocator extends
        AbstractMemcachedSessionLocator {

    static final int NUM_REPS = 160;
    private transient volatile TreeMap<Long, List<Session>> ketamaSessions = new TreeMap<Long, List<Session>>();
    private final HashAlgorithm hashAlg;
    private volatile int maxTries;
    private final Random random = new Random();

    /**
     * compatible with nginx-upstream-consistent,patched by wolfg1969
     */
    static final int DEFAULT_PORT = 11211;
    private final boolean cwNginxUpstreamConsistent;

    public KetamaMemcachedSessionLocator() {
        this.hashAlg = HashAlgorithm.KETAMA_HASH;
        this.cwNginxUpstreamConsistent = false;
    }

    public KetamaMemcachedSessionLocator(boolean cwNginxUpstreamConsistent) {
        this.hashAlg = HashAlgorithm.KETAMA_HASH;
        this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
    }

    public KetamaMemcachedSessionLocator(HashAlgorithm alg) {
        this.hashAlg = alg;
        this.cwNginxUpstreamConsistent = false;
    }

    public KetamaMemcachedSessionLocator(HashAlgorithm alg,
            boolean cwNginxUpstreamConsistent) {
        this.hashAlg = alg;
        this.cwNginxUpstreamConsistent = cwNginxUpstreamConsistent;
    }

    public KetamaMemcachedSessionLocator(List<Session> list, HashAlgorithm alg) {
        super();
        this.hashAlg = alg;
        this.cwNginxUpstreamConsistent = false;
        this.buildMap(list, alg);
    }


    /**
    * 根据,节点生成treeMap hash环
    **/
    private final void buildMap(Collection<Session> list, HashAlgorithm alg) {
        TreeMap<Long, List<Session>> sessionMap = new TreeMap<Long, List<Session>>();

        String sockStr;//memcached服务器ip+端口
        for (Session session : list) {
            if (this.cwNginxUpstreamConsistent) {
                //不知道干啥的,也没看懂,似乎和nginx负载有关,选择性放弃
                InetSocketAddress serverAddress = session
                        .getRemoteSocketAddress();
                sockStr = serverAddress.getAddress().getHostAddress();
                if (serverAddress.getPort() != DEFAULT_PORT) {
                    sockStr = sockStr + ":" + serverAddress.getPort();
                }
            } else {
                sockStr = String.valueOf(session.getRemoteSocketAddress());
            }
            /**
             * Duplicate 160 X weight references
             */
            int numReps = NUM_REPS;//默认是160
            if (session instanceof MemcachedTCPSession) {
                //这个numReps,虚拟节点的数量。如果节点很少,则会发生不均衡的问题。即所有数据都倾斜到了第一个默认节点。
                //后面getSessionByHash方法的时候会看到
                numReps *= ((MemcachedSession) session).getWeight();
            }
            if (alg == HashAlgorithm.KETAMA_HASH) {
                for (int i = 0; i < numReps / 4; i++) {
                    //已经写的比较清楚了,这个/4,每4个一组
                    //其实这里这个算法就是KETAMA_HASH算法的实现,不知道为什么他没有直接调用现成的方法
                    byte[] digest = HashAlgorithm.computeMd5(sockStr + "-" + i);
                    //获取虚拟节点的hash值(如果没有i,那么就无所谓虚拟节点,所有server的hash值都相同)
                    for (int h = 0; h < 4; h++) {
                        long k = (long) (digest[3 + h * 4] & 0xFF) << 24
                                | (long) (digest[2 + h * 4] & 0xFF) << 16
                                | (long) (digest[1 + h * 4] & 0xFF) << 8
                                | digest[h * 4] & 0xFF;
                        //Md5编码后,每个虚拟结点对应Md5码16个字节中的4个,组成一个long型数值,做为这个虚拟结点在环中的惟一key。
                        //langyu 在 http://langyu.iteye.com/blog/684087 中提到,
                        //之所以用long类型是因为long实现了comparable 接口
                        //但是实际上int也同样实现了comparable接口,不知道是不是jdk历史遗留问题。
                        //将一个节点16位转换为4个数值,分布到treemap中。
                        this.getSessionList(sessionMap, k).add(session);
                    }
                }
            } else {
                for (int i = 0; i < numReps; i++) {
                    long key = alg.hash(sockStr + "-" + i);
                    this.getSessionList(sessionMap, key).add(session);
                }
            }
        }
        this.ketamaSessions = sessionMap;
        this.maxTries = list.size();
    }

    //每个map的key上挂载这所有key值相同的虚拟节点。
    //理论上说应该能够避免这种情况,但是实际上不可避免的会有这种情况发生。
    //因为被分配到了2^32个int值中。而不是md5原有的128位
    private List<Session> getSessionList(
            TreeMap<Long, List<Session>> sessionMap, long k) {
        List<Session> sessionList = sessionMap.get(k);
        if (sessionList == null) {
            sessionList = new ArrayList<Session>();
            sessionMap.put(k, sessionList);
        }
        return sessionList;
    }

    // 程序的入口
    public final Session getSessionByKey(final String key) {
        if (this.ketamaSessions == null || this.ketamaSessions.size() == 0) {
            return null;
        }

        long hash = this.hashAlg.hash(key);
        //这个地方是一致性hash的入口
        Session rv = this.getSessionByHash(hash);
        int tries = 0;
        while (!this.failureMode && (rv == null || rv.isClosed())
                && tries++ < this.maxTries) {
            hash = this.nextHash(hash, key, tries);
            rv = this.getSessionByHash(hash);
        }
        return rv;
    }

    //入口参数是缓存对象key的hash值
    public final Session getSessionByHash(final long hash) {
        TreeMap<Long, List<Session>> sessionMap = this.ketamaSessions;
        if (sessionMap.size() == 0) {
            return null;
        }
        Long resultHash = hash;
        if (!sessionMap.containsKey(hash)) {

            //如果key的hash值没有和整个服务器虚拟节点中的hash值产生重合
            SortedMap<Long, List<Session>> tailMap = sessionMap.tailMap(hash);
            //获取这个虚拟节点之后的所有节点
            if (tailMap.isEmpty()) {
                resultHash = sessionMap.firstKey();//如果是最后一个节点,则返回第一个节点。
                //这就是为啥如果一致性hash中服务器节点个数非常少,会产生数据分布不均衡的原因
            } else {
                //如果有则返回第一个节点
                resultHash = tailMap.firstKey();
            }
        }
        List<Session> sessionList = sessionMap.get(resultHash);
        if (sessionList == null || sessionList.size() == 0) {
            return null;
        }
        int size = sessionList.size();
        //随机分配一个值
        //这里和同事在考虑,这样set和get被分配到了不同的节点,那么取的时候怎么办。会产生不命中的问题。
        //我们最后这样认为:首先这个list的存在就是因为hash值不够均匀产生了碰撞。
        //这个概率应该是有,但是不应该很高,也就是说这个list的长度不会很大。
        //另外即使发生了冲突的问题,每次随机选取一个节点,这样最坏情况是同一份数据在多个节点中都存在。
        //这个对于memcached这种缓存场景来说完全是允许的。
        return sessionList.get(this.random.nextInt(size));
    }
}


参考的blog
http://langyu.iteye.com/blog/684087
http://flychao88.iteye.com/blog/1540246
http://blog.chenlb.com/2009/11/memcached-consistent-hashing.html
http://www.iteye.com/topic/611976

原文地址:https://www.cnblogs.com/beanchoc/p/3178273.html