es知识这一篇就够了

Elasticsearch 基本概念

  • Index:Elasticsearch用来存储数据的逻辑区域,它类似于关系型数据库中的database 概念。一个index可以在一个或者多个shard上面,同时一个shard也可能会有多个replicas。
  • Document:Elasticsearch里面存储的实体数据,类似于关系数据中一个table里面的一行数据。
    document由多个field组成,不同的document里面同名的field一定具有相同的类型。document里面field可以重复出现,也就是一个field会有多个值,即multivalued。
  • Document type:为了查询需要,一个index可能会有多种document,也就是document type. 它类似于关系型数据库中的 table 概念。但需要注意,不同document里面同名的field一定要是相同类型的。
  • Mapping:它类似于关系型数据库中的 schema 定义概念。存储field的相关映射信息,不同document type会有不同的mapping。

下图是ElasticSearch和关系型数据库的一些术语比较:

Relationnal database Elasticsearch
Database Index
Table Type
Row Document
Column Field
Schema Mapping
Index Everything is indexed
SQL Query DSL
SELECT * FROM table… GET http://…
UPDATE table SET PUT http://…

查看分词结果

http://localhost:9200/your_index/your_type/your_id/_termvectors?fields=your_fieldsName

根据字断长度进行查询

  • 字断类型必须是keyword
body = {"query": {"script": {"script": {'source': "doc['name'][0].length()<2", 'lang': 'painless'}}}}

body = {"query": {"bool": {"filter": {"script": {"script": {'source': "doc['name'][0].length()<2", 'lang':'painless'}}}}}}

