基于业务需求,由于老版本的数据之前是存储在SqlServer中且字段经过改版都有很大的差距,需要同步mongo中的数据到SqlServer中做数据联合计算处理
1. 根据需要的数据先写Query再进行聚合及映射处理,要先知道SqlServer中的数据字段和Mongo中字段对应关系
2.批量写入数据到SqlServer
Code:
# #!/usr/bin/python3 # -*- coding: utf-8 -*- # @Time : 2021-09-16 11:58 # @Author : BruceLong # @FileName: yiche_mongo_to_sqlserver.py # @Email : 18656170559@163.com # @Software: PyCharm # @Blog :http://www.cnblogs.com/yunlongaimeng/ import pymssql import time import pymongo class MongoToSqlServer: def __init__(self): self.cd_mongo = pymongo.MongoClient( "mongodb://localhost:27017/") self.coll = self.cd_mongo['ClientIds']['C102101'] self.conn = pymssql.connect('127.0.0.1', 'admin', 'admin', 'DC_BitAuto_BBS') self.cur = self.conn.cursor() self.count = 0 self.limit = 1000 self.table = 'bitautoBBS_newByMonth' pass def selece_mongo(self): # ctime = time.strftime('%Y-%m-%d', time.localtime(time.time() - 60 * 60 * 24)) ctime = '2021-09-01' ctime_sql = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) while True: pipeline = [ { "$match": { "realChannel": "yiche", "CreateTime": { "$gte": ctime } } } , { "$project": { "_id": 0, "ID": "$pid", "NoOfRead": "$NumberofRead", "NoOfReply": "$NumberofReplys", "UserName": "$Author", "UserInfo": None, "ArticleTitle": "$articleTitle", "ArticleContent": "$reply", "ArticleTime": "$replyDate", "Thread_ID": "$ThreadId", "Is_Top": None, "Forum": None, "fname": "$ForumName", "url": "$URL", "CreateTime": ctime_sql } }, {"$skip": self.count}, {"$limit": self.limit}] datas = self.coll.aggregate(pipeline=pipeline) datas = [tuple(i.values()) for i in datas] if not datas: return self.count += self.limit # print(datas) sql = "INSERT INTO [dbo].[{table}] VALUES ({values})".format(table=self.table, values=','.join( [" %s" for i in range(len(datas[0]))])) # print(sql) self.cur.executemany(sql, datas) self.conn.commit() print(self.count, self.limit, len(datas)) pass def run(self): self.selece_mongo() if __name__ == '__main__': mongo = MongoToSqlServer() mongo.run()