elasticsearch 乐观并发控制

一、参考

Elasticsearch: 权威指南 处理冲突

Index API

Sequence IDs: Coming Soon to an Elasticsearch Cluster Near You

二、两种并发更新策略

2.1 悲观并发控制

这种方法被关系型数据库广泛应用,它假定有变更冲突可能发生,因此阻塞访问资源以防止冲突

一个典型的例子是,读取一行数据之前先锁住,确保只有放置了锁的线程可以对这行数据进行操作

2.2 乐观并发控制

elasticsearch 使用这种方式的并发控制,假定冲突是不可发生的,并且不会阻塞当前正在尝试的操作

然而,如果源数据在读写过程中被修改了,更新将会失败,应用程序决定接下来如何解决冲突,例如:

重试更新、使用新的数据、将相关情况报告给用户

三、乐观并发控制

elasticsearch 是分布式的,

当文档创建、更新或者删除时候,新版本的文档必须复制到集群中的其他节点

elasticsearch 是异步和并发的,

这意味着这些复制到其他节点的请求,被并发发送,并且到达目的节点的顺序是乱序的

elasticsearch 需要一种方法确保文档的旧版本不会覆盖新的版本

即使用 _version保证

3.1 _version

每个文档都有一个版本号_version, 当文档被修改的时候,版本号递增,使用该版本号确保变更顺序正确,

如果旧版本的文档在新版本之后到达,它可以被简单的忽略

# (1) 创建文档
PUT yztest/_doc/1
{
  "f1": 1
}

# 创建返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1, # 初始的版本号为1
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

# (2) 更新文档
POST yztest/_doc/1
{
  "f1": 11
}

# 更新的返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2, # 更新后,版本号递增
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

# (3) 删除文档
DELETE yztest/_doc/1

# 删除的返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3, # 删除后,版本号递增
  "result" : "deleted",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}

# (4) 删除后重新创建相同id的文档
POST yztest/_doc/1
{
  "f1": 11
}

# 重新创建的返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 4, # 版本号递增,而不是为1
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 3,
  "_primary_term" : 1
}

3.2 如何修改版本号?

refresh 接口,将从最近一个 refresh 到目前所有的操作刷新到缓存中,即 refresh 后,新入的文档可以被查询到,删除、更新(即先删除后添加)的文档被更新

# 查看refresh interval
GET yztest/_settings?include_defaults

# 更新refresh interval
PUT yztest/_settings
{
  "index" : {
    "refresh_interval" : "300s"
  }
}

# 更新返回值
{
  "acknowledged" : true
}

设置 refresh interval 后,查看创建、更新是否可以被发现?

# (1) 创建新的文档(重建索引并设置refresh interval)
POST yztest/_doc/1
{
  "f1": 1
}

# 创建返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1, # 新建的索引,新的文档版本号为1
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

# (2) 更新文档
POST yztest/_doc/1
{
  "f1": 2
}

# 更新返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 2, # 新的版本号为2
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

# (3) 查看是否更新
GET yztest/_search?version=true

# 查询返回值
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "yztest",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 1, # 因为还没有refresh, 所以版本号没有更新到2
        "_score" : 1.0,
        "_source" : {
          "f1" : 1
        }
      }
    ]
  }
}

# (4) refresh
POST yztest/_refresh

# refresh返回值
{
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  }
}

# (5) refresh后查看文档
GET yztest/_search?version=true

# refresh后,查询的返回值
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "yztest",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 2, # 现在的版本号为最新的版本号
        "_score" : 1.0,
        "_source" : {
          "f1" : 2
        }
      }
    ]
  }
}

# (6) 更新文档

POST yztest/_doc/1
{
  "f1": 3
}

# 更新的返回结果
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3, # 版本号会递增,即使没有refresh(即查询到的版本号为1)
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 2,
  "_primary_term" : 1
}




结论:

文档的版本号,在相同的索引中,一直递增

refresh 的作用是将文档操作刷新到内存中,即能被检索到(最新的文档),对于版本号没有影响

