DelayedOperationPurgatory之purgatory的实现

purgatory的超时检测

当一个DelayedOpeartion超时(timeout)时,它需要被检测出来,然后调用它的回调方法。这个事情看起来很简单,但做好也并不容易。

0.8.x的Kafka的实现简单明了,但是效率不高。这些版本的Kafka的delayed request实现了java.util.concurrent.DelayQueue要求的DelayedItem接口。这些请求被放个DelayQueue, 然后有一个专门的线程从DelayQueue里poll这些请求出来,所以被poll出来的元素也就是已过期的元素。

这样做的坏处是DelayQueue里插入和删除的时间复杂度为O(logn),当其中元素很多时,还是挺费CPU。而且这个实现中,当一个请求被satisfied以后,它并没有被立即从DelayedQueue里移除(因为删除特定的元素的时间复杂度为O(n)),而是隔一定数目的请求就遍历这个Queue,从中移除元素(这样做的开销会小于单独移除特定的元素)。所以,如果这个purge interval设置不好的话,就可能会OOM~

0.9.0开始,purgatory采用了新的基于timing wheel的实现,可以参见之前的blog中翻译的Kafka之Purgatory Redesign Proposal (翻译)

下面看一下0.9.0的源码里对于超时检测的实现。

TimingWheel

TimingWheel的原理可以看一下这篇文章惊艳的时间轮定时器Kafka之Purgatory Redesign Proposal (翻译)也有提到其实现的原理。

Kafka的实现大体上跟通用的Timing Wheel差不多,但是结合了一些Kafka使用的特点,比如使用DelayQueue来驱动时间轮,以及bucket采用双端链表的实现。

注释里讲的例子

TimingWheel类的注释里也讲到了它的原理,并且举了一个例子,可以方便理解它的运作。下面介绍一下注释里提到的例子。不过大体看下是怎么回事就行了,这个注释本身的逻辑就有些问题(如果不是我理解错的话)。而且这个例子只是说明了一下概念。

u代表最小的时间粒度,n代表时间轮的大小,即一个时间轮有多少个桶。设u等于1,等于3,起始时间是c。那么此时,不同级别的桶是下面这样的:

* level    buckets
* 1 [c,c] [c+1,c+1] [c+2,c+2]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26]

bucket超时(expire)的时间,依据于bucket的起始时间。所以在c+1时刻,[c, c], [c, c+2]和[c, c+8]就都超时了。

级别1的时钟移动到到c+1, 并且创建了[c+3, c+3]

级别2和级别3的时钟停在c,因为它们的时钟移动的单位分别是3和9。所以,级别2和3并不会有新的桶创建。

需要说明的是,级别2的[c, c+2]不会收到任何task,因为这个区间已经被级别1覆盖了。对于级别3的[c, c+8]也是一样,因为它的这个区间被级别2覆盖了。这样做有些浪费,但是却简化了实现。 

* 1        [c+1,c+1]  [c+2,c+2]  [c+3,c+3]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+26]

在c+2时刻,[c+1, c+1]变得超时。级别1的时钟走到了c+2, 并且创建了[c+4, c+4]。

* 1        [c+2,c+2]  [c+3,c+3]  [c+4,c+4]
* 2 [c,c+2] [c+3,c+5] [c+6,c+8]
* 3 [c,c+8] [c+9,c+17] [c+18,c+18]

在c+3时刻,[c+2, c+2]变得超时,级别2移动到了c+3,并且创建了[c+5, c+5]和[c+9, c+11]

* 1        [c+3,c+3]  [c+4,c+4]  [c+5,c+5]
* 2 [c+3,c+5] [c+6,c+8] [c+9,c+11]
* 3 [c,c+8] [c+9,c+17] [c+8,c+11]

设计TimingWheel应考虑的问题

整体的设计就是按照proposal提到的来,但是有些细节需要考虑一下

