kafka消费者

from kafka import KafkaConsumer,TopicPartition
import json
scrapy_kafka_hosts = ["ip:端口", "ip:端口"]
topic = 'wangliang_test'
consumer = KafkaConsumer(bootstrap_servers=scrapy_kafka_hosts,
group_id='12345679', # 消费组 value_deserializer=lambda m: json.loads(m.decode('ascii')), # 消费json 格式的消息 auto_offset_reset='latest', # latest 最新的偏移量,默认最新的 # earliest 最早的偏远量,在还一个组时候才可以使用最早的 enable_auto_commit=True, # 是否开启自动提交 默认开启 auto_commit_interval_ms = 6000 # 提交偏移量的时间间隔,默认5000ms = 5 秒 ) consumer.subscribe([topic]) # 消息的主题,可以指定多个 print(consumer.partitions_for_topic(topic)) # print(consumer.topics()) #获取主题列表 # print(consumer.subscription()) #获取当前消费者订阅的主题 # print(consumer.assignment()) #获取当前消费者topic、分区信息 # print(consumer.beginning_offsets(consumer.assignment())) #获取当前消费者可消费的偏移量 consumer.assign([ TopicPartition(topic=topic, partition=0), TopicPartition(topic=topic, partition=1), TopicPartition(topic=topic, partition=3) ]) # 该命令与 subscribe 操作只能存在一个 表示指定主题和分区 获取 consumer 消息 # print(consumer.assignment()) #获取当前消费者topic、分区信息 consumer.seek(TopicPartition(topic=topic, partition=0), 1) # 指定起始offset为12 1表示offset 表示offset 开始连接 # consumer.seek(TopicPartition(topic=topic, partition=1), 0) # 可以注册多个分区,此分区从第一条消息开始接收 consumer.seek(TopicPartition(topic=topic, partition=3), 0) # 没有注册的分区上的消息不会被消费 for msg in consumer: print(msg) recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value) print(recv) print(msg.value["name"],msg.value["age"])

可以指定分区和消费者消费组和管道时间控制,消费数据。

from kafka import KafkaProducer, KafkaConsumer, TopicPartition
import time


class ConsumerForKFK(object):
    _MESSAGE_NAME = 'wangliang_test'

    def __init__(self, kafkahost, client_id):
        self.kafkaHost = kafkahost
        self.group_id = client_id

    @property
    def consumer_client(self, group_id=None):
        return KafkaConsumer(
            bootstrap_servers=self.kafkaHost,
            # client_id = self.client_id,
            group_id=self.group_id,
            auto_offset_reset="latest"
            # 若不指定 consumer_timeout_ms,默认一直循环等待接收,若指定,则超时返回,不再等待
            # consumer_timeout_ms : 毫秒数
            # consumer_timeout_ms=5000
        )

    def consumer_seek(self, partition=None, partition_all=None, offset_time=None):
        partition_number = range(partition_all) if partition_all else partition
        consumer = self.consumer_client
        Partition_list = []
        for i in partition_number:
            Partition_list.append(TopicPartition(topic=self._MESSAGE_NAME, partition=i))
        consumer.assign(Partition_list)
        print(consumer.assignment())  # 获取当前消费者topic、分区信息
        if offset_time:
            for Partition in Partition_list:
                consumer.seek(Partition, 0)
            # 发送到指定的消息主题(异步,不阻塞)
            for msg in consumer:  # 迭代器,等待下一条消息

                print(int(time.time()) - int(msg.timestamp / 1000))
                if int(time.time()) - int(msg.timestamp / 1000) <= offset_time:  # 打印消息
                    print(msg)
        else:
            for msg in consumer:  # 迭代器,等待下一条消息
                print(msg)


scrapy_kafka_hosts = ["ip:端口", "ip:端口"]
topic = 'wangliang_test'
cl = ConsumerForKFK(scrapy_kafka_hosts, "12345679")
cl.consumer_seek(
    partition=[1, 2],
    # partition_all=3,
    offset_time=3000
)

消费者学习   https://www.jianshu.com/p/c89997867d48

Python往kafka生产消费数据  https://www.cnblogs.com/longsongpong/p/11010195.html

python操作kafka实践 https://www.cnblogs.com/small-office/p/9399907.html

 

原文地址:https://www.cnblogs.com/wang102030/p/11905251.html