Redis的Java客户端Jedis的八种调用方式(事务、管道、分布式)介绍

jedis是一个著名的key-value存储系统,而作为其官方推荐的java版客户端jedis也非常强大和稳定,支持事务、管道及有jedis自身实现的分布式。

在这里对jedis关于事务、管道和分布式的调用方式做一个简单的介绍和对比:

一、普通同步方式

最简单和基础的调用方式:

 1 @Test
 2 public void test1Normal() {
 3     Jedis jedis = new Jedis("localhost");
 4     long start = System.currentTimeMillis();
 5     for (int i = 0; i < 100000; i++) {
 6         String result = jedis.set("n" + i, "n" + i);
 7     }
 8     long end = System.currentTimeMillis();
 9     System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");
10     jedis.disconnect();
11 }

很简单吧,每次set之后都可以返回结果,标记是否成功。

二、事务方式(Transactions)

redis的事务很简单,他主要目的是保障,一个client发起的事务中的命令可以连续的执行,而中间不会插入其他client的命令。

看下面例子:

 1 @Test
 2 public void test2Trans() {
 3     Jedis jedis = new Jedis("localhost");
 4     long start = System.currentTimeMillis();
 5     Transaction tx = jedis.multi();
 6     for (int i = 0; i < 100000; i++) {
 7         tx.set("t" + i, "t" + i);
 8     }
 9     List<Object> results = tx.exec();
10     long end = System.currentTimeMillis();
11     System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");
12     jedis.disconnect();
13 }

我们调用jedis.watch(…)方法来监控key,如果调用后key值发生变化,则整个事务会执行失败。另外,事务中某个操作失败,并不会回滚其他操作。这一点需要注意。还有,我们可以使用discard()方法来取消事务。

三、管道(Pipelining)

有时,我们需要采用异步方式,一次发送多个指令,不同步等待其返回结果。这样可以取得非常好的执行效率。这就是管道,调用方法如下:

 1 @Test
 2 public void test3Pipelined() {
 3     Jedis jedis = new Jedis("localhost");
 4     Pipeline pipeline = jedis.pipelined();
 5     long start = System.currentTimeMillis();
 6     for (int i = 0; i < 100000; i++) {
 7         pipeline.set("p" + i, "p" + i);
 8     }
 9     List<Object> results = pipeline.syncAndReturnAll();
10     long end = System.currentTimeMillis();
11     System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");
12     jedis.disconnect();
13 }

四、管道中调用事务

就Jedis提供的方法而言,是可以做到在管道中使用事务,其代码如下:

 1 @Test
 2 public void test4combPipelineTrans() {
 3     jedis = new Jedis("localhost"); 
 4     long start = System.currentTimeMillis();
 5     Pipeline pipeline = jedis.pipelined();
 6     pipeline.multi();
 7     for (int i = 0; i < 100000; i++) {
 8         pipeline.set("" + i, "" + i);
 9     }
10     pipeline.exec();
11     List<Object> results = pipeline.syncAndReturnAll();
12     long end = System.currentTimeMillis();
13     System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");
14     jedis.disconnect();
15 }

但是经测试(见本文后续部分),发现其效率和单独使用事务差不多,甚至还略微差点。

五、分布式直连同步调用

 1 Test
 2 public void test5shardNormal() {
 3     List<JedisShardInfo> shards = Arrays.asList(
 4             new JedisShardInfo("localhost",6379),
 5             new JedisShardInfo("localhost",6380));
 6 
 7     ShardedJedis sharding = new ShardedJedis(shards);
 8 
 9     long start = System.currentTimeMillis();
10     for (int i = 0; i < 100000; i++) {
11         String result = sharding.set("sn" + i, "n" + i);
12     }
13     long end = System.currentTimeMillis();
14     System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");
15 
16     sharding.disconnect();
17 }

这个是分布式直接连接,并且是同步调用,每步执行都返回执行结果。类似地,还有异步管道调用。

六、分布式直连异步调用

 1 @Test
 2 public void test6shardpipelined() {
 3     List<JedisShardInfo> shards = Arrays.asList(
 4             new JedisShardInfo("localhost",6379),
 5             new JedisShardInfo("localhost",6380));
 6 
 7     ShardedJedis sharding = new ShardedJedis(shards);
 8 
 9     ShardedJedisPipeline pipeline = sharding.pipelined();
10     long start = System.currentTimeMillis();
11     for (int i = 0; i < 100000; i++) {
12         pipeline.set("sp" + i, "p" + i);
13     }
14     List<Object> results = pipeline.syncAndReturnAll();
15     long end = System.currentTimeMillis();
16     System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");
17 
18     sharding.disconnect();
19 }

七、分布式连接池同步调用