先得明确一些东西的定义:

  • time unit 对一个TimingWheel,它走一个格对应的物理时间定义为这个TimingWheel的time unit。在源码中,tickMs这个TimingWheel的构造器参数决定了它的time unit。
  • bucket 一个bucket代表了一个时间段,它的结束时间 - 开始时间 + 1  = time unit,开始时间(物理时间)总是time unit的整数倍。一个TimingWheel由固定数量的bucket组成,这些bucket的代表的时间段互不重叠,并且完全覆盖了这个TimingWheel当前代表的整个时间段。
  • size 一个TimingWheel的大小即是它包含多少个bucket,也就是它走一圈的时间/time unit
  • current time 当前这个TimgWheel的指针指向的那个bucket的开始时间。因此current time也总是time unit的整数倍。

实际上,上述的概念可以有不同的定义,这些定义也决定了TimingWheel的一个特定实现。上边提到的定义,是跟源码里的TimingWheel的行为一致的。

当前时间的bucket的expire time

有了这些概念,就可确定一个事情,这个事情就是上边提到的注释里有些混乱的一个概念:是否认为current time所指的那个bucket是expire的?这个概念也决定了当TimingWheel tick一次的时候,是新指向的bucket变得过期,还是之前的bucket变得过期。在一个hierachical timing wheel中,高级的bucket在过期以后,需要把它里边的元素重新插入低级的timing wheel, 以保证整个timer(所以这些timing wheel构成一个timer)的计时精度。这就决定了,一个bucket的expire time就是这个bucket的start time。根据current time的定义,这也就决定了一个TimingWheel当前指向的那个bucket是一个已经expire的bucket,也就是在这个wheel 走动的时候,它新指向的那个bucket变得过期。

在Kafka中,bucket对应于TimerTaskList类。

何时会overflow

一个timing wheel有它可以host的时间段。这个时间段由几个参数决定

  • currenTime 当前时间
  • tickMs 每个bucket的大小,在Kafka中物理时间的单位是毫秒。所以,tickMs是指一个桶有多少毫秒
  • wheelSize 这个timing wheel有多少个bucket

根据这些参数,一个timing wheel可以存放的request的过期时间分布在[currentTime, currentTime + tickMs * wheelSize - 1], 包含两端的时间。

当把一个request加到一个timing wheel,而这个request的expire time超过了上边时间段的右端,就需要把它溢出到更高级timing wheel。

根据expire time把元素放在适当的桶内

这个操作的关键在于控制它的时间复杂度,它实际上可以分成两步(这个有点像把大象放进冰箱要几步……):

  1. 找到合适的桶
  2. 把item放到桶里

找到合适的桶

每个timing wheel的桶的数目是固定的,这比较适于构建一个bucket array来保存桶,况且数组的寻址更放一些。Kafka里的实现是使用的数组。

那么已知一个item, 已知它的expire time(并且没有溢出),它应该放在哪个桶里呢?

如果我们把整个时间域按照这个timing wheel的tickMs无数多的桶,第一个桶的开始时间是Unix epoch的0毫秒,那么可以根据expire time求出这个item应该放入的桶的编号,也就是expire time/tickMs。

例如,当前的timing wheel里的桶的编号为0, 1, 2, 3。 currentTime为0。

在tick一次以后,0号桶就可以被重用了,我们自然可以把4号桶放进去。这时,这个timing wheel的桶编号为4, 1, 2, 3。仍然符合我们前边提到的一个timing wheel的边界。同样,再tick一次以后,这个timing wheel的桶的编号变成了4, 5, 2, 3。这种方式,相当于每个桶能放的值为它在数组里的index + n * wheelSize (n为非负整数)。

这样就有了根据expireTime确定桶的index的公式: (expireTime/tickMs) % wheelSize。

      // Put in its own bucket
      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)

      // Set the bucket expiration time
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }

这里也可以看出来一个bucket的expirationTime是virtualId * tickMs, 也就是它的起始时间。

注意,只有保证每个timing wheel里所有元素的时间都在[currentTime, currentTime + tickMs * wheelSize - 1],这个公式才不违反我们之前对timing wheel规则的定义。

