kafka 生产,消费的的几个小问题

  1. 生产的代码比较简单:
import json
from kafka import KafkaProducer
from decimal import Decimal
import decimal

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return float(o)
        super(DecimalEncoder, self).default(o)

def producer():
    host_port = 'localhost:9095'
    topic = '1111'
    producer = KafkaProducer(bootstrap_servers=[host_port], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    msg_dict = {
        "sleep_time": 10,
        "db_config": {
            "database": "test_1",
            "host": "xxxx",
            "user": "root",
            "password": "root"
        },
        "table": "msg",
        "msg": "Hello World"
    }
    msg_dict = {'price_cost': Decimal('6729.0716'), 'location_id': 193, 'product_id': 842892}

    msg = json.dumps(msg_dict,cls=DecimalEncoder)
    producer.send(topic, msg, partition=0)
    print('send success')
    producer.close()


if __name__ == '__main__':
    producer()
  1. 消费的代码涉及到的东西比较多,个人觉得群组功能挺不错的:
import json
from pykafka import KafkaClient
def customer2():

    hosts = 'localhost:9095,localhost:9092'
    client = KafkaClient(hosts=hosts)
    topic = client.topics['1111']
    consumer = topic.get_simple_consumer(consumer_group=b'123456', auto_commit_interval_ms=1,
                                         auto_commit_enable=True)
    for msg in consumer:
        a = json.loads(msg.value,encoding='utf-8')
        print(a)
if __name__ == '__main__':
    customer2()

问答:

  1. kafka,在没有消费端消费消息的情况下,生产消息,启动消费端获取不到数据,在启动消费端后推送消息又可以获取到数据?
    消费端使用pykafka 而不是使用kafka,使用get_simple_consumer方法来获取数据,基本上每次指定一个新的group都会获取前边所有历史数据
  2. 消费端指定多个host?
    host使用逗号隔开.
原文地址:https://www.cnblogs.com/qianxunman/p/13750128.html