offset range 查询

offset range 查询

我们在实际使用过程中经常需要查询某个topic的某分区的offset的range
命令行:

kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -2      
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list xxxx:9092 -topic xxxtopic --time -1

-1 -2 的特殊含义:

public class ListOffsetRequest extends AbstractRequest {
    public static final long EARLIEST_TIMESTAMP = -2L;
    public static final long LATEST_TIMESTAMP = -1L;
}

客户端

KafkaConsumer.endOffsets(Collection)
KafkaConsumer.beginningOffsets(Collection)
Fetcher.beginningOrEndOffset(Collection, long, long)
Fetcher.retrieveOffsetsByTimes(Map<TopicPartition, Long>, long, boolean)
Fetcher.sendListOffsetRequests(boolean, Map<TopicPartition, Long>)

// Group the partitions by node.
        final Map<Node, Map<TopicPartition, Long>> timestampsToSearchByNode = new HashMap<>();
        for (Map.Entry<TopicPartition, Long> entry: timestampsToSearch.entrySet()) {
            TopicPartition tp  = entry.getKey();
            PartitionInfo info = metadata.fetch().partition(tp);
            if (info == null) {
                metadata.add(tp.topic());
                log.debug("Partition {} is unknown for fetching offset, wait for metadata refresh", tp);
                return RequestFuture.staleMetadata();
            } else if (info.leader() == null) {
                log.debug("Leader for partition {} unavailable for fetching offset, wait for metadata refresh", tp);
                return RequestFuture.leaderNotAvailable();
            } else {
                Node node = info.leader();
                Map<TopicPartition, Long> topicData = timestampsToSearchByNode.get(node);
                if (topicData == null) {
                    topicData = new HashMap<>();
                    timestampsToSearchByNode.put(node, topicData);
                }
                topicData.put(entry.getKey(), entry.getValue());
            }
        }

        final RequestFuture<Map<TopicPartition, OffsetData>> listOffsetRequestsFuture = new RequestFuture<>();
        final Map<TopicPartition, OffsetData> fetchedTimestampOffsets = new HashMap<>();
        final AtomicInteger remainingResponses = new AtomicInteger(timestampsToSearchByNode.size());
        for (Map.Entry<Node, Map<TopicPartition, Long>> entry : timestampsToSearchByNode.entrySet()) {
            sendListOffsetRequest(entry.getKey(), entry.getValue(), requireTimestamps)
                    .addListener(new RequestFutureListener<Map<TopicPartition, OffsetData>>() {
                        @Override
                        public void onSuccess(Map<TopicPartition, OffsetData> value) {
                            synchronized (listOffsetRequestsFuture) {
                                fetchedTimestampOffsets.putAll(value);
                                if (remainingResponses.decrementAndGet() == 0 && !listOffsetRequestsFuture.isDone())
                                    listOffsetRequestsFuture.complete(fetchedTimestampOffsets);
                            }
                        }

                        @Override
                        public void onFailure(RuntimeException e) {
                            synchronized (listOffsetRequestsFuture) {
                                // This may cause all the requests to be retried, but should be rare.
                                if (!listOffsetRequestsFuture.isDone())
                                    listOffsetRequestsFuture.raise(e);
                            }
                        }
                    });
        }
        return listOffsetRequestsFuture;

简单点说:就是找到leader节点然后给其发送ListOffsetRequest请求。这个请求是按时间进行offset定位。

broker端

KafkaApis.handleListOffsetRequestV1AndAbove(request: RequestChannel.Request)

查询最新offset

这个值应该是在生产的时候维护好的

val lastFetchableOffset = offsetRequest.isolationLevel match {
  case IsolationLevel.READ_COMMITTED => localReplica.lastStableOffset.messageOffset
  case IsolationLevel.READ_UNCOMMITTED => localReplica.highWatermark.messageOffset
}

这个地方也能反映出 LEO,LSO,highwater的区别!!

查询最早offset

kafka.log.Log.fetchOffsetsByTimestamp(targetTimestamp: Long)
这个值应该是在生产的时候维护好的

@threadsafe
class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
          val producerStateManager: ProducerStateManager,
          logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
// ......
if (targetTimestamp == ListOffsetRequest.EARLIEST_TIMESTAMP)
        return Some(TimestampOffset(RecordBatch.NO_TIMESTAMP, logStartOffset))

按时间戳查询offset

先确定target segment

      val targetSeg = {
        // Get all the segments whose largest timestamp is smaller than target timestamp
        val earlierSegs = segmentsCopy.takeWhile(_.largestTimestamp < targetTimestamp)
        // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
        if (earlierSegs.length < segmentsCopy.length)
          Some(segmentsCopy(earlierSegs.length))
        else
          None
      }

再到seg的index根据时间查找
LogSegment.findOffsetByTimestamp(timestamp: Long, startingOffset: Long)
先定位到index然后再二分查找


// LogSegment.scala
val timestampOffset = timeIndex.lookup(timestamp)
val position = index.lookup(math.max(timestampOffset.offset, startingOffset)).position

// AbstractIndex.scala

  /**
   * Lookup lower and upper bounds for the given target.
   */
  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
    // check if the index is empty
    if(_entries == 0)
      return (-1, -1)

    // check if the target offset is smaller than the least offset
    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
      return (-1, 0)

    // binary search for the entry
    var lo = 0
    var hi = _entries - 1
    while(lo < hi) {
      val mid = ceil(hi/2.0 + lo/2.0).toInt
      val found = parseEntry(idx, mid)
      val compareResult = compareIndexEntry(found, target, searchEntity)
      if(compareResult > 0)
        hi = mid - 1
      else if(compareResult < 0)
        lo = mid
      else
        return (mid, mid)
    }

    (lo, if (lo == _entries - 1) -1 else lo + 1)
  }    

原文地址:https://www.cnblogs.com/simoncook/p/11809433.html