redis使用总结

一、基本概念

  1. redis单机使用不同的数据库保存多个键值对,单机redis默认有16个db,但是redis集群中节点只使用第一个数据库db[0]。 redis集群使用分片的方式保存数据库中的键值对,集群的整个数据库被分成16384个槽slot,数据库中每个键都属于这16384个槽的其中一个,集群中每个节点可以处理0个或最多16384个槽。

  2. redis集群把节点分为master和slave,master负责处理客户端的命令(处理负责的槽的命令请求),slave负责复制某个master,并在被复制的master下线的时候,代替master继续处理命令请求。
    在一个集群中,超过半数以上的master认为某个master下线了或者是疑似下线,那么就把这个master标记为已下线(FAIL),广播FAIL消息。这个下线master的所有slave开始故障转移。

  3. 故障转移
    (1)基于Raft算法的领头选举方法实现选举新的主节点(只有master具有投票权,IO速度快的slave就成了新master)。
    (2)新节点把已下线master的所有槽指派给自己负责。
    (3)新节点广播PONG消息,宣布自己成为master并接管了槽。
    (4)新节点开始接收和处理自己负责的槽有关的命令请求,故障转移完成。

  4. 通道pipeline
    (1)通道是接收客户端的命令请求,并直接执行了,但是返回结果是在调用syncAndReturnAll()时,按照命令执行的顺序的返回结果存储的列表。调用sync()没有返回结果,也表示通道结束,需要最后close()。
    (2)pipeline适用于批处理,当有大量的操作需要一次性执行的时候,可以用管道。
    (3)redis集群暂时没有可以执行使用的集群通道,可以在每个master上生成一个通道,并执行这个master上的key,参考下面附件中的代码code1或者自己写程序构建一个redis集群的pipeline,参考下面附件中的代码code2.

  5. 事务transaction
    (1)Redis通过MULTI、EXEC、WATCH等命令来实现事务(transaction)功能。事务提供了
    一种将多个命令请求打包,然后一次性、按顺序地执行多个命令的机制,并且在事务执行期
    间,服务器不会中断事务而改去执行其他客户端的命令请求,它会将事务中的所有命令都执
    行完毕,然后才去处理其他客户端的命令请求。
    (2)事务中的命令要不全部执行,要不就是全部不执行。
    (3)redis集群对于事务的支持只能在一个slot上完成,所以必须对每个节点上每个槽构建一个transaction,对这个槽上的key进行操作。
    (4)mutli和exec必须成对出现,不然会出现:(error) ERR MULTI calls can not be nested.即不允许在存在一个mutli的情况下,再生成新的multi。代码可参考下面附件中的代码code3.

  6. redis集群上的正则表达模式匹配,现在只能在每个节点上单独执行提取符合模式匹配的key。

 获取redis集群上所有符合给定模式pattern(正则表达式)的key
    正则表达模式:
    h?llo 匹配 hello, hallo 和 hxllo
    h*llo 匹配 hllo 和 heeeello
    h[ae]llo 匹配 hello 和 hallo, 但是不匹配 hillo
    h[^e]llo 匹配 hallo, hbllo, … 但是不匹配 hello
    h[a-b]llo 匹配 hallo 和 hbllo

   val keys = new util.TreeSet[java.lang.String]()
    // 1.获得cluster中所有节点
    val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
    // 3. 对集群中每个节点都执行keys(pattern)
    for(node <- nodes_master){
      val jedisPool:JedisPool = clusterNodes.get(node)
      val jedisConn:Jedis = jedisPool.getResource
      try{
        keys.addAll(jedisConn.keys(pattern))
      }catch{
        case e:Exception => { println("error getting keys: "+e+" !!!") }
      }finally{
        jedisConn.close()
      }
    }

二、性能

  1. Redis在最新的硬件上可以每秒执行100 000个操作,而在高端的硬件上甚至可以每秒执行将近225 000个操作。(来源《Redis实战》-6.2.2)

附件

** 代码1:在每个master上生成一个通道,并执行这个master上的key**

// 从redis上获取全部的Rec5_Pic_<pid>的key
    val pattern = "User_Label_Act_-u[0123456789]*"
    //    println("key pattern: "+pattern)
    var keys:List[AnyRef] = List()
    // 1.获得cluster中所有节点
    val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
    //     2. 对集群中每个节点都执行keys(pattern)
    for(node <- nodes_master){
      val time_start_key:Long = System.currentTimeMillis()
      println("node: "+node)
      val jedisPool:JedisPool = clusterNodes.get(node)
      val jedisConn:Jedis = jedisPool.getResource
      // 获得集群上每个节点上符合pattern的所有的key
      keys = jedisConn.keys(pattern).toArray().toList
      if(keys.isEmpty){
       ...
      }else{
        val jcp:Pipeline = jedisConn.pipelined()
        for(key <- keys){
           jcp操作(set,rpush等)          
          }
        }
        jcp.sync()
        jcp.close()
      }
      jedisConn.close()
    }