如果,你的分布式调用代码是运行在线程中,那么上面两个直连调用方式就不合适了,因为直连方式是非线程安全的,这个时候,你就必须选择连接池调用。

 1 @Test
 2 public void test7shardSimplePool() {
 3     List<JedisShardInfo> shards = Arrays.asList(
 4             new JedisShardInfo("localhost",6379),
 5             new JedisShardInfo("localhost",6380));
 6 
 7     ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
 8 
 9     ShardedJedis one = pool.getResource();
10 
11     long start = System.currentTimeMillis();
12     for (int i = 0; i < 100000; i++) {
13         String result = one.set("spn" + i, "n" + i);
14     }
15     long end = System.currentTimeMillis();
16     pool.returnResource(one);
17     System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");
18 
19     pool.destroy();
20 }

上面是同步方式,当然还有异步方式。

八、分布式连接池异步调用

 1 @Test
 2 public void test8shardPipelinedPool() {
 3     List<JedisShardInfo> shards = Arrays.asList(
 4             new JedisShardInfo("localhost",6379),
 5             new JedisShardInfo("localhost",6380));
 6 
 7     ShardedJedisPool pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
 8 
 9     ShardedJedis one = pool.getResource();
10 
11     ShardedJedisPipeline pipeline = one.pipelined();
12 
13     long start = System.currentTimeMillis();
14     for (int i = 0; i < 100000; i++) {
15         pipeline.set("sppn" + i, "n" + i);
16     }
17     List<Object> results = pipeline.syncAndReturnAll();
18     long end = System.currentTimeMillis();
19     pool.returnResource(one);
20     System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");
21     pool.destroy();
22 }

