python-- 使用 psycopg2 操作 PostgreSQL 或使用连接池 DBUtils.PooledDB 操作 PostgreSQL

需要安装的软件和第三方包
python need install pip and petl DButils psycopg2 libs

command is:

yum install python-pip
pip install psycopg2
pip install petl
pip install DButils

yum install postgresql-devel*

1、使用 psycopg2 操作 PostgreSQL

Python:使用psycopg2模块操作PostgreSQL
python无网安装psycopg2(操作postgresql,附操作代码)


2、使用连接池 DBUtils.PooledDB 操作 PostgreSQL

参考代码:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import datetime
import sys
import os
import configparser
import logging
import psycopg2

from  DBUtils.PooledDB import PooledDB

class  DatabaseOperator(object):
    '''class for database operator
    '''
    def __init__(self, database_config_path, database_config=None):
        '''Constructor
        '''
        self._database_config_path = database_config_path
        # load database configuration
        if not database_config :
            self._database_config = self.parse_postgresql_config(database_config_path)
        else:
            self._database_config = database_config
        self._pool = None

    def database_config_empty(self):
        if self._database_config:
            return False
        else:
            return True
        
    def parse_postgresql_config(self, database_config_path=None):
        '''解析数据库配置文件参数
        database_config_path: 数据库配置文件路径
        --------
        返回值: 解析配置属性, 以 dict 类型返回数据库配置参数 config
        '''
        if database_config_path == None and self._database_config_path != None:
            database_config_path = self._database_config_path
        if not os.path.isfile(database_config_path):
            sys.exit("ERROR: Could not find configuration file: {0}".format(database_config_path))
        parser = configparser.SafeConfigParser()
        parser.read(database_config_path)
        config = {}
        config['database'] = parser.get('testdb', 'Database')
        config['db_user'] = parser.get('testdb', 'UserName')
        config['db_passwd'] = parser.get('testdb', 'Password')
        config['db_port'] = parser.getint('testdb', 'Port')
        config['db_host'] = parser.get('testdb', 'Servername')
        self._database_config = config
        return config  
    
    def get_pool_conn(self):
        if not self._pool:
            self.init_pgsql_pool()
        return self._pool.connection()
        
    def init_pgsql_pool(self):
        '''初始化连接池
        '''
        # 字典 config 是否为空
        config = self.parse_postgresql_config()
        POSTGREIP = config['db_host']
        POSTGREPORT = config['db_port']
        POSTGREDB = config['database']
        POSTGREUSER = config['db_user']
        POSTGREPASSWD = config['db_passwd']
        try:
            logging.info('Begin to create {0} postgresql pool on:{1}.
'.format(POSTGREIP, datetime.datetime.now()))
            pool = PooledDB(
                creator = psycopg2,  # 使用连接数据库的模块 psycopg2
                maxconnections = 6,  # 连接池允许的最大连接数,0 和 None 表示不限制连接数
                mincached = 1,       # 初始化时,链接池中至少创建的空闲的链接,0 表示不创建
                maxcached = 4,       # 链接池中最多闲置的链接,0 和 None 不限制
                blocking = True,     # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
                maxusage = None,     # 一个链接最多被重复使用的次数,None 表示无限制
                setsession = [],     # 开始会话前执行的命令列表
                host = POSTGREIP,
                port = POSTGREPORT,
                user = POSTGREUSER,
                password = POSTGREPASSWD,
                database = POSTGREDB)
            self._pool = pool
            logging.info('SUCCESS: create {0} postgresql pool success on {1}.
'.format(POSTGREIP, datetime.datetime.now()))
                    
        except Exception as e:
            logging.error('ERROR: create {0} postgresql pool failed on {1}.
'.format(POSTGREIP, datetime.datetime.now()))
            self.close_db_cursor()
            sys.exit('ERROR: create postgresql pool error caused by {0}'.format(str(e)))
            
    def pg_select_operator(self, sql):
        '''进行查询操作,函数返回前关闭 cursor, conn
        '''
        # 执行查询
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result = cursor.fetchall()
        except Exception as e:
            logging.error('ERROR: execute {0} causes error'.format(sql))
            sys.exit('ERROR: load data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.close()       
        return result

    def exec_sql(self, sql):
        result = False
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()    
        return result
    
    def pg_update_operator(self, sql): 
        result = False
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()      
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: update data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()    
        return result

    def pg_delete_operator(self, sql):
        result = False
        # 执行查询
        try:
            conn = self.get_pool_conn()
            cursor = conn.cursor()   
            cursor.execute(sql)
            result =  True
        except Exception as e:
            logging.error('ERROR: execute  {0} causes error'.format(sql))
            sys.exit('ERROR: delete data from database error caused {0}'.format(str(e)))
        finally:
            cursor.close()
            conn.commit()
            conn.close()       
        return result
    
    def close_pool(self):
        '''关闭 pool
        '''
        if self._pool != None:
            self._pool.close()

    def test_pool_select(self):
        sql = 'select * from student'
        result = self.pg_select_operator(sql)
        print(result)

    def test_pool_exec(self, sqlscript):
        result = self.exec_sql(sqlscript)
        print(result)
        return result
            
if __name__ == '__main__':
    path = "sql_conf_test.conf"
    # 测试 select
    db = DatabaseOperator(database_config_path=path)
    db.test_pool_select()

    # 测试 insert
    db.test_pool_exec("insert into student(name, age) values('test', 20);")
    # 关闭 pool
    db.close_pool()

  sql_conf_test.conf

[testdb]
Database = test
UserName = xxx
Password = xxx
Port = 5432
Servername = 127.0.0.1

  执行 python .DatabaseOperator.py,结果:

  查看数据库

===========================================================

def pg_insert_operator(self, sql):
    '''进行插入操作,函数返回前关闭 cursor, conn
        返回:影响的记录条数
    '''
    result = 0
    try:
        conn = self.get_pool_conn()
        cursor = conn.cursor()      
        cursor.execute(sql)
    except Exception as e:
        logging.error('ERROR: execute  {0} causes error'.format(sql))
        sys.exit('ERROR: insert data from database error caused {0}'.format(str(e)))
    finally:
        conn.commit()
        result = cursor.rowcount
        cursor.close()
        conn.close()    
    
    return result

---

原文地址:https://www.cnblogs.com/xy-ouyang/p/12987676.html