** 代码2:构建redis集群的pipeline**

import java.io.Closeable
import java.io.IOException
import java.lang.reflect.Field

import scala.util.control.Breaks._
import redis.clients.jedis.{Client, Jedis, JedisCluster, JedisClusterInfoCache, JedisPool, JedisSlotBasedConnectionHandler, PipelineBase}
import redis.clients.jedis.exceptions.{JedisMovedDataException, JedisRedirectionException}
import redis.clients.jedis.util.JedisClusterCRC16
import redis.clients.jedis.util.SafeEncoder

class JedisClusterPipeline extends PipelineBase with Closeable{
  private var connectionHandler:JedisSlotBasedConnectionHandler = _
  private val clients:collection.mutable.Queue[Client] = collection.mutable.Queue[Client]() // 根据顺序存储每个命令对应的Client
  private val jedisMap:collection.mutable.Map[JedisPool, Jedis] = collection.mutable.Map[JedisPool, Jedis]() // 用于缓存连接
  val FIELD_CONNECTION_HANDLER:Field = getField(Class.forName("redis.clients.jedis.BinaryJedisCluster"), "connectionHandler")  // 构建属性反射对象
  val FIELD_CACHE:Field = getField(Class.forName("redis.clients.jedis.JedisClusterConnectionHandler"), "cache")
  println("JedisClusterPipeline clients: "+clients.length)

  /* @Description: 根据jedisCluster实例生成对应的JedisClusterPipeline
     * @Param: [jedisCluster]
     * @return: _root_.com.boe.recommend.JedisClusterPipeline
     */
  def pipelined(jedisCluster:JedisCluster):JedisClusterPipeline={
//    println("pipelined(jedisCluster:JedisCluster)!!!")
    val pipeline:JedisClusterPipeline = new JedisClusterPipeline()
    pipeline.setJedisCluster(jedisCluster)
    pipeline
  }

  /* @Description: 获得JedisCluster实例的connectionHandler属性值
   * @Param: [jedis]
   * @return: Unit
   */
  def setJedisCluster(jedis:JedisCluster): Unit ={
//    println("setJedisCluster(jedis:JedisCluster)!!!")
    val ch:JedisSlotBasedConnectionHandler = getValue(jedis, FIELD_CONNECTION_HANDLER).asInstanceOf[JedisSlotBasedConnectionHandler]
    if(ch == null){
      throw new RuntimeException("error: cannot get JedisSlotBasedConnectionHandler from JedisCluster!!!")
    }
    connectionHandler = ch
  }

  /* @Description: 获得JedisSlotBasedConnectionHandler实例的cache属性值JedisClusterInfoCache,
  再根据JedisClusterInfoCache获得处理特定槽slot的JedisPool,
  再根据JedisPool,获得对应的Jedis
   * @Param: [slot]
   * @return: _root_.redis.clients.jedis.Jedis
   */
  def getJedis(slot:Int):Jedis={
//    println("getJedis(slot:Int)!!!")
    val cache:JedisClusterInfoCache = getValue(connectionHandler, FIELD_CACHE).asInstanceOf[JedisClusterInfoCache]
    val pool:JedisPool = cache.getSlotPool(slot)
    // 根据pool从缓存中获取Jedis
//    var jedis:Jedis = jedisMap(pool)
//    if(jedis == null){
//      jedis = pool.getResource
//      jedisMap += (pool -> jedis)
//    }
    var jedis:Jedis = null
    if(jedisMap.keys.exists(x=>x==pool)){
      jedis = jedisMap(pool)
    }else{
      jedis = pool.getResource
      jedisMap += (pool -> jedis)
    }
    jedis
  }

  // 必须覆盖父类PipelineBase的抽象方法
  def getClient(key: String):Client={
//    println("getClient(key: String)!!!")
    val bKey:Array[Byte] = SafeEncoder.encode(key)
    getClient(bKey)
  }

  // 必须覆盖父类PipelineBase的抽象方法
  def getClient(key: Array[Byte]): Client={
//    println("getClient(key: Array[Byte])!!!")
    val jedis:Jedis = getJedis(JedisClusterCRC16.getSlot(key))  // 求出key对应的槽slot,再获取处理这个槽的Jedis
    val client = jedis.getClient  // 获得Jedis的client
    clients += client
    client
  }