九、需要注意的地方

  1. 事务和管道都是异步模式。在事务和管道中不能同步查询结果。比如下面两个调用,都是不允许的:

  2.  1 Transaction tx = jedis.multi();
     2  for (int i = 0; i < 100000; i++) {
     3      tx.set("t" + i, "t" + i);
     4  }
     5  System.out.println(tx.get("t1000").get());  //不允许
     6 
     7  List<Object> results = tx.exec();
     8 
     9 10 11 
    12  Pipeline pipeline = jedis.pipelined();
    13  long start = System.currentTimeMillis();
    14  for (int i = 0; i < 100000; i++) {
    15      pipeline.set("p" + i, "p" + i);
    16  }
    17  System.out.println(pipeline.get("p1000").get()); //不允许
    18 
    19  List<Object> results = pipeline.syncAndReturnAll();
    1. 事务和管道都是异步的,个人感觉,在管道中再进行事务调用,没有必要,不如直接进行事务模式。

    2. 分布式中,连接池的性能比直连的性能略好(见后续测试部分)。

    3. 分布式调用中不支持事务。

      因为事务是在服务器端实现,而在分布式中,每批次的调用对象都可能访问不同的机器,所以,没法进行事务。

    十、测试

    运行上面的代码,进行测试,其结果如下:

  3. Simple SET: 5.227 seconds
    
    Transaction SET: 0.5 seconds
    Pipelined SET: 0.353 seconds
    Pipelined transaction: 0.509 seconds
    
    Simple@Sharing SET: 5.289 seconds
    Pipelined@Sharing SET: 0.348 seconds
    
    Simple@Pool SET: 5.039 seconds
    Pipelined@Pool SET: 0.401 seconds

    另外,经测试分布式中用到的机器越多,调用会越慢。上面是2片,下面是5片:

  4. Simple@Sharing SET: 5.494 seconds
    Pipelined@Sharing SET: 0.51 seconds
    Simple@Pool SET: 5.223 seconds
    Pipelined@Pool SET: 0.518 seconds

    下面是10片:

  5. Simple@Sharing SET: 5.9 seconds
    Pipelined@Sharing SET: 0.794 seconds
    Simple@Pool SET: 5.624 seconds
    Pipelined@Pool SET: 0.762 seconds

    下面是100片:

  6. Simple@Sharing SET: 14.055 seconds
    Pipelined@Sharing SET: 8.185 seconds
    Simple@Pool SET: 13.29 seconds
    Pipelined@Pool SET: 7.767 seconds

    分布式中,连接池方式调用不但线程安全外,根据上面的测试数据,也可以看出连接池比直连的效率更好。

    十一、完整的测试代码

  7.   1 package com.example.nosqlclient;
      2 
      3 import java.util.Arrays;
      4 import java.util.List;
      5 
      6 import org.junit.AfterClass;
      7 import org.junit.BeforeClass;
      8 import org.junit.Test;
      9 
     10 import redis.clients.jedis.Jedis;
     11 import redis.clients.jedis.JedisPoolConfig;
     12 import redis.clients.jedis.JedisShardInfo;
     13 import redis.clients.jedis.Pipeline;
     14 import redis.clients.jedis.ShardedJedis;
     15 import redis.clients.jedis.ShardedJedisPipeline;
     16 import redis.clients.jedis.ShardedJedisPool;
     17 import redis.clients.jedis.Transaction;
     18 
     19 import org.junit.FixMethodOrder;
     20 import org.junit.runners.MethodSorters;
     21 
     22 @FixMethodOrder(MethodSorters.NAME_ASCENDING)
     23 public class TestJedis {
     24 
     25     private static Jedis jedis;
     26     private static ShardedJedis sharding;
     27     private static ShardedJedisPool pool;
     28 
     29     @BeforeClass
     30     public static void setUpBeforeClass() throws Exception {
     31         List<JedisShardInfo> shards = Arrays.asList(
     32                 new JedisShardInfo("localhost",6379),
     33                 new JedisShardInfo("localhost",6379)); //使用相同的ip:port,仅作测试
     34 
     35 
     36         jedis = new Jedis("localhost"); 
     37         sharding = new ShardedJedis(shards);
     38 
     39         pool = new ShardedJedisPool(new JedisPoolConfig(), shards);
     40     }
     41 
     42     @AfterClass
     43     public static void tearDownAfterClass() throws Exception {
     44         jedis.disconnect();
     45         sharding.disconnect();
     46         pool.destroy();
     47     }
     48 
     49     @Test
     50     public void test1Normal() {
     51         long start = System.currentTimeMillis();
     52         for (int i = 0; i < 100000; i++) {
     53             String result = jedis.set("n" + i, "n" + i);
     54         }
     55         long end = System.currentTimeMillis();
     56         System.out.println("Simple SET: " + ((end - start)/1000.0) + " seconds");
     57     }
     58 
     59     @Test
     60     public void test2Trans() {
     61         long start = System.currentTimeMillis();
     62         Transaction tx = jedis.multi();
     63         for (int i = 0; i < 100000; i++) {
     64             tx.set("t" + i, "t" + i);
     65         }
     66         //System.out.println(tx.get("t1000").get());
     67 
     68         List<Object> results = tx.exec();
     69         long end = System.currentTimeMillis();
     70         System.out.println("Transaction SET: " + ((end - start)/1000.0) + " seconds");
     71     }
     72 
     73     @Test
     74     public void test3Pipelined() {
     75         Pipeline pipeline = jedis.pipelined();
     76         long start = System.currentTimeMillis();
     77         for (int i = 0; i < 100000; i++) {
     78             pipeline.set("p" + i, "p" + i);
     79         }
     80         //System.out.println(pipeline.get("p1000").get());
     81         List<Object> results = pipeline.syncAndReturnAll();
     82         long end = System.currentTimeMillis();
     83         System.out.println("Pipelined SET: " + ((end - start)/1000.0) + " seconds");
     84     }
     85 
     86     @Test
     87     public void test4combPipelineTrans() {
     88         long start = System.currentTimeMillis();
     89         Pipeline pipeline = jedis.pipelined();
     90         pipeline.multi();
     91         for (int i = 0; i < 100000; i++) {
     92             pipeline.set("" + i, "" + i);
     93         }
     94         pipeline.exec();
     95         List<Object> results = pipeline.syncAndReturnAll();
     96         long end = System.currentTimeMillis();
     97         System.out.println("Pipelined transaction: " + ((end - start)/1000.0) + " seconds");
     98     }
     99 
    100     @Test
    101     public void test5shardNormal() {
    102         long start = System.currentTimeMillis();
    103         for (int i = 0; i < 100000; i++) {
    104             String result = sharding.set("sn" + i, "n" + i);
    105         }
    106         long end = System.currentTimeMillis();
    107         System.out.println("Simple@Sharing SET: " + ((end - start)/1000.0) + " seconds");
    108     }
    109 
    110     @Test
    111     public void test6shardpipelined() {
    112         ShardedJedisPipeline pipeline = sharding.pipelined();
    113         long start = System.currentTimeMillis();
    114         for (int i = 0; i < 100000; i++) {
    115             pipeline.set("sp" + i, "p" + i);
    116         }
    117         List<Object> results = pipeline.syncAndReturnAll();
    118         long end = System.currentTimeMillis();
    119         System.out.println("Pipelined@Sharing SET: " + ((end - start)/1000.0) + " seconds");
    120     }
    121 
    122     @Test
    123     public void test7shardSimplePool() {
    124         ShardedJedis one = pool.getResource();
    125 
    126         long start = System.currentTimeMillis();
    127         for (int i = 0; i < 100000; i++) {
    128             String result = one.set("spn" + i, "n" + i);
    129         }
    130         long end = System.currentTimeMillis();
    131         pool.returnResource(one);
    132         System.out.println("Simple@Pool SET: " + ((end - start)/1000.0) + " seconds");
    133     }
    134 
    135     @Test
    136     public void test8shardPipelinedPool() {
    137         ShardedJedis one = pool.getResource();
    138 
    139         ShardedJedisPipeline pipeline = one.pipelined();
    140 
    141         long start = System.currentTimeMillis();
    142         for (int i = 0; i < 100000; i++) {
    143             pipeline.set("sppn" + i, "n" + i);
    144         }
    145         List<Object> results = pipeline.syncAndReturnAll();
    146         long end = System.currentTimeMillis();
    147         pool.returnResource(one);
    148         System.out.println("Pipelined@Pool SET: " + ((end - start)/1000.0) + " seconds");
    149     }
    150 }
原文地址:https://www.cnblogs.com/link1988/p/5503692.html