es查询

  • 查询所有数据

      # 搜索所有数据
      es.search(index="my_index", doc_type="test_type")
      # 或者
      body = {
        "query":{
          "match_all":{}
        }
      }
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • term与terms

      # term
      body = {
        "query":{
          "term":{
            "name":"python"
          }
        }
      }
      # 查询name="python"的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
      # terms
      body = {
        "query":{
          "terms":{
            "name":[
              "python","android"
            ]
          }
        }
      }
      # 搜索出name="python"或name="android"的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • match与multi_match

      # match:匹配name包含python关键字的数据
      body = {
        "query":{
          "match":{
            "name":"python"
          }
        }
      }
      # 查询name包含python关键字的数据
      es.search(index="my_index",doc_type="test_type",body=body)
      
      # multi_match:在name和addr里匹配包含深圳关键字的数据
      body = {
        "query":{
          "multi_match":{
            "query":"深圳",
            "fields":["name","addr"]
          }
        }
      }
      # 查询name和addr包含"深圳"关键字的数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • ids 查询

      body = {
        "query":{
          "ids":{
            "type":"test_type",
            "values":[
              "1","2"
            ]
          }
        }
      }
      # 搜索出id为1或2的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • 复合查询bool

      # bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)
      body = {
        "query":{
          "bool":{
            "must":[
              {
                "term":{
                  "name":"python"
                }
              },
              {
                "term":{
                  "age":18
                }
              }
            ]
          }
        }
      }
      # 获取name="python"并且age=18的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • 切片式查询

      body = {
        "query":{
          "match_all":{}
        }
        "from":2  # 从第二条数据开始
        "size":4  # 获取4条数据
      }
      # 从第2条数据开始,获取4条数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • 范围查询

      body = {
        "query":{
          "range":{
            "age":{
              "gte":18,    # >=18
              "lte":30    # <=30
            }
          }
        }
      }
      # 查询18<=age<=30的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • 通配符查询

      body = {
        "query":{
          "wildcard":{
            "name":"*id"
          }
        }
      }
      # 查询name以id为后缀的所有数据
      es.search(index="my_index",doc_type="test_type",body=body)
    
  • nested复杂查询

          {
              "_source": {
                  "include": ["_id"]  # 查询结果将只返回 _id 字段
              },
              "from": 0,
              "size": 500,
              "query": {
                  "bool": {
                      "must": [{
                          "nested": {
                              "path": "author_test",   // path:嵌套字段的字段名
                              "query": {
                                  "bool": {
                                      "must": [{
                                              "term": {
                                                  "author_test.person_id": "xxxxxx"  
                                              }
                                          },
                                          {
                                              "terms": {
                                                  "author_test.irtag": [0,1,2,3,4] 
                                              }
                                          }
                                      ]
                                  }
                              }
                          }
                      }]
                  }
              }
          }
    
  • 排序

      body = {
        "query":{
          "match_all":{}
        }
        "sort":{
          "age":{         # 根据age字段升序排序
            "order":"asc"    # asc升序,desc降序
          }
        }
      }
    
  • filter_path

      # 响应过滤
      # 只需要获取_id, _type数据,多个条件用逗号隔开
      es.search(index="my_index",doc_type="test_type",filter_path=['hits.hits._id', 'hits.hits._type'])
      # 获取所有数据
      es.search(index="my_index",doc_type="test_type",filter_path=["hits.hits._*"])
    
  • count

      # 获取数据量
      es.count(index="my_index",doc_type="test_type")
    
  • 度量类聚合

    获取最小值

      body = {
        "query":{
          "match_all":{}
        },
        "aggs":{            # 聚合查询
          "min_age":{         # 最小值的key
            "min":{         # 最小
              "field":"age"    # 查询"age"的最小值
            }
          }
        }
      }
      # 搜索所有数据,并获取age最小的值
      es.search(index="my_index",doc_type="test_type",body=body)
    

    获取最大值

      body = {
        "query":{
          "match_all":{}
        },
        "aggs":{            # 聚合查询
          "max_age":{         # 最大值的key
            "max":{         # 最大
              "field":"age"    # 查询"age"的最大值
            }
          }
        }
      }
      # 搜索所有数据,并获取age最大的值
      es.search(index="my_index",doc_type="test_type",body=body)
    

    获取和

      body = {
        "query":{
          "match_all":{}
        },
        "aggs":{            # 聚合查询
          "sum_age":{         # 和的key
            "sum":{         # 和
              "field":"age"    # 获取所有age的和
            }
          }
        }
      }
      # 搜索所有数据,并获取所有age的和
      es.search(index="my_index",doc_type="test_type",body=body)
    

    获取平均值

      body = {
        "query":{
          "match_all":{}
        },
        "aggs":{            # 聚合查询
          "avg_age":{         # 平均值的key
            "sum":{         # 平均值
              "field":"age"    # 获取所有age的平均值
            }
          }
        }
      }
      # 搜索所有数据,获取所有age的平均值
      es.search(index="my_index",doc_type="test_type",body=body)
    

更多的搜索用法

Python Elasticsearch DSL 使用简介

  • 安装

      $ pip install elasticsearch-dsl
    
  • 创建索引和文档

      from datetime import datetime
      from elasticsearch_dsl import DocType, Date, Integer, Keyword, Text
      from elasticsearch_dsl.connections import connections
      # Define a default Elasticsearch client
      connections.create_connection(hosts=['localhost'])
      class Article(DocType):
          title = Text(analyzer='snowball', fields={'raw': Keyword()})
          body = Text(analyzer='snowball')
          tags = Keyword()
          published_from = Date()
          lines = Integer()
          class Meta:
              index = 'blog'
          def save(self, ** kwargs):
              self.lines = len(self.body.split())
              return super(Article, self).save(** kwargs)
          def is_published(self):
              return datetime.now() >= self.published_from
      # create the mappings in elasticsearch
      Article.init()
    

    创建了一个索引为blog,文档为article的Elasticsearch数据库和表。
    必须执行Article.init()方法。 这样Elasticsearch才会根据你的DocType产生对应的Mapping。否则Elasticsearch就会在你第一次创建Index和Type的时候根据你的内容建立对应的Mapping。

  • 通过Elasticsearch Restful API来检查

      http GET http://127.0.0.1:9200/blog/_mapping/
      
      # 输出
      {"blog":
      	{"mappings":
      		{"article":
      			{"properties":{
      				"body":{"type":"text","analyzer":"snowball"},
      				"lines":{"type":"integer"},
      				"published_from":{"type":"date"},
      				"tags":{"type":"keyword"},
      				"title":{"type":"text","fields":{"raw":{"type":"keyword"}},"analyzer":"snowball"}
      			}
      		}}
      	}
      }
    

