ReplicaManager之DelayedOperation

DelayedOperation包括两种:DelayedFetch和DelayedProduce,它们的存在是由Kafka Protocol决定的,而Kafka Protocol是由实际需求决定的……

存在DelayedFetch是为了更有效率的fetch,也就是batch fetch;存在DelayedProduce是为了等待更多副本的写入,以达到用户指定的持久性保证(也就是消息更不容易丢)。

对于这些DelayedOperation而言,什么时候不再需要delay是必须指明的,跟据操作的不同,delay被满足的条件有所不同,但也有共通的,比如max wait time。

FetchRequest

case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
                        correlationId: Int = FetchRequest.DefaultCorrelationId,
                        clientId: String = ConsumerConfig.DefaultClientId,
                        replicaId: Int = Request.OrdinaryConsumerId,
                        maxWait: Int = FetchRequest.DefaultMaxWait,
                        minBytes: Int = FetchRequest.DefaultMinBytes,
                        requestInfo: Map[TopicAndPartition, PartitionFetchInfo])
        extends RequestOrResponse(Some(RequestKeys.FetchKey)) { ...  }

注意其中的两个参数, 按照Kafka Protocol的说明,它们的含义是这样的:

  • maxWait  The max wait time is the maximum amount of time in milliseconds to block waiting if insufficient data is available at the time the request is issued.即,如果没有足够的数据,这个FetchRequest可以最多等待这么长时间,以期待更多的数据到来。
  • minBytes This is the minimum number of bytes of messages that must be available to give a response. If the client sets this to 0 the server will always respond immediately, however if there is no new data since their last request they will just get back empty message sets. If this is set to 1, the server will respond as soon as at least one partition has at least 1 byte of data or the specified timeout occurs. By setting higher values in combination with the timeout the consumer can tune for throughput and trade a little additional latency for reading only large chunks of data (e.g. setting MaxWaitTime to 100 ms and setting MinBytes to 64k would allow the server to wait up to 100ms to try to accumulate 64k of data before responding). 在返回Response前最少获取这么多bytes的数据。如果client把这个值设成0,那么server将会总是立即返回响应,尽管可能在上一次请求后没有新数据,此时client就会收到一个空的message set。如果把这个值设成1,那么只有一个partition有至少1byte的数据,或者把定的timeout到了,那么server也会返回响应。通过把这个值设大一些,并且配合timeout的设置,consumer可以只读取一批数据,从而在吞吐量和延迟之间进行调节(比如把MaxWaitTime设成100ms, 并且把MiniBytes设成64k,将使得server在返回响应给client之前,最多等待100ms来收集够64k的数据)

当一个FetchRequest由于这两个参数而需要等待时,Kafka就生成一个DelayedFetch对象,来表示'delay'的语义。

DelayedFetch

case class PartitionFetchInfo(offset: Long, fetchSize: Int)
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionFetchInfo) { ...  }

case class FetchMetadata(fetchMinBytes: Int,
                         fetchOnlyLeader: Boolean,
                         fetchOnlyCommitted: Boolean,
                         isFromFollower: Boolean,
                         fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {... }    
class DelayedFetch(delayMs: Long,
                   fetchMetadata: FetchMetadata,
                   replicaManager: ReplicaManager,
                   responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit)
  extends DelayedOperation(delayMs) { ...  }

其中

  • PatitionFetchInfo  它是针对一个partition的,说明了从哪个offset开始读,以及最多读取多少byte的数据。
  • FetchPartitionStatus 它也是针对一个partition的,含义和PartitionFetchInfo基本一样,只不过其中的startOffsetMetadata包含了与LogSegment有关的信息,即它可以定位于具体的物理的segment。
  • FetchMetadata 包括了对每个partition的FetchPartitionStatus,以及前边提到的minBytes等信息。
  • 而DelayedFetch包括了FetchMetadata等信息,其中的delayMs就是前边提到的maxWait.

总之,FetchRequest提供了关于原始的FetchRequest的信息,以及在FetchRequest处理的步骤中补充的一些信息。

 ProduceRequest

case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
                           correlationId: Int,
                           clientId: String,
                           requiredAcks: Short,
                           ackTimeoutMs: Int,
                           data: collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet])
    extends RequestOrResponse(Some(RequestKeys.ProduceKey)) { ...  }

注意这里边的两个参数

  • requiredAcks  这个值就是producer配置里的acks。它的具体含义可以在Kafka的文档里找到。重点强调的是,当ack=-1时,只是要求所有在ISR的replica都已经确认写入了这个produce request的内容,leader才会返回响应给producer。
  • ackTimeoutMs  This provides a maximum time in milliseconds the server can await the receipt of the number of acknowledgements in RequiredAcks. The timeout is not an exact limit on the request time for a few reasons: (1) it does not include network latency, (2) the timer begins at the beginning of the processing of this request so if many requests are queued due to server overload that wait time will not be included, (3) we will not terminate a local write so if the local write time exceeds this timeout it will not be respected. To get a hard timeout of this type the client should use the socket timeout. 这段主要是说这个timeout并不是从发出请求到得到响应的最大时间,它不包括网络时延,不包括请求在被实际处理前的等街时间,如果正在写的时候超时了,写操作也不会被停止。真是想要这么一个严格的超时时间,就得用socket timeout做限制了。

