elasticsearch 基础 —— Update By Query API

Update By Query API

最简单的用法是_update_by_query在不更改源的情况下对索引中的每个文档执行更新。这对于获取新属性或其他一些在线映射更改很有用 。这是API:

POST twitter/_update_by_query?conflicts=proceed

这将返回如下内容:

{
  "took" : 147,
  "timed_out": false,
  "updated": 120,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "total": 120,
  "failures" : [ ]
}

_update_by_query在索引启动时获取索引的快照,并使用internal版本控制索引它。这意味着如果文档在拍摄快照的时间和处理索引请求之间发生更改,则会出现版本冲突。当版本匹配时,文档会更新,版本号会递增。

由于internal版本控制不支持将值0作为有效版本号,因此无法使用版本等于零的文档进行更新, _update_by_query并且将使请求失败。

所有更新和查询失败都会导致_update_by_query中止并failures在响应中返回。已执行的更新仍然存在。换句话说,该过程不会回滚,只会中止。当第一个失败导致中止时,失败的批量请求返回的所有失败都将在failures元素中返回; 因此,可能存在相当多的失败实体。

如果您只想计算版本冲突,不要导致_update_by_query 中止,您可以conflicts=proceed在URL或"conflicts": "proceed" 请求正文中设置。第一个例子是这样做的,因为它只是试图获取在线映射更改,而版本冲突只是意味着冲突文档在_update_by_query 尝试更新文档的开始和更新之间进行了更新。这很好,因为该更新将获得在线映射更新。

回到API格式,这将更新twitter索引中的推文:

POST twitter/_doc/_update_by_query?conflicts=proceed

您还可以_update_by_query使用 Query DSL进行限制。这将更新twitter用户索引中的所有文档kimchy

POST twitter/_update_by_query?conflicts=proceed
{
  "query": { ①
    "term": {
      "user": "kimchy"
    }
  }
}

  必须query以与Search API相同的方式将查询作为值传递给键。您也可以使用q 与搜索API相同的方式使用参数。

到目前为止,我们只是在不更改文档来源的情况下更新文档。这对于拾取新房产等事情非常有用, 但这只是其中一半的乐趣。_update_by_query 支持脚本来更新文档。这将增加likes所有kimchy的推文上的字段:

POST twitter/_update_by_query
{
  "script": {
    "source": "ctx._source.likes++",
    "lang": "painless"
  },
  "query": {
    "term": {
      "user": "kimchy"
    }
  }
}

就像在Update API中一样,您可以设置ctx.op更改执行的操作:

noop

设置ctx.op = "noop"脚本是否确定不需要进行任何更改。这将导致_update_by_query从其更新中省略该文档。这种无操作将noop响应机构的计数器中 报告。

delete

设置ctx.op = "delete"如果你的脚本决定,该文件必须被删除。删除将deleted响应正文中的计数器中 报告。

设置ctx.op为其他任何内容都是错误的。设置任何其他字段ctx是错误的。

请注意,我们已停止指定conflicts=proceed。在这种情况下,我们希望版本冲突中止该过程,以便我们可以处理失败。

此API不允许您移动它接触的文档,只需修改它们的源。这是故意的!我们没有规定将文档从原始位置删除。

也可以同时在多个索引和多个类型上完成这一切,就像搜索API一样:

POST twitter,blog/_doc,post/_update_by_query

如果您提供,routing则路由将复制到滚动查询,将进程限制为与该路由值匹配的分片:

POST twitter/_update_by_query?routing=1

默认情况下,_update_by_query使用1000的滚动批次。您可以使用scroll_sizeURL参数更改批量大小:

POST twitter/_update_by_query?scroll_size=100

_update_by_query也可以通过指定如下内容来使用“ 摄取节点”功能pipeline

PUT _ingest/pipeline/set-foo
{
  "description" : "sets foo",
  "processors" : [ {
      "set" : {
        "field": "foo",
        "value": "bar"
      }
  } ]
}
POST twitter/_update_by_query?pipeline=set-foo

URL参数

除了标准的参数,如pretty,此更新通过查询API也支持refreshwait_for_completionwait_for_active_shardstimeout 和scroll

发送refresh将在请求完成时更新正在更新的索引中的所有分片。这与Index API的refresh 参数不同,后者仅导致接收新数据的分片被编入索引。

如果请求包含,wait_for_completion=false则Elasticsearch将执行一些预检检查,启动请求,然后返回task 可与Tasks API 一起使用以取消或获取任务状态的请求。Elasticsearch还将创建此任务的记录作为文档.tasks/task/${taskId}。这是你的保留或删除你认为合适。完成后,删除它,以便Elasticsearch可以回收它使用的空间。

