python/pandas 操作数据库

  1. (数据库)CSV操作
# 从CSV中读取数据
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)

# 读取本地CSV文件
df = pd.read_csv("C:/Users/fuqia/Desktop/example.csv", sep=',')
  1. (数据库)使用sqlite3
import pandas as pd
import altair as alt
import sqlite3

con = sqlite3.connect("pandas.db")

# 执行sql语句 ,创建表
# con.execute("CREATE TABLE t (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")

# 执行sql语句 ,创建表
# con.execute("CREATE TABLE person2(\
# id  INTEGER PRIMARY KEY AUTOINCREMENT NOT NULL,\
# time  TEXT NOT NULL,\
# name  varchar(100) NOT NULL\
# )")
 
# # 执行sql语句,向数据表中插入数据记。
# con.execute("insert into person2(time, name) values('1', 'bill')")
# 提交之前的操作
# con.commit()
 
# pandas 从数据库中读取数据
df = pd.read_sql("select id,time, name from person2", con)

# pandas 数据存入数据库
df = pd.DataFrame({'time': [11, 12, 13, 14], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
df.to_sql('person2', con, index=False)


# 从CSV中读取数据
df = pd.read_csv('pandas.csv',encoding = "utf-8",delimiter=",",error_bad_lines=False)
df
# 数据存入数据库
df.to_sql('hf', con, index=False)

  1. (数据库)mysql 读取插入pandas数据
import pandas as pd
import pymysql
import sys
from sqlalchemy import create_engine

def read_mysql_and_insert():
    # pymysql for df read_sql
    try:
        conn = pymysql.connect(host='127.0.0.1', user='root', password='pass', db='db1', charset='utf8')
    except pymysql.err.OperationalError as e:
        print('Error is ' + str(e))
        sys.exit()

    # sqlalchemy for df to_sql
    try:
        engine = create_engine('mysql+pymysql://root:pass@127.0.0.1:3306/db1')
    except sqlalchemy.exc.OperationalError as e:
        print('Error is ' + str(e))
        sys.exit()
    except sqlalchemy.exc.InternalError as e:
        print('Error is ' + str(e))
        sys.exit()

    try:
        # pymysql 读取数据库,返回df类型数据
        sql = 'select * from person2'
        df = pd.read_sql(sql, con=conn)
    except pymysql.err.ProgrammingError as e:
        print('Error is ' + str(e))
        sys.exit()

    print(df.head())
    # sqlalchemy 存取df类型数据到数据库
    df.to_sql(name='add_table',con=engine,if_exists='append',index=False)
    conn.close()
    print('ok')

if __name__ == '__main__':
    df = read_mysql_and_insert()

  1. (python模块)pymysql,sqlalchemy
    import pandas as pd
    import numpy as np
    import altair as alt

    # pymysql ,执行sql语句
    conn = pymysql.connect(host='127.0.0.1', user='root', password='xxx', db='db1', charset='utf8')
    cursor = conn.cursor()
    cursor.execute("CREATE TABLE t3 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
    
    # df 相关
    #1 pymysql + sqlalchemy ,创建engine 
    from sqlalchemy import create_engine
    engine = create_engine('mysql+pymysql://root:xxx@127.0.0.1:3306/db1')
    #2 sqlalchemy ,执行sql语句
    engine.execute("CREATE TABLE t2 (ts TIMESTAMP DEFAULT CURRENT_TIMESTAMP)")
    #3 sqlalchemy ,读取df
    sql = ''' select * from employee; '''
    df = pd.read_sql_query(sql, engine)
    print(df)
  1. (python模块)使用pymysql
import pymysql

con = pymysql.connect('localhost',
                      'root',
                      'eve136712',
                      db='model_save',
                      charset='utf8',
                      use_unicode=True)
cursor = con.cursor()

# 通过游标的execute方法执行sql语句。
cursor.execute("create table person(id varchar(32) primary key, name varchar(100) )")

  1. (python模块)使用sqlalchemy

import pandas as pd
from sqlalchemy import create_engine

# engine 创建 方法1 (还是必须要用 pymysql + sqlalchemy ,创建engine )
engine = create_engine('mysql+pymysql://root:123456@localhost:3306/test')

# engine 创建 方法2 (还是必须要用pymysql)
db_info = {'user': 'root',
           'password': '123456',
           'host': 'localhost',
           'port': 3306,
           'database': 'test'
           }
engine = create_engine('mysql+pymysql://%(user)s:%(password)s@%(host)s:%(port)d/%(database)s?charset=utf8' % db_info,
                       encoding='utf-8')

# sqlalchemy 读取数据库,返回df类型数据
sql = ''' select * from employee; '''
df = pd.read_sql_query(sql, engine)
print(df)

df = pd.DataFrame({'id': [1, 2, 3, 4], 'name': ['zhangsan', 'lisi', 'wangwu', 'zhuliu']})
# 将df数据储存为MySQL中的数据表,储存index列
df.to_sql('mydf', engine, index=True)


# 将新建的DataFrame储存为MySQL中的数据表,不储存index列(index=False)
# if_exists:
# 1.fail:如果表存在,啥也不做
# 2.replace:如果表存在,删了表,再建立一个新表,把数据插入
# 3.append:如果表存在,把数据插入,如果表不存在创建一个表!!
# 方式1
pd.io.sql.to_sql(df, 'example', con=engine, index=False, if_exists='replace')
# 方式2
df.to_sql('example', con=engine,  if_exists='replace')


  1. (python模块)使用Django (应用)

def mysql_to_pd(case_name='XXXXXTraffic'):
    report_details_by_casename = ReportDetail.objects.filter(case_name=case_name)
    details_by_case_name_df = pd.DataFrame(
        list(report_details_by_casename.values(
            'report_create_time',
            'platform_name',
            'throughputd',
            'throughputu',
            'case_name', )))
    df = details_by_case_name_df.rename(columns={'platform_name': 'HW'})
    df['time'] = df['report_create_time'].dt.strftime('%Y-%m-%d')
    df.set_index(pd.to_datetime(df["time"]), inplace=True)
    df = df.sort_index()
    return df

data=mysql_to_pd()



  1. (python模块)使用df+mysql (应用)
import pandas as pd
import numpy as np
import altair as alt
import pymysql
from sqlalchemy import create_engine
    
engine = create_engine('mysql+pymysql://root:password@127.x.x.x:3306/db5')
sql = ''' select report_uuid  from report_reportinfo; '''
df = pd.read_sql_query(sql, engine)
df
  1. (python)动态创建mysql表【自定义模块】
# 此代码实现将dict写入mysql指定表中。如果指定表不存在,则根据dict中key的字段创建表,然后将dict写入表中

import pymysql
from scrapy.conf import settings


class DataToMysql:
    def __init__(self, host, user, passwd, db, port):
        try:
            self.conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db,
                                        port=3306, charset='utf8')  # 链接数据库
            self.cursor = self.conn.cursor()
        except pymysql.Error as e:
            print("数据库连接信息报错")
            raise e

    def write(self, table_name, info_dict):
        """
        根据table_name与info自动生成建表语句和insert插入语句
        :param table_name: 数据需要写入的表名
        :param info_dict: 需要写入的内容,类型为字典
        :return:
        """
        sql_key = ''  # 数据库行字段
        sql_value = ''  # 数据库值
        for key in info_dict.keys():  # 生成insert插入语句
            sql_value = (sql_value + '"' + pymysql.escape_string(info_dict[key]) + '"' + ',')
            sql_key = sql_key + ' ' + key + ','

        try:
            self.cursor.execute(
                "INSERT INTO %s (%s) VALUES (%s)" % (table_name, sql_key[:-1], sql_value[:-1]))
            self.conn.commit()  # 提交当前事务
        except pymysql.Error as e:
            if str(e).split(',')[0].split('(')[1] == "1146":  # 当表不存在时,生成建表语句并建表
                sql_key_str = ''  # 用于数据库创建语句
                columnStyle = ' text'  # 数据库字段类型
                for key in info_dict.keys():
                    sql_key_str = sql_key_str + ' ' + key + columnStyle + ','
                self.cursor.execute("CREATE TABLE %s (%s)" % (table_name, sql_key_str[:-1]))
                self.cursor.execute("INSERT INTO %s (%s) VALUES (%s)" %
                                    (table_name, sql_key[:-1], sql_value[:-1]))
                self.conn.commit()  # 提交当前事务
            else:
                raise


if __name__ == '__main__':
    mysql = DataToMysql('localhost','root','******','pythonDB')
    di = {"A": "A", "B": "B"}
    mysql.write('te', di)

  1. (python)动态创建mysql表中的字段【自定义模块】【good】

def check_column_exist(mysql_conn, column_name):
    mysql_sql = f'''SHOW COLUMNS FROM `report_parse_soap_result` LIKE '{column_name[:column_name.index(':')]}';'''
    return bool(mysql_conn.cursor().execute(mysql_sql))


def add_column(mysql_conn, column_name):
    try:
        mysql_sql = f"ALTER TABLE `report_parse_soap_result` " \
                    f"ADD COLUMN `{column_name[:column_name.index(':')]}` INT NULL ;"
        mysql_conn.cursor().execute(mysql_sql)
    except Exception as e:
        print('add_column:', e)


def save_one_soap_result(mysql_conn,report_uuid,soap_result):
    (column_name, value), = soap_result.items()
    if check_column_exist(mysql_conn, column_name):
        try:
            sql = f"UPDATE report_parse_soap_result SET `{column_name[:column_name.index(':')]}` = {value} " \
                  f'where fk_report_uuid_id= "{report_uuid}"'
            mysql_conn.cursor().execute(sql)
            mysql_conn.commit()
        except Exception as e:
            print('save_one_soap', e)

    else:
        add_column(mysql_conn, column_name)


def save_soap_result_list(mysql_conn, report_uuid, soap_result_list):
    try:
        sql = f'''INSERT INTO report_parse_soap_result (`fk_report_uuid_id`) VALUES ( '{report_uuid}');'''
        mysql_conn.cursor().execute(sql)
        mysql_conn.commit()
    except Exception as e:
        print(e)
    for result in soap_result_list:
        save_one_soap_result(mysql_conn, report_uuid, result)



# 使用
from test import save_soap_result_list

host = '10.101.35.249'
user = 'xxx'
password = 'xxx'
db = 'db'
conn = pymysql.connect(host=host, user=user, passwd=password, db=db, port=3306, charset='utf8')

ojb = RobotXML('output.xml')
report_uuid = ojb.report_reportinfo_dic['report_uuid']
soap_result_list = ojb.report_soap_parser_result
save_soap_result_list(conn, report_uuid, soap_result_list)

原文地址:https://www.cnblogs.com/amize/p/13942598.html