放在桶里

在Kafka里,bucket是用双端链表实现的,因此insert一个元素的开销是O(1)的。

Timer的运转

以上对于Timing Wheel的规则的约定,只是定义了一种数据结构和它的运转规则。但是Timing Wheel自身是不会随着物理时间的前进而改变的,也是就说它需要外部驱动。

如何驱动 

对于Timing Wheel而言,这走动就是它的current time前进。随着current time的改变,会有bucket变得过期,其它人可以从Timing Wheel中取出这些桶处理(这里需要考虑到同步的问题,即tick这个动作需要持有timing wheel的锁)。改变current time,对于Kafka的实现对应于Timing Wheel的advance方法。

现在有两个问题需要解决:

  1. 如何根据使得TimingWheel根据物理时间走动。前边的那个purgatory redesign proposal中提到过,一个简单的方式是使用一个线程周期性地醒来,驱动TimingWheel前进,但这样做的坏处是当Timing Wheel里的元素(准确是说是非空的桶)很稀疏时,周期性地唤醒线程检查的是一种浪费。所以Kafka使用了一种“按需”唤醒线程的方式,也就是DelayQueue。每个bucket的实现了Delay接口,getDelay(获取超时时间)返回这个bucket的expire time,也就是它的开始时间。ExpiredOperationReaper线程会通过DelayQueue的poll方法阻塞自己(当前的实现是最多阻塞200毫秒,因为默认的最低级的wheel的tickMs为1ms, 所以这个timeout时间是可以接受的),当有bucket expire,它会从DelayQueue里取出它来。Kafka的ExpiredOperationReaper的doWork方法(它会被线程一执调用)是这样的
        override def doWork() {
          timeoutTimer.advanceClock(200L)
            estimatedTotalOperations.getAndSet(delayed)
            debug("Begin purging watch lists")
            val purged = allWatchers.map(_.purgeCompleted()).sum
            debug("Purged %d elements from watch lists.".format(purged))
          }

    它会调用timeoutTimer的advanceClock方法,来改变timer的当前时间(不大对呀,还有很多事要做的)。是的,不只是改变当前时间这会简单,改变timer的当前时间一定和处理超时的bucket是一体的。而且advanceClock(200L)并不是把Timing wheel前进200ms的意思,而是传进去了一个200ms的超时时间……。Timer的advanceClock方法实际上长这样,感觉叫做"processExpiredBucketsAndAdvanceClock"更靠谱点~

      def advanceClock(timeoutMs: Long): Boolean = {
        var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS) //取出超时的bucket,最多阻塞timeoutMs,当前的实现中就是200ms
        if (bucket != null) {
          writeLock.lock() //持有写锁(ReentrantReadWriteLock里的写锁),开始处理超时的bucket
          try {
            while (bucket != null) {
              timingWheel.advanceClock(bucket.getExpiration()) //改变timing wheel的当前时间至这个超时的bucket的expire time
              bucket.flush(reinsert)//处理bucket里的元素
              bucket = delayQueue.poll()//继续poll其它超时的元素
            }
          } finally {
            writeLock.unlock()//释放锁
          }
          true
        } else {
          false
        }
      }

    这里边的点让人困惑的是timingWheel.advanceClock(bucket.getExpiration())这一句,它把timing wheel的当前时间设成了bucket的expire time。为什么是这个时间呢?因为bucket的expire time就是它的起始时间,由于poll总是取出最新过期的bucket,所以poll出来的bucket的expire time基本就是当前的物理时间。所以这么设置以后,当writeLock被释放后,往timer插入新的元素时,才能根据这个元素的物理时间放入到合适的桶里。如果bucket的curernt time跟物理时间差距太大,那么timer的计时就会不准(timer的准确性在后边会分析一下)。

  2. 过期的桶如何处理?由于Kafka的bucket是一个数组里的元素,因此除非在过期后复制出里边的所有元素,否则就需要在持有Timing Wheel的锁期间完成所有元素在过期后的回调函数,清空这个bucket,然后才能释放锁。因此,如果在驱动器timer前进的线程里调用这些这些元素的回调函数(比如返回响应啥的),那么这个驱动器线程持有锁的时间可能会相当长,而且在它处理完所有过期元素之前,是不能往Timer中加入新的TimerTask的,所以Reaper不能阻塞在这些元素的回调上,否则可能无法及时处理后边的已过期的元素。所以Kafka使用了一个单独的线程池来执行回调。
     private def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry): Unit = {
        if (!timingWheel.add(timerTaskEntry)) {
          // Already expired or cancelled
          if (!timerTaskEntry.cancelled)
            taskExecutor.submit(timerTaskEntry.timerTask)
        }
      }

    当timingWheel.add返回false时,代表这个timerTaskEntry(也就是bucket里的一个元素,它持有一个TimerTask,DelayedOperation继承了TimerTask)要不是已期expire了,要不是被cancel了。如果是expire了,就把它持有的TimerTask提交到taskExecutor这个ExecutorSerivce中(TimerTask实现了runnable接口)。对于DelayedOperation,也就是是它的forceComplete回调会被执行,对于DelayedFetch和DelayedOperation,也就是会产生和发送响应。

    需要注意的是,这个taskExecutor是一个FixedThreadPool,

      // timeout timer
      private[this] val executor = Executors.newFixedThreadPool(1, new ThreadFactory() {
        def newThread(runnable: Runnable): Thread =
          Utils.newThread("executor-"+purgatoryName, runnable, false)
      })
      private[this] val timeoutTimer = new Timer(executor)

    由于FixedThreadPool使用一个unbound queue,因此可以认为submit是非阻塞的。但是这样带来的问题是这个线程池的Queue有可能会积压元素。(它貌似无法产生back-pressure给产生TimerTask的源头)

    对于过期的bucket,分成两类:1.里边的item都过期了,需要提交给taskExecutor执行 2. 这是一个高级的Timing Wheel,因此里边的元素需要放入低级别的Timing Wheel。Kafka对二者进行了统一抽象,即用一个reinsert方法完成上边两种处理,而reinsert调用的也就是上边的addTimerTaskEntry方法。

  3. private[this] val reinsert = (timerTaskEntry: TimerTaskEntry) => addTimerTaskEntry(timerTaskEntry)

    addTimerTaskEntry方法会调用TimingWheel的add方法,它会根据TaskEntry的不同状态,进行不同的处理

      def add(timerTaskEntry: TimerTaskEntry): Boolean = {
        val expiration = timerTaskEntry.timerTask.expirationMs
    
        if (timerTaskEntry.cancelled) {
          // Cancelled
          false
        } else if (expiration < currentTime + tickMs) {
          // Already expired
          false
        } else if (expiration < currentTime + interval) {
          // Put in its own bucket
          ...
          true
        } else {
          // Out of the interval. Put it into the parent timer
          if (overflowWheel == null) addOverflowWheel()
          overflowWheel.add(timerTaskEntry)
        }
      } 

 Timer的准确性

