将数据导入MongoDB集群与MySQL


import sys
import json
import pymongo
import datetime
from pymongo import MongoClient

client = MongoClient('mongodb://192.168.1.31:20000,192.168.1.34:20000')
db = client.RHY
collection = db.ST_RIVER_R

f = open("D:/bigdata/st_river_r.CSV")
line = f.readline()
print(line)
fieldNames = line.split(',')
# STCD,TM,Z,Q,XSA,XSAVV,XSMXV,FLWCHRCD,WPTN,MSQMT,MSAMT,MSVMT
line = f.readline()
count = 0
records = []
insertCount = 0
while line:
     #
     count = count + 1
     fieldValues = line.split(',')
     if len(fieldValues) == 12 or fieldValues[0].strip() != '':
         insertObj = {}
         STCD = fieldValues[0]
         insertObj['STCD'] = STCD
         TM = fieldValues[1]
         if TM.strip() != '':
             TM = datetime.datetime.strptime(TM, '%Y-%m-%d %H:%M:%S')
             insertObj['TM'] = TM
         Z = fieldValues[2]
         if Z.strip() != '':
             Z = float(Z)
             insertObj['Z'] = Z
         Q = fieldValues[3]
         if Q.strip() != '':
             Q = float(Q)
             insertObj['Q'] = Q
         # XSA
         XSA = fieldValues[4]
         if XSA.strip() != '':
             XSA = float(XSA)
             insertObj['XSA'] = XSA
         # XSAVV
         XSAVV = fieldValues[5]
         if XSAVV.strip() != '':
             XSAVV = float(XSAVV)
             insertObj['XSAVV'] = XSAVV
         #
         XSMXV = fieldValues[6]
         if XSMXV.strip() != '':
             XSMXV = float(XSMXV)
             insertObj['XSMXV'] = XSMXV
         #
         FLWCHRCD = fieldValues[7]
         if FLWCHRCD.strip() != '':
             insertObj['FLWCHRCD'] = FLWCHRCD
         #
         WPTN = fieldValues[8]
         if WPTN.strip() != '':
             insertObj['WPTN'] = WPTN
         #
         MSQMT = fieldValues[9]
         if MSQMT.strip() != '':
             insertObj['MSQMT'] = MSQMT
         #
         MSAMT = fieldValues[10]
         if MSAMT.strip() != '':
             insertObj['MSAMT'] = MSAMT
         #
         MSVMT = fieldValues[11]
         if MSVMT.strip() != '':
             insertObj['MSVMT'] = MSVMT
         #
         # collection.insert_one(insertObj)
         # collection.insert_many(new_posts)
         records.append(insertObj)
         if len(records) == 1000:
             insertCount = insertCount + 1
             if count > 1451000:
                 collection.insert_many(records)
                 print(str(count) + '  ' + str(insertCount))
             print(count)
             records = []
     else:
         print(line)
     #
     line = f.readline()

f.close()
client.close()


------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------



import sys
import json
import math
import copy
import pymongo
import datetime
from pymongo import MongoClient
import shapefile
import pymysql

sf = shapefile.Reader(r'E:/Ambari/ubuntu/mapdata/aircraftPositionLine50.shp')
fields = sf.fields
shapes = sf.shapes()
count = len(shapes)
print('count: ' + str(count))
fieldName = []
for index in range(len(fields)):
     if index > 0:
         field = fields[index]
         # print(field)
         fieldName.append(field[0])
#print(fieldName)
#

db = pymysql.connect("127.0.0.1","root","gis","acms" )
cursor = db.cursor()
sql = "INSERT INTO airline_r(id, code, name, time_index, x, y, z, angle) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)"
  
