Flask之CBV,flash,Flask-Session,WTForms,DBUtils

CBV

Flask的路由系统有两种方式 :

  @app.route()

  app.add_url_rule()


from flask import Flask, views, render_template, redirect, request, session

class
LoginView(views.MethodView): def get(self): return render_template("login.html") def post(self): if request.form["name"] == "qwe" and request.form["pwd"] == "qwe": session["user"] = "hd" return redirect("/reg")print(get_flashed_messages("错误")) return render_template("login.html") app.add_url_rule("/login", view_func=LoginView.as_view("login"))

 flash闪现

请求完成后,有时需要让用户知道状态发生了变化。这里可以使用确认消息、警告或者错
误提醒。一个典型例子是,用户提交了有一项错误的登录表单后,服务器发回的响应重新
渲染了登录表单,并在表单上面显示一个消息,提示用户用户名或密码错误。
这种功能是 Flask 的核心特性

后端设置flash值 : 

from flask import Flask, views, render_template, redirect, request, session, url_for, flash, get_flashed_messages
from flask_session import Session

app.secret_key = "xd"
class LoginView(views.MethodView):
    def get(self):
        return render_template("login.html")

    def post(self):
        if request.form["name"] == "qwe" and request.form["pwd"] == "qwe":
            session["user"] = "hd"
            return redirect("/reg")
        flash("错误", "12")
        print(get_flashed_messages("错误"))
        return render_template("login.html")

-------------------------------------------------------------------------------
有时候使用flash时, 代码没有问题但还是会抛出keyerror的错误
像使用session一样加上 : secret_key , 就解决了

前端使用flash值 :

<!DOCTYPE html>
<html lang="zh-CN">
<head>
    <meta charset="UTF-8">
    <title>登录页面</title>
    <meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>


<form action="" method="post">
    {# flash设置的值存在一个列表中, 所有循环取出使用 #}
    {% for message in get_flashed_messages() %}
        <p>
            {{ message }}
        </p>
    {% endfor %}
</form>


</body>
</html>

Flash的消息只会显示一次, 然后就消失了.


Flask-Session

flask-session是flask框架的session组件, 由于原来flask内置session使用签名cookie保存, 该组件则将支持session保存到多个地方 :

  1. redis 

  2. memcached

  3. filesystem

  4. mongodb

  5. sqlalchmey

安装 :

pip3 install flask-session

存储方式 :

  1. redis

import redis
from flask import Flask, session
from flask_session import Session

app = Flask(__name__)
app.debug = True
app.secret_key = 'xxxx'

app.config['SESSION_TYPE'] = 'redis'  # session类型为redis
app.config['SESSION_PERMANENT'] = False  # 如果设置为True,则关闭浏览器session就失效。
app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上session的cookie值进行加密
app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀
app.config['SESSION_REDIS'] = redis.Redis(host='127.0.0.1', port='6379', password='123123')  # 用于连接redis的配置

Session(app)


@app.route('/index')
def index():
    session['k1'] = 'v1'
    return 'xx'


if __name__ == '__main__':
    app.run()

  2. memcached

import redis
from flask import Flask, session
from flask_session import Session
import memcache

app = Flask(__name__)
app.debug = True
app.secret_key = 'xxxx'


app.config['SESSION_TYPE'] = 'memcached' # session类型为redis
app.config['SESSION_PERMANENT'] = True # 如果设置为True,则关闭浏览器session就失效。
app.config['SESSION_USE_SIGNER'] = False # 是否对发送到浏览器上session的cookie值进行加密
app.config['SESSION_KEY_PREFIX'] = 'session:' # 保存到session中的值的前缀
app.config['SESSION_MEMCACHED'] = memcache.Client(['10.211.55.4:12000'])


Session(app)


@app.route('/index')
def index():
    session['k1'] = 'v1'
    return 'xx'


if __name__ == '__main__':
    app.run()

  3. filesystem

import redis
from flask import Flask, session
from flask_session import Session

app = Flask(__name__)
app.debug = True
app.secret_key = 'xxxx'

app.config['SESSION_TYPE'] = 'filesystem'  # session类型为redis
app.config[
    'SESSION_FILE_DIR'] = '/Users/wupeiqi/PycharmProjects/grocery/96.Flask新课程/组件/2.flask-session'  # session类型为redis
app.config['SESSION_FILE_THRESHOLD'] = 500  # 存储session的个数如果大于这个值时,就要开始进行删除了
app.config['SESSION_FILE_MODE'] = 384  # 文件权限类型

app.config['SESSION_PERMANENT'] = True  # 如果设置为True,则关闭浏览器session就失效。
app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上session的cookie值进行加密
app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀

Session(app)


@app.route('/index')
def index():
    session['k1'] = 'v1'
    session['k2'] = 'v1'
    return 'xx'


if __name__ == '__main__':
    app.run()

  4. mongodb

import redis
from flask import Flask, session
from flask_session import Session
import pymongo

app = Flask(__name__)
app.debug = True
app.secret_key = 'xxxx'

app.config['SESSION_TYPE'] = 'mongodb'  # session类型为redis

app.config['SESSION_MONGODB'] = pymongo.MongoClient()
app.config['SESSION_MONGODB_DB'] = 'mongo的db名称(数据库名称)'
app.config['SESSION_MONGODB_COLLECT'] = 'mongo的collect名称(表名称)'

app.config['SESSION_PERMANENT'] = True  # 如果设置为True,则关闭浏览器session就失效。
app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上session的cookie值进行加密
app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀

Session(app)


@app.route('/index')
def index():
    session['k1'] = 'v1'
    session['k2'] = 'v1'
    return 'xx'


if __name__ == '__main__':
    app.run()

  mongodb操作简单示例 :

from pymongo import MongoClient

# 创建链接
conn = MongoClient('47.93.4.198', 27017)

# 选择数据库
db = conn['db1']

# 选择表
posts = db['posts']

post_data = {
    'name': 'alex',
    'age': 18
}

# 表中插入数据
# result = posts.insert_one(post_data)

# 获取一条数据
# row = posts.find_one()
# print(row)

# # 获取多条数据
# rows = posts.find()
# for row in rows:
#     print(row)

# 删除多条数据
# rows = posts.delete_many(filter={})
# print(rows)

# 更新多条数据
# posts.update({}, {'name': 'xxx'})

  5. sqlalchemy

import redis
from flask import Flask, session
from flask_session import Session as FSession
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)
app.debug = True
app.secret_key = 'xxxx'

