一致性HASH算法

在做大数据量的数据缓存,同时扩展性和可用性很重要的时候,一致性HASH算法是很有用的。

原文“

Thus, it is critical for our hash function to map items consistently: regardless of the existence of multiple, changing views of the system, each item should be mapped to only a small number of machines, and in such a way that all machines get roughly the same load of items.”

摘自论文 http://www8.org/w8-papers/2a-webserver/caching/paper2.html

一致性HASH算法的特点是它把目标的缓存服务器也做Hash。然后一致性Hash算法通过比较目标缓存机器的hash值与key的hash值, 来决定把对象缓存在哪一台缓存服务器。

注:这里的对象是指要被缓存的对象,不是指Java对象。

在了解一致性HASH算法的用处之前,我们有必要了解一下没有一致性Hash算法,我们如何在几台机器上做缓存。假如有两台缓存服务器,我们要把对象均匀地放到两台机器中,我们可能会用这样的算法

1. 确定 key/value;

2. 将Key做hash,得到一个无符号的long型值;

3. 将key的hash值 mod 2;

4. 如果结果是0, 放到第一台机器;如果结果是1,放到第二台机器;

问题来了,如果两台机器的性能不够了,我想增加一台机器,怎么办? 那么改算法,把对hash值对2取模,变成对3取模。于是算法变成这样,

1. 确定key/value;

2. 将key做hash,得到一个无符号的long型值;

3. 将key的hash值 mod 3;

4. 如果结果是0,放到第一台机器;如果结果是1,放到第二台机器;如果结果是2,放到第三台机器。

从上面的pseudocode我们可以看出,随着机器的增加,key的hash算法是需要变换的(机器数不同,对不同的数取模),是不一致的。

为了解决这个问题,一致性HASH算法被提了出来。

一致性HASH如何解决这个问题的。

首先,一致性hash把目标也做一个hash,比如根据目标机器的IP地址等(于是目标服务器也有了一个唯一的无符号long型的hash值)。

然后,把所有机器加入一个环,例如

Image(8)

                图1

在图1中A,B,C是缓存服务器

最后,Key的hash值与环中服务器的hash值做个比较,如果key的hash值落到环中某两个服务器hash值的区间,把对象缓存到区间中的前一个服务去。比如,

key的hash值是123,环中机器A的hash值是100,机器B的hash值是200,那么对象存放在机器A中。

再来看看,环中服务器的增加或减少的情况,比如在环中添加一个服务器,key的hash算法可以不用变。所以说hash算法是一致的。

下面是对一致性HASH算法用于多缓存服务器的模拟。在模拟中,我采用端口号来区分不同的缓存服务器。

1. 一致性Hash

ConsistentHash

package art.programming.cache;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Scanner;
import java.util.SortedMap;
import java.util.TreeMap;

import art.programming.cache.CacheClient.ServerNotAliveException;

public class ConsistentHash {
	private final int numberOfReplicas;
	// Cache server cycle
	private final SortedMap<Long, CacheClient> circle = new TreeMap<Long, CacheClient>();

	private HashFunction hashFunction;

	public ConsistentHash(HashFunction hashFunction, int numberOfReplicas,
			Collection<CacheClient> nodes) {
		this.hashFunction = hashFunction;
		this.numberOfReplicas = numberOfReplicas;

		for (CacheClient node : nodes) {
			add(node);
		}
	}

	public void add(CacheClient node) {
		for (int i = 0; i < numberOfReplicas; i++) {
			circle.put(hashFunction.hash(node.getCacheServerName() + i), node);
		}
	}

	public void remove(String node) {
		for (int i = 0; i < numberOfReplicas; i++) {
			circle.remove(hashFunction.hash(node.toString() + i));
		}
	}

	public void put(String key, String value) {
		if (circle.isEmpty()) {
			throw new IllegalStateException(
					"No cache server is available for now!");
		}
		long hash = hashFunction.hash(key);
		System.out.println("The key has been hashed " + hash);
		
		SortedMap<Long, CacheClient> tailMap = circle.tailMap(hash);

		// The hash code of the cache server that store the key/value
		long cacheServerHash = tailMap.isEmpty() ? circle.firstKey() : tailMap
				.firstKey();

		CacheClient cacheClient = circle.get(cacheServerHash);

		if (!putValue(cacheClient, key, value)) {
			// Recursively find the alive server
			removeCacheServerFromCycle(cacheServerHash);
			put(key, value);
		}
	}

	private boolean putValue(CacheClient cacheClient, String key, String value) {
		try {
			cacheClient.put(key, value);
			System.out.println(" and it is stored in " + cacheClient.getCacheServerName());
		} catch (ServerNotAliveException e) {
			return false;
		}
		return true;
		/**
		 * java art.programming.cache.CacheServer
		 * 
		 */
	}

	private void removeCacheServerFromCycle(long cacheServerHash) {
		circle.remove(cacheServerHash);
	}

	public String get(String key) {
		if (circle.isEmpty()) {
			return null;
		}
		long hash = hashFunction.hash(key);
		System.out.println("The key has been hashed " + hash);
		
		SortedMap<Long, CacheClient> tailMap = circle.tailMap(hash);

		// The hash code of the cache server that store the key/value
		long cacheServerHash = tailMap.isEmpty() ? circle.firstKey() : tailMap
				.firstKey();
		
		CacheClient cacheClient = null;
		try {
			cacheClient = circle.get(cacheServerHash);
			System.out.println("Going to server " + cacheClient.getCacheServerName() + " to fetch the value");
			return get(cacheClient, key);
		} catch (ServerNotAliveException e) {
			System.out.println("The server is not available : " +  cacheClient.getCacheServerName());
		}
		return get(key);
	}

