mysql-redis连接

# log 数据库连接
class LogMysql(object):
    conn = None
    cursor = None

    def __init__(self):
        self.conn = pymysql.connect(host='', user='',
                            password='',
                            database='log', charset='utf8')
        self.cursor = self.conn.cursor()

    # 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
    def dict_fetchall(self):
        "Return all rows from a cursor as a dict"
        # 获取查询字段
        columns = [col[0] for col in self.cursor.description]
        print(columns)
        return [dict(zip(columns, row)) for row in self.cursor.fetchall()]

    # 获取表列表
    def get_table_list(self):
        # 判断表是否存在
        self.cursor.execute("SHOW TABLES")
        res = self.cursor.fetchall()
        table_list = []
        for i in res:
            table_list.append(i[0])
        # print("table_list", table_list)
        return table_list


# redis主库
class Redis(object):
    conn = None

    def __init__(self):
        poll = redis.ConnectionPool(host='192.168.5.219', port=6379, db=14, password='root1234@A')
        # 本地测试
        # poll = redis.ConnectionPool(host='192.168.10.10', port=7000, db=14)
        self.conn = redis.Redis(connection_pool=poll)


class LogMysql(object):
    conn = None
    cursor = None
    table = None
    database = None

    def __init__(self, database, table):
        self.table = table
        self.database = database
        self.conn = pymysql.connect(host='rm-2zezqp8sll2swzwby.mysql.rds.aliyuncs.com', user='datacenter',
                                    password='kbs11zx@',
                                    database=self.database, charset='utf8')

        # 本地测试
        # self.conn = pymysql.connect(host='192.168.10.5', user='root',
        #                             password='root',
        #                             database='unionlog', charset='utf8')
        self.cursor = self.conn.cursor()

    # 为了方便使用一般会选择将查询结果加上字段名称以字典组的方式返回查询结果
    def dict_fetchall(self):
        "Return all rows from a cursor as a dict"
        # 获取查询字段
        columns = [col[0] for col in self.cursor.description]
        # print(columns)
        return [dict(zip(columns, row)) for row in self.cursor.fetchall()]

    # 插入数据库
    def insert_db(self, data):
        # ##################### 表名 #####################
        table = self.table
        keys = ', '.join(data.keys())
        values = ', '.join(['%s'] * len(data))
        sql = 'INSERT INTO {table}({keys}) VALUES ({values})'.format(table=table, keys=keys, values=values)
        try:
            self.cursor.execute(sql, tuple(data.values()))
            print('insert Successful')
            self.conn.commit()
            self.cursor.close()
            self.conn.close()
        except Exception as e:
            print('insert Failed')
            self.conn.rollback()
            self.cursor.close()
            self.conn.close()

    # 更新数据库
    def updata_db(self, data):
        # ##################### 表名 #####################
        table = self.table
        keys = ', '.join(data.keys())
        values = ', '.join(['%s'] * len(data))
        # 实际用的是插入语句,不过加了ON DUPLICATE KEY UPDATE(主键存在,则执行更新操作)
        sql = 'INSERT INTO {table}({keys}) VALUES ({values}) ON DUPLICATE KEY UPDATE'.format(table=table, keys=keys,
                                                                                             values=values)
        update = ','.join([" {key} = %s".format(key=key) for key in data])
        sql += update
        try:
            self.cursor.execute(sql, tuple(data.values()) * 2)
            print('update Successful')
            self.conn.commit()
            self.cursor.close()
            self.conn.close()
        except Exception as e:
            print('update Failed')
            self.cursor.close()
            self.conn.close()
    

    def update_db_sql(self, sql):
        # sql="""update contact set mobile=100088 where name='kate'"""
        try:
            self.cursor.execute(sql)  #执行sql
            self.conn.commit() #提交到数据库
            # db_cursor.execute("select * from contact")
            # ars=db_cursor.fetchall()
            # for rs in ars:
            #     print(rs)
            print('更新成功')
        except Exception as e:
            print("error to update:",e)
            self.conn.rollback()  #发生错误则回滚
            self.cursor.close()
            self.conn.close()
原文地址:https://www.cnblogs.com/xiao-xue-di/p/11865040.html