  // 必须实现Closeable接口的方法
  @throws[IOException]
  override def close(): Unit={
    println("close()!!!")
    clean()
    for(client <- clients){
      client.close()
    }
    clients.clear()
    for(jedis <- jedisMap.values){  // collection.mutable.Map[JedisPool, Jedis]
      jedis.close()
    }
    jedisMap.clear()
  }

  /* @Description: 构建cls类的fieldName属性的反射对象
    val cluster = RedisUtil.createJedisCluster()
    val FIELD_CONNECTION_HANDLER:Field = Class.forName("redis.clients.jedis.BinaryJedisCluster").getDeclaredField("connectionHandler")
    FIELD_CONNECTION_HANDLER.setAccessible(true)  // 成员变量为procted,故必须进行此操作
    val ch:JedisSlotBasedConnectionHandler = FIELD_CONNECTION_HANDLER.get(cluster).asInstanceOf[JedisSlotBasedConnectionHandler]
    println("ch: "+ch)  // ch: redis.clients.jedis.JedisSlotBasedConnectionHandler@1d082e88
   * @Param: [cls, fieldName]
   * @return: _root_.java.lang.reflect.Field
   */
  def getField(cls:Class[_], fieldName:String): Field ={
    try{
      val field:Field = cls.getDeclaredField(fieldName)
      field.setAccessible(true)
      field
    }catch {
      case e1:NoSuchFieldException =>
        throw new RuntimeException("error1: cannot find or access field '" + fieldName + "' from " + cls.getName, e1)
      case e2:SecurityException =>
        throw new RuntimeException("error2: cannot find or access field '" + fieldName + "' from " + cls.getName, e2)
      case e:Exception =>
        throw new RuntimeException("error: getField, ", e)
    }
  }

  /* @Description: 获得obj对象中定义的发射field的属性值
   * @Param: [obj, field]
   * @return: Any
   */
  def getValue(obj:Object, field:Field): Any ={
    try {
      field.get(obj)
    }catch {
      case e1:IllegalArgumentException =>
        println("error1: get value fail "+e1)
        throw new RuntimeException(e1)
      case e2:IllegalAccessException =>
        println("error2: get value fail "+e2)
        throw new RuntimeException(e2)
      case e:Exception =>
        println("error: get value fail "+e)
        throw new RuntimeException(e)
    }
  }

  // 刷新集群信息,当集群信息发生变更时调用
  def refreshCluster(): Unit ={
    connectionHandler.renewSlotCache()
  }

// Pipeline.java源码
//  public void sync() {
      // 判断消息队列是否为空,是否发出请求
//    if (getPipelinedResponseLength() > 0) {
        // 从InputStream中获取回复消息,逐个将消息塞回消息队列的Response中
//      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
//      for (Object o : unformatted) {
//        generateResponse(o);
//      }
//    }
//  }
//  public List<Object> syncAndReturnAll() {
//    if (getPipelinedResponseLength() > 0) {
//      List<Object> unformatted = client.getMany(getPipelinedResponseLength());
//      List<Object> formatted = new ArrayList<Object>();
//      for (Object o : unformatted) {
//        try {
//          formatted.add(generateResponse(o).get());
//        } catch (JedisDataException e) {
//          formatted.add(e);
//        }
//      }
//      return formatted;
//    } else {
//      return java.util.Collections.<Object> emptyList();
//    }
//  }

  // 同步读取所有数据. 与syncAndReturnAll()相比,sync()只是没有对数据做反序列化
  def sync(): Unit ={
//    println("sync()")
    innerSync(null)
  }

