flask-sqlalchemy 用法总结

Flask-SQLAlchemy是一个Flask扩展,能够支持多种数据库后台,我们可以不需要关心SQL的处理细节,操作数据库,一个基本关系对应一个类,而一个实体对应类的实例对象。Flask是一个轻量级的web框架,而SQLAlchemy 是转为Flask定制的ORM框架。

  • 安装flask-sqlalchemy

pip install flask-sqlalchemy
  • 初始化sqlalchemy

from flask import Flask
from flask.ext.sqlalchemy import SQLAlchemy

app = Flask(__name__)
# 配置 sqlalchemy  数据库驱动://数据库用户名:密码@主机地址:端口/数据库?编码
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql://password:@127.0.0.1:3306/db?charset=utf8'
# 初始化
db = SQLAlchemy(app)

注意

SQLALCHEMY_DATABASE_URI 是关键字,默认数据源配置变量。而我们在实际操作中,难免要同时连接多个数据库,这个时候就要配置多个数据源,而SQLAlchemy提供了很方便的多数据源配置。
只需在配置文件中这样写就可以:
SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main'
SQLALCHEMY_BINDS = {
    'users':        'mysqldb://localhost/users',
    'appmeta':      'sqlite:////path/to/appmeta.db'
}
SQLALCHEMY_DATABASE_URI 是默认数据源,users和appmeta是其他数据源,用的时候也非常方便,在model中加入 __bind_key__ = 'users' 即可。
  • 模型

数据库表映射到模型,继承db.model类即可

class RoadReportErrorCheck(db.Model):

    __bind_key__  = 'mdb_plat'
    __tablename__ = 'road_report_error_check'

    order_id                 = db.Column(db.Integer, nullable=False, default=0, primary_key=True)            #订单ID                     
    p_pid                     = db.Column(db.String(32), nullable=False, default='')        #包ID
    check_state             = db.Column(db.Integer, nullable=False, default=0)            #0未审核,1初审,2复审
    first_checker             = db.Column(db.String(32), nullable=False, default='')        #轨迹点初审人
    first_check_time         = db.Column(db.TIMESTAMP, nullable=False, default=0)        #轨迹点初审时间
    checker                 = db.Column(db.String(32), nullable=False, default='')        #轨迹点复审人
    check_time                 = db.Column(db.TIMESTAMP, nullable=False, default=0)        #轨迹点复审时间
    description             = db.Column(db.String(200), nullable=False, default='')        #轨迹点审核描述
    receive_flag             = db.Column(db.Integer, nullable=False, default=0)            #外包领取,0未领,1已领
    create_time             = db.Column(db.TIMESTAMP, nullable=False, default=0)
    update_time             = db.Column(db.TIMESTAMP, nullable=False, default=datetime.now)
  • 建表

只需初始化一个变量,添加到session中即可

group_info = Group_Check(id=id,package_id=package_id,user=user,upload_time=upload_time,orderid=orderid,group_track_url=group_track_url,xf_links=xf_links,create_time=datetime.datetime.now())
db.session.add(group_info)
db.session。commit()
  • 过滤

SQLAlchemy 提供了很多种方便的过滤方法。写法如下:

#查询单个
Group_Check.query.fliter(Group_Check.id == id).first()
#查询所有
Group_Check.query.fliter(Group_Check.id == id).all()
#条件查询
Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))
#求和
Group_Check.query.fliter(db.and_(Group_Check.id == id, Group_Check.state==1))
 





原文地址:https://www.cnblogs.com/cjingzm/p/7967803.html