3.3 internal version

# (1) 创建文档,指定版本信息
POST yztest/_doc/1?version=2
{
  "f1": 1
}

# 返回结果,可以发现不能指定 internal类型的version
{
  "error" : {
    "root_cause" : [
      {
        "type" : "action_request_validation_exception",
        "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
      }
    ],
    "type" : "action_request_validation_exception",
    "reason" : "Validation Failed: 1: internal versioning can not be used for optimistic concurrency control. Please use `if_seq_no` and `if_primary_term` instead;"
  },
  "status" : 400
}

# (2) 创建文档
POST yztest/_doc/1
{
  "f1": 1
}

# 创建返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 1, # 新的文档
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}

结论: internal(内部)类型的版本号,无法通过指定特殊值更新文档的版本信息,详见

Deprecated usage of internal versioning for optimistic concurrency controledit

3.4 external version

常用的场景为:

使用其他的数据库当作主数据库,使用 ES 当作数据检索,这意味着所有主数据库的所有修改发生时候,都需要被复制到 ES,如果有多个进程负责这个数据的同步,可能会遇到并发一致性问题

如果主数据库中已经有了版本号(一个能作为版本号的字段值或者 timestamp),那么可以通过在 ES 中添加 version_type=external 的方式使用主数据库的版本信息,

外部版本号是一个  JAVA 中的 long 正值

# (1) 创建一个新的文档,指定外部版本号为3
PUT yztest/_doc/1?version=3&version_type=external
{
  "f1": 1
}

# 创建的返回值
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 3, # 通过指定外部版本号,设置文档版本号为3
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 0,
  "_primary_term" : 1
}


外部版本号的处理方式:

检查当前的_version 是否小于指定的版本号,而不是检查当前的_version 和请求中指定的版本号是否相同,如果请求成功,指定的外部版本号作为文档的新的_version进行存储

外部版本号可以出现在下面的操作中:

(1) 创建新的文档

(2) 更新文档

(3) 删除文档

# (1) 获取文档当前的版本信息
GET yztest/_search?version=true

# 查询返回值
{
  "took" : 0,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "yztest",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 3, # 文档的版本号为3
        "_score" : 1.0,
        "_source" : {
          "f1" : 1
        }
      }
    ]
  }
}

# (2) 更新文档,指定新的版本号为5
POST yztest/_doc/1?version=5&version_type=external
{
  "f1": 5
}

# 更新的结果
{
  "_index" : "yztest",
  "_type" : "_doc",
  "_id" : "1",
  "_version" : 5, # 设置新的版本号为5
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 2,
    "failed" : 0
  },
  "_seq_no" : 1,
  "_primary_term" : 1
}

3.5 版本冲突

# (1) 查看文档的version
GET yztest/_search?version=true

# 查询的结果
{
  "took" : 1,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : {
      "value" : 1,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "yztest",
        "_type" : "_doc",
        "_id" : "1",
        "_version" : 3, # 当前的版本号为3
        "_score" : 1.0,
        "_source" : {
          "f1" : 1
        }
      }
    ]
  }
}

# (2) 更新为新的版本号
POST yztest/_doc/1?version=2&version_type=external
{
  "f1": 2
}

# 更新报错
{
  "error" : {
    "root_cause" : [
      {
        "type" : "version_conflict_engine_exception",
        "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [2]",
        "index_uuid" : "oDT5p07YRUiVwglKdUyQww",
        "shard" : "0",
        "index" : "yztest"
      }
    ],
    "type" : "version_conflict_engine_exception",
    "reason" : "[1]: version conflict, current version [3] is higher or equal to the one provided [2]",
    "index_uuid" : "oDT5p07YRUiVwglKdUyQww",
    "shard" : "0",
    "index" : "yztest"
  },
  "status" : 409
}



结论:更新时候,不能指定低于当前文档版本号

原文地址:https://www.cnblogs.com/thewindyz/p/14464921.html