confluent_kafka生产者

import socket

from confluent_kafka import Producer

conf = {
    'bootstrap.servers': "localhost:9092",
    'client.id': socket.gethostname()
}

producer = Producer(conf)


def __publish_delivery_report(err, msg) -> None:
    if err is not None:
        print(f"send msg:{msg} fail, err is not None")
    else:
        print(f"send msg{msg} success")


def send_msg(topic: str, data):
    producer.produce(topic, data, callback=__publish_delivery_report)
    producer.flush()


if __name__ == '__main__':
    msg = "hello kafka"
    topic = "test"
    send_msg(topic, msg)
Please call me JiangYouDang!
原文地址:https://www.cnblogs.com/luckygxf/p/15115670.html