使用Elasticsearch进行CRUD操作

crud是指在做计算处理时的增加(Create)、检索(Retrieve)、更新(Update)和删除(Delete)几个单词的首字母简写。

  • Create an article

      # create and save and article
      article = Article(meta={'id': 1}, title='Hello elasticsearch!', tags=['elasticsearch'])
      article.body = ''' looong text '''
      article.published_from = datetime.now()
      article.save()
    

=>Restful API

	http POST http://127.0.0.1:9200/blog/article/1 title="hello elasticsearch" tags:='["elasticsearch"]'
	HTTP/1.1 201 Created
	Content-Length: 73
	Content-Type: application/json; charset=UTF-8
	{
	    "_id": "1", 
	    "_index": "blog", 
	    "_type": "article", 
	    "_version": 1, 
	    "created": true
	}
  • Get a article

      article = Article.get(id=1)
      # 如果获取一个不存在的文章则返回None
      a = Article.get(id='no-in-es')
      a is None
      # 还可以获取多个文章
      articles = Article.mget([1, 2, 3])
    

=>Restful API

	http GET http://127.0.0.1:9200/blog/article/1
	HTTP/1.1 200 OK
	Content-Length: 141
	Content-Type: application/json; charset=UTF-8
	{
	    "_id": "1", 
	    "_index": "blog", 
	    "_source": {
	        "tags": [
	            "elasticsearch"
	        ], 
	        "title": "hello elasticsearch"
	    }, 
	    "_type": "article", 
	    "_version": 1, 
	    "found": true
	}
  • Update a article

      article = Article.get(id=1)
      article.tags = ['elasticsearch', 'hello']
      article.save()
      # 或者
      article.update(body='Today is good day!', published_by='me')
    

=>Restful API

	http PUT http://127.0.0.1:9200/blog/article/1 title="hello elasticsearch" tags:='["elasticsearch", "hello"]'
	HTTP/1.1 200 OK
	Content-Length: 74
	Content-Type: application/json; charset=UTF-8
	{
	    "_id": "1", 
	    "_index": "blog", 
	    "_type": "article", 
	    "_version": 2, 
	    "created": false
	}
  • Delete a article

      article = Article.get(id=1)
      article.delete()
    

=> Restful API

	http DELETE http://127.0.0.1:9200/blog/article/1
	HTTP/1.1 200 OK
	Content-Length: 71
	Content-Type: application/json; charset=UTF-8
	{
	    "_id": "1", 
	    "_index": "blog", 
	    "_type": "article", 
	    "_version": 4, 
	    "found": true
	}
	http HEAD  http://127.0.0.1:9200/blog/article/1
	HTTP/1.1 404 Not Found
	Content-Length: 0
	Content-Type: text/plain; charset=UTF-8

