python解析ES数据到表

Elasticsearch 通过 Elasticsearch,您能够执行及合并多种类型的搜索(结构化数据、非结构化数据、地理位置、指标),搜索方式随心而变。
参考:
https://github.com/elastic/elasticsearch-py/blob/7.10/docs/sphinx/index.rst
使用的版本:
      "version" : { "number" : "7.10.2"}

es_analy.py               进行单个索引解析数据
es_exec_circle.py      控制每个小时循环执行

es_analy.py 脚本

#!/usr/local/python/bin/python
# coding=utf-8
import sys
import os
import json
import ast
from elasticsearch import Elasticsearch
from operator import itemgetter


#获取并检查外部传入参数
if (len(sys.argv)==5):
    index_name = sys.argv[1]
    bizDate = sys.argv[2]
    create_time_start = sys.argv[3]
    create_time_end = sys.argv[4]
    if len(bizDate) !=8 :
       print( "业务日期传参非法*********")
       sys.exit(1)
else:
    print( "脚本传参错误,python es_analy.py index_name YYYYMMDD "开始时间" "结束时间"  请检查!")
    print( "脚本传参错误,python es_analy.py index_data_info "2021-04-26 09:00:00" "2021-04-26 09:59:59"  请检查!")
    sys.exit(1)

# 地址链接
es = Elasticsearch(
    ['xx.xx.xx.xx'],
    http_auth=('username', 'password'),
    scheme="http",
    port=9200,
)
print es

# 表字段列表
tab_fld = []
# 表字段字符串
tab_fld_str = ''
tab_fld_cloumn = ''
# 文件路径
file_path = '/home/es/tmp/' + index_name + bizDate + ".txt"
print "文件路径:" + file_path

# 全部数据获取
#res = es.search(index=index_name, body={"query": {"match_all": {}}})
# 带条件查询获取
res = es.search(index=index_name, body={"query": {"bool":{"must": [{"range": {"CREATE_TIME":{"from":create_time_start,"to":create_time_end}}}]}}})
print("Got %d Hits:" % res['hits']['total']['value'])
# 获取的值
hists = res['hits']['hits']

# 获取总的字段值
def table_fld_info():
    global tab_fld_str
    global tab_fld_cloumn
    # 通过解析每一行获取字段值
    #for hit in hists:
    #    hit_dic = hit["_source"]
    #    hit_list = json.dumps(hit_dic)
    #    hit_dic = ast.literal_eval(hit_list)
    #    print hit_dic
    #    hit_dic_key = list(hit_dic.keys())
    #    for i in hit_dic_key:
    #        if i in tab_fld:
    #           pass
    #        else:
    #           tab_fld.append(i)
    #tab_fld_cloumn = [ i for i in tab_fld]
    
    # 通过get_mapping获取字段值
    mapping_values = es.indices.get_mapping(index=index_name)
    mapping_val = json.dumps(mapping_values)
    mapping_dic = ast.literal_eval(mapping_val)
    tab_fld_cloumn = list(mapping_dic[index_name]['mappings']['properties'].keys())
    # 加工表字段
    print tab_fld_cloumn
    for fld in tab_fld_cloumn:
        tab_fld_str = tab_fld_str + fld + ' string,'
    tab_fld_str = tab_fld_str[:-1]
    return tab_fld_cloumn
    
