python连接ES进行数据过滤删除,新增查询、创建索引功能

# -*- coding: utf-8 -*-
"""
@Time :2020/06/04
@UpTime :2020/06/23
@Author :Mr.Yang
@File :ElasticSearch_operations.py
@Software :PyCharm
@Description :对ES进行查询,创建索引、按时间戳删除操作(增加写入操作待定)
@plug-in package:目前我所使用的是7.7.1的插件包,pip install elasticsearch==7.7.1
"""

import time
import datetime
import json
import sys
from elasticsearch import Elasticsearch

"""当前时间及时间戳转换"""
Time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
"""当前时间计算"""
now_time = datetime.date.today()
"""当前时间的前一个月"""
last_time = datetime.datetime.now() - datetime.timedelta(days = 1)
"""字符格式类型转换"""
now_time = last_time.strftime("%Y-%m-%d %H:%M:%S")
"""换算时间戳为毫秒级"""
mtime = (int(time.mktime(last_time.timetuple())) * 1000)

"""ES连接配置"""
es_host = ['', '', '']
es = Elasticsearch(hosts=es_host)

"""ES查询、删除修改下列参数"""
index_name = ""
id = ""

def query(index_name,id):
"""查询转换为json格式"""
res = es.get(index=index_name,id=id)
print json.dumps(res, sort_keys=True, indent=4, separators=(', ', ': '))#['hits']['hits']

def deles():
"""查询所有数据"""
noll = {
"query": {
"bool": {
"must_not": {
"exists": {
"field": "updateTime"
}
}
}
}
}
"""小于更新时间戳"""
updatetime = {
"query":{
"range":{
"updateTime": {
"lte": mtime
}
}
}
}
"""小于创建时间戳"""
createtime = {
"query":{
"range":{
"createTime":{
"lte":mtime
}
}
}
}

#updata = es.search(index=index_name,body=updatetime)
#create = es.search(index=index_name,body=createtime)
"""查询updateTime是否为空"""
updatanoll = es.search(index=index_name,body=noll)
"""删除更新时间小于设定时间戳的数据"""
es.delete_by_query(index=index_name, body=updatetime)
"""查询结果转换为json格式"""
#res = json.dumps(updata, sort_keys=True, indent=4, separators=(', ', ': ')
"""查询结果匹配到单个值"""
#res = json.dumps(updatenot["hits"]["hits"][0]["_source"]["updateTime"], sort_keys=True, indent=4, separators=(', ', ': '))
"""删除更新时间为空的数据,以插入时间做判断"""
if updatanoll != " ":
es.delete_by_query(index=index_name,body=createtime)
else:
print "无更新时间为空的数据"

def create():
"""创建索引"""
body = {
"settings":{
"index":{
"number_of_shards": "3",
"number_of_replicas": "2"
}
}
}
try:
result = es.indices.create(index='ceshi',body=body)
print result
except Exception as err:
print err


choose = input("请输出选项1/2/3(1查询,2过滤以更新时间为1个月以前的删除,3创建索引):")
if choose == 1:
query(index_name, id)
elif choose == 2:
deles()
elif choose == 3:
create()
else:
print "请正确输入参数(1/2/3(1查询,2过滤以更新时间为1个月以前的删除,3创建索引)"
原文地址:https://www.cnblogs.com/Huang-Niu/p/13049746.html