【连接池实现】
【一致性hash实现】
项目实践心得。代码一定要剖析到每一行,吸取精华才算凑效。
1.Redis的通信协议
Redis采用自定义的二进制通信协议。有一个基本规范
发送命令规范:
<参数个数>
$<参数1字节数>
<参数1>
...
$<参数n字节数>
<参数n>
响应规范
响应类型有返回数据的第一个字节决定的
+代表一个状态信息,+ok
-代表错误
:返回的是一个整数
$返回一个块数据,跟发送命令的规范一样。$<长度>
<数据>
*返回多个数据块。同上,后面跟着参数的个数和每个参数的数据。
2.jedis的整体架构(核心解决方案)
解剖jedis架构之前,先自己尝试设计一个客户端。如果用最简单的方式,你会如何设计?
用最简单的Socket就可以实现了,socket跟服务端通信后,就可以按上面的格式发命令了。然后阻塞获得服务端返回的数据。听起来很简单吧。
实际上jedis也是这样实现的。客户端连接数很大,怎么办?每次新建一个socket,开销会成线性增长。我们想到了连接池方案。先新建一批连接,使用时从连接池取一个,用完再回收。连接池实现起来有多复杂?用 apache 的连接池接口,可以简化开发成本。
一台机容量有限,想将数据存储到多个节点。有很多种方案。在业务层面,我们可以将不同的业务数据存储在不同的节点(水平分割)。根据用户容量分割到不同的节点(纵向分割)。有没有通用一点的,我们想到的"一致性hash”方案。根据key,定位到目标节点,存储相关的数据。
吹了那么多,其实上面就是jedis的核心了。其他的只是处理各种的命令返回值而已了
3.源代码剖析
连接redis,非常简单,跟传统的socket通信一样。new一个socket,设置相关的参数,connect获取相关的输入输出流进行通信。它的特别之处在于,封装了这些流,方便操作。
1. public void connect() {
2. if (!isConnected()) {
3. try {
4. socket = new Socket();
5. //->@wjw_add
6. socket.setReuseAddress(true);
7. socket.setKeepAlive(true); //Will monitor the TCP connection is valid
8. socket.setTcpNoDelay(true); //Socket buffer Whetherclosed, to ensure timely delivery of data
9. socket.setSoLinger(true,0); //Control calls close () method, the underlying socket is closed immediately
10. //<-@wjw_add
11.
12. socket.connect(new InetSocketAddress(host, port), timeout);
13. socket.setSoTimeout(timeout);
14. outputStream = new RedisOutputStream(socket.getOutputStream());
15. inputStream = new RedisInputStream(socket.getInputStream());
16. } catch (IOException ex) {
17. throw new JedisConnectionException(ex);
18. }
19. }
20. }
发送命令,就是按照上面的所说的通信协议发送相关的命令
要注意的是要将字符串转成二进制流
1. private static void sendCommand(final RedisOutputStream os,
2. final byte[] command, final byte[]... args) {
3. try {
4. os.write(ASTERISK_BYTE);
5. os.writeIntCrLf(args.length + 1);
6. os.write(DOLLAR_BYTE);
7. os.writeIntCrLf(command.length);
8. os.write(command);
9. os.writeCrLf();
10.
11. for (final byte[] arg : args) {
12. os.write(DOLLAR_BYTE);
13. os.writeIntCrLf(arg.length);
14. os.write(arg);
15. os.writeCrLf();
16. }
17. } catch (IOException e) {
18. throw new JedisConnectionException(e);
19. }
20. }
处理服务端返回。Redis本身是单线程阻塞的。所以你可以从socket的InputStream中直接读取数据。
获得二进制流,根据不同的命令,将它(解析)转成相应的数据类型即可。
1. private static Object process(final RedisInputStream is) {
2. try {
3. byte b = is.readByte();
4. if (b == MINUS_BYTE) {
5. processError(is);//报错
6. } else if (b == ASTERISK_BYTE) {
7. return processMultiBulkReply(is);//*多个数据块
8. } else if (b == COLON_BYTE) {
9. return processInteger(is);//:整数
10. } else if (b == DOLLAR_BYTE) {
11. return processBulkReply(is);//$一个数据块
12. } else if (b == PLUS_BYTE) {
13. return processStatusCodeReply(is);//+状态
14. } else {
15. throw new JedisConnectionException("Unknown reply: " + (char) b);
16. }
17. } catch (IOException e) {
18. throw new JedisConnectionException(e);
19. }
20. return null;
21. }
连接池的实现。现在要实现连接池,一般都是用apache的pool接口来实现了。只需要两个步骤。
1.实现Config 就是连接池的配置参数,包括最大连接数,初始连接数,空闲时间.
2.实现 BasePoolableObjectFactory,提供新建连接,废弃连接,测试连接方法
1. public Object makeObject() throws Exception {//产生redis实例
2. final Jedis jedis = new Jedis(this.host, this.port, this.timeout);
3.
4. jedis.connect();
5. if (null != this.password) {
6. jedis.auth(this.password);
7. }
8. if( database != 0 ) {
9. jedis.select(database);
10. }
11.
12. return jedis;
13. }
14.
15. public void destroyObject(final Object obj) throws Exception {//销毁jedis实例
16. if (obj instanceof Jedis) {
17. final Jedis jedis = (Jedis) obj;
18. if (jedis.isConnected()) {
19. try {
20. try {
21. jedis.quit();
22. } catch (Exception e) {
23. }
24. jedis.disconnect();
25. } catch (Exception e) {
26.
27. }
28. }
29. }
30. }
31.
32. public boolean validateObject(final Object obj) {//验证jedis是否有效,简单的ping命令
33. if (obj instanceof Jedis) {
34. final Jedis jedis = (Jedis) obj;
35. try {
36. return jedis.isConnected() && jedis.ping().equals("PONG");
37. } catch (final Exception e) {
38. return false;
39. }
40. } else {
41. return false;
42. }
43. }
一致性hash的实现。关键是如何使得key均匀散列。jedis使用了murmurhash 2.0
1. public R getShard(String key) {
2. return resources.get(getShardInfo(key));
3. }
4.
5. public S getShardInfo(byte[] key) {
6. SortedMap tail = nodes.tailMap(algo.hash(key));//找到key所在的节点
7. if (tail.size() == 0) {
8. return nodes.get(nodes.firstKey());
9. }
10. return tail.get(tail.firstKey());
11. }
4.经验教训
这里谈谈实践中遇到的几个问题,都是血和泪的教训啊。
1.jedis的连接池获得的连接,进行通信时候出错了,一定要记得销毁该连接。因为它的inputstream里面可能还残留数据。下次从连接池获得的时候都是dirty data了。一般采用以下的方案:
1. try{
2. //do something
3. pool.returnConnection;//返回正常连接
4. catch(Exception e){
5. pool.returnBrokenConnection;//销毁连接
2.使用一致性hash的时候,使用批量查询命令mget的时候,ShardedJedis本身不支持的,只能用一个个key去取数据,性能低下。有一个 比较土的办法。先将key对应的节点分类合并,然后单独用mget去获取数据,再将返回值合并给用户。可以显著减少网络连接。
5.总结
jedis简单,高效,本身代码也可以当做一个网络客户端的典型实现范例。