理想情况下,Timer应该在一个TimerTask的expiration time检测到它超时,然后执行回调。但是,实际情况是这样的吗? 之所以会有此疑问,是由于以下几点:

  1. 驱动它的是前边提到的这些逻辑,而不是直接调用的JDK跟时间有关的方法(像是Thread.sleep这种的)。那就得看一下这些逻辑能不能使timer准确
  2. timer本身是有精度的,就是tickMs。

bucket的expiration time是由TimingWheel的add方法确定的

      val virtualId = expiration / tickMs
      val bucket = buckets((virtualId % wheelSize.toLong).toInt)
      bucket.add(timerTaskEntry)
      // Set the bucket expiration time
      if (bucket.setExpiration(virtualId * tickMs)) {
        queue.offer(bucket)
      }

所以,一个TimerTask被放进的那个桶的的expiration time实际上是根据这个TimerTask的expiration time确定的,这是一个物理时间。由此可知,一个TimerTask应该会在自己所属的那个相于Unix epoch 0时刻的全局bucket(也就是id为virtualId那个bucket)的起始时刻被poll出来。但是被poll出来并不代表着这个TimerTask的回调会被执行,实际的执行时刻取决于接下怎么办。

被poll出来的bucket里的元素会经过Timer#reinsert -> Timer#addTimerTaskEntry ->  TimingWheel#add 方法,来决定怎么办。这个处理过程前边提到过,重点是,只有被add方法认为是expire和TimerTask,才会被提交到线程池执行。而add方法是根据current time来决定TimerTask是否过期的。

    val expiration = timerTaskEntry.timerTask.expirationMs

    if (timerTaskEntry.cancelled) {
      // Cancelled
      false
    } else if (expiration < currentTime + tickMs) {
      // Already expired
      false
    } else if (expiration < currentTime + interval) {
      // Put in its own bucket

这里就遇到了物理时间和Timer的本地时间不一致的问题,即expiration和currentTime的区别。

currentTime时在poll出来一个bucket以后确定的,逻辑是在Timer的advanceClock方法里,前边也提到过。

  def advanceClock(timeoutMs: Long): Boolean = {
    var bucket = delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)
    if (bucket != null) {
      writeLock.lock()
      try {
        while (bucket != null) {
          timingWheel.advanceClock(bucket.getExpiration())
          bucket.flush(reinsert)
          bucket = delayQueue.poll()
        }
      } finally {
        writeLock.unlock()
      }
      true
    } else {
      false
    }
  }

