【python】confluent_kafka将offset置为最大

 该博文方法有问题,正确方案在http://www.cnblogs.com/dplearning/p/7992994.html

将指定group对应的offset重置到最大值,跳过未消费数据

代码如下:

# coding:utf-8

import os
from confluent_kafka import Consumer, TopicPartition
import traceback


def reset_kafka_offset(group, topic):
    broker_list = "xx.xx.xx.xx:9092,xx.xx.xx.x:9092"
    c = Consumer({'bootstrap.servers': broker_list,
                  'group.id': group,
                  'default.topic.config': {'auto.offset.reset': 'smallest'}})
    c.subscribe([topic])

    tp = TopicPartition(topic, 0)
    tp_out = c.committed([tp])
    init_offset = tp_out[0].offset
    if int(init_offset) == -1001:   #是一个新的group 没有消费过 
        # 如果是一个新的group.id必须先消费一条消息,这样后面的重置offset才有效, 如果不消费,重置offset前后获取到的offset值都是-1001
        msg = c.poll()
        if not msg.error():
             msg_data = msg.value().decode('utf-8')
        c.commit()

    tp = TopicPartition(topic, 0)
    watermark_offsets = c.get_watermark_offsets(tp)   # 获取offset最大最小值
    print watermark_offsets
    if watermark_offsets:
        logsize = watermark_offsets[1]  # offset最大值
        if logsize is not None:
            tp1 = TopicPartition(topic, 0, int(logsize))
            c.commit(offsets=[tp1], async=False)  # 直接将offset置为logsize,跳过未消费的数据
        tp_out = c.committed([tp])  # 查看提交的offset位置
        print tp_out[0].offset
        c.close()


if __name__ == "__main__":
    reset_kafka_offset("test", "test")
原文地址:https://www.cnblogs.com/dplearning/p/7911895.html