mongodb数据迁移到新库并同时写入ES

工作中遇到一个需求,要将旧系统的mongodb数据库全部迁移至新的系统中。新旧系统的数据结构不一致,旧系统设计的是两张表,新系统是一张。字段也发生了变化。

1、实现方案

连接mongodb数据库,逐条读取数据,并重新组装。最后写入新库和ES。

程序实现并不复杂,但有几个注意的地方,记录一下。本文没有详细讲述具体的模块使用方法,如果需要就自行百度下。这种工具程序的业务定制化程度很高,无法完成通用的任务,仅供参考。

完整的程序到github下载。下载地址

2、结构

conf 配置文件目录
logs 日志文件目录
util 工具目录
data_migrate.py  入库文件main
mongo_connect.py mongodb连接类
write_es.py   ES连接类

3、使用

修改配置文件后,就可以使用了。里边的数据结构转换的代码按照具体的业务逻辑转换。
全局配置文件

填写数据库配置信息

#读取数据库配置
[MongodbR]
host =
port =
user =
password =
db =  数据库1
db2 =
coll =  集合1
coll2 =
#写入数据库配置
[MongodbW]
host =
port =
user =
password =
db =
coll =

日志配置文件

需要修改日志文件路径,报警级别

[handler_fileHandler]
class=logging.handlers.RotatingFileHandler
level=DEBUG
formatter=fmt
args=('./logs/logdemo.log','a',20000,5,'utf-8')

4、代码解读

mongodb

mongodb连接超时问题
通常我们查询的使用res=coondocinfo.find() 一般来说没有问题,此时出来的只是游标,而不是数据结果集。所以在处理过程中需要长时间保持数据库的连接,
但默认的mongodb数据库连接只有十分钟,如果读取大量数据的时候十分钟显然不够用,这项目一年的数据量有上亿条。所以需要使用:
with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor 这种方式保持mongodb连接,否则会报错

with coondocinfo.find({"ch": 1,'it': {"$gte": stime, "$lt": etime}},no_cursor_timeout=True) as cursor:
  for i in cursor:
    logging.info("正在处理的_id是%s" % i['_id'])
    #读取doctext库,获取正文数据,已处理成字符串
    try:
      content = list(coondoctext.find({'_id' : i['_id']},{'content':1,'_id':0}))[0]['content']
    except Exception as e:
      logging.error(e)    

mongodb字段_ID问题

ES

写入ES
写入es的时候需要注意,

id = data['_id']如果是mongodb读出来的数据会有_id这个字段,而es中也会默认给一个id字段,所以需要将数据结构的id删除掉。否则会报错。
  del data['_id']
  add(index='allchannel-iksmart', body=data,id=str(id))

写入索引

body = {"name": "long", "age": 11,"height": 111}
add(index=index_name,body=body,id=1)

:param index: 索引名称
:param body:文档内容
:param id: 是否指定id,如不指定就会使用生成的字符串

MAIN

stime 查询起始时间
etime 查询截止时间
formatdata 主逻辑函数

if __name__ == '__main__':
  stime = 1577808000
  etime = 1580486400
  Data_migrate().formatdata(stime=stime,etime=etime)

原文地址:https://www.cnblogs.com/zhaobowen/p/13269383.html