mk kafka

# coding:utf-8
import json
from pykafka import KafkaClient


def readkafka():
with open('oms.json', 'r') as f:
msg = f.read()
# msg_d=eval(msg)
d = json.loads(msg)
d["waybillNo"] = waybill
return d

def sendkafka():
client = KafkaClient(hosts="10.202.xx.x:xxx")

topic = client.topics[''] # xxx kafka
producer = topic.get_producer()
producer.start()

with open('./requestfile/IBSE_package.json', 'r') as f:
msg = f.read()
msg_d = eval(msg)
msg_f = json.dumps(msg_d, ensure_ascii=False, encoding='utf-8')
producer.produce(msg_f)
producer.stop()



client = KafkaClient(hosts="10.202.xx.x:xxx")
topic = client.topics[''] # xxx kafka
producer = topic.get_producer()
producer.start()
with open('fckafka.json', 'r') as f:
msg = f.read()
msg_d = eval(msg)
msg_f = json.dumps(msg_d, ensure_ascii=False, encoding='utf-8')
producer.produce(msg_f)
producer.stop()


client = KafkaClient(hosts="10.202.xx.x:9095")
topic = client.topics[''] # xxx kafka
producer = topic.get_producer()
producer.start()
with open('oms1.json', 'w') as f1:
f1.write(json.dumps(readkafka(), encoding="utf-8"))
f1.close()
with open('oms1.json', 'r') as f11:
msg_d = f11.read()
msg_f = json.dumps(msg_d, ensure_ascii=False, )
producer.produce(msg_f)
producer.stop()
原文地址:https://www.cnblogs.com/yaohu/p/12597010.html