python之kafka消费

使用python3第三方工具,实现kafka消费

 1 # -*- coding: utf-8 -*-
 2 
 3 import uuid
 4 import json
 5 from kafka import KafkaConsumer
 6 from xxxxxx import MessageToDict
 7 from xxx import ObjectInfo
 8 
 9 import sys
10 import codecs
11 
12 sys.stdout = codecs.getwriter("utf-8")(sys.stdout.detach())
13 
14 
15 class ReadKafkaContent(object):
16     @staticmethod
17     def deserialize(msg):
18         """
19         反序列化
20         :param msg:
21         :return:
22         """
23         pb_obj = ObjectInfo()
24         pb_obj.Clear()
25         pb_obj.ParseFromString(msg.value)
26         return MessageToDict(pb_obj, including_default_value_fields=True, preserving_proto_field_name=True)
27 
28     def consume_msg(self, consumer_obj):
29         """
30         逐条消费,返回反序列化后的内容
31         :param consumer_obj:
32         :return:
33         """
34         try:
35             while True:
36                 msg = next(consumer_obj, None)
37                 if not msg:
38                     continue
39                 content = self.deserialize(msg)
40                 return content
41         except Exception as ex:
42             print(u"消费kafka错误,退出测试")
43             return None
44 
45     def entry(self, topic, ip, count=10, log="log_read_kafka_content.json"):
46         """
47 
48         :param topic:topic
49         :param ip:ip
50         :param count:查询kafka数据数量,默认10条
51         :param log:内容保存地址,默认
52         :return:
53         """
54         print(u"开始......")
55         try:
56             # 创建kafka消费对象
57             print(u"创建kafka消费对象...")
58             consumer = KafkaConsumer(topic, group_id=uuid.uuid4().hex,
59                                      bootstrap_servers=[ip],
60                                      auto_offset_reset="latest", consumer_timeout_ms=3 * 1000)
61         except Exception as ex:
62             print(u"连接kafka失败!")
63             return False
64         print(u"kafka消费对象创建成功.")
65 
66         with open(log, "w") as f:
67             for i in range(count):
68                 print(u"开始消费第%s条数据..." % str(i + 1))
69                 content = self.consume_msg(consumer)
70                 if not content:
71                     return False
72 
73                 # dict转json保存数据内容
74                 content_json = json.dumps(content, ensure_ascii=False, indent=4)
75                 f.write(content_json)
76                 f.write("

")
77                 print(u"第%s条数据写入完成." % str(i + 1))
78 
79         print(u"完成.")
原文地址:https://www.cnblogs.com/sunshine-blog/p/11929690.html