	private String get(CacheClient cacheClient, String key)
			throws ServerNotAliveException {
		return cacheClient.get(key);
	}
	
	
	public static void main(String... args){
		List<CacheClient> cacheClients = new ArrayList<CacheClient>();
		for (int i =0; i < 10; i++){
			int port = 7770 + i;
			cacheClients.add(new CacheClient(String.valueOf(port)));
		}
		
		ConsistentHash consistentHash = new ConsistentHash(new HashFunctionImpl(), 1, cacheClients);
		Scanner scanner = new Scanner(System.in);
		while(true){
			String command = scanner.nextLine();
			String[] commandArr = command.split(",");
			if (commandArr[0].equals("put")){
				consistentHash.put(commandArr[1], commandArr[2]);
			}else{
				String result = consistentHash.get(commandArr[1]);
				System.out.println("The get ressult is " + result);
			}
		}
		
	}
}

2. 缓存客户端

package art.programming.cache;

import java.io.BufferedReader;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.Scanner;


public class CacheClient {
	private String cacheServerName;
	private Socket socket;
	public CacheClient(String cacheServerName){
		this.cacheServerName = cacheServerName;
		try {
			this.socket = new Socket("localhost", Integer.valueOf(cacheServerName));
		} catch (NumberFormatException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (UnknownHostException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
	
	public String getCacheServerName(){
		return cacheServerName;
	}
	
	public void put(String key, String value) throws ServerNotAliveException{
		StringBuilder operationKeyValue = new StringBuilder("put,").append(key).append(",").append(value);
		try {
			DataOutputStream output = new DataOutputStream(socket.getOutputStream());
			output.writeUTF(operationKeyValue.toString());
		} catch (IOException e) {
             throw new ServerNotAliveException();
		}
	}
	
	public String get(String key) throws ServerNotAliveException{
		StringBuilder operationKeyValue = new StringBuilder("get,").append(key);
		String result = null;
		try {
			DataOutputStream output = new DataOutputStream(socket.getOutputStream());
			output.writeUTF(operationKeyValue.toString());
			DataInputStream  in = new DataInputStream (socket.getInputStream());
			 result = in.readUTF();
		} catch (IOException e) {
			 throw new ServerNotAliveException();
		}
		return result;
	}
	
	
	public static void main(String... args) throws ServerNotAliveException{
		Scanner scanner = new Scanner(System.in);
		CacheClient cacheClient = new CacheClient("7777");
		while(true){
			String command = scanner.nextLine();
			System.out.println("Command: " + command);
			cacheClient.put("name", "alex");
		}
	}
	
	public static class ServerNotAliveException extends Exception{
		
	}
}

3. 缓存服务器

package art.programming.cache;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;


public class CacheServer {
	private static Map<String, String> cache = new HashMap<String, String>();
	
	public static void main(String... args) throws NumberFormatException, IOException{
		String port = args[0];
		System.out.println("Binding to the port: " + port);
		ServerSocket socketServer = new ServerSocket();
		socketServer.bind(new InetSocketAddress("localhost", Integer.valueOf(port)) );
		while(true){
			Socket socket = socketServer.accept();
			try{
				DataInputStream  in = new DataInputStream (socket.getInputStream());
				DataOutputStream out = new DataOutputStream(socket.getOutputStream());
				System.out.println("Serves the request");
				while(true){
					String opAndKeyValue = in.readUTF();
					System.out.println("Clent request " + opAndKeyValue);
					String[] opAndKeyValueArr = opAndKeyValue.split(",");
					String op = opAndKeyValueArr[0];
					String key = opAndKeyValueArr[1];
					System.out.println("The requested operation is " + op + " the key is " + key);
					
					if (op.equals("get")){
						out.writeUTF(cache.get(key));
						System.out.println("return the value of key " + key +" to the client");
					}else{
						System.out.println("put the key" + key + " value " + opAndKeyValueArr[2]);
						cache.put(key, opAndKeyValueArr[2]);
					}
				}
			}catch(Exception ex){
				ex.printStackTrace();				
			}finally{
				
			}	
		}		
	}
}

4. Hash函数

package art.programming.cache;


public interface HashFunction {
	long hash(String node);
}
package art.programming.cache;

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;

public class HashFunctionImpl implements HashFunction {

	public long hash(final String k) {

			byte[] bKey = computeMd5(k);
			long rv = ((long) (bKey[3] & 0xFF) << 24)
					| ((long) (bKey[2] & 0xFF) << 16)
					| ((long) (bKey[1] & 0xFF) << 8) | (bKey[0] & 0xFF);
	
		return rv & 0xffffffffL; /* Truncate to 32-bits */
	}

	/**
	 * Get the md5 of the given key.
	 */
	public  byte[] computeMd5(String k) {
		MessageDigest md5 = null;
		try {
			md5 = MessageDigest.getInstance("md5");
		} catch (NoSuchAlgorithmException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		md5.update(k.getBytes());
		return md5.digest();
	}
}

参考资料

http://java.dzone.com/articles/consistent-hashing

http://weblogs.java.net/blog/tomwhite/archive/2007/11/consistent_hash.html

原文地址:https://www.cnblogs.com/cando/p/2658438.html