使用ElasticSearch DSL 搜索一

  • 连接 Es:

      import elasticsearch
      
      es = elasticsearch.Elasticsearch([{'host': '127.0.0.1', 'port': 9200}])
    
  • 先看一下搜索,q 是指搜索内容,空格对 q 查询结果没有影响,size 指定个数,from_ 指定起始位置,filter_path 可以指定需要显示的数据,如本例中显示在最后的结果中的只有 _id 和 _type。

      res_3 = es.search(index="bank", q="Holmes", size=1, from_=1)
      res_4 = es.search(index="bank", q=" 39225    5686 ", size=1000, filter_path=['hits.hits._id', 'hits.hits._type'])
    
  • 查询指定索引的所有数据:

    其中,index 指定索引,字符串表示一个索引;列表表示多个索引,如 index=["bank", "banner", "country"];正则形式表示符合条件的多个索引,如 index=["apple*"],表示以 apple 开头的全部索引。
    search 中同样可以指定具体 doc-type。

      from elasticsearch_dsl import Search
      
      s = Search(using=es, index="index-test").execute()
      print s.to_dict()
    
  • 根据某个字段查询,可以多个查询条件叠加:

      s = Search(using=es, index="index-test").query("match", sip="192.168.1.1")
      s = s.query("match", dip="192.168.1.2")
      s = s.excute()
    
  • 多字段查询:

      from elasticsearch_dsl.query import MultiMatch, Match
      
      multi_match = MultiMatch(query='hello', fields=['title', 'content'])
      s = Search(using=es, index="index-test").query(multi_match)
      s = s.execute()
      
      print s.to_dict()
    
  • 还可以用 Q() 对象进行多字段查询,fields 是一个列表,query 为所要查询的值。

      from elasticsearch_dsl import Q
      
      q = Q("multi_match", query="hello", fields=['title', 'content'])
      s = s.query(q).execute()
      
      print s.to_dict()
    
  • Q() 第一个参数是查询方法,还可以是 bool。

      q = Q('bool', must=[Q('match', title='hello'), Q('match', content='world')])
      s = s.query(q).execute()
      
      print s.to_dict()
    
  • 通过 Q() 进行组合查询,相当于上面查询的另一种写法。

      q = Q("match", title='python') | Q("match", title='django')
      s = s.query(q).execute()
      print(s.to_dict())
      # {"bool": {"should": [...]}}
      
      q = Q("match", title='python') & Q("match", title='django')
      s = s.query(q).execute()
      print(s.to_dict())
      # {"bool": {"must": [...]}}
      
      q = ~Q("match", title="python")
      s = s.query(q).execute()
      print(s.to_dict())
      # {"bool": {"must_not": [...]}}
    
  • 过滤,在此为范围过滤,range 是方法,timestamp 是所要查询的 field 名字,gte 为大于等于,lt 为小于,根据需要设定即可。
    关于 term 和 match 的区别,term 是精确匹配,match 会模糊化,会进行分词,返回匹配度分数,(term 如果查询小写字母的字符串,有大写会返回空即没有命中,match 则是不区分大小写都可以进行查询,返回结果也一样)

      # 范围查询
      s = s.filter("range", timestamp={"gte": 0, "lt": time.time()}).query("match", country="in")
      # 普通过滤
      res_3 = s.filter("terms", balance_num=["39225", "5686"]).execute()
    
  • 其他写法:

      s = Search()
      s = s.filter('terms', tags=['search', 'python'])
      print(s.to_dict())
      # {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
      
      s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
      print(s.to_dict())
      # {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
      s = s.exclude('terms', tags=['search', 'python'])
      # 或者
      s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])
      print(s.to_dict())
      # {'query': {'bool': {'filter': [{'bool': {'must_not': [{'terms': {'tags': ['search', 'python']}}]}}]}}}
    
  • 聚合可以放在查询,过滤等操作的后面叠加,需要加 aggs。
    bucket 即为分组,其中第一个参数是分组的名字,自己指定即可,第二个参数是方法,第三个是指定的 field。
    metric 也是同样,metric 的方法有 sum、avg、max、min 等,但是需要指出的是,有两个方法可以一次性返回这些值,stats 和 extended_stats,后者还可以返回方差等值。

      # 实例1
      s.aggs.bucket("per_country", "terms", field="timestamp").metric("sum_click", "stats", field="click").metric("sum_request", "stats", field="request")
      
      # 实例2
      s.aggs.bucket("per_age", "terms", field="click.keyword").metric("sum_click", "stats", field="click")
      
      # 实例3
      s.aggs.metric("sum_age", "extended_stats", field="impression")
      
      # 实例4
      s.aggs.bucket("per_age", "terms", field="country.keyword")
      
      # 实例5,此聚合是根据区间进行聚合
      a = A("range", field="account_number", ranges=[{"to": 10}, {"from": 11, "to": 21}])
      
      res = s.execute()
    
  • 最后依然要执行 execute(),此处需要注意,s.aggs 操作不能用变量接收(如 res=s.aggs,这个操作是错误的),聚合的结果会保存到 res 中显示。

      #排序
      s = Search().sort(
          'category',
          '-title',
          {"lines" : {"order" : "asc", "mode" : "avg"}}
      )
      
      # 分页
      s = s[10:20]
      # {"from": 10, "size": 10}
    
  • 一些扩展方法,感兴趣的同学可以看看:

      s = Search()
      
      # 设置扩展属性使用`.extra()`方法
      s = s.extra(explain=True)
      
      # 设置参数使用`.params()`
      s = s.params(search_type="count")
      
      # 如要要限制返回字段,可以使用`source()`方法
      # only return the selected fields
      s = s.source(['title', 'body'])
      # don't return any fields, just the metadata
      s = s.source(False)
      # explicitly include/exclude fields
      s = s.source(include=["title"], exclude=["user.*"])
      # reset the field selection
      s = s.source(None)
      
      # 使用dict序列化一个查询
      s = Search.from_dict({"query": {"match": {"title": "python"}}})
      
      # 修改已经存在的查询
      s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})
    

