python3-kafka生产者可入json数据(pykafka)

#!/usr/bin/python
import  json
from kafka import KafkaProducer
import random
import time

topic='topic'

#生产者写入数据格式为json格式,需要加value_serializer参数
producer = KafkaProducer(
                            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
                            bootstrap_servers="这里是实例ip,端口号9092,多个ip用,隔开"
                         )


for i in range(2):
    a=random.randint(10000000000,99999999999)
    b = random.randint(7200,14400)
    key=str(a)+"_755WX_"+transit_depot_no
    actual_depart_tm = int(time.time()) - b
    actual_depart_tmArray = time.localtime(actual_depart_tm)
    newactual_depart_tm = time.strftime("%Y-%m-%d %H:%M:%S", actual_depart_tmArray)

    data={
        "key": "160972",
        "actual_depart_tm": "2020-12-21 15:35:00",
        "pre_send_batch_dt": "2020-12-21",
        "plan_arrive_batch_dt": "2020-12-21",

    data["key"] = key
    data["actual_depart_tm"] = newactual_depart_tm

    producer.send(topic, data)

producer.close()

  

原文地址:https://www.cnblogs.com/dmtz/p/14863485.html