# 设置数据库链接
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123@127.0.0.1:3306/fssa?charset=utf8'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True

# 实例化SQLAlchemy
db = SQLAlchemy(app)



app.config['SESSION_TYPE'] = 'sqlalchemy'  # session类型为sqlalchemy
app.config['SESSION_SQLALCHEMY'] = db # SQLAlchemy对象
app.config['SESSION_SQLALCHEMY_TABLE'] = 'session' # session要保存的表名称
app.config['SESSION_PERMANENT'] = True  # 如果设置为True,则关闭浏览器session就失效。
app.config['SESSION_USE_SIGNER'] = False  # 是否对发送到浏览器上session的cookie值进行加密
app.config['SESSION_KEY_PREFIX'] = 'session:'  # 保存到session中的值的前缀
FSession(app)


@app.route('/index')
def index():

    session['k1'] = 'v1'
    session['k2'] = 'v1'

    return 'xx'


if __name__ == '__main__':
    app.run()

  在代码写好之后, 不用着急运行, 要确保数据库内有你需要使用的表.

应用场景 :

  应用程序较小时, 使用原生的加密cookie保存session(内置)

  应用程序较大时, 使用redis(flask-session)


 WTForm表单

使用Flask-WTF时, 每个Web表单都由一个继承自Form的类表示. 这个类定义表单中的一组字段, 每个字段都用对象表示. 字段对象可附属一个或者多个验证函数. 验证函数用来验证用户提交的输入值是否符合要求. 

定义表单类 :

from flask import Flask, views, render_template, redirect, request, session
from wtforms.fields import simple, core
from wtforms import Form
from wtforms import validators