DelayedProduce

class DelayedProduce(delayMs: Long,
                     produceMetadata: ProduceMetadata,
                     replicaManager: ReplicaManager,
                     responseCallback: Map[TopicAndPartition, ProducerResponseStatus] => Unit)
  extends DelayedOperation(delayMs) { ...  }
case class ProduceMetadata(produceRequiredAcks: Short, produceStatus: Map[TopicAndPartition, ProducePartitionStatus]) { ... }

DelayedProduce基本和DelayedFetch是一个思路,由ProduceMetadata记录了一些加工后的信息,便于以后判断delay条件时使用。对于DelayedProduce,主要是在把消息集append到本地后,就可以获取append之后这个消息集的最后一条消息的offset,由此推出来当follower来请求到至少哪一条消息时,就说明这个replica已经拉取完了这个message set, 这个offset就是requiredOffset。

case class ProducePartitionStatus(requiredOffset: Long, responseStatus: ProducerResponseStatus) { ... }

多Partition的Produce和Fetch

对于上面的定义里,需要注意的一点时,在单个FetchRequest和ProduceRequest里,都可以produce以及fetch到之个partition。而delay参数,比如ack以及minBytes都是针对整个Request的。所以,对于ProduceRequest而言,只有它包括的所有partition的ISR都已经确认,这个Request也会整体不再被delay。而对于FetchRequest,只要总的bytes超过了minBytes,不管是不是所有Partition的数据都有,它都会不再被delay。

ProduceRequest的这种行为是由于对于它的响应只会有一个,所以需要等所有partition都处理完了,再能给出这个响应。这个对于异步的Producer也是一样。如果需要允许确认一部分Partition就返回响应,那么server和client端的处理就会麻烦很多。

而FetchRequest的这种行为使得无法保证fetch到的数据中不同partition的比例,可能并不是特别好。但是大部分情况下,还都是只针对一个Partition发一个FetchRequest,所以不会受到影响。

总之,这部分的Kafka Protocol设计得并不太完美,但的确比较简洁,算是有得有失吧。

DelayedOperation接口

在看源码之前,可以先大致想一下DelayedOperation的通用处理过程,这样会更容易理解源码中的定义。

首先,这两种Request都有max wait时间,为此需要定期检查被delayed的这些request是否超时(expire)了,为了做到这一点,需要一个定时任务。

除了时间触发之外,对DelayedOperation的delay条件是否满足的检查还有一些是事件触发的,比如,对于DelayedProduce,当一个replica发来fetch请求时,leader就获取了replica拉取进度的新信息,因此就需要检查下DelayedProduce是否可以被满足(可以被满足就是说这个DelayedOperation应该摆脱delay状态)。同样的,对于DelayedFetch,当有新的producer request里的消息append到leader,就需要检查下处于delayed状态的DelayedFetch请求是否可以被满足。因此,DelayedOperation必须有接口可以被调用以检查是否delay条件被满足了。

此外,DelayedOperation应该指明在delay条件满足之后应该怎么做,比如此时应该发送response;以及应该在expire之后应该怎么做。

因此,它大概需要以下方法:

  • isCompleted  检查是否已经被完成
  • tryComplete 检查delay条件是否已经满足,满足的话就执行一些操作
  • onExpiration 在expire以后怎么做
  • onComplete 在complete之后怎么做

源码中的这个接口的定义有些混乱,因为有些方法的定义跟具体的处理过程紧密相关。但是也是有一些情况是上面没有考虑到的,比如同步,比如在源码的DelayedOperation接口的forceComplete方法中

  private val completed = new AtomicBoolean(false)

  /*
   * Force completing the delayed operation, if not already completed.
   * This function can be triggered when
   *
   * 1. The operation has been verified to be completable inside tryComplete()
   * 2. The operation has expired and hence needs to be completed right now
   *
   * Return true iff the operation is completed by the caller: note that
   * concurrent threads can try to complete the same operation, but only
   * the first thread will succeed in completing the operation and return
   * true, others will still return false
   */
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }

在Kafka的架构中,有多个线程共同处理请求,因此可能会有多个线程同时检查delay条件以及在发现满足条件后执尝试行操作。但是对于onComplete这样的方法,在DelayedOperation的整个生命周期中,只允许被调用一次(因为这个方法会返回给client响应),因此需要对访问它的方法进行同步。而DelayedOperation约定onComplete方法只允许被forceComplete方法调用,因此在forceComplete方法中用AtomicBoolean的CAS操作构造了一段只被执行一次的代码。即,只允许把completed从false设成true的那个线程执行cancel和onComplete。由于CAS是原子操作,而且没有其它地方把它从true改成false,所以if里操作对于这个DelayedOperation,在其生命周期中只会被执行一次。

总的来说,这些DelayedOperation在超时后,就直接调forceComplete,然后调onExpiration(这个顺序有点乱,不过在DelayedFetch和DelayedProduce中,onExpiration都只是做簿记工作)。在由事件触发时,就调用tryComplete,如果在tryComplete中发现delay条件被满足,就调用forceComplete。

原文地址:https://www.cnblogs.com/devos/p/5049820.html