Flask之model以及缓存

Flask默认并没有提供任何数据库操作的API。

Flask中可以自己的选择数据,用原生语句实现功能,也可以选择ORM(SQLAlchemy,MongoEngine)
原生SQL缺点
代码利用率低,条件复杂代码语句越长,有很多相似语句
一些SQL是在业务逻辑中拼出来的,修改需要了解业务逻辑
直接写SQL容易忽视SQL问题。

一、orm

将对对象的操作转换为原生SQL

1、优点

易用性,可以有效减少重复SQL,性能损耗少设计灵活,可以轻松实现复杂查询,移植性好

 Python中的orm是SQLAlchemy

针对于Flask的支持

pip install flask-sqlalchemy

2、连接数据库

dialect+driver://username:password@host:port/database

dialect数据库实现

driver数据库的驱动

username

password

host

port

database

连接数据库需要指定配置

app.config[‘SQLALCHEMY_DATABASE_URI’] = DB_URI
app.config[‘SQLALCHEMY_TRAKE_MODIFICATIONS’]=False

3、创建模型

class User(db.Model):
    __tablename__ = "UserModel"   # 指定表名,默认是类名

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)

    u_name = db.Column(db.String(16), unique=True)

    u_des = db.Column(db.String(128), nullable=True)

(1)、字段类型

Integer            
SmallInteger        
BigInteger        
Float
Numeric
String
Text
Unicode
Unicode Text
Boolean
Date
Time
DateTime
Interval
LargeBinary

(2)、常见约束

primary_key
autoincrement
unique
index
nullable
default
ForeignKey()

(3)、数据操作

db.create_all()   创建数据库

db.drop_all()    删除数据库

①、数据插入

数据插入是在事务中处理

db.session.add(object)
db.session.add_all(list[object])
db.session.commit()
@api.route('/adduser/')
def adduser():
    users = []
    for i in range(5):
        user = User()
        user.u_name = "小花%d" % random.randrange(10000)
        users.append(user)
    db.session.add_all(users)
    db.session.commit()
    return 'Add success'
②、数据删除
db.session.delete(object)
db.session.commit()

修改和删除基于查询。

(4)、模型继承

默认继承并不会报错,它会将多个模型的数据映射到一张表中,导致数据混乱,不能满足基本使用
抽象的模型是不会在数据库中产生映射的

class Animal(db.Model):
    __abstract__ = True
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    a_name = db.Column(db.String(16))


class Dog(Animal):
    d_legs = db.Column(db.Integer, default=4)


class Cat(Animal):

    c_eat = db.Column(db.String(32), default='fish')

(5)、模型迁移

python manager.py db init   #初次迁移,生成migration包
python manager.py db migrate   # 创建迁移
python manager.py db upgrade   # 更新

(6)、数据查询

①、查询单个对象

first

get

get_or_404

@api.route('/getuser/<int:id>/')
def get_user(id):
    user = User.query.get(id)
    print(user)
    return 'GET success'
②、查询结果集
all:比较特殊,返回列表
@api.route('/getusers/')
def get_users():
    users = User.query.all()
    for user in users:
        print(user.u_name)
    return 'get success'
filter:BaseQuery对象

运算符:

  contains
  startswith
  endswith
  in_
  like
  __gt__
  __ge__
  __lt__
  __le__

条件:

- 类名.属性名.魔术方法(临界值)

@api.route('/getdog/')
def getdog():
    dogs = Dog.query.filter(Dog.id.__le__(5))
    for dog in dogs:
        print(dog.id, dog.a_name)
    return 'GET SUCCESS'

- 类名.属性名 操作符运算符 临界值

@api.route('/getdog/')
def getdog():
    dogs = Dog.query.filter(Dog.id > 5)
    for dog in dogs:
        print(dog.id, dog.a_name)
    return 'GET SUCCESS'
@api.route('/getdog/')
def getdog():
    dogs = Dog.query.filter(Dog.a_name.contains("2"))
    for dog in dogs:
        print(dog.id, dog.a_name)
    return 'GET SUCCESS'
offset和limit不区分顺序,都是先执行offset
@api.route('/getdog/')
def getdog():
    dogs = Dog.query.offset(5).limit(4)
    for dog in dogs:
        print(dog.id, dog.a_name)
    return 'GET SUCCESS'

- order_by 调用必须在 offset和limit 之前