  // 同步读取所有数据 并按命令顺序返回一个列表
  def syncAndReturnAll(): collection.mutable.ListBuffer[Any] ={
    println("syncAndReturnAll(): collection.mutable.ListBuffer[Any]!!!")
    val responseList:collection.mutable.ListBuffer[Any] = collection.mutable.ListBuffer[Any]()
    innerSync(responseList)
    responseList
  }

//  sync()
//  innerSync(formatted:collection.mutable.ListBuffer[Any])!!!
//    innerSync client: redis.clients.jedis.Client@2c767a52
//    getPipelinedResponseLength: 101
//  client get: ()
//  data: OK
//  innerSync client: redis.clients.jedis.Client@708f5957
//  getPipelinedResponseLength: 1
//  client get: ()
//  data: OK
//  close()!!!
  def innerSync(formatted:collection.mutable.ListBuffer[Any]): Unit ={
    println("innerSync(formatted:collection.mutable.ListBuffer[Any])!!!")
    val clientSet:collection.mutable.HashSet[Client] = collection.mutable.HashSet[Client]()
    try{
      println("clients innerSync: "+clients.size)
//      println(clients)
      breakable{
        for(client <- clients) { // 队列
          if (getPipelinedResponseLength > 0) {
//            println("innerSync client: " + client)
//            println("getPipelinedResponseLength: " + getPipelinedResponseLength)
            //        println("client get: "+client.get("testk1"))
            val data: Any = generateResponse(client.getOne).get
//            println("data: " + data)
            // 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
            //        val unformatted: util.List[AnyRef] = client.getMany(getPipelinedResponseLength)
            //        println("unformatted: "+unformatted)
            // 从inputStream中读取所有回复
            //        println("client.getObjectMultiBulkReply: "+client.getObjectMultiBulkReply)
            //        val unformatted: Array[AnyRef] = client.getObjectMultiBulkReply.toArray
            if (formatted != null) {
              formatted += data
              //          for (o <- unformatted.toArray) {
              //            try{
              //              val rep = generateResponse(o).get
              //              formatted += rep
              //            }
              //            catch {
              //              case e: JedisDataException =>
              //                formatted += e
              //            }
              //          }
            }
            // size相同说明所有的client都已经添加,就不用再调用add方法了
            if (clientSet.size != jedisMap.size) {
              clientSet.add(client)
            }
          }else{break}
        }
      }
    }catch {
      case e1:JedisRedirectionException =>{
        if(e1.isInstanceOf[JedisMovedDataException]){
          // if MOVED redirection occurred, rebuilds cluster's slot cache,
          // recommended by Redis cluster specification
          refreshCluster()
        }
        println("error: JedisRedirectionException "+e1)
        throw e1
      }
      case e:Exception => println("error: generateResponse "+e)
    }finally {
      if(clientSet.size != jedisMap.size){
        // 所有还没有执行过的client要保证执行(flush),防止放回连接池后后面的命令被污染
        for(jedis <- jedisMap.values){
          breakable{
            if(clientSet.contains(jedis.getClient)) {break}
            try {
              // 清空inputstream里面的所有数据,忽略QUEUED or ERROR回复
              jedis.getClient.getMany(getPipelinedResponseLength)
            }catch {
              case ex:RuntimeException => println("error: RuntimeException "+ex)
            }
          }
        }
      }
      close()
    }
  }
  //查看数据类型方法-manOf(data)
  def manOf[T:Manifest](t:T):Manifest[T]=manifest[T]
}

** 代码3:在每个主节点上每个槽构建一个transaction,对这个槽上的key进行操作.**

// 从redis上获取全部符合正则匹配的key
    val pattern = "User_Label_Act_-u[0123456789]*"
    var keys:List[AnyRef] = List()
    // 1.获得cluster中所有节点
    val clusterNodes:util.Map[String, JedisPool] = cluster.getClusterNodes
    //     2. 对集群中每个节点都执行keys(pattern)
    for(node <- nodes_master){
      val time_start_key:Long = System.currentTimeMillis()
      println("node: "+node)
      val jedisPool:JedisPool = clusterNodes.get(node)
      val jedisConn:Jedis = jedisPool.getResource
      // 获得集群上每个节点上符合pattern的所有的key
      keys = jedisConn.keys(pattern).toArray().toList
      if(keys.isEmpty){
        ...
      }else{
        // redis cluster对于事务的支持只能在也一个slot上完成
        val slot_multi:collection.mutable.HashMap[String,collection.mutable.ListBuffer[String]] = collection.mutable.HashMap[String,collection.mutable.ListBuffer[String]]()
        for(key <- keys){
          val bKey:Array[Byte] = SafeEncoder.encode(key.toString)
          val slot:String = JedisClusterCRC16.getSlot(bKey).toString
          if(slot_multi.keys.exists(x=>x==slot)){
            slot_multi(slot).append(key.toString)
          }else{
            slot_multi.put(slot,collection.mutable.ListBuffer(key.toString))
          }
        }
        println("slot_multi: "+slot_multi.size+"  "+slot_multi.keys)
        for(slot_key <- slot_multi.keys){
          println("slot_key keys.size: "+slot_key+"  "+slot_multi(slot_key).length)
          val tx:Transaction = jedisConn.multi()
          for(key <- slot_multi(slot_key)){
              tx操作(set,rpush等)
          }
          tx.exec()
          tx.clear()
          tx.close()
         }
      }
      jedisConn.close()
     }

原文地址:https://www.cnblogs.com/xl717/p/12896178.html