ORM框架

ykorm:

import re

class CountError(Exception):

  def __init__(self,msg):

    self.msg = msg

class Field:

  def __init__(self,name,column_type,primary_key,default):

    self.name = name

    self.column_type = column_type

    self.primary_key = primary_key

    self.default = default

class StringField(Field):

  def __init__(self,name,column_type='varchar(200)',primary_key=False,default=None):

    super().__init__(name,column_type,primary_key,default)

class IntegerField(Field):

  def __init__(self,name,column_type='int',primary_key=False,default=None):

    super().__init__(name,column_type,primary_key,default)

class ModelMeta(type):

  def __init__(cls,cls_name,bases,namespace):

    if cls_name == 'Models':

      return type.__init__(cls,cls_name,bases,namespace)

    table_name = cls_name.lower()

    tags = []

    for k,v in namespace.items():

      tag = ""

      if isinstance(v,Field):

        tag += v.name + " " + v.column_type

        if v.primary_key:

          tag+= " primary key auto_increment"

        if v.default is not None:

          if isinstance(v.default,str):

            tag += " default '%s'" %v.default

          elif isinstance(v.default,int):

            tag += " default &s"%v.default

          else:

            raise TypeError("该默认类型不支持")

        tags.append(tag)

    tag_str = ','.join(tags)

    sql = "create table %s(%s)"%(table_name,tag_str)

    if len(re.findall('primary',sql)) !=1:

      raise CountError("主键个数错误")

    print(sql)

    from .ykorm_tool import Mysql

    Mysql.create(sql)

    return type.__init__(cls,cls_name,bases,namespace)

class Models(metaclass=ModelMeta):

  def __str__(self):

    return self.__class__.__name__ + ":" + str(self.__dict__)

ykorm_pool:

import pymysql,time

class Connection:

  def create_conn(self):

    return pymysql.connect(host="127.0.0.1",port=3306,user="root",password="root",charset="utf8",db="yk3",autocommit=True)

  def __init__(self,max_count=5,timeout=0.5):

    self.max_count = max_count

    self.timeout = timeout

    self.pool=[]

    self.current_count =2

    for i in range(self.current_count):

      conn = self.create_conn()

      self.pool.append(conn)

  def execute(self,sql,args=None,is_select=False):

    while True:

      if not self.pool:

        if self.current_count < self.max_count:

          conn = self.create_conn()

          self.current_count += 1

          self.pool.append(conn)

        else:

          print("在等待")

          time.sleep(self.timeout)

      else:

        break

    conn = self.pool.pop()

    cursor = conn.cursor(pymysql.cursors.DictCursor)

    affect_row = 0

    try:

      affect_row = cursor.execute(sql,args)

    execept pymysql.err.IntegerError as e:

      print(e)

    if is_select:

      fetch_all = cursor.fetchall()

      self.pool.append(conn)

      return fetch_all

    self.pool.append(conn)

    return affect_row

ykorm_tool:

from ykorm_pool import Connection

class Mysql:

  __conn = Connection(max_count =2)

  @classmethod

  def create(cls,sql):

    return cls._conn.execute(sql)

  @classmethod

  def get_class_name(cls,obj):

    return obj.__class__.__name__.lower()

  @classmethod

  def save(cls,obj):

    table_name = cls.get_class_name(obj)

    col_str = ""

    val_str = ""

    args = [] 

    for k,v in obj.__dict__items():

      col_str += k+","

      val_str += "%s,"

      args.append(v)

    col_str = col_str[0:-1]

    val_str = val_str[0:-1]

    sql = "insert into %s(%s) values(%s)"%(table_name,col_str,val_str)

    return cls.__conn.execute(sql,args)

  @classmethod

  def delete(cls,obj):

    table_name = cls.get_class_name(0bj)

    sql  ="delete from"+table_name+" where id = %s"

    args = [obj.id]

    return cls.__conn.execute(sql,args)

  @classmethod

  def update(cls,obj):

    table_name = cls.get_class_name(obj)

    tags = []

    args = []

    for k,v in obj.__dict__.items():

      tag += "" +k+"=%s"

      tags.append(tag)

      args.append(v)

    tag_str = ','.join(tags)

    sql = "update %s set %s where id=" %(table_name,tag_str)

    sql +='%s'

    args.append(obj.id)

    return cls.__conn.execute(sql,args)

  @classmethod

  def select_by_id(cls,class_name,id):

    table_name = class_name.__name__.lower()

    sql = "select * from "+table_name + " where id=%s"

    args = [id]

    res = cls.__conn.execute(sql,args,is_select=True)

    if not res:

      return None

    obj_dic = res[0]

    obj = object.__new__(class_name)

    obj.__dict__ = obj_dic

    return obj

  @classmethod

  def select_many(cls,class_name,conditions=None):

    table_name = class_name.__name__.lower()

    if conditions:

      sql += " " + conditions

    res = cls.__conn.exexute(sql,is_select=True)

    if not res:

      return None

    objs = []

    for dic in res:

      obj = object.__new__(class_name)

      obj.__dict__ = dic

      objs.append(obj)

    return objs

__init__

from ykorm_tool import Mysql

from ykorm import StringField,IntegerField,Models

原文地址:https://www.cnblogs.com/suncunxu/p/10401621.html