python peewee模块执行原生sql语句的方法

由于公司要求使用peewee模块,不能使用pymysql,而我又不是特别喜欢ORM的方式,很喜欢原生sql,所以,我决定重新封装peewee

代码如下:


from peewee import MySQLDatabase


class base_peewee(object):
def __init__(self, host=None, port=3306, user=None, password=None, db_name=None):
self.db_host = host
self.db_port = int(port)
self.user = user
self.password = str(password)
self.db = db_name
self.conn = None
self.cursor = None

def connect(self):
# self.conn = MySQLDatabase(
# host=self.db_host,
# port=self.db_port,
# user=self.user,
# passwd=self.password,
# database=self.db,
# charset="utf8"
# )
self.conn = MySQLDatabase(
host="127.0.0.1",
port=3306,
user="root",
passwd="123123qwe",
database='test',
charset="utf8"
)
self.cursor = self.conn.cursor()


class ReDefinedPeeWee(base_peewee):

def __init__(self):
super(ReDefinedPeeWee, self).__init__()
self.connect()

def commit(self):
self.conn.commit()

def rollback(self):
self.conn.rollback()

def insert_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()

def update_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()

def delete_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()

def selectone_sql(self, sql, columns=None):
"""

:param sql:
:param columns: ['id', 'name'...] 要求sql的返回数据相同
:return:
"""
self.cursor.execute(sql)
self.conn.commit()
if not columns:
return self.cursor.fetchone()
else:
data = self.cursor.fetchone()
if data and len(data) == len(columns):
return dict(zip(columns, data))
else:
return data

def selectall_sql(self, sql, columns=None):
self.cursor.execute(sql)
self.conn.commit()
if not columns:
return self.cursor.fetchall()
else:
data = self.cursor.fetchall()

if len(data) > 0 and len(data[0]) == len(columns):
return [dict(zip(columns, i)) for i in data]
else:
return data

def select_sql(self, sql, value=None, columns=None):
self.cursor.execute(sql, value)
self.conn.commit()
return self.cursor.fetchall()

def close(self):
self.cursor.close()
self.conn.close()
self.conn = None
self.cursor = None


def main():
ret = ReDefinedPeeWee()
res = ret.selectone_sql("select * from test", )
print(res)
res1 = ret.selectone_sql("select * from test", ["id", 'name', "num"])
print(res1)
ret.close()


if __name__ == '__main__':
main()
 

如果用peewee写原生的方式就是这么玩

from peewee import MySQLDatabase, Model
from marshmallow import Schema, fields
from peewee import PrimaryKeyField, IntegerField, CharField, FloatField, DoubleField

# MYSQL 配置

# db = MySQLDatabase('test',
#                    user='root',
#                    password='123123qwe',
#                    host='127.0.0.1',
#                    port=3306)


db = MySQLDatabase('',
                   user='root',
                   password='',
                   host='',
                   port=37214
                   )


class BaseModel(Model):
    class Meta:
        database = db


class TestFactor(BaseModel):
    id = PrimaryKeyField()
    type = IntegerField(verbose_name="类型")
    name = CharField(verbose_name="姓名")
    num = FloatField(verbose_name="浮点")

    class Meta:
        database = db
        # order_by = ('id',)
        db_table = 'test1'


class TestFactor_(Schema):
    id = fields.Integer()
    type = fields.Integer()
    name = fields.String()
    num = fields.Float()
    name_level = fields.Method('get_name_level')

    def get_name_level(self, item):
        if item.type == 1:
            status = '正常'
        elif item.type == 2:
            status = "低危"
        elif item.type == 3:
            status = "高危"
        else:
            status = "正常"
        return status


# 健康管理监测值存储类
class HealthHouseKeeperMonitoringValue(BaseModel):
    id = PrimaryKeyField()
    user_id = IntegerField(verbose_name="用户ID")
    type_id = IntegerField(verbose_name="类型")
    monitoring_value = CharField(verbose_name="监测值")
    report_filepath = CharField(verbose_name="文档路径")
    create_time = IntegerField(verbose_name="创建时间")
    update_time = IntegerField(verbose_name="更新时间")
    status = IntegerField()

    class Meta:
        order_by = ('id',)
        db_table = 'wechat_health_housekeeper_monitoringvalue'


class HealthHouseKeeperMonitoringValueSerializer(Schema):
    id = fields.Integer()
    user_id = fields.Integer()
    type_id = fields.Integer()
    type_level = fields.Method("get_type_level")
    monitoring_value = fields.String()
    report_filepath = fields.String()
    create_time = fields.Integer()
    update_time = fields.Integer()
    status = fields.Integer()

    def get_type_level(self, item):
        if item.type_id == 1:
            status = '血压'
        elif item.type_id == 2:
            status = "心率"
        elif item.type_id == 3:
            status = "低密度脂蛋白胆固醇"
        elif item.type_id == 4:
            status = "空腹血糖"
        elif item.type_id == 5:
            status = "甘油三酯"
        elif item.type_id == 6:
            status = "糖化血红蛋白"
        elif item.type_id == 7:
            status = "总胆固醇"
        elif item.type_id == 8:
            status = "BMI"
        return status


def test():
    # get方法-单条数据
    # detail = TestFactor.get() # 只查一条,没有则报错
    # data = TestFactor_(many=False) # 展示数据
    # print(data.dump(detail))

    # get_or_none方法-单挑数据
    # detail = TestFactor.get_or_none() # 没有不报错,只查询一条
    # data = TestFactor_(many=False) # 展示数据
    # print(data.dump(detail))

    # 多条数据
    # detail = TestFactor.select()
    # many=False表示只有一条, exclude表示不展示某些列, only表示只展示某些列
    # data = TestFactor_(many=False, exclude=[], only=())
    # data = TestFactor_(many=False, exclude=[])
    # print([data.dump(i) for i in detail])

    detail = HealthHouseKeeperMonitoringValue.select().where(
        HealthHouseKeeperMonitoringValue.user_id == 180,
        HealthHouseKeeperMonitoringValue.status == 1,
        HealthHouseKeeperMonitoringValue.type_id == 1,
    ).order_by(HealthHouseKeeperMonitoringValue.create_time.desc())
    print(detail)
    data_serial = HealthHouseKeeperMonitoringValueSerializer()
    print(len(detail))
    print(data_serial.dump(detail[0]))
    # user_detail = RiskFactorSerializers(many=False, exclude=['create_time', 'id', 'chronic_id', 'user_id'])


if __name__ == '__main__':
    test()
原文地址:https://www.cnblogs.com/renfanzi/p/13469860.html