class LoginForm(Form):
    username = simple.StringField(
        label="用户名",
        validators=[
            validators.DataRequired(message="不能为空!"),
            validators.Length(min=3, max=5, message="用户名长度为3-5位")
        ],
        render_kw={"class": "username"}
    )

    password = simple.PasswordField(
        label="密码",
        validators=[
            validators.DataRequired(message="不能为空"),  # 控制用户不能输入空
            validators.Length(min=3, max=5, message="密码长度为3-5位"),  # 控制输入长度
            validators.Regexp(regex="d+", message="密码必须为数字")  # 正则匹配
        ],
        render_kw={"class": "pwd"}  # 给标签加加属性
    )

  这个表单中的字段都定义为类变量, 类变量的值是相应字段类型的对象. LoginForm表单中有一个名为username的文本字段和一个名为password的文本字段. StringField类表示属性为type="text"的<input>标签. 字段构造函数的第一个参数是把表单渲染成HTML时使用的标号.

  StringField构造函数中可选参数validators指定一个由验证函数组成的列表, 在接收用户提交的数据之前验证数据. DataRequired确保提交的字段不能为空.

WTForm支持的HTML标准字段如下 :

字段类型 说明
StringField 文本字段
TextAreaField 多行文本字段
PasswordField 密码文本字段
HiddenField 隐藏文本字段
DateTimeField 文本字段, 值为datetime.datetime格式
DateField 文本字段, 值为datetime.date格式
IntegerField 文本字段, 值为整数
DecimalField 文本字段, 值为decimal.Decimal
FloatField 文本字段, 值为浮点数
BooleanField 复选框, 值为True和False
RadioField 一组单选框
SelectField 下拉列表
SelectMultipleField 下拉列表, 可多选
FileField 文件上传字段
SubmitField 表单提交按钮
FormField 把表单作为字段嵌入另一个表单
FieldList 一组指定类型的字段

WTForm内建的验证函数如下 :

验证函数 说明
Email 验证电子邮件地址
EqualTo 比较两个字段的值, 常用于要求输入两次密码进行确认的情况
IPAddress 验证IPv4网络地址
Length 验证输入字符串的长度
NumberRange 验证输入的值在数字范围内
Optional 无输入值时跳过其他验证函数
Required 确保字段中有数据
Regexp 使用正则表达式验证输入值
URL 验证URL
AnyOf 确保输入值在可选值列表内
NoneOf 确保输入值不在可选列表内

DBUtils - Python数据库连接池

创建数据库连接池 :

import pymysql
from DBUtils.PooledDB import PooledDB, SharedDBConnection
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='数据库名',
    charset='utf8'
)

使用数据库连接池 :

def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    cursor = conn.cursor()
    cursor.execute('select * from 表名')
    result = cursor.fetchall()
    conn.close()

使用数据库连接池中的链接

某某人写的sqlhelper :

class MySQLhelper(object):
    def __init__(self, host, port, dbuser, password, database):
        self.pool = PooledDB(
            creator=pymysql,  # 使用链接数据库的模块
            maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
            mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
            maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
            maxshared=3,
            # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
            blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
            maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
            setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
            ping=0,
            # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
            host=host,
            port=int(port),
            user=dbuser,
            password=password,
            database=database,
            charset='utf8'
        )

    def create_conn_cursor(self):
        conn = self.pool.connection()
        cursor = conn.cursor(pymysql.cursors.DictCursor)
        return conn,cursor

    def fetch_all(self, sql, args):
        conn,cursor = self.create_conn_cursor()
        cursor.execute(sql,args)
        result = cursor.fetchall()
        cursor.close()
        conn.close()
        return result


    def insert_one(self,sql,args):
        conn,cursor = self.create_conn_cursor()
        res = cursor.execute(sql,args)
        conn.commit()
        print(res)
        conn.close()
        return res

    def update(self,sql,args):
        conn,cursor = self.create_conn_cursor()
        res = cursor.execute(sql,args)
        conn.commit()
        print(res)
        conn.close()
        return res


sqlhelper = MySQLhelper("127.0.0.1", 3306, "root", "1233121234567", "d")

# sqlhelper.fetch_all("select * from user where id=%s",(1))

# sqlhelper.insert_one("insert into user VALUES (%s,%s)",("jinwang",4))

# sqlhelper.update("update user SET name=%s WHERE  id=%s",("yinwang",1))
纯属借鉴
原文地址:https://www.cnblogs.com/dong-/p/10140319.html