单例模式使用

使用之一:

#单例实现连接池的创建:
import threading,time,pymysql
from DBUtils.PooledDB import PooledDB

class Singleton():
    singleton_lock = threading.Lock()
    def __init__(self,*args,**kwargs):
        #负责连接池的创建,也即给单例对象进行属性的添加
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建

            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            maxshared=3,
            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host='127.0.0.1',
            port=3306,
            user='root',
            password='123',
            database='pooldb',
            charset='utf8'
        )
    def __new__(cls, *args, **kwargs):
        #负责单例对象的创建(在new方法中进行对象的创建在init方法中进行对象字段的添加)
        if not hasattr(cls,'_instance'):
            with cls.singleton_lock:
                if not hasattr(cls,'_instance'):
                    cls._instance = super(Singleton,cls).__new__(cls)
        return cls._instance
    def connect(self):
        #创建连接
        return self.pool.connection()
    def close(self):
        #关闭连接池
        return self.pool.close()
def bar():
    obj = Singleton()
    conn = obj.connect()
for i in range(5):
    t = threading.Thread(target=bar,args=(''))

使用之二:

#低版本实现数据库的连接操作:
import pymysql
sql_msg = {'host':'', 'user':'', 'password':"",'database':'', 'port':0,'charset':''}#配置文件中
class Mysqldb():
    __isinstance = None
    def __init__(self,*args,**kwargs):
        self.conn = pymysql.connect(**sql_msg)
        self.course = self.conn.cursor()

    @classmethod
    def register(cls,*args,**kwargs):
        if not cls.__isinstance:
            cls.__isinstance = Mysqldb(*args,**kwargs)
        return cls.__isinstance

    def fetch(self,sql):
        self.course.execute(sql)
        result = self.course.fetchall()
        return result
        # result = self.course.fetchmany()
        # result = self.course.fetchone()
    def close_course(self):
        self.course.close()
    def close_conn(self):
        self.conn.close()
if __name__ == '__main__':
obj = Mysqldb.register()
obj.fetch(sql)
 

使用之三(方式二的升级版):

#基于方式二的改版:
import
threading import pymysql parm_dict = {'host':'', 'user':'', 'password':"",'database':'', 'port':0,'charset':''}#配置文件中获取 class MysqlDb(object): '''基于线程锁和__new__方法来实现的''' local_lock = threading.Lock() def __init__(self): #负责给对象进行字段的添加,将连接和游标对象绑定到对象上 self.conn = pymysql.connect(**parm_dict) self.cursor = self.conn.cursor() def __new__(cls, *args, **kwargs): #负责对象的创建工作 if not hasattr(cls,'_instance'): with cls.local_lock: if not hasattr(cls,'_instance'): cls._instance = object.__new__(cls) return cls._instance def fetch(self,sql): '''将fetch方法绑定和对象进行绑定,实现sql的增删改查操作''' self.cursor.execute(sql)#进行sql的操作 result = self.cursor.fetchall()#获取sql操作的结果 return result#将结果进行返回 def close_course(self): '''将close_course方法和对象进行绑定,实现游标的关闭操作''' self.cursor.close() def close_conn(self): '''将close_conn方法和对象进行绑定,实现连接的关闭''' self.conn.close() if __name__ == '__main__': obj = Mysqldb() obj.fetch(sql)

使用之四:(tornado通过单例创建ioloop对象)


import threading
class IOLoop:
    _instance_lock = Lock()
    @staticmethod
    def instance():
        # 通过单例模式创建了一个IOLoop对象并返回
        if not hasattr(IOLoop, "_instance"):
            with IOLoop._instance_lock:
                if not hasattr(IOLoop, "_instance"):
                    IOLoop._instance = IOLoop()
        return IOLoop._instance
原文地址:https://www.cnblogs.com/aadmina/p/8260186.html