confluent-kafka python Producer Consumer实现

  • 基础通讯
    •   Producer.py
import confluent_kafka
import time

topic = 'confluent-kafka-topic'


def confluent_kafka_producer_performance():
    topic = 'confluent-kafka-topic'
    conf = {'bootstrap.servers': '192.168.65.130:9092'}
    producer = confluent_kafka.Producer(**conf)
    messages_to_retry = 0
    msg_payload = 'This is message'

    producer_start = time.time()
    for i in range(10):
        try:
            producer.produce(topic, value=msg_payload)
            print(msg_payload)
        except BufferError as e:
            messages_to_retry += 1

    # hacky retry messages that over filled the local buffer
    for i in range(messages_to_retry):
        producer.poll(0)
        try:
            producer.produce(topic, value=msg_payload)
        except BufferError as e:
            producer.poll(0)
            producer.produce(topic, value=msg_payload)

    producer.flush()

    return time.time() - producer_start


if __name__ == "__main__":
    time_span = confluent_kafka_producer_performance()
    print(time_span)
    •   Consumer.py
import confluent_kafka
import uuid
import time


def confluent_kafka_consumer_performance():
    topic = 'confluent-kafka-topic'
    msg_consumed_count = 0
    conf = {'bootstrap.servers': '192.168.65.130:9092',
            'group.id': uuid.uuid1(),
            'session.timeout.ms': 6000,
            'default.topic.config': {
                'auto.offset.reset': 'earliest'
            }
            }

    consumer = confluent_kafka.Consumer(**conf)

    consumer_start = time.time()
    # This is the same as pykafka, subscribing to a topic will start a background thread
    consumer.subscribe([topic])

    while True:
        msg = consumer.poll(1)
        if msg:
            msg_consumed_count += 1
            print(msg)

        if msg_consumed_count >= 10:
            break

    consumer_timing = time.time() - consumer_start
    consumer.close()
    return consumer_timing


if __name__ == "__main__":
    time_span = confluent_kafka_consumer_performance()
    print(time_span)
  • 分区实现
    •   pro_partition.py
import confluent_kafka

conf = {'bootstrap.servers':'192.168.65.130:9092'}

producer = confluent_kafka.Producer(**conf)

producer.produce('con_1',key ='key',value='part_0', partition=0)

producer.poll(1)
    •   con_partition.py
import confluent_kafka
import uuid

conf = {'bootstrap.servers':'192.168.65.130:9092', 'group.id':uuid.uuid1()}
consumer = confluent_kafka.Consumer(**conf)
tp1=confluent_kafka.TopicPartition('con_1', 0)
consumer.assign([tp1])
msg = consumer.poll(1)
msg.key()
msg.value()
原文地址:https://www.cnblogs.com/ryu-manager/p/9443722.html