kafaka获取指定时间内的消息并写入指定文件

from confluent_kafka import Consumer, TopicPartition
import time
import datetime


def str_to_timestamp(str_time, format_type='%Y-%m-%d %H:%M:%S'):
    time_array = time.strptime(str_time, format_type)
    time_int = int(time.mktime(time_array)) * 1000
    return time_int

def timestamp_to_str(timestamp):
    time_stamp = int(timestamp * (10 ** (10 - len(str(timestamp)))))
    print(datetime.datetime.fromtimestamp(time_stamp))

def listdata_to_file(list_data, file='abnormal.logs'):
    with open(file, "w", encoding="utf-8") as f:
        for line in list_data:
            f.write(line + '
')

def consumer_data_according_timestamp(topic, time_begin, time_end):
    KAFKASERVERS = 'xxxxxxxxxx'
    GROUPNAME = 'xxxxxxxxx'

    c = Consumer({
        'bootstrap.servers': KAFKASERVERS,
        'group.id': GROUPNAME,
        'auto.offset.reset': 'earliest',
        'session.timeout.ms': 6000,
        'security.protocol': 'SASL_PLAINTEXT',
        'sasl.mechanism': 'PLAIN',
        'sasl.username': 'xxxxxxx',
        'sasl.password': 'xxxxxxx',
    })
    # 主题名
    topic = topic
    str_time_begin = time_begin
    str_time_end = time_end
    file_name = topic + str_time_begin.replace(" ", "-").replace(":", "-")


    # 获取当前topic存在多少个分区
    cluster_data = c.list_topics(topic=topic)
    topic_data = cluster_data.topics[topic]
    available_partitions = topic_data.partitions
    # c.subscribe([topic])
    # 把每个partition的offset设置到指定时间戳下,即获取大于改timestamp入库kafka的数据
    # 注意这里的时间戳位数
    timestamp_begin = str_to_timestamp(str_time_begin)
    timestamp_end = str_to_timestamp(str_time_end)
    list_data = []
    len_list_data = 0
    data_total_num=0
    for partition_id in range(len(available_partitions)):
        print("partition_id:%d" %partition_id)
        tps = [TopicPartition(topic, partition_id, timestamp_begin)]
        print(tps)
        start_offsets = c.offsets_for_times(tps)
        print(start_offsets)
        c.assign(start_offsets)
        while True:
            # 阻塞等待消息的最大时间
            msg = c.poll(1.0)
            if msg is None:
                break
            if msg.error():
                print("Consumer error: {}".format(msg.error()))
                continue
            # 获取该数据入kafka时间戳
            kafka_timestamp = msg.timestamp()[1]
            if(kafka_timestamp >= timestamp_end):
                print(timestamp_to_str(kafka_timestamp))
                break
            list_data.append(msg.value().decode('utf-8'))
            len_list_data=len_list_data+1
            if len_list_data >= 5000:
                listdata_to_file(list_data, file=file_name)
                len_list_data = 0
                data_total_num=data_total_num + 5000
            # 消费kafka相应数据
            #print('Received message: {%s}[%d]' %(msg.value().decode('utf-8'), msg.partition()))
            #print(list_data)

    print(data_total_num+len_list_data)
    listdata_to_file(list_data, file=file_name)
    c.unassign()
    c.close()

if __name__ == '__main__':
    consumer_data_according_timestamp(topic='xxxxxx', time_begin='2021-07-12 19:25:36', time_end='2021-07-12 19:35:36')
    #consumer_data_according_timestamp(topic='xxxx', time_begin='2021-07-05 18:55:29', time_end='2021-07-05 19:05:29')
原文地址:https://www.cnblogs.com/to-here/p/15002006.html