kafka消息队列的使用

概念性内容参考 Kafka的简介

软件的安装参考 mac 安装Kafka

python代码使用 ,先 pip3 install kafka-python

生产者

from kafka import KafkaProducer

# 生产端
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
for i in range(3):
    future = producer.send(
              'test',   # 设置topic,消费者根据topic拿数据
              key=b'qwedef',    # 必须是bytes数据类型,同一个key值,会被送至同一个分区
              value=b'dewcdef',     # 必须是bytes数据类型, 发送的数据
              partition=0      # partition设置分区
    )  
    result = future.get(timeout=10)      # 监控是否发送成功         
    # print(result)

消费者

from kafka import KafkaConsumer

# 消费端

consumer = KafkaConsumer(
                'test',        # 设置topic,从topic里面拿数据
                group_id='group1',    # 如果不设置这个参数,则消费者A消费后,消费者B会继续消费;如果和其他消费者设置同样的group_id, 则消费者A消费这条数据后,不会被消费者B消费
                bootstrap_servers=['localhost:9092']            
)
for msg in consumer:
    print(msg.key.decode())
    print(msg.value.decode())
原文地址:https://www.cnblogs.com/pyweb/p/14648159.html