使用offset以及limit实现分页
@api.route('/getdogs/')
def get_dogs():
    page = request.args.get("page", 1, type=int)
    per_page = request.args.get('per_page', 4, type=int)
    dogs = Dog.query.offset(per_page * (page - 1)).limit(per_page)
    return render_template('Dogs.html', dogs=dogs)

paginate实现分页
@api.route('/getdogs/')
def get_dogs_with_page():
    # dogs = Dog.query.paginate().items
    pagination = Dog.query.paginate()
    per_page = request.args.get('per_page', 4, type=int)
    return render_template('Dogs.html', pagination=pagination, per_page=per_page)
<div class=pagination>
    {% for page in pagination.iter_pages(left_edge=5,left_current=5,right_current=5,right_edge=5) %}
        {% if page %}
            {% if page != pagination.page %}
                <a href="{{ url_for('api.get_dogs_with_page') }}?page={{ page }}&per_page={{ per_page }}">{{ page }}</a>
            {% else %}
                <strong>{{ page }}</strong>
            {% endif %}
        {% else %}
            <span class=ellipsis>…</span>
        {% endif %}
    {% endfor %}
</div>

filter_by

用在级联数据上,条件语法精准,字段  =  值

@blue.route('/getcatsfilterby/')
def get_cats_filter_by():

    cats = Cat.query.filter_by(id = 5)

    return render_template('Cats.html', cats=cats)

(7)级联数据

class Customer(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    c_name = db.Column(db.String(16))
    addresses = db.relationship('Address', backref='customer', lazy=True)


class Address(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    a_position = db.Column(db.String(128))
    a_customer_id = db.Column(db.Integer, db.ForeignKey(Customer.id))

 ①添加数据
@api.route('/getcustomer/')
def get():
    customer = Customer.query.order_by(desc('id')).first()

    return str(customer.id)


@api.route('/addaddress/')
def add_address():
    address = Address()
    address.a_position = '秀水街 %s' % random.randrange(10000)
    address.a_customer_id = Customer.query.order_by(desc('id')).first().id    # 注意此处使用id的倒叙,不能直接用‘-id’

    db.session.add(address)
    db.session.commit()
    return 'Address Add Success %s' % address.a_position

 ②查询数据
根据地址找到对应的人
@api.route('/getcustomer/')
def get():
    a_id = request.args.get('a_id', type=int)
    address = Address.query.get(a_id)
    customer = Customer.query.get(address.a_customer_id)   #维护关系表中的外键存的是不维护关系表中的主键,
    return customer.c_name

 根据人找到对应的地址
@api.route('/getaddress/')
def get_address():
    c_id = request.args.get('c_id')
    customer = Customer.query.get(c_id)
   # addresses = Address.query.filter_by(a_customer_id=customer.id)
   addresses = customer.addresses
    return render_template('address.html', addresses=addresses)
<ul>
    {% for address in addresses %}
        <li>{{ address.a_position }}</li>
    {% endfor %}

</ul>

 ③逻辑运算

filter多个条件

@api.route('/getaddress/')
def get_address():
    addresses = Address.query.filter(Address.a_customer_id.__eq__(1)).filter(Address.a_position.endswith('4'))

    return render_template('address.html', addresses=addresses)

与  and   

filter(and_(条件),条件…)

@api.route('/getaddress/')
def get_address():
    addresses = Address.query.filter(and_(Address.a_customer_id.__eq__(1),Address.a_position.endswith('4')))

    return render_template('address.html', addresses=addresses)

Django中可以将字段直接写在filter中,无需使用and_


or_
filter(or_(条件),条件…)


not_
filter(not_(条件),条件…)

@api.route('/getaddress/')
def get_address():
    addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))

    return render_template('address.html', addresses=addresses)

二、缓存

pip install Flask-Caching

在ext.py中进行配置

from flask_caching import Cache
from flask_migrate import Migrate
from flask_session import Session
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()
migrate = Migrate()
cache = Cache(config={
    "CACHE_TYPE": "redis"    # 默认是连接本地,可以设置远程。
})
# cache=Cache() # 配置可以写在settings中的Config类中 def init_ext(app): db.init_app(app) migrate.init_app(app, db) Session(app) cache.init_app(app)

在视图中使用

@api.route('/getaddress/')
@cache.cached(timeout=60)
def get_address():
    addresses = Address.query.filter(not_(or_(Address.a_customer_id.__eq__(1), Address.a_position.endswith('4'))))
    print('数据库中获取')
    return render_template('address.html', addresses=addresses)
原文地址:https://www.cnblogs.com/huiyichanmian/p/12285612.html