由于公司要求使用peewee模块,不能使用pymysql,而我又不是特别喜欢ORM的方式,很喜欢原生sql,所以,我决定重新封装peewee
代码如下:
from peewee import MySQLDatabase
class base_peewee(object):
def __init__(self, host=None, port=3306, user=None, password=None, db_name=None):
self.db_host = host
self.db_port = int(port)
self.user = user
self.password = str(password)
self.db = db_name
self.conn = None
self.cursor = None
def connect(self):
# self.conn = MySQLDatabase(
# host=self.db_host,
# port=self.db_port,
# user=self.user,
# passwd=self.password,
# database=self.db,
# charset="utf8"
# )
self.conn = MySQLDatabase(
host="127.0.0.1",
port=3306,
user="root",
passwd="123123qwe",
database='test',
charset="utf8"
)
self.cursor = self.conn.cursor()
class ReDefinedPeeWee(base_peewee):
def __init__(self):
super(ReDefinedPeeWee, self).__init__()
self.connect()
def commit(self):
self.conn.commit()
def rollback(self):
self.conn.rollback()
def insert_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()
def update_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()
def delete_sql(self, sql, value=None, commit=None):
self.cursor.execute(sql, value)
if commit:
self.commit()
def selectone_sql(self, sql, columns=None):
"""
:param sql:
:param columns: ['id', 'name'...] 要求sql的返回数据相同
:return:
"""
self.cursor.execute(sql)
self.conn.commit()
if not columns:
return self.cursor.fetchone()
else:
data = self.cursor.fetchone()
if data and len(data) == len(columns):
return dict(zip(columns, data))
else:
return data
def selectall_sql(self, sql, columns=None):
self.cursor.execute(sql)
self.conn.commit()
if not columns:
return self.cursor.fetchall()
else:
data = self.cursor.fetchall()
if len(data) > 0 and len(data[0]) == len(columns):
return [dict(zip(columns, i)) for i in data]
else:
return data
def select_sql(self, sql, value=None, columns=None):
self.cursor.execute(sql, value)
self.conn.commit()
return self.cursor.fetchall()
def close(self):
self.cursor.close()
self.conn.close()
self.conn = None
self.cursor = None
def main():
ret = ReDefinedPeeWee()
res = ret.selectone_sql("select * from test", )
print(res)
res1 = ret.selectone_sql("select * from test", ["id", 'name', "num"])
print(res1)
ret.close()
if __name__ == '__main__':
main()
如果用peewee写原生的方式就是这么玩
from peewee import MySQLDatabase, Model from marshmallow import Schema, fields from peewee import PrimaryKeyField, IntegerField, CharField, FloatField, DoubleField # MYSQL 配置 # db = MySQLDatabase('test', # user='root', # password='123123qwe', # host='127.0.0.1', # port=3306) db = MySQLDatabase('', user='root', password='', host='', port=37214 ) class BaseModel(Model): class Meta: database = db class TestFactor(BaseModel): id = PrimaryKeyField() type = IntegerField(verbose_name="类型") name = CharField(verbose_name="姓名") num = FloatField(verbose_name="浮点") class Meta: database = db # order_by = ('id',) db_table = 'test1' class TestFactor_(Schema): id = fields.Integer() type = fields.Integer() name = fields.String() num = fields.Float() name_level = fields.Method('get_name_level') def get_name_level(self, item): if item.type == 1: status = '正常' elif item.type == 2: status = "低危" elif item.type == 3: status = "高危" else: status = "正常" return status # 健康管理监测值存储类 class HealthHouseKeeperMonitoringValue(BaseModel): id = PrimaryKeyField() user_id = IntegerField(verbose_name="用户ID") type_id = IntegerField(verbose_name="类型") monitoring_value = CharField(verbose_name="监测值") report_filepath = CharField(verbose_name="文档路径") create_time = IntegerField(verbose_name="创建时间") update_time = IntegerField(verbose_name="更新时间") status = IntegerField() class Meta: order_by = ('id',) db_table = 'wechat_health_housekeeper_monitoringvalue' class HealthHouseKeeperMonitoringValueSerializer(Schema): id = fields.Integer() user_id = fields.Integer() type_id = fields.Integer() type_level = fields.Method("get_type_level") monitoring_value = fields.String() report_filepath = fields.String() create_time = fields.Integer() update_time = fields.Integer() status = fields.Integer() def get_type_level(self, item): if item.type_id == 1: status = '血压' elif item.type_id == 2: status = "心率" elif item.type_id == 3: status = "低密度脂蛋白胆固醇" elif item.type_id == 4: status = "空腹血糖" elif item.type_id == 5: status = "甘油三酯" elif item.type_id == 6: status = "糖化血红蛋白" elif item.type_id == 7: status = "总胆固醇" elif item.type_id == 8: status = "BMI" return status def test(): # get方法-单条数据 # detail = TestFactor.get() # 只查一条,没有则报错 # data = TestFactor_(many=False) # 展示数据 # print(data.dump(detail)) # get_or_none方法-单挑数据 # detail = TestFactor.get_or_none() # 没有不报错,只查询一条 # data = TestFactor_(many=False) # 展示数据 # print(data.dump(detail)) # 多条数据 # detail = TestFactor.select() # many=False表示只有一条, exclude表示不展示某些列, only表示只展示某些列 # data = TestFactor_(many=False, exclude=[], only=()) # data = TestFactor_(many=False, exclude=[]) # print([data.dump(i) for i in detail]) detail = HealthHouseKeeperMonitoringValue.select().where( HealthHouseKeeperMonitoringValue.user_id == 180, HealthHouseKeeperMonitoringValue.status == 1, HealthHouseKeeperMonitoringValue.type_id == 1, ).order_by(HealthHouseKeeperMonitoringValue.create_time.desc()) print(detail) data_serial = HealthHouseKeeperMonitoringValueSerializer() print(len(detail)) print(data_serial.dump(detail[0])) # user_detail = RiskFactorSerializers(many=False, exclude=['create_time', 'id', 'chronic_id', 'user_id']) if __name__ == '__main__': test()