所以bucket的本地时间,总是落后于物理时间的,落后多少取决于poll返回的时刻与bucket的expiration time的差别,以及获取写锁的时间。这里需要注意的是,这个timingWheel就是最低层的那个timingWheel,而且advanceClock方法会更新它的所有上层TimingWheel的currentTime。

这里需要注意的是writeLock的释放时机。Reaper线程会执行writeLock直至不能从delayQueue中取出元素,并且在while循环中的delayQueue.poll方法是没有timeout时间的,这是为什么呢?为什么不在poll出来一个bucket并且处理完它以后就释放锁,然后再去poll呢?原因就是TimingWheel的advanceClock方法也会更新它的上级TimingWheel的currentTime。但是上级的TimingWheel里的expire的那个bucket还并没有处理(如果存在这样的bucket的话)。此时,如果释放锁,这个上级的TimingWheel就可能处于不一致的状态,这样会造成过期的bucket里的TimerTask没有被清空时这个bucket的expiration time就被更新了,从而带来错误。所以Reaper线程需要把所有TimingWheel的已过期的bucket全部取出来,处理完毕之后,才能允许往Timer里加入新的TimerTask。

注意到,reinsert方法执行前已经更新了timingWheel的currentTime,而TimerTaskList的flush方法会对这个list中的的TimerTask执行resinsert。

现在的问题在于,如果poll出来的是一个高层级的Timing那Wheel里的bucket,那么接下来的处理会不会带来误差。

假设最低级别的TimingWheel的tickMs是1ms, wheelSize是4,那么第二级的bucket的tickMs就是4,设它的wheelSize也是3。那么,第三级的TimingWheel的tickMs就是12.

1. 设最低级别的TimingWheel比物理时间落后2ms。

假设poll出来的这个bucket里元素为a b c, 它们的过期时间分别为4 5 6 (物理时间)。reinsert执行时的物理时间为6, 而TimingWheel的currentTime为4.

那么在reinsert执行时,所有这个bucket里的元素都被认为已经过期,这是正确的。

2. 假设最低级别的TimingWheel比物理时间落后超过了tickMs, 设落后8ms。

poll出来的是第三级别的bucket,reinsert执行时的物理时间为 21, 这个bucket是在物理时间13被poll出来的,因此前三级wheel的currenTime被设成了12。