使用ElasticSearch DSL 搜索二

Search主要包括:

查询(queries)
过滤器(filters)
聚合(aggreations)
排序(sort)
分页(pagination)
额外的参数(additional parameters)
相关性(associated)

创建一个查询对象

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search
client = Elasticsearch()
s = Search(using=client)

初始化测试数据

def add_article(id_, title, body, tags):
    article = Article(meta={'id': id_}, title=title, tags=tags)
    article.body = body
    article.published_from = datetime.now()
    article.save()
def init_test_data():
    add_article(2, 'Python is good!', 'Python is good!', ['python'])
    add_article(3, 'Elasticsearch', 'Distributed, open source search and analytics engine', ['elasticsearch'])
    add_article(4, 'Python very quickly', 'Python very quickly', ['python'])
    add_article(5, 'Django', 'Python Web framework', ['python', 'django'])

第一个查询语句

# 创建一个查询语句
s = Search().using(client).query("match", title="python")
# 查看查询语句对应的字典结构
print(s.to_dict())
# {'query': {'match': {'title': 'python'}}}
# 发送查询请求到Elasticsearch
response = s.execute()
# 打印查询结果
for hit in s:
    print(hit.title)
# Out:
Python is good!
Python very quickly
# 删除查询
s.delete()

1、Queries

# 创建一个多字段查询
multi_match = MultiMatch(query='python', fields=['title', 'body'])
s = Search().query(multi_match)
print(s.to_dict())
# {'query': {'multi_match': {'fields': ['title', 'body'], 'query': 'python'}}}
# 使用Q语句
q = Q("multi_match", query='python', fields=['title', 'body'])
# 或者
q = Q({"multi_match": {"query": "python", "fields": ["title", "body"]}})
s = Search().query(q)
print(s.to_dict())
# If you already have a query object, or a dict 
# representing one, you can just override the query used 
# in the Search object:
s.query = Q('bool', must=[Q('match', title='python'), Q('match', body='best')])
print(s.to_dict())
# 查询组合
q = Q("match", title='python') | Q("match", title='django')
s = Search().query(q)
print(s.to_dict())
# {"bool": {"should": [...]}}
q = Q("match", title='python') & Q("match", title='django')
s = Search().query(q)
print(s.to_dict())
# {"bool": {"must": [...]}}
q = ~Q("match", title="python")
s = Search().query(q)
print(s.to_dict())
# {"bool": {"must_not": [...]}}