# 解析数据写入临时文件
def table_fld_values():
    # 设置文件名
    fo = open(file_path, "w")
    # 字段列表
    fld_list = table_fld_info()
    fld_list = list(fld_list)
    for hit in hists:
        # 每一列的数据写入文件
        line_data = ''
        hit_dic = hit["_source"]
        hit_list = json.dumps(hit_dic,encoding='utf-8',ensure_ascii=False)
        # 字符串转成字典
        hit_dic = ast.literal_eval(hit_list)
        print hit_dic
        for i in fld_list:
            print i + ":  " + hit_dic.get(i,'')
            line_data = line_data + ',' + hit_dic.get(i,'')
        line_data = line_data[1:] + "," + bizDate
        print line_data
        fo.write(line_data + "
")
    fo.close()

# 创建表
def table_data():
    global tab_fld_str
    global index_name
    # 表名中有'-'转换成'_'
    index_name = "es." + index_name.replace('-','_')
    #创建正式表,不存在则创建
    sql = "create table if not exists " + index_name + "(" + tab_fld_str + ") partitioned by (  `dt`  varchar(8) ) row format delimited fields terminated by '\001' lines terminated by '\n' stored as textfile;"
    # impala 执行
    exec_Sql(sql)

    #如果存在临时表,则删除临时表
    sql = "drop table if exists " + index_name + "_temp"
    # impala 执行
    exec_Sql(sql)
    
    #创建临时表,用于加载数据
    sql = "create table if not exists " + index_name + "_temp(" + tab_fld_str + ",dt varchar(8) ) row format delimited fields terminated by ',' lines terminated by '\n' stored as textfile;"
    # impala 执行
    exec_Sql(sql)

    # hive 加载数据
    sql = "load data local inpath '" + file_path + "' into table " + index_name + "_temp" 
    # hive 执行
    exec_hive_Sql(sql)
        
    # impala 刷新数据
    sql = "invalidate metadata " + index_name + "_temp;" 
    # impala 执行
    exec_Sql(sql)

    # 临时表数据插入到正式表
    sql = "insert into " + index_name + " partition(dt)  select * from " + index_name + "_temp" 
    # impala 执行
    exec_Sql(sql)
        
if __name__ == "__main__":
   if len(hists) == 0:
      print "该时间段未能解析出来数据,直接成功通过"
      pass
   else:      
      table_fld_values()
      table_data()

es_exec_circle.py 脚本

#!/usr/local/python/bin/python
# coding=utf-8
import time
import datetime
import sys
import os

#获取并检查外部传入参数
if (len(sys.argv)==5):
    index_name = sys.argv[1]
    bizDate = sys.argv[2]
    create_time_start = sys.argv[3]
    create_time_end = sys.argv[4]
    if len(bizDate) !=8 :
       print( "业务日期传参非法*********")
       sys.exit(1)
else:
    print( "脚本传参错误,python es_analy.py index_name  YYYYMMDD "开始时间" "结束时间"  请检查!")
    print( "脚本传参错误,python es_analy.py index_data_info "2021-04-26 09:00:00" "2021-04-26 09:59:59"  请检查!")
    sys.exit(1)

#开始时间
start_time = create_time_start
start_time_array = timeArray = time.strptime(start_time, "%Y-%m-%d %H:%M:%S")
start_time_Stamp = int(time.mktime(start_time_array))
print "开始时间:" + start_time + " 时间戳:" + str(start_time_Stamp)

#结束时间
end_time = create_time_end
end_time_array = timeArray = time.strptime(end_time, "%Y-%m-%d %H:%M:%S")
end_time_Stamp = int(time.mktime(end_time_array))
print "结束时间:" + start_time + " 时间戳:" + str(end_time_Stamp)

#获取开始时间的小时
start_time_h = int(start_time[14:16])
if start_time_h > 29:
   print "开始时间在小时的后半段"
   time_level = start_time[0:14] + "59:59"
   print time_level
else:
   print "开始时间在小时的前半段"
   time_level = start_time[0:14] + "29:59"
   print time_level

print "第一次时间段:" + start_time + "        " + time_level
time_level_array = timeArray = time.strptime(time_level, "%Y-%m-%d %H:%M:%S")
time_level_Stamp = int(time.mktime(time_level_array))
#处理第一次脚本处理
cmd = "python es_analy.py " + index_name +" " + bizDate + " "" + start_time + """ + " "" + time_level + """
print cmd
val = os.system(cmd)
if val == 0:
   pass
else:
   print "cmd: " + cmd + " 命令执行出错!请检查..."
   sys.exit(1)
   
#每隔半小时进行循环解析
while 1==1: 
   if time_level_Stamp < end_time_Stamp:
      dateArray_start = datetime.datetime.fromtimestamp(time_level_Stamp + 1)
      startTime = dateArray_start.strftime("%Y-%m-%d %H:%M:%S")
      time_level_Stamp = time_level_Stamp + 1800
      dateArray_end = datetime.datetime.fromtimestamp(time_level_Stamp)
      endTime = dateArray_end.strftime("%Y-%m-%d %H:%M:%S")
      print startTime + "    " + endTime
      # 循环处理
      cmd = "python es_source_to_data.py " + index_name +" " + bizDate + " "" + startTime + """ + " "" + endTime + """
      print cmd
      val = os.system(cmd)
      if val == 0:
         pass
      else:
         print "cmd: " + cmd + " 命令执行出错!请检查..."
         sys.exit(1)
   else:
      break

脚本执行示例:

python es_exec_circle.py index_data_info 20210623 "2021-04-26 09:00:00" "2021-04-26 09:59:59"

当前目录会生成一个.txt 文件,是解析出来的数据,以','分隔的。

天下难事,必作于易;天下大事,必作于细
原文地址:https://www.cnblogs.com/hello-wei/p/14932017.html