Mongo批量同步数据到SqlServer

基于业务需求,由于老版本的数据之前是存储在SqlServer中且字段经过改版都有很大的差距,需要同步mongo中的数据到SqlServer中做数据联合计算处理
1. 根据需要的数据先写Query再进行聚合及映射处理,要先知道SqlServer中的数据字段和Mongo中字段对应关系

2.批量写入数据到SqlServer

Code:

# #!/usr/bin/python3
# -*- coding: utf-8 -*-
# @Time : 2021-09-16 11:58
# @Author : BruceLong
# @FileName: yiche_mongo_to_sqlserver.py
# @Email   : 18656170559@163.com
# @Software: PyCharm
# @Blog :http://www.cnblogs.com/yunlongaimeng/
import pymssql
import time

import pymongo


class MongoToSqlServer:
    def __init__(self):
        self.cd_mongo = pymongo.MongoClient(
            "mongodb://localhost:27017/")
        self.coll = self.cd_mongo['ClientIds']['C102101']
        self.conn = pymssql.connect('127.0.0.1', 'admin', 'admin', 'DC_BitAuto_BBS')
        self.cur = self.conn.cursor()
        self.count = 0
        self.limit = 1000
        self.table = 'bitautoBBS_newByMonth'
        pass

    def selece_mongo(self):
        # ctime = time.strftime('%Y-%m-%d', time.localtime(time.time() - 60 * 60 * 24))
        ctime = '2021-09-01'
        ctime_sql = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
        while True:
            pipeline = [
                {
                    "$match":
                        {
                            "realChannel": "yiche",
                            "CreateTime": {
                                "$gte": ctime
                            }
                        }
                }
                , {
                    "$project": {
                        "_id": 0,
                        "ID": "$pid",
                        "NoOfRead": "$NumberofRead",
                        "NoOfReply": "$NumberofReplys",
                        "UserName": "$Author",
                        "UserInfo": None,
                        "ArticleTitle": "$articleTitle",
                        "ArticleContent": "$reply",
                        "ArticleTime": "$replyDate",
                        "Thread_ID": "$ThreadId",
                        "Is_Top": None,
                        "Forum": None,
                        "fname": "$ForumName",
                        "url": "$URL",
                        "CreateTime": ctime_sql
                    }
                }, {"$skip": self.count}, {"$limit": self.limit}]
            datas = self.coll.aggregate(pipeline=pipeline)
            datas = [tuple(i.values()) for i in datas]
            if not datas:
                return
            self.count += self.limit
            # print(datas)
            sql = "INSERT INTO [dbo].[{table}] VALUES ({values})".format(table=self.table,
                                                                         values=','.join(
                                                                             [" %s" for i in range(len(datas[0]))]))
            # print(sql)
            self.cur.executemany(sql, datas)
            self.conn.commit()
            print(self.count, self.limit, len(datas))
        pass

    def run(self):
        self.selece_mongo()


if __name__ == '__main__':
    mongo = MongoToSqlServer()
    mongo.run()
原文地址:https://www.cnblogs.com/yunlongaimeng/p/15343859.html