python实现elasticsearch操作CRUD API Marathon

python操作elasticsearch常用API

*官方API: *
https://elasticsearch-py.readthedocs.io/en/master/api.html#global-options

1.基础

通过elasticsearch 模块实现python与elasticsearch交互。

pip install elasticsearch

pip install elasticsearch-dsl

创建连接

from elasticsearch import Elasticsearch

es = Elasticsearch([{'host':'127.0.0.1','port':9200}])

默认本地主机和9200端口

    # 常用参数
    '''
    index - 索引名
    q - 查询指定匹配 使用Lucene查询语法
    from_ - 查询起始点  默认0
    doc_type - 文档类型
    size - 指定查询条数 默认10
    field - 指定字段 逗号分隔
    sort - 排序  字段:asc/desc
    body - 使用 Query DSL
    scroll - 滚动查询
    '''

2.常见增删改操作

创建

创建index

    res = es.indices.create(index="test004",ignore=400)
    ## ignore 400, 如果存在index 抛异常,但程序不会中断

创建index 并且mapping

    mappings = {
                "mappings": {
                    "type_doc_test": {                           #type_doc_test为doc_type
                        "properties": {
                            "id": {
                                "type": "long",
                                "index": "false"
                            },
                            "serial": {
                                "type": "keyword",  # keyword不会进行分词,text会分词
                                "index": "false"  # 不建索引
                            },
                            #tags可以存json格式,访问tags.content
                            "tags": {
                                "type": "object",
                                "properties": {
                                    "content": {"type": "keyword", "index": True},
                                    "dominant_color_name": {"type": "keyword", "index": True},
                                    "skill": {"type": "keyword", "index": True},
                                }
                            },
                            "hasTag": {
                                "type": "long",
                                "index": True
                            },
                            "status": {
                                "type": "long",
                                "index": True
                            },
                            "createTime": {
                                "type": "date",
                                "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                            },
                            "updateTime": {
                                "type": "date",
                                "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                            }
                        }
                    }
                }
            }
    
    res = es.indices.create(index = 'index_test',body =mappings)

删除index

    res = es.indices.delete(index="test004",ignore=400)

创建数据

    res = es.indices.create(index="test004",ignore=400) # 创建索引

创建单条数据

    data = {'title': '美国留给伊拉克的是个烂摊子吗', 'url': 'http://view.news.qq.com/zt2011/usa_iraq/index.htm'}
    
    res = es.create(index="test004", doc_type="politics", id=1, body=data)
    
    # 另外一种插入数据的方法
    res = es.index(index="test004",doc_type="politics",body=data)
    # 与es.create()的区别在于:不用指定id,但是如果已存在相应数据,报异常,无法自增插入数据

插入多条数据 数据装在列表中,for循环迭代 es.index 插入数据

    datas = [
        {
            'title': '美国留给伊拉克的是个烂摊子吗',
            'url': 'http://view.news.qq.com/zt2011/usa_iraq/index.htm',
            'date': '2011-12-16'
        },
        {
            'title': '公安部:各地校车将享最高路权',
            'url': 'http://www.chinanews.com/gn/2011/12-16/3536077.shtml',
            'date': '2011-12-16'
        },
        {
            'title': '中韩渔警冲突调查:韩警平均每天扣1艘中国渔船',
            'url': 'https://news.qq.com/a/20111216/001044.htm',
            'date': '2011-12-17'
        },
        {
            'title': '中国驻洛杉矶领事馆遭亚裔男子枪击 嫌犯已自首',
            'url': 'http://news.ifeng.com/world/detail_2011_12/16/11372558_0.shtml',
            'date': '2011-12-18'
        }
    ]
    
    for data in datas:
        es.index(index='test004',doc_type='politics',body=data)

bulk批量写入数据

    doc = [
        # 添加数据
        {'index': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
        {'name': 'jack', 'sex': 'male', 'age': 10 }
        # 删除数据
        {'delete': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
        # 创建数据
        {"create": {'_index' : 'indexName', "_type" : 'typeName', '_id': 'idValue'}}
        {'name': 'lucy', 'sex': 'female', 'age': 20 }
        # 更新数据
        {'update': {'_index': 'indexName', '_type': 'typeName', '_id': 'idValue'}}
        {'doc': {'age': '100'}}
     ]
    
    es.bulk(index='indexName',  doc_type='typeName', body=doc)

更新

    data = {
        'title': '美国留给伊拉克的是个烂摊子',
        'url': 'http://view.news.qq.com/zt2011/usa_iraq/index.htm',
        'date': '2015-02-13'
    }
    # res = es.update(index="test004", doc_type="politics", id=1, body=data)
    res = es.index(index='news', doc_type='politics', body=data, id=1)
    
    # 数据存在则更新,不存在则创建
    result = es.index(index='employees',doc_type='employee_info',id=15, body={'name':"noName",'age':99})

条件更新

    # update_by_query:更新满足条件的所有数据,写法同上删除和查询
    
    # tips: index() 方法可以代替我们完成两个操作,
    # 如果数据不存在,那就执行插入操作,
    # 如果已经存在,那就执行更新操作

删除

    res = es.delete(index="test004",doc_type="politics",id=1)

条件删除

    delete_by_query:删除满足条件的所有数据,查询条件必须符合DLS格式
    
    query = {'query': {'match': {'sex': 'famale'}}}# 删除性别为女性的所有文档
    
    query = {'query': {'range': {'age': {'lt': 11}}}}# 删除年龄小于11的所有文档
    
    es.delete_by_query(index='indexName', body=query, doc_type='typeName')

    # 获取数据量
    es.count(index="my_index",doc_type="test_type")

3.查询操作

查询

与索引有关

    # 获取索引信息
    result = es.indices.get(index='test004')
    print(json.dumps(result,indent=2,ensure_ascii=False))
    # 获取所有索引信息
    result = es.indices.get_alias()
    print(json.dumps(result,indent=2,ensure_ascii=False))
    # 获取索引映射信息,默认无参数获取所有索引的mapping,加指定的index,获取指定index的mapping信息
    result = es.indices.get_mapping(index='car')
    print(json.dumps(result,indent=2,ensure_ascii=False))

文档信息

    # 查询文档数量及当时时间
    result = es.cat.count()
    print(json.dumps(result,indent=2,ensure_ascii=False))
    # 查询总的文档数 可以指定index,算总数量
    result = es.count()
    print(json.dumps(result,indent=2,ensure_ascii=False))
    
    # 查询每个索引信息及存储及文档数量
    result = es.cat.indices()
    print(result)

es.get( ) 相关

    # 如果是 get ,需要确定index, doc_type, id,return a document, 
    GET test004/politics/1/_search
    
    # Returns the source of a document
    result = es.get_source(index='employees',doc_type='employee_info',id=1)
    
    # 多文档查询 mget

es.search( ) 相关

    # 如果是 search,确定index, doc_type, 查询体 body
    # 同kibana中, GET test004/politics/_search
    res = es.search(index='test004', doc_type='politics')
    print(res)

示例

    # 添加body的查询
    dsl = {
        'query': {
            'match': {
                'title': '中国 领事馆'
            }
        }
    }
    res = es.search(index='test004', doc_type='politics',body=dsl)
    print(json.dumps(res, indent=2, ensure_ascii=False))
    # json序列化后,输出json格式,层次清晰

    # 查询后返回特定的字段
    es.search(index='test-index', filter_path=['hits.hits._id', 'hits.hits._type'])
    # 返回id type
    # 也可以用 * ,表示返回某种符合条件的模糊查找

拓展

搜索所有数据

    es.search(index="my_index",doc_type="test_type")
    
    
    # 或者
    body = {
        "query":{
            "match_all":{}
        }
    }
    es.search(index="my_index",doc_type="test_type",body=body)

term与terms

term

    body = {
        "query":{
            "term":{
                "name":"python"
            }
        }
    }
    # 查询name="python"的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

terms

    body = {
        "query":{
            "terms":{
                "name":[
                    "python","android"
                ]
            }
        }
    }
    # 搜索出name="python"或name="android"的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

match与multi_match

    # match:匹配name包含python关键字的数据
    body = {
        "query":{
            "match":{
                "name":"python"
            }
        }
    }
    # 查询name包含python关键字的数据
    es.search(index="my_index",doc_type="test_type",body=body)
    
    # multi_match:在name和addr里匹配包含深圳关键字的数据
    body = {
        "query":{
            "multi_match":{
                "query":"深圳",
                "fields":["name","addr"]
            }
        }
    }
    # 查询name和addr包含"深圳"关键字的数据
    es.search(index="my_index",doc_type="test_type",body=body)

复合查询bool

bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)

    body = {
        "query":{
            "bool":{
                "must":[
                    {
                        "term":{
                            "name":"python"
                        }
                    },
                    {
                        "term":{
                            "age":18
                        }
                    }
                ]
            }
        }
    }
    # 获取name="python"并且age=18的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

切片式查询

    body = {
        "query":{
            "match_all":{}
        }
        "from":2    # 从第二条数据开始
        "size":4    # 获取4条数据
    }
    # 从第2条数据开始,获取4条数据
    es.search(index="my_index",doc_type="test_type",body=body)

范围查询

    body = {
        "query":{
            "range":{
                "age":{
                    "gte":18,       # >=18
                    "lte":30        # <=30
                }
            }
        }
    }
    # 查询18<=age<=30的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

前缀查询

    body = {
        "query":{
            "prefix":{
                "name":"p"
            }
        }
    }
    # 查询前缀为"赵"的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

通配符查询

    body = {
        "query":{
            "wildcard":{
                "name":"*id"
            }
        }
    }
    # 查询name以id为后缀的所有数据
    es.search(index="my_index",doc_type="test_type",body=body)

排序

    body = {
        "query":{
            "match_all":{}
        }
        "sort":{
            "age":{                 # 根据age字段升序排序
                "order":"asc"       # asc升序,desc降序
            }
        }
    }

filter_path

响应过滤

    # 只需要获取_id数据,多个条件用逗号隔开
    es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._id"])
    
    # 获取所有数据
    es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._*"])

count

执行查询并获取该查询的匹配数

    # 获取数据量
    es.count(index="my_index",doc_type="test_type")

度量类聚合

获取最小值

    body = {
        "query":{
            "match_all":{}
        },
        "aggs":{                        # 聚合查询
            "min_age":{                 # 最小值的key
                "min":{                 # 最小
                    "field":"age"       # 查询"age"的最小值
                }
            }
        }
    }
    # 搜索所有数据,并获取age最小的值
    es.search(index="my_index",doc_type="test_type",body=body)

获取最大值

    body = {
        "query":{
            "match_all":{}
        },
        "aggs":{                        # 聚合查询
            "max_age":{                 # 最大值的key
                "max":{                 # 最大
                    "field":"age"       # 查询"age"的最大值
                }
            }
        }
    }
    # 搜索所有数据,并获取age最大的值
    es.search(index="my_index",doc_type="test_type",body=body)

获取和

    body = {
        "query":{
            "match_all":{}
        },
        "aggs":{                        # 聚合查询
            "sum_age":{                 # 和的key
                "sum":{                 # 和
                    "field":"age"       # 获取所有age的和
                }
            }
        }
    }
    # 搜索所有数据,并获取所有age的和
    es.search(index="my_index",doc_type="test_type",body=body)

获取平均值

    body = {
        "query":{
            "match_all":{}
        },
        "aggs":{                        # 聚合查询
            "avg_age":{                 # 平均值的key
                "sum":{                 # 平均值
                    "field":"age"       # 获取所有age的平均值
                }
            }
        }
    }
    # 搜索所有数据,获取所有age的平均值
    es.search(index="my_index",doc_type="test_type",body=body)

类实现es的CRUD操作

    ## https://www.cnblogs.com/shaosks/p/7592229.html
    import os
    import time
    from os import walk
    import CSVOP
    from datetime import datetime
    from elasticsearch import Elasticsearch
    from elasticsearch.helpers import bulk
    
    class ElasticObj:
        def __init__(self, index_name,index_type,ip ="127.0.0.1"):
            '''
    
            :param index_name: 索引名称
            :param index_type: 索引类型
            '''
            self.index_name =index_name
            self.index_type = index_type
            # 无用户名密码状态
            #self.es = Elasticsearch([ip])
            #用户名密码状态
            self.es = Elasticsearch([ip],http_auth=('elastic', 'password'),port=9200)
    
        def create_index(self,index_name="ott",index_type="ott_type"):
            '''
            创建索引,创建索引名称为ott,类型为ott_type的索引
            :param ex: Elasticsearch对象
            :return:
            '''
            #创建映射
            _index_mappings = {
                "mappings": {
                    self.index_type: {
                        "properties": {
                            "title": {
                                "type": "text",
                                "index": True,
                                "analyzer": "ik_max_word",
                                "search_analyzer": "ik_max_word"
                            },
                            "date": {
                                "type": "text",
                                "index": True
                            },
                            "keyword": {
                                "type": "string",
                                "index": "not_analyzed"
                            },
                            "source": {
                                "type": "string",
                                "index": "not_analyzed"
                            },
                            "link": {
                                "type": "string",
                                "index": "not_analyzed"
                            }
                        }
                    }
    
                }
            }
            if self.es.indices.exists(index=self.index_name) is not True:
                res = self.es.indices.create(index=self.index_name, body=_index_mappings)
                print res
    
    
        def IndexData(self):
            es = Elasticsearch()
            csvdir = 'D:/work/ElasticSearch/exportExcels'
            filenamelist = []
            for (dirpath, dirnames, filenames) in walk(csvdir):
                filenamelist.extend(filenames)
                break
            total = 0
            for file in filenamelist:
                csvfile = csvdir + '/' + file
                self.Index_Data_FromCSV(csvfile,es)
                total += 1
                print total
                time.sleep(10)
    
        def Index_Data_FromCSV(self,csvfile):
            '''
            从CSV文件中读取数据,并存储到es中
            :param csvfile: csv文件,包括完整路径
            :return:
            '''
            list = CSVOP.ReadCSV(csvfile)
            index = 0
            doc = {}
            for item in list:
                if index > 1:#第一行是标题
                    doc['title'] = item[0]
                    doc['link'] = item[1]
                    doc['date'] = item[2]
                    doc['source'] = item[3]
                    doc['keyword'] = item[4]
                    res = self.es.index(index=self.index_name, doc_type=self.index_type, body=doc)
                    print(res['created'])
                index += 1
                print index
    
        def Index_Data(self):
            '''
            数据存储到es
            :return:
            '''
            list = [
                {   "date": "2017-09-13",
                    "source": "慧聪网",
                    "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
                    "keyword": "电视",
                    "title": "付费 电视 行业面临的转型和挑战"
                 },
                {   "date": "2017-09-13",
                    "source": "中国文明网",
                    "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
                    "keyword": "电视",
                    "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
                 }
                  ]
            for item in list:
                res = self.es.index(index=self.index_name, doc_type=self.index_type, body=item)
                print(res['created'])
    
        def bulk_Index_Data(self):
            '''
            用bulk将批量数据存储到es
            :return:
            '''
            list = [
                {"date": "2017-09-13",
                 "source": "慧聪网",
                 "link": "http://info.broadcast.hc360.com/2017/09/130859749974.shtml",
                 "keyword": "电视",
                 "title": "付费 电视 行业面临的转型和挑战"
                 },
                {"date": "2017-09-13",
                 "source": "中国文明网",
                 "link": "http://www.wenming.cn/xj_pd/yw/201709/t20170913_4421323.shtml",
                 "keyword": "电视",
                 "title": "电视 专题片《巡视利剑》广获好评:铁腕反腐凝聚党心民心"
                 },
                {"date": "2017-09-13",
                 "source": "人民电视",
                 "link": "http://tv.people.com.cn/BIG5/n1/2017/0913/c67816-29533981.html",
                 "keyword": "电视",
                 "title": "中国第21批赴刚果(金)维和部隊启程--人民 电视 --人民网"
                 },
                {"date": "2017-09-13",
                 "source": "站长之家",
                 "link": "http://www.chinaz.com/news/2017/0913/804263.shtml",
                 "keyword": "电视",
                 "title": "电视 盒子 哪个牌子好? 吐血奉献三大选购秘笈"
                 }
            ]
            ACTIONS = []
            i = 1
            for line in list:
                action = {
                    "_index": self.index_name,
                    "_type": self.index_type,
                    "_id": i, #_id 也可以默认生成,不赋值
                    "_source": {
                        "date": line['date'],
                        "source": line['source'].decode('utf8'),
                        "link": line['link'],
                        "keyword": line['keyword'].decode('utf8'),
                        "title": line['title'].decode('utf8')}
                }
                i += 1
                ACTIONS.append(action)
                # 批量处理
            success, _ = bulk(self.es, ACTIONS, index=self.index_name, raise_on_error=True)
            print('Performed %d actions' % success)
    
        def Delete_Index_Data(self,id):
            '''
            删除索引中的一条
            :param id:
            :return:
            '''
            res = self.es.delete(index=self.index_name, doc_type=self.index_type, id=id)
            print res
    
        def Get_Data_Id(self,id):
    
            res = self.es.get(index=self.index_name, doc_type=self.index_type,id=id)
            print(res['_source'])
    
            print '------------------------------------------------------------------'
            #
            # # 输出查询到的结果
            for hit in res['hits']['hits']:
                # print hit['_source']
                print hit['_source']['date'],hit['_source']['source'],hit['_source']['link'],hit['_source']['keyword'],hit['_source']['title']
    
        def Get_Data_By_Body(self):
            # doc = {'query': {'match_all': {}}}
            doc = {
                "query": {
                    "match": {
                        "keyword": "电视"
                    }
                }
            }
            _searched = self.es.search(index=self.index_name, doc_type=self.index_type, body=doc)
    
            for hit in _searched['hits']['hits']:
                # print hit['_source']
                print hit['_source']['date'], hit['_source']['source'], hit['_source']['link'], hit['_source']['keyword'], \
                hit['_source']['title']
    

常见参数-search
``
# Parameters:
body – The search definition using the Query DSL

index – A comma-separated list of index names to search; use _all or empty string to perform the operation on all indices

_source – True or false to return the _source field or not, or a list of fields to return

_source_excludes – A list of fields to exclude from the returned _source field

_source_includes – A list of fields to extract and return from the _source field

allow_no_indices – Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes _all string or when no indices have been specified)

allow_partial_search_results – Indicate if an error should be returned if there is a partial 

search failure or timeout Default: True

analyze_wildcard – Specify whether wildcard and prefix queries should be analyzed (default: false)

analyzer – The analyzer to use for the query string

batched_reduce_size – The number of shard results that should be reduced at once on the 
coordinating node. This value should be used as a protection mechanism to reduce the memory overhead per search request if the potential number of shards in the request can be large. 
Default: 512

ccs_minimize_roundtrips – Indicates whether network round- trips should be minimized as part of cross-cluster search requests execution Default: true

default_operator – The default operator for query string query (AND or OR) Valid choices: AND, OR Default: OR
df – The field to use as default where no field prefix is given in the query string

docvalue_fields – A comma-separated list of fields to return as the docvalue representation of a field for each hit

expand_wildcards – Whether to expand wildcard expression to concrete indices that are open, closed or both. Valid choices: open, closed, hidden, none, all Default: open

explain – Specify whether to return detailed information about score computation as part of a hit

from_ – Starting offset (default: 0)

ignore_throttled – Whether specified concrete, expanded or aliased indices should be ignored when throttled

ignore_unavailable – Whether specified concrete indices should be ignored when unavailable (missing or closed)

lenient – Specify whether format-based query failures (such as providing text to a numeric field) should be ignored

max_concurrent_shard_requests – The number of concurrent shard requests per node this search executes concurrently. This value should be used to limit the impact of the search on the cluster in order to limit the number of concurrent shard requests Default: 5

pre_filter_shard_size – A threshold that enforces a pre- filter roundtrip to prefilter search shards based on query rewriting if the number of shards the search request expands to exceeds the threshold. This filter roundtrip can limit the number of shards significantly if for instance a shard can not match any documents based on its rewrite method ie. if date filters are mandatory to match but the shard bounds and the query are disjoint.

preference – Specify the node or shard the operation should be performed on (default: random)
q – Query in the Lucene query string syntax

request_cache – Specify if request cache should be used for this request or not, defaults to index level setting

rest_total_hits_as_int – Indicates whether hits.total should be rendered as an integer or an object in the rest search response

routing – A comma-separated list of specific routing values

scroll – Specify how long a consistent view of the index should be maintained for scrolled search

search_type – Search operation type Valid choices: query_then_fetch, dfs_query_then_fetch

seq_no_primary_term – Specify whether to return sequence number and primary term of the last modification of each hit

size – Number of hits to return (default: 10)

sort – A comma-separated list of <field>:<direction> pairs

stats – Specific ‘tag’ of the request for logging and statistical purposes

stored_fields – A comma-separated list of stored fields to return as part of a hit

suggest_field – Specify which field to use for suggestions

suggest_mode – Specify suggest mode Valid choices: missing, popular, always Default: missing

suggest_size – How many suggestions to return in response

suggest_text – The source text for which the suggestions should be returned

terminate_after – The maximum number of documents to collect for each shard, upon reaching which the query execution will terminate early.

timeout – Explicit operation timeout

track_scores – Whether to calculate and return scores even if they are not used for sorting

track_total_hits – Indicate if the number of documents that match the query should be tracked. A number can also be specified, to accurately track the total hit count up to the number.

typed_keys – Specify whether aggregation and suggester names should be prefixed by their respective types in the response

version – Specify whether to return document version as part of a hit
原文地址:https://www.cnblogs.com/davis12/p/13732171.html