kafka_consumer

使用pykafka进行消费

#coding:utf8
from pykafka import KafkaClient
import time
from pykafka.common import OffsetType
import json

client = KafkaClient(hosts='192.168.1.1:9092')

# print client.topics

topic = client.topics['perfin']

consumer = topic.get_balanced_consumer(consumer_group='perftest',
                                       auto_commit_enable=True,
                                       zookeeper_connect='192.168.1.1:2181',
                                       auto_commit_interval_ms = 1000,
                                       auto_offset_reset = OffsetType.LATEST,
                                       # auto_commit_enable=True, auto_co
                                       )

start = time.time()
c=0
tmp = []
for msg in consumer:
    c+=1
    tmp.append(msg.value)
    if c%1000==0:
        now =time.time()
        print "[%s] avg speed: %s /second"%(time.strftime('%Y-%m-%d %H:%M:%S'),int(c/(now-start)))
        start =now
    if c > 10000:
        open('./kafkamsg.txt','w').write("
".join(tmp))
        import sys
        sys.exit(0)
原文地址:https://www.cnblogs.com/yeyong/p/11855035.html