for index in range(count):
     preX = None
     preY = None
     preZ = None
     angle = None

    features = []
     record = sf.record(index)
     attribute = record[0:len(fields)]
     attribute[0] = index
     print(attribute)
     shap = shapes[index]
     points = shap.points
     pointCount = len(points)

    for i in range(pointCount):
         coordinate = shap.points[i]
         x = coordinate[0]
         y = coordinate[1]
         z = (0 if (len(coordinate) < 3) else coordinate[2])
         if preX != None:
             angle = math.atan2(y-preY, x - preX)
             feature = copy.deepcopy(attribute)
             feature.append(i-1)
             feature.append(preX)
             feature.append(preY)
             feature.append(preZ)
             feature.append(angle)
             print(feature)
             features.append(tuple(feature))
             #cursor.execute(sql % tuple(feature))
             #cursor.execute(sql, feature)
         if i == pointCount -1:
             feature = copy.deepcopy(attribute)
             feature.append(i)
             feature.append(x)
             feature.append(y)
             feature.append(z)
             feature.append(angle)
             print(feature)
             features.append(tuple(feature))
             #cursor.execute(sql % tuple(feature))
             #cursor.execute(sql, feature)
         preX = x
         preY = y
         preZ = z
     #print(features)
     cursor.executemany(sql, features)
     db.commit()
     '''  
     try:
         # 执行sql语句
         cursor.executemany(sql, features)
         # 提交到数据库执行
         db.commit()
     except:
         # 如果发生错误则回滚
         print()
         db.rollback()
     '''
# 关闭数据库连接
db.close()


'''
client = MongoClient('mongodb://192.168.1.31:20000,192.168.1.34:20000')
db = client.RHY
collection = db.ST_RIVER_R

f = open("D:/bigdata/st_river_r.CSV")
line = f.readline()
print(line)
fieldNames = line.split(',')
# STCD,TM,Z,Q,XSA,XSAVV,XSMXV,FLWCHRCD,WPTN,MSQMT,MSAMT,MSVMT
line = f.readline()
count = 0
records = []
insertCount = 0
while line:
     #
     count = count + 1
     fieldValues = line.split(',')
     if len(fieldValues) == 12 or fieldValues[0].strip() != '':
         insertObj = {}
         STCD = fieldValues[0]
         insertObj['STCD'] = STCD
         TM = fieldValues[1]
         if TM.strip() != '':
             TM = datetime.datetime.strptime(TM, '%Y-%m-%d %H:%M:%S')
             insertObj['TM'] = TM
         Z = fieldValues[2]
         if Z.strip() != '':
             Z = float(Z)
             insertObj['Z'] = Z
         Q = fieldValues[3]
         if Q.strip() != '':
             Q = float(Q)
             insertObj['Q'] = Q
         # XSA
         XSA = fieldValues[4]
         if XSA.strip() != '':
             XSA = float(XSA)
             insertObj['XSA'] = XSA
         # XSAVV
         XSAVV = fieldValues[5]
         if XSAVV.strip() != '':
             XSAVV = float(XSAVV)
             insertObj['XSAVV'] = XSAVV
         #
         XSMXV = fieldValues[6]
         if XSMXV.strip() != '':
             XSMXV = float(XSMXV)
             insertObj['XSMXV'] = XSMXV
         #
         FLWCHRCD = fieldValues[7]
         if FLWCHRCD.strip() != '':
             insertObj['FLWCHRCD'] = FLWCHRCD
         #
         WPTN = fieldValues[8]
         if WPTN.strip() != '':
             insertObj['WPTN'] = WPTN
         #
         MSQMT = fieldValues[9]
         if MSQMT.strip() != '':
             insertObj['MSQMT'] = MSQMT
         #
         MSAMT = fieldValues[10]
         if MSAMT.strip() != '':
             insertObj['MSAMT'] = MSAMT
         #
         MSVMT = fieldValues[11]
         if MSVMT.strip() != '':
             insertObj['MSVMT'] = MSVMT
         #
         # collection.insert_one(insertObj)
         # collection.insert_many(new_posts)
         records.append(insertObj)
         if len(records) == 1000:
             insertCount = insertCount + 1
             if count > 1451000:
                 collection.insert_many(records)
                 print(str(count) + '  ' + str(insertCount))
             print(count)
             records = []
     else:
         print(line)
     #
     line = f.readline()

f.close()
client.close()
'''

原文地址:https://www.cnblogs.com/gispathfinder/p/9286749.html