wait_for_active_shards控制在继续请求之前必须激活碎片的副本数量。详情请见此处 。timeout控制每个写入请求等待不可用分片变为可用的时间。两者都完全适用于 Bulk API中的工作方式。由于_update_by_query采用滚动搜索,你还可以指定scroll参数来控制多长时间保持“搜索上下文”活着,例如?scroll=10m,默认情况下它是5分钟。

requests_per_second可以被设置为任何正十进制数(1.46, 1000等)和节流速率_update_by_query通过填充每个批次由一等待时间发出索引操作的批次。可以通过设置requests_per_second为禁用限制-1

通过在批处理之间等待来完成限制,以便在_update_by_query内部使用的滚动 可以被赋予考虑填充的超时。填充时间是批量大小除以requests_per_second写入所花费的时间之间的差异。默认情况下,批处理大小为1000,因此如果requests_per_second设置为500

target_time = 1000 / 500 per second = 2 seconds
wait_time = target_time - delete_time = 2 seconds - .5 seconds = 1.5 seconds

由于批处理是作为单个_bulk请求发出的,因此大批量大小将导致Elasticsearch创建许多请求,然后等待一段时间再开始下一组。这是“突发”而不是“平滑”。默认是-1

响应正文

JSON响应如下所示:

{
  "took" : 147,
  "timed_out": false,
  "total": 5,
  "updated": 5,
  "deleted": 0,
  "batches": 1,
  "version_conflicts": 0,
  "noops": 0,
  "retries": {
    "bulk": 0,
    "search": 0
  },
  "throttled_millis": 0,
  "requests_per_second": -1.0,
  "throttled_until_millis": 0,
  "failures" : [ ]
}

took

整个操作从开始到结束的毫秒数。

timed_out

true如果在查询执行更新期间执行的任何请求超时 ,则将此标志设置为。

total

已成功处理的文档数。

updated

已成功更新的文档数。

deleted

已成功删除的文档数。

batches

由查询更新拉回的滚动响应数。

version_conflicts

按查询更新的版本冲突数。

noops

由于用于按查询更新的脚本返回的noop值,因此忽略的文档数ctx.op

retries

逐个更新尝试的重试次数。bulk是重试的批量操作search的数量,是重试的搜索操作的数量。

throttled_millis

请求睡眠符合的毫秒数requests_per_second

requests_per_second

在查询更新期间有效执行的每秒请求数。

throttled_until_millis

在按查询响应删除时,此字段应始终等于零。它只在使用Task API时有意义,它指示下一次(自纪元以来的毫秒数),为了符合,将再次执行受限制的请求requests_per_second

failures

如果在此过程中存在任何不可恢复的错误,则会出现故障数组。如果这是非空的,那么请求因为那些失败而中止。逐个查询是使用批处理实现的,任何故障都会导致整个进程中止,但当前批处理中的所有故障都会被收集到数组中。您可以使用该conflicts选项来防止reindex在版本冲突中中止。

使用Task API

您可以使用Task API获取所有正在运行的逐个查询请求的状态 :

GET _tasks?detailed=true&actions=*byquery

回复如下:

{
  "nodes" : {
    "r1A2WoRbTwKZ516z6NEs5A" : {
      "name" : "r1A2WoR",
      "transport_address" : "127.0.0.1:9300",
      "host" : "127.0.0.1",
      "ip" : "127.0.0.1:9300",
      "attributes" : {
        "testattr" : "test",
        "portsfile" : "true"
      },
      "tasks" : {
        "r1A2WoRbTwKZ516z6NEs5A:36619" : {
          "node" : "r1A2WoRbTwKZ516z6NEs5A",
          "id" : 36619,
          "type" : "transport",
          "action" : "indices:data/write/update/byquery",
          "status" : {    ①
            "total" : 6154,
            "updated" : 3500,
            "created" : 0,
            "deleted" : 0,
            "batches" : 4,
            "version_conflicts" : 0,
            "noops" : 0,
            "retries": {
              "bulk": 0,
              "search": 0
            }
            "throttled_millis": 0
          },
          "description" : ""
        }
      }
    }
  }
}

 该对象包含实际状态。它就像响应json一样,重要的是增加了这个total领域。total是reindex期望执行的操作总数。您可以通过添加估计的进展updatedcreated以及deleted多个领域。请求将在其总和等于total字段时结束。

使用任务ID,您可以直接查找任务:

GET /_tasks/task_id

此API的优势在于它可以集成wait_for_completion=false 以透明地返回已完成任务的状态。如果任务完成并wait_for_completion=false设置在它上面,它将返回一个 results或一个error字段。此功能的成本是wait_for_completion=false创建的文档 .tasks/task/${taskId}。您可以删除该文档。

使用Cancel Task API

可以使用任务取消API取消任何按查询更新:

POST _tasks/task_id/_cancel

task_id可以使用上述任务的API被发现。

取消应该很快发生,但可能需要几秒钟。上面的任务状态API将继续列出任务,直到它被唤醒以取消自身。

Rethrottling

requests_per_second可以使用_rethrottleAPI 通过查询在运行的更新上更改值:

POST _update_by_query/task_id/_rethrottle?requests_per_second=-1

task_id可以使用上述任务的API被发现。

就像在_update_by_queryAPI 上设置它一样,requests_per_second 可以-1禁用限制或任何十进制数,如1.712限制到该级别。加速查询的Rethrottling会立即生效,但是在完成当前批处理后,重新启动会降低查询速度。这可以防止滚动超时。

切片

逐个查询支持切片滚动以并行化更新过程。这种并行化可以提高效率,并提供一种方便的方法将请求分解为更小的部分。

手动切片

通过为每个请求提供切片ID和切片总数,手动切片查询:

POST twitter/_update_by_query
{
  "slice": {
    "id": 0,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}
POST twitter/_update_by_query
{
  "slice": {
    "id": 1,
    "max": 2
  },
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您可以验证哪个适用于:

GET _refresh
POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

 这样的结果是明智的total

{
  "hits": {
    "total": 120
  }
}

自动切片

您还可以使用“ 切片滚动”切换为自动并行查询 _uid。使用slices指定片使用的数字:

POST twitter/_update_by_query?refresh&slices=5
{
  "script": {
    "source": "ctx._source['extra'] = 'test'"
  }
}

您还可以验证以下内容:

POST twitter/_search?size=0&q=extra:test&filter_path=hits.total

这样的结果是明智的total

{
  "hits": {
    "total": 120
  }
}

设置slicesauto将让Elasticsearch选择要使用的切片数。此设置将使用每个分片一个切片,达到一定限制。如果有多个源索引,它将根据具有最小分片数的索引选择切片数。

添加slices_update_by_query刚刚自动化在上面的部分中使用的手工工艺,创建子请求,这意味着它有一些怪癖:

  • 您可以在Tasks API中查看这些请求 。这些子请求是请求任务的“子”任务slices
  • 获取请求的任务状态slices仅包含已完成切片的状态。
  • 这些子请求可单独寻址,例如取消和重新限制。
  • 对请求进行重新处理slices将按比例重新调整未完成的子请求。
  • 取消请求slices将取消每个子请求。
  • 由于slices每个子请求的性质将无法获得完全均匀的文档部分。将解决所有文档,但某些切片可能比其他文件更大。期望更大的切片具有更均匀的分布。
  • 像请求requests_per_secondsize请求的参数slices 按比例分配给每个子请求。结合上面关于分布不均匀的点,你应该得出结论,使用 sizewith slices可能不会导致size文件确切地为`_update_by_query`。
  • 每个子请求获得的源索引的略有不同的快照,尽管这些都是在大约相同的时间进行的。

挑选切片数量

如果自动切片,设置slicesauto将为大多数索引选择合理的数字。如果您手动切片或以其他方式调整自动切片,请使用这些指南。

当数量slices等于索引中的分片数时,查询性能最有效。如果该数字很大(例如,500),请选择较小的数字,因为太多slices会损害性能。设置 slices高于分片数通常不会提高效率并增加开销。

更新性能在可用资源上以切片数量线性扩展。

查询或更新性能是否主导运行时取决于重新编制索引的文档和群集资源。

选择一个新的属性

假设您创建了一个没有动态映射的索引,用数据填充它,然后添加了一个映射值以从数据中获取更多字段:

PUT test
{
  "mappings": {
    "_doc": {
      "dynamic": false,   ①
      "properties": {
        "text": {"type": "text"}
      }
    }
  }
}

POST test/_doc?refresh
{
  "text": "words words",
  "flag": "bar"
}
POST test/_doc?refresh
{
  "text": "words words",
  "flag": "foo"
}
PUT test/_mapping/_doc   ②
{
  "properties": {
    "text": {"type": "text"},
    "flag": {"type": "text", "analyzer": "keyword"}
  }
}

这意味着不会将新字段编入索引,只存储在其中_source

这会更新映射以添加新flag字段。要获取新字段,您必须使用它重新索引所有文档。

搜索数据将找不到任何内容:

POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 0
  }
}

但您可以发出_update_by_query请求以获取新映射:

POST test/_update_by_query?refresh&conflicts=proceed
POST test/_search?filter_path=hits.total
{
  "query": {
    "match": {
      "flag": "foo"
    }
  }
}
{
  "hits" : {
    "total" : 1
  }
}

将字段添加到多字段时,您可以执行完全相同的操作。

原文地址:https://www.cnblogs.com/gmhappy/p/11864055.html