【ElasticSearch】线上索引重建

项目背景:

  1.由于项目中存在旧索引设置不合理情况,需要进行索引重建

  2.线上的ElasticSearch由1台扩容到3台,原有的索引需要分片

  例如:

   旧索引 index_user 设置主分片为1,副分片为0,数据没有高可用

GET index_user/_search
{
  "took" : 121,
  "timed_out" : false,
  "_shards" : {
    "total" : 1,
    "successful" : 1,
    "skipped" : 0,
    "failed" : 0
  }

实现步骤:

  1.新建索引,index_user_v2设置我们所需要的主分片和副分片数量

PUT index_user_v2
{
  "settings": {
    "number_of_replicas": 1,
    "number_of_shards": 5
  }
}

  2.设置索引数据结构,因为新索引和旧索引mapping结构一致,索引可以直接copy旧索引的数据结构;

PUT index_user_v2/t_user/_mappings
{
  "properties": {
    "age": {
      "type": "integer"
    },
    "ageScope": {
      "type": "keyword"
    },
    "birthday": {
      "type": "long"
    },
    "cityId": {
      "type": "integer"
    },
    "cityName": {
      "type": "keyword"
    },
    "countryCode": {
      "type": "integer"
    },
    "countyId": {
      "type": "integer"
    },
    "create_time": {
      "type": "long"
    },
    "dbId": {
      "type": "long"
    },
    "email": {
      "type": "keyword"
    },
    "gameIds": {
      "type": "text",
      "analyzer": "ik_max_word"
    },
    "isCreateServer": {
      "type": "integer"
    },
    "isDelete": {
      "type": "boolean"
    },
    "nickName": {
      "type": "text",
      "analyzer": "ik_smart"
    },
    "nickNamePingYin": {
      "type": "text",
      "analyzer": "pinyin"
    },
    "nnNumber": {
      "type": "long"
    },
    "provinceId": {
      "type": "integer"
    },
    "provinceName": {
      "type": "keyword"
    },
    "sex": {
      "type": "keyword"
    },
    "signature": {
      "type": "keyword"
    },
    "status": {
      "type": "keyword"
    },
    "telNum": {
      "type": "keyword"
    },
    "updae_time": {
      "type": "long"
    },
    "userId": {
      "type": "long"
    },
    "userType": {
      "type": "keyword"
    },
    "userUrl": {
      "type": "keyword"
    },
    "userUrlNn": {
      "type": "keyword"
    },
    "user_id": {
      "type": "long"
    }
  }
}

  3. 执行完步骤1和步骤2之后,在Kibana->Monitoring->Node里面可以看到索引index_user_v2已经被自动分片到三个节点,如图

           

    这里,正式开始索引重建之前,可以将index_user_v2的副分片数量设置为0,减少副分片写入带来的时间损耗

  PUT index_user_v2/_settings
  {
    "settings": {
      "number_of_replicas": 0
    }
  }

   4.执行索引迁移,将index_user上的数据复制到index_user_v2, 同时设置 wait_for_completion=false 表示索引迁移的请求会在后台执行

# 索引迁移
POST /_reindex?wait_for_completion=false
{
  "source": {
    "index": "index_user"
  },
  "dest": {
    "index":"index_user_v2"
  }
}

  执行后,会生成一个taskId  :  例如:Mroifc1NSJq2s7mf38XxmA:1679363718,后续我们可以使用这个taskId去查询这个迁移任务的状态,耗时,以及执行的进度等等

GET _tasks/Mroifc1NSJq2s7mf38XxmA:1679363718
{
  "completed" : true,
  "task" : {
    "node" : "Mroifc1NSJq2s7mf38XxmA",
    "id" : 1679363718,
    "type" : "transport",
    "action" : "indices:data/write/reindex",
    "status" : {
      "total" : 15480531,
      "updated" : 0,
      "created" : 15480531,
      "deleted" : 0,
      "batches" : 15481,
      "version_conflicts" : 0,
      "noops" : 0,
      "retries" : {
        "bulk" : 0,
        "search" : 0
      },
      "throttled_millis" : 0,
      "requests_per_second" : -1.0,
      "throttled_until_millis" : 0
    },
    "description" : "reindex from [index_user] to [index_user_v2]",
    "start_time_in_millis" : 1623316057822,
    "running_time_in_nanos" : 594661905143,
    "cancellable" : true,
    "headers" : { }
  },
  "response" : {
    "took" : 594661,
    "timed_out" : false,
    "total" : 15480531,
    "updated" : 0,
    "created" : 15480531,
    "deleted" : 0,
    "batches" : 15481,
    "version_conflicts" : 0,
    "noops" : 0,
    "retries" : {
      "bulk" : 0,
      "search" : 0
    },
    "throttled" : "0s",
    "throttled_millis" : 0,
    "requests_per_second" : -1.0,
    "throttled_until" : "0s",
    "throttled_until_millis" : 0,
    "failures" : [ ]
  }
}

    5.任务完成后  

    将旧索引index_user的别名index_user_latest 移除

    新索引index_user_v2添加别名index_user_latest

    至此完成全部的索引重建任务

# 别名替换
POST _aliases
{
  "actions": [
    {
      "add": {
        "index": "index_user_v2",
        "alias": "index_user_latest"
      }
    },
    {
      "remove": {
        "index": "index_user",
        "alias": "index_user_latest"
      }
    }
  ]
}

事后思考:

  1.执行_reindex索引迁移时,会读取当前index_user旧索引的数量 15480602条数据,将这批数据复制到新索引index_user_v2中

  但是实际生产会持续写数据到旧索引index_user中,导致reindex复制的数据,会略小于实际的数据量

  处理方式:该索引的数据是StreamSet实时同步MySQL的数据到ElasticSearch中,这里可以将StreamSet停止,记录复制开始的时间,待复制完成后进行数据的增量同步;

  2.这里有个点可以优化,_reindex复制后, wait_for_completion=false 会生成任务,可以将任务Id写入定时任务中,轮训该任务的状态,任务结束后,可以及时通知;

  

   

原文地址:https://www.cnblogs.com/july-sunny/p/14872754.html