poll出来的这个bucket的元素a b c的过期时间分别是 14 20 22。 那么最低层TimingWheel的add方法会认为expiration time在[12, 15]的元素已经过期,所以b和c不会被立即提交给taskExecutor线程池执行,而是被重新插入到最低级的TimingWheel。而由于最低级的TimingWheel的溢出阀值为15,所以b和c会被提交给它的上一级TimingWheel,而第二级TimingWheel可以管理的TimerTask的范围是[12, 23], 因此b和c会被交给第二级的TimingWheel。

但无论如何,它总是会被加入到正确的桶里(这个桶的过期时间符合这个TimerTask的过期时间),这个bucket会在下一次(或者再隔几次……)的poll中被取出来。

只要最低级的TimingWheel的currentTime不会一直固定在一个值,已过期的TimerTask就一定会被提交执行。而且它被执行时间取决于它之前有多少个已经过期的元素,即它不会被无限期地延后(starve), 尽管它可能会在过期后仍然在TimingWheel的层次结构间倒腾。

那么,就可以认为一个TimerTask实际被执行的时间取决于

  1. tickMs带来的误差。由于tickMs的原因,一个TimerTask可能会被分配到一个expiration time比它小的bucket里。但是DelayedOprationPurgatory把tickMs设成1ms, 所以这个误差可以忽略。
  2. Reaper线程从DelayQueue里poll元素的延迟和处理poll出来的元素带来的延迟。这些基本是不可控的。这会使得一个TimerTask肯定会在expiration time时刻之后的某时刻被提交执行。GC和CPU时间的分配都会影响这个延迟。别一个重要因素是获取writeLock的时间,由于只有reapder线程会尝试获取writeLock,所以它只需要面对获取readLock线程的竞争。而每次往Timer add一个TimerTask都会获取readLock。因此只要这个ReentranceReadWriteLock是一个公平锁,这么做就没问题。但是Kafka在实现Timer时并没有把这个ReentrantReadWriteLock设成fair mode。
      // Locks used to protect data structures while ticking
      private[this] val readWriteLock = new ReentrantReadWriteLock()
    因此,如果对锁的竞争很厉害,理论上来说,Reaper可能获取不了写锁,也就是说可能会使JVM产生OOM。

总结

总之,可以认为在压力低时(reaper可以迅速地从DelayQueue中获取元素,并且对读写锁的竞争不激烈,reaper线程可以获取充足的CPU时间), Timer理论上还是挺准确的。如果吞吐量非常大,那就难说了。而且,非公平的读写锁可能会使得插入TimerTask的速度大于取出TimerTask的速度,或者taskExecutor这个线程池处理请求的速度跟不上TimerTask生成的速度,造成OOM。


总结

DelayedOperationPurgatory的新设计避免了之前版本简单地把所有DelayedOperatoin作为DelayedQueue中的元素时,DelayedQueue里元素数目过多造成的CPU开销。它把一同过期的所有DelayedOperatoin放在一个bucket里,而只从DelayQueue里poll bucket,这样就大大减少了DelayQueue中的元素数目。为此,引入了hierachical timing wheel来组织DelayedOperation到合适的bucket里。

ExpiredOperationReaper线程负责从DelayedQueue中取出expired bucket,然后根据bucket里的TimerTask的过期时间放入适当的TimingWheel或者提交给一个线程池执行已经过期的TimerTask的回调函数。由Reaper线程和Timer一起维护TimingWheel里面元素的一致性(使TimingWheel里的各个属性和元素符合它们的定义)。

源码里所有这些类: DelayedOperationPurgatory, Timer, TimingWheel, TimerTaskList 的实现都是很简洁的,但是实现这套系统的难度还是挺高的,特别是一些并发方面的考虑(参与的主要线程有一个Reaper线程和多个处理请求的线程)。

有些地方分析的可能不正确,请指出。

原文地址:https://www.cnblogs.com/devos/p/5059362.html