python实现elasticsearch的update操作,不改变原数据,增加字段,或者查询更新

直接上代码:

from elasticsearch import Elasticsearch

ES_URL = 'http://172.30.3.57:9200/'

es_client = Elasticsearch(ES_URL)

def get_data_update_data():
    query = {"size": 1000000}
    res = es_client.search(index='nnnlog-2021-04-28', body=query)['hits']['hits']
    # res = es_client.search(index='nnn', body=query)['hits']['hits']
    print(len(res))
    i = 0
    for log in res:
        operator_id = log['_source']['operator_id']
        # operator_id = log['_source']['name']
        operator_name = 'name_' + operator_id.split('-')[1]
        log['_source'].update({'operator_name': operator_name})
        es_client.update( # 此处可能发生es超时,建议异常处理
            index=log['_index'],
            doc_type='_doc',
            id=log['_id'],
            body={'doc':log['_source']} # 用map包裹数据
        )
        i += 1
        print('update: ', i)

if __name__ == "__main__":
    get_data_update_data()

如果是通过查询来更新的操作,update_by_query,参照一下代码实现:

# kibana
POST businesslog-2021-05-31/_update_by_query
{
  "query": {
    "term": {
      "operator_id.keyword": "operator-0" 
    }
  },
  "script": {
    "lang": "painless",
    "source": "ctx._source.operator_name=params.operator_name",
    "params": {
      "operator_name": "name_0"
    }
  }
}

# python
query = {
  "query": {
    "term": {
      "operator_id.keyword": "operator-0" 
    }
  },
  "script": {
    "lang": "painless",
    "source": "ctx._source.operator_name=params.operator_name",
    "params": {
      "operator_name": "name_0"
    }
  }
}
res = es_client.update_by_query(index='xxx', body=query)

此类更新可以通过查询,批量更新,只要符合查询条件的都可以update,速度很快。

原文地址:https://www.cnblogs.com/davis12/p/14813532.html