python bulk批量保存elasticsearch数据

之前用kclpy读取kinesis流数据,处理并保存到elasticsearch中,现在发现elk中的数据展示与当前时间差越来越大(源数据增加了三倍)。阅读kinesis文档进行相应分片、实例数扩展,均没有明显效果。

重新优化了下代码,使用了bulk批量保存数据到elasticsearch,存放速率明显提高。

相关示例代码:

from datetime import datetime
import pytz 
import time
from elasticsearch import Elasticsearch 
from elasticsearch.helpers import bulk
import json

es = Elasticsearch(hosts=[{'host': "ip", 'port': "9200"}], http_auth=("username", "password")) 
def index_bulk():
    ACTIONS = []
    count = 0
    for i in range(500):
        t = time.time()
        kinesisdict = {
            "priority": 0, 
            "tags": {i},
            "threshold": 0, 
            "kinesis": True, 
            "env": "test", 
            "region": "cn", 
            "metric": "/var/log/sengled/bulk.log", 
            "dataSource": "bulk", 
            "service": "bulk", 
            "status": "", 
            "endpoint": "test-cn-inception-10.12.112.165", 
            "starttime": t, 
            "product": "bulk", 
            "step": 0, 
            "value": "bulk", 
            "ip": "10.12.112.165", 
            "objectType": "dev", 
            "endtime": t, 
            "timestamp": t, 
            "counterType": ""
        }
        count = i

        # kinesisdict = json.loads(json.dumps(bulk_json))
        kdict = kinesisdict.copy()
        kdict['@timestamp'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
        if kdict['starttime'] == 0:
            kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
        else:
            kdict['starttime'] = datetime.fromtimestamp(int(kinesisdict['starttime']),pytz.timezone('Asia/Shanghai'))

        if kdict['endtime'] == 0:
            kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['timestamp']),pytz.timezone('Asia/Shanghai'))
        else:
            kdict['endtime'] = datetime.fromtimestamp(int(kinesisdict['endtime']),pytz.timezone('Asia/Shanghai'))

        kdict['value'] = str(kinesisdict['value'])
        kdict['threshold'] = str(kinesisdict['threshold'])
        kdict['tags'] = str(kinesisdict['tags'])
        del kdict['timestamp'] 

        action = {
            "_index": "kinesis-2018.07.19",
            "_type": "kinesisdata",
            "_source": kdict
        }
        ACTIONS.append(action)
    print(ACTIONS)
    bulk(es, ACTIONS, index = "kinesis-2018.11.28", raise_on_error=True)

    print("insert %s lines" % count)


index_bulk()
原文地址:https://www.cnblogs.com/husbandmen/p/10033775.html