python批量向kafka塞数据

python批量向kafka塞数据

from kafka import KafkaClient
from kafka.producer import SimpleProducer
from kafka import KafkaProducer


def send_data_2_kafka_(topic, datas):
    '''
        向kafka解析队列发送数据
    '''
    print('### 开始塞入 ###')
    PARTNUM = 10
    TOPICNAME = topic
    KAFKABROKER = kafkabroker
    client = KafkaClient(hosts=KAFKABROKER, timeout=30)
    producer = SimpleProducer(client, async_send=False)
    curcount = int(len(datas)//PARTNUM)
    l = len(datas)
    if l != PARTNUM:
        curcount = curcount + 1
    logger.info("datas: %d" % l)
    print("curcount:===", curcount) # 1
    # if curcount == 0:
    #     curdata = datas
    #     future = producer.send_messages(TOPICNAME, *curdata)
    #     print(future)
    if l:

        for i in range(curcount): # 2
            start = i*PARTNUM
            # print("i === ", i) # i = 0
            # print("start === ", start)
            if i != curcount - 1:  # 1
                if datas:
                    end = (i+1)*PARTNUM
                    curdata = datas[start:end]
                    future = producer.send_messages(TOPICNAME, *curdata)
                    print(future)
            else:
                if datas:
                    curdata = datas[start:]
                    future = producer.send_messages(TOPICNAME, *curdata)
                    print(future)
            
    producer.stop()
    client.close()
    print('### 结束塞入 ###')





def kafka_send_date(topic, data):
    logger.info('### 开始塞入 ###')
    # kafka生产者链接
    producer = KafkaProducer(bootstrap_servers='192.168.2.134:9092')
    # future = producer.send(topic, json.dumps(date).encode())
    # future = producer.send(topic, str(date).replace("'", '"').encode('utf-8'))
    data = json.dumps(data)
    r = bytes('{}'.format(data), 'utf-8')
    future = producer.send(topic, r)
    record_metadata = future.get(timeout=10)
    print(record_metadata, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
    logger.info('### 结束塞入 ###')
原文地址:https://www.cnblogs.com/xiao-xue-di/p/11887650.html