python连接kafka-2.0

import sys
import time
import os
import json
import vertica_python
import logging
import pykafka
from pykafka import KafkaClient

#显示时间和编码方式
print('start time', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
print(sys.getdefaultencoding())

#kafka的zookepper地址和broker地址,注意要在/etc/hosts内加上解析,否则会有一大堆有规律的报错
client = KafkaClient(hosts="地址:9092",zookeeper_hosts="地址:2181")
topic = client.topics[b'订阅名']
consumer = topic.get_simple_consumer(consumer_timeout_ms=2000,consumer_group=b'自定义消费者名称')


#输出文件位置/计数初始化
file_output = open('输出文件位置', "w+", encoding='utf8')
file_output.truncate()
a_error_count = 0
a_line_count = 0
print('数据抽取准备完成')

#准备抽取
for message in consumer:
    #用于停止抽取来生成文件
    if message is not None and a_line_count<=20000:
        try:
            str_offset_join = message.value.decode()
            #json头部加上offset,用于唯一标识
            a = '{"offsets":"' + str(message.offset) + '",' + str_offset_join.lstrip('{')
            #使用b来验证数据是json能解析的
            b = json.loads(a)
            file_output.write(a)
            file_output.write(' ')
            a_line_count += 1
        except:
            print('error_message')
            a_error_count += 1
            continue
    else:
        break

#本次消费完成,提交消费进度
consumer.consume()
consumer.commit_offsets()

#导入数据库
file_output.close()
vsql_copy1="copy 表名 from local 数据位置"
vsql_copy1+=" parser fjsonparser() exceptions 数据位置 direct;"
vsql_line1="/opt/vertica/bin/vsql -h 地址 -U 用户名 -w 密码 -At -c ""+vsql_copy1+"""
print(vsql_line1)
os.popen(vsql_line1)

print('===数据抽取完成==')
print('fetch comlete')
print('a_error_count=', str(a_error_count))
print('a_line_count=', str(a_line_count))
print("load complete")
print('end_time', time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))

原文地址:https://www.cnblogs.com/castlevania/p/7683306.html