2、Filters

s = Search()
s = s.filter('terms', tags=['search', 'python'])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
s = s.query('bool', filter=[Q('terms', tags=['search', 'python'])])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'terms': {'tags': ['search', 'python']}}]}}}
s = s.exclude('terms', tags=['search', 'python'])
# 或者
s = s.query('bool', filter=[~Q('terms', tags=['search', 'python'])])
print(s.to_dict())
# {'query': {'bool': {'filter': [{'bool': {'must_not': [{'terms': {'tags': ['search', 'python']}}]}}]}}}

3、Aggregations

s = Search()
a = A('terms', filed='title')
s.aggs.bucket('title_terms', a)
print(s.to_dict())
# {
# 'query': {
#   'match_all': {}
#  },
#  'aggs': {
#       'title_terms': {
#            'terms': {'filed': 'title'}
#        }
#    }
# }
# 或者
s = Search()
s.aggs.bucket('articles_per_day', 'date_histogram', field='publish_date', interval='day') 
    .metric('clicks_per_day', 'sum', field='clicks') 
    .pipeline('moving_click_average', 'moving_avg', buckets_path='clicks_per_day') 
    .bucket('tags_per_day', 'terms', field='tags')
s.to_dict()
# {
#   "aggs": {
#     "articles_per_day": {
#       "date_histogram": { "interval": "day", "field": "publish_date" },
#       "aggs": {
#         "clicks_per_day": { "sum": { "field": "clicks" } },
#         "moving_click_average": { "moving_avg": { "buckets_path": "clicks_per_day" } },
#         "tags_per_day": { "terms": { "field": "tags" } }
#       }
#     }
#   }
# }

4、Sorting

s = Search().sort(
    'category',
    '-title',
    {"lines" : {"order" : "asc", "mode" : "avg"}}
)

5、Pagination

s = s[10:20]
# {"from": 10, "size": 10}

6、Extra Properties and parameters

s = Search()
# 设置扩展属性使用`.extra()`方法
s = s.extra(explain=True)
# 设置参数使用`.params()`
s = s.params(search_type="count")
# 如要要限制返回字段,可以使用`source()`方法
# only return the selected fields
s = s.source(['title', 'body'])
# don't return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(include=["title"], exclude=["user.*"])
# reset the field selection
s = s.source(None)
# 使用dict序列化一个查询
s = Search.from_dict({"query": {"match": {"title": "python"}}})
# 修改已经存在的查询
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})

性能优化

1.过滤比查询要快,因为过滤不需要计算相关性分数,相关性分数的计算也会浪费很多时间;
2.不得不提一下range过滤,这个方法能不用就不要用,相当消耗时间,我去除这个方法之后时间快了一半不止;
3.对数据进行分类,分成几个大类,分别建立其索引,这样速度也会快很多;
4.精简数据,不需要的数据直接舍弃
s = Search()
# 设置扩展属性使用`.extra()`方法
s = s.extra(explain=True)
# 设置参数使用`.params()`
s = s.params(search_type="count")
# 如要要限制返回字段,可以使用`source()`方法
# only return the selected fields
s = s.source(['title', 'body'])
# don't return any fields, just the metadata
s = s.source(False)
# explicitly include/exclude fields
s = s.source(include=["title"], exclude=["user.*"])
# reset the field selection
s = s.source(None)
# 使用dict序列化一个查询
s = Search.from_dict({"query": {"match": {"title": "python"}}})
# 修改已经存在的查询
s.update_from_dict({"query": {"match": {"title": "python"}}, "size": 42})

原文地址:https://www.cnblogs.com/beihangxuwei/p/13491149.html