Flask

初识flask

介绍

Flask诞生于2010年,是Armin ronacher(人名)用 Python 语言基于 Werkzeug(wo ke zou ge) 工具箱编写的轻量级Web开发框架。
Flask 本身相当于一个内核,其他几乎所有的功能都要用到扩展(邮件扩展Flask-Mail,用户认证Flask-Login,数据库Flask-SQLAlchemy),都需要用第三方的扩展来实现。比如可以用 Flask 扩展加入ORM、窗体验证工具,文件上传、身份验证等。Flask 没有默认使用的数据库,你可以选择 MySQL,也可以用 NoSQL。

其 WSGI 工具箱采用 Werkzeug(路由模块),模板引擎则使用 Jinja2。这两个也是 Flask 框架的核心。

官网: http://flask.pocoo.org/
官方文档: http://docs.jinkan.org/docs/flask/
        
Flask常用的扩展包:
    Flask-SQLalchemy:操作数据库,ORM;
    Flask-script:终端脚本工具,脚手架;
    Flask-migrate:管理迁移数据库;
    Flask-Session:Session存储方式指定;
    Flask-WTF:表单;
    Flask-Mail:邮件;
    Flask-Bable:提供国际化和本地化支持,翻译;
    Flask-Login:认证用户状态;
    Flask-OpenID:认证, OAuth;
    Flask-RESTful:开发REST API的工具;
    Flask JSON-RPC: 开发rpc远程服务[过程]调用
    Flask-Bootstrap:集成前端Twitter Bootstrap框架
    Flask-Moment:本地化日期和时间
    Flask-Admin:简单而可扩展的管理接口的框架

创建虚拟环境

#安装依赖
	pip3 install virtualenv
    pip3 install virtualenvwrapper
    
#建立软连接(相当于windows快捷方式)
	ln -s /usr/local/python3/bin/virtualenv /usr/bin/virtualenv
    
#配置虚拟环境,填入下面内容,填入内容后按esc,:wq保存并退出,
	vim ~/.bash_profile
    
    填入内容,看图1
    	VIRTUALENVWRAPPER_PYTHON=/usr/local/python3/bin/python3
		source /usr/local/python3/bin/virtualenvwrapper.sh
        
#查看下修改好的neir
	cat ~/.bash_profile
#更新配置文件内容
	source ~/.bash_profile
    
#虚拟环境默认的根目录 
	cd ~/.virtualenvs  
  
#创建虚拟环境
    1 创建虚拟环境到配置的WORKON_HOME路径下
        mkvirtualenv -p python3 自定义的虚拟环境名称(比如:luffy)	使用的是python3
        mkvirtualenv -p python2 自定义的虚拟环境名称(比如:luffy)	使用的是python2
    2 查看已有的虚拟环境
        workon
    3 使用虚拟环境
        workon 已有的虚拟环境名称(比如:luffy)
    4 进入、退出该虚拟环境的Python环境
        python、exit()
    5 为虚拟环境安装模块
        pip install 模块名称(比如:django==2.2)
    6 退出当前虚拟环境
        deactivate
    7 删除虚拟环境(删除当前虚拟环境要先退出)
        rmvirtualenv 自定义的虚拟环境名称(比如:luffy)
        

快速入门

#安装flask
    workon flask
    pip install flask

#pycharm创建flask项目,flask_first,看下图步骤

#app.py
    from flask import Flask, request
    app = Flask(__name__)

    @app.route('/')
    def hello_world():
        print(request.path)  # /
        return 'Hello World!'

    @app.route('/hello')
    def hello():
        print(request.path)  # /hello
        return 'hello'

    if __name__ == '__main__':
        app.run()
    # 请求来了,会执行 app(request),会触发谁?触发__call__方法

图1

image-20200826194305510

图2

image-20200826194406711

登录,显示用户信息

#登录成功之后可以访问index页面

#App2.py
    from flask import Flask, render_template, request, redirect, session, url_for
    app = Flask(__name__)
    app.debug = True
    app.secret_key = 'sdfsdfsdfsdf'

    USERS = {
        1: {'name': '张三', 'age': 18, 'gender': '男', 'text': "道路千万条"},
        2: {'name': '李四', 'age': 28, 'gender': '男', 'text': "安全第一条"},
        3: {'name': '王五', 'age': 18, 'gender': '女', 'text': "行车不规范"},
    }

    @app.route('/detail/<int:nid>', methods=['GET'])
    def detail(nid):
        user = session.get('user_info')
        if not user:
            return redirect('/login')

        info = USERS.get(nid)
        return render_template('detail.html', info=info)

    @app.route('/index', methods=['GET'])
    def index():
        user = session.get('user_info')
        if not user:
            # return redirect('/login')
            url = url_for('l1')
            return redirect(url)
        return render_template('index.html', user_dict=USERS)

    @app.route('/login', methods=['GET', 'POST'], endpoint='l1')
    def login():
        if request.method == "GET":
            return render_template('login.html')
        else:
            # request.query_string
            user = request.form.get('user')
            pwd = request.form.get('pwd')
            if user == 'joab' and pwd == '123':
                session['user_info'] = user
                return redirect('http://www.baidu.com')
            return render_template('login.html', error='用户名或密码错误')


    if __name__ == '__main__':
        app.run()

#login.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <h1>用户登录</h1>
    <form method="post">
        <input type="text" name="user">
        <input type="text" name="pwd">
        <input type="submit" value="登录">{{error}}
    </form>
</body>
</html>

#index.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <h1>用户列表</h1>
    <table>
        {% for k,v in user_dict.items() %}
        <tr>
            <td>{{k}}</td>
            <td>{{v.name}}</td>
            <td>{{v['name']}}</td>
            <td>{{v.get('name')}}</td>
            <td><a href="/detail/{{k}}">查看详细</a></td>
        </tr>
        {% endfor %}
    </table>
</body>
</html>

#templatesdetail.html

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
    <h1>详细信息 {{info.name}}</h1>
    <div>
        {{info.text}}
    </div>
</body>
</html>

总结

1 三板斧
	return 字符串		#直接返回字符串
	return render_template('index.html')	#返回html页面
	return redirect('/login')		#重定向

2 路由写法,(路径,支持的请求方式,别名)别名不写默认是函数名	#@app.route('/login',methods=['GET','POST'],endpoint='l1')

3 模板语言渲染
	-同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
    如:
        {% for k,v in user_dict.items() %}
        <tr>
            <td>{{k}}</td>
            <td>{{v.name}}</td>
            <td>{{v['name']}}</td>
            <td>{{v.get('name')}}</td>
            <td><a href="/detail/{{k}}">查看详细</a></td>
        </tr>
        {% endfor %}
    
4 分组(就是django中的有名分组)
    @app.route('/detail/<int:nid>',methods=['GET'])
    def detail(nid):
        
5 反向解析
	-url_for('别名'),设置别名,重定向到该url
    
    列子:
        @app.route('/index', methods=['GET'])
        def index():
            user = session.get('user_info')
            if not user:
                # return redirect('/login')
                url = url_for('l1')
                return redirect(url)
            return render_template('index.html', user_dict=USERS)
        
6 获取前端传递过来的数据
	# get 请求
		request.query_string
        
  	# post请求
      user = request.form.get('user')
      pwd = request.form.get('pwd')

配置文件

#方式1
    app.config['DEBUG'] = True
    PS: 由于Config对象本质上是字典,所以还可以使用		app.config.update(...)	
    
#方式2
    #通过py文件配置
    app.config.from_pyfile("python文件名称")
    如:
    settings.py
    DEBUG = True
    
#方式3
    app.config.from_object('settings.TestingConfig')
    
    class Config(object):
        DEBUG = False
        TESTING = False
        DATABASE_URI = 'sqlite://:memory:'

    class ProductionConfig(Config):
        DATABASE_URI = 'mysql://user@localhost/foo'

    class DevelopmentConfig(Config):
        DEBUG = True

    class TestingConfig(Config):
        TESTING = True

路由系统

#基本使用
	@app.route('/detail/<int:nid>',methods=['GET'],endpoint='detail')
    
#转换器
    DEFAULT_CONVERTERS = {
        'default':          UnicodeConverter,
        'string':           UnicodeConverter,
        'any':              AnyConverter,
        'path':             PathConverter,
        'int':              IntegerConverter,
        'float':            FloatConverter,
        'uuid':             UUIDConverter,
    }
    
#路由本质
    1 本质就是:app.add_url_rule()
    2 endpoint:如果不写默认是函数名,endpoint不能重名
    

#@app.route和app.add_url_rule参数:
    rule, URL规则
    
    view_func, 视图函数名称
    
    defaults = None, 默认值, 当URL中无参数,函数需要参数时,使用defaults = {'k': 'v'},为函数提供参数
    
    endpoint = None, 名称,用于反向生成URL,即: url_for('名称')
    
    methods = None, 允许的请求方式,如:["GET", "POST"]
    
    #对URL最后的 / 符号是否严格要求,默认严格,False,就是不严格
    
        @app.route('/index', strict_slashes=False)
        #访问http://www.xx.com/index/ 或http://www.xx.com/index均可
        @app.route('/index', strict_slashes=True)
        #仅访问http://www.xx.com/index
        
    #重定向到指定地址
    	@app.route('/index/<int:nid>', redirect_to='/home/<nid>')

CBV(前后端分离开发)

from flask import Flask,request,render_template,redirect
from flask import views
app=Flask(__name__)

class IndexView(views.View):
    methods = ['GET']
    decorators = [auth, ]
    def dispatch_request(self):
		print('Index')
        return 'Index!'


def auth(func):
    def inner(*args, **kwargs):
        print('before')
        result = func(*args, **kwargs)
        print('after')
        return result
    return inner

class IndexView(views.MethodView):
    methods = ['GET']  # 指定请求方式
    # 登录认证装饰器加在哪?
    decorators = [auth, ]  #加多个就是从上往下的效果
    def get(self):
        return "我是get请求"
    def post(self):
       return '我是post请求'

# 注册路由
app.add_url_rule('/index',view_func=IndexView.as_view('index'))	# IndexView.as_view('index'),必须传name

if __name__ == '__main__':
    app.run()
    
    
# views.View用的比较少

# 继承views.MethodView,只需要写get,post,delete方法

# 如果加装饰器decorators = [auth, ],加多个就是从上往下的效果

# 允许的请求方法methods = ['GET'] 

HTTP请求

配置文件

# SECRET_KEY:如果使用session,必须配置

# SESSION_COOKIE_NAME:cookie名字

# 数据库地址,端口号,也要放到配置文件中,但是不是内置的参数

# flask内置session如何实现?
	-通过SECRET_KEY加密以后,当做cookie返回给浏览器
    -下次发送请求,携带cookie过来,反解,再放到session中
    

flask-resful跟drf挺像的,可以看看

路由支持正则

#写类,继承BaseConverter

#注册:app.url_map.converters['regex'] = RegexConverter

#使用:@app.route('/index/<regex("d+"):nid>')  正则表达式会当作第二个参数传递到类中

from flask import Flask, views, url_for
from werkzeug.routing import BaseConverter
app = Flask(import_name=__name__)

class RegexConverter(BaseConverter):
    """
    自定义URL匹配正则表达式
    """
    def __init__(self, map, regex):
        super(RegexConverter, self).__init__(map)
        self.regex = regex

    def to_python(self, value):
        """
        路由匹配时,匹配成功后传递给视图函数中参数的值
        """
        return int(value)

    def to_url(self, value):
        """
        使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
        """
        val = super(RegexConverter, self).to_url(value)
        return val
    
# 添加到flask中

    app.url_map.converters['regex'] = RegexConverter
    @app.route('/index/<regex("d+"):nid>')
    def index(nid):
        print(url_for('index', nid='888'))
        return 'Index'

    if __name__ == '__main__':
        app.run()

模板

# 同django的dtl,但是比dtl强大,支持加括号执行,字典支持中括号取值和get取值
    如:
        {% for k,v in user_dict.items() %}
        <tr>
            <td>{{k}}</td>
            <td>{{v.name}}</td>
            <td>{{v['name']}}</td>
            <td>{{v.get('name')}}</td>
            <td><a href="/detail/{{k}}">查看详细</a></td>
        </tr>
        {% endfor %}

# 模板有没有处理xss攻击,在页面显示标签,内部怎么实现的?
	-1 模板层   要渲染的字符串|safe	# 前端使用 要渲染的字符串|safe 渲染出input框
    -2 后端:Markup('<input type="text">')		#使用Markup()前端可以渲染出input框
    
# Markup等价django的mark_safe ,

# extends,include一模一样

视图函数和视图类

request对象的属性和方法

request.method  #提交的方法

request.args  #get请求提及的数据

request.form   #post请求提交的数据

request.values  #post和get提交的数据总和

request.cookies  #客户端所带的cookie

request.headers  #请求头

request.path     #不带域名,请求路径

request.full_path  #不带域名,带参数的请求路径

request.url           #带域名带参数的请求路径

request.base_url		#带域名请求路径

request.url_root      #域名

request.host_url		#域名

request.host			#127.0.0.1:5000
    
request.files		#文件

响应对象的方法

return "字符串"

return render_template('html模板路径',**{})

return redirect('/index.html')

return jsonify({'k1':'v1'})	#对着django,JsonResponse

#添加/更新cookie
    aa='hello world'
    res=make_response(aa)
    res.set_cookie('xxx','lqz')
    
# 往响应头中放东西
    res.headers['X-Something'] = 'A value'
    print(type(res))
    from  flask.wrappers import Response
    return res

response = make_response(render_template('index.html'))		#response是flask.wrappers.Response类型

response.delete_cookie('key')	#删除cookie

response.set_cookie('key', 'value')		#添加/更新cookie

response.headers['X-Something'] = 'A value'		# 往响应头中放东西	

return response		#记得返回响应对象

return 'hello'	#返回字符串

补充知识

varchr :65535个字节的数据
    utf8:中文占2个字节,varchar(300)
    utf8mb4:中文占3个字节,varchar(300)
    
# 快速导出requestment.txt
	pip3 install pipreqs
    
# pipreqs ./ --encoding=utf-8

闪现

#应用场景,把前端上个页面报错信息,下个页面get_flashed_message()展示出来
    -设置:flash('aaa')
        
    -取值:get_flashed_message()
    
    -设置:flash('lqz',category='error1')
    
    -取值:res=get_flashed_messages(category_filter=['error1'])
    
    -假设在a页面操作出错,跳转到b页面,在b页面显示a页面的错误信息

session的使用

# 全局导入

# 视图函数中 session['key']=value

# 删除:session.pop('key')

# 取:session['key']

# open_session

# save_session

请求扩展(像django中间件)

#类似于django的中间件,请求来了,请求走了,什么操作

#请求来了就会触发,类似于django的process_request,如果有多个,顺序是从上往下
    @app.before_request
    def before(*args,**kwargs):
        if request.path=='/login':
            return None
        else:
            name=session.get('user')
            if not name:
                return redirect('/login')
            else:
                return None
            
#请求走了就会触发,类似于django的process_response,如果有多个,顺序是从下往上执行
    @app.after_request
    def after(response):
        print('我走了')
        return response

#3 before_first_request 项目启动起来第一次会走,以后都不会走了,也可以配多个(项目启动初始化的一些操作)
    @app.before_first_request
    def first():
        print('我的第一次')
    
# 4 每次视图函数执行完了都会走它,# 用来记录出错日志
    @app.teardown_request  # 用来记录出错日志
    def ter(e):
        print(e)
        print('我是teardown_request ')

# 5 errorhandler绑定错误的状态码,只要码匹配,就走它
    @app.errorhandler(404)
    def error_404(arg):
        return render_template('error.html',message='404错误')

# 6 全局标签
    @app.template_global()
    def sb(a1, a2):
        return a1 + a2
    在模板中使用:{{ sb(3,4) }}

# 7 全局过滤器
    @app.template_filter()
    def db(a1, a2, a3):
        return a1 + a2 + a3
    在模板中使用:{{ 1|db(2,3)}}


1 重点掌握before_request和after_request

2 注意有多个的情况,执行顺序,请求来的时候,顺序是从上往下,走的时候是顺序是从下往上

3 before_request请求拦截后(也就是有return值),response所有都执行

中间件(跟django中间件完全一样)

from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
    return 'Hello World!'

# 模拟中间件
class Md(object):
    def __init__(self,old_wsgi_app):
        self.old_wsgi_app = old_wsgi_app

    def __call__(self,  environ, start_response):
        print('开始之前')
        ret = self.old_wsgi_app(environ, start_response)
        print('结束之后')
        return ret

if __name__ == '__main__':
    #1我们发现当执行app.run方法的时候,最终执行run_simple,最后执行app(),也就是在执行app.__call__方法	
    #2 在__call__里面,执行的是self.wsgi_app().那我们希望在执行他本身的wsgi之前做点事情。
    #3 所以我们先用Md类中__init__,保存之前的wsgi,然后我们用将app.wsgi转化成Md的对象。
    #4 那执行新的的app.wsgi_app,就是执行Md的__call__方法。
    #把原来的wsgi_app替换为自定义的,
    
    app.wsgi_app = Md(app.wsgi_app)
    app.run()

猴子补丁

'''什么是猴子补丁?
只是一个概念,不属于任何包和模块
利用了python一切皆对象的理念,在程序运行过程中,动态修改方法
概念'''
class Monkey():
    def play(self):
        print('猴子在玩')


class Dog():
    def play(self):
        print('狗子在玩')

m=Monkey()
# m.play()
m.play=Dog().play

m.play()

'''有什么用?
这里有一个比较实用的例子,
很多用到import json,
后来发现ujson性能更高,
如果觉得把每个文件的import json改成import ujson as json成本较高,
或者说想测试一下ujson替换是否符合预期, 只需要在入口加上:

只需要在程序入口'''

import json
import ujson

def monkey_patch_json():
    json.__name__ = 'ujson'
    json.dumps = ujson.dumps
    json.loads = ujson.loads
monkey_patch_json()

aa=json.dumps({'name':'lqz','age':19})
print(aa)



#协程:单线程下实现并发
from gevent import monkey;monkey.patch_all()
import gevent
import time
def eat():
    print('eat food 1')
    time.sleep(2)
    print('eat food 2')

def play():
    print('play 1')
    time.sleep(1)
    print('play 2')

g1=gevent.spawn(eat)
g2=gevent.spawn(play)
gevent.joinall([g1,g2])
print('主')

蓝图

1 没有蓝图之前前,都是单文件

2 有了蓝图可以分文件,分app,之前的请求扩展还是一样用,只是在当前蓝图对象管理下的有效

3 蓝图使用
	#第一步在app中注册蓝图,括号里是一个蓝图对象
    app.register_blueprint(user.us)
	# 第二步,在不同文件中注册路由时,直接使用蓝图对象注册,不用使用app了,避免了循环导入的问题
    @account.route('/login.html', methods=['GET', "POST"])
    
4 中小型项目目录划分
	项目名字
    	-pro_flask文件夹
        -__init__.py
    	-templates
        	-login.html
        -statics
        	-code.png
        -views
        	-blog.py
        	-account.py
        	-user.py
        -run.py
        
5 大型项目
    项目名
    	-pro_flask文件夹
        	-__init__.py
            -web
            	-__init__.py
            	-static
                -views.py
                -templates
    		-admin
                -templates
                -static
                -views.py
                -__init__.py
        -run.py
 

threading.local

###############1 不用local,多线程写同一个数据,会导致错乱
    from threading import Thread
    import time
    lqz = -1
    def task(arg):
        global lqz
        lqz = arg
        time.sleep(2)
        print(lqz)

    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()


################2 使用local对象,多线程写同一数据不会错乱,因为每个线程操作自己的数据
    from threading import Thread
    from threading import local
    import time
    from threading import get_ident
    # 特殊的对象
    lqz = local()
    # {'线程id':{value:1},'线程id':{value:2}....}
    def task(arg):
        lqz.value = arg
        time.sleep(2)
        print(lqz.value)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()


#######3自己写一个类似local的东西,函数版本
    from threading import get_ident,Thread
    import time
    storage = {}
    #{'线程id':{value:1},'线程id':{value:2}....}
    def set(k,v):
        ident = get_ident()
        if ident in storage:
            storage[ident][k] = v
        else:
            storage[ident] = {k:v}
    def get(k):
        ident = get_ident()
        return storage[ident][k]
    def task(arg):
        set('val',arg)
        v = get('val')
        print(v)

    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()



#######4自己写一个类似local的东西,面向对象版本
    from threading import get_ident,Thread
    import time
    class Local(object):
        storage = {}
        def set(self, k, v):
            ident = get_ident()
            if ident in Local.storage:
                Local.storage[ident][k] = v
            else:
                Local.storage[ident] = {k: v}
        def get(self, k):
            ident = get_ident()
            return Local.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.set('val',arg)
        time.sleep(1)
        v = obj.get('val')

        print(v)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

# #######5自己写一个类似local的东西,面向对象支持 . 取值赋值
    from threading import get_ident,Thread
    import time
    class Local(object):
        storage = {}
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in Local.storage:
                Local.storage[ident][k] = v
            else:
                Local.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return Local.storage[ident][k]
    obj = Local()

    def task(arg):
        obj.val = arg
        time.sleep(1)
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()


#######6  每次实例化得到一个local对象,用自己的字典存储
    from threading import get_ident,Thread
    import time
    class Local(object):
        def __init__(self):
           object.__setattr__(self,'storage',{})
           #  self.storage={}  # 这种方式不能用
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in self.storage:
                self.storage[ident][k] = v
            else:
                self.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return self.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        time.sleep(1)
        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

######7支持线程和协程
    try:
        from greenlet import getcurrent as get_ident
    except Exception as e:
        from threading import get_ident
    from threading import Thread
    import time
    class Local(object):
        def __init__(self):
            object.__setattr__(self,'storage',{})
        def __setattr__(self, k, v):
            ident = get_ident()
            if ident in self.storage:
    #             self.storage[ident][k] = v
    #         else:
    #             self.storage[ident] = {k: v}
        def __getattr__(self, k):
            ident = get_ident()
            return self.storage[ident][k]
    obj = Local()
    def task(arg):
        obj.val = arg
        # obj.xxx = arg

        print(obj.val)
    for i in range(10):
        t = Thread(target=task,args=(i,))
        t.start()

请求上下文执行流程

请求上下文执行流程(ctx):
		-0 flask项目一启动,有6个全局变量
			-_request_ctx_stack:LocalStack对象
			-_app_ctx_stack :LocalStack对象
			-request : LocalProxy对象
			-session : LocalProxy对象
		-1 请求来了 app.__call__()---->内部执行:self.wsgi_app(environ, start_response)
		-2 wsgi_app()
			-2.1 执行:ctx = self.request_context(environ):返回一个RequestContext对象,并且封装了request(当次请求的request对象),session
			-2.2 执行: ctx.push():RequestContext对象的push方法
				-2.2.1 push方法中中间位置有:_request_ctx_stack.push(self),self是ctx对象
				-2.2.2 去_request_ctx_stack对象的类中找push方法(LocalStack中找push方法)
				-2.2.3 push方法源码:
				    def push(self, obj):
						#通过反射找self._local,在init实例化的时候生成的:self._local = Local()
						#Local()flask封装的支持线程和协程的local对象
						# 一开始取不到stack,返回None
						rv = getattr(self._local, "stack", None)
						if rv is None:
							#走到这,self._local.stack=[],rv=self._local.stack
							self._local.stack = rv = []
						# 把ctx放到了列表中
						#self._local={'线程id1':{'stack':[ctx,]},'线程id2':{'stack':[ctx,]},'线程id3':{'stack':[ctx,]}}
						rv.append(obj)
						return rv
		-3 如果在视图函数中使用request对象,比如:print(request)
			-3.1 会调用request对象的__str__方法,request类是:LocalProxy
			-3.2 LocalProxy中的__str__方法:lambda x: str(x._get_current_object())
				-3.2.1 内部执行self._get_current_object()
				-3.2.2 _get_current_object()方法的源码如下:
				    def _get_current_object(self):
						if not hasattr(self.__local, "__release_local__"):
							#self.__local()  在init的时候,实例化的,在init中:object.__setattr__(self, "_LocalProxy__local", local)
							# 用了隐藏属性
							#self.__local 实例化该类的时候传入的local(偏函数的内存地址:partial(_lookup_req_object, "request"))
							#加括号返回,就会执行偏函数,也就是执行_lookup_req_object,不需要传参数了
							#这个地方的返回值就是request对象(当此请求的request,没有乱)
							return self.__local()
						try:
							return getattr(self.__local, self.__name__)
						except AttributeError:
							raise RuntimeError("no object bound to %s" % self.__name__)
				-3.2.3 _lookup_req_object函数源码如下:
					def _lookup_req_object(name):
						#name是'request'字符串
						#top方法是把第二步中放入的ctx取出来,因为都在一个线程内,当前取到的就是当次请求的ctx对象
						top = _request_ctx_stack.top
						if top is None:
							raise RuntimeError(_request_ctx_err_msg)
						#通过反射,去ctx中把request对象返回
						return getattr(top, name)
				-3.2.4 所以:print(request) 实质上是在打印当此请求的request对象的__str__
		-4 如果在视图函数中使用request对象,比如:print(request.method):实质上是取到当次请求的reuquest对象的method属性
		
		-5 最终,请求结束执行: ctx.auto_pop(error),把ctx移除掉
		
	其他的东西:
		-session:
			-请求来了opensession
				-ctx.push()---->也就是RequestContext类的push方法的最后的地方:
					if self.session is None:
						#self是ctx,ctx中有个app就是flask对象,   self.app.session_interface也就是它:SecureCookieSessionInterface()
						session_interface = self.app.session_interface
						self.session = session_interface.open_session(self.app, self.request)
						if self.session is None:
							#经过上面还是None的话,生成了个空session
							self.session = session_interface.make_null_session(self.app)
			-请求走了savesession
				-response = self.full_dispatch_request() 方法内部:执行了before_first_request,before_request,视图函数,after_request,savesession
				-self.full_dispatch_request()---->执行:self.finalize_request(rv)-----》self.process_response(response)----》最后:self.session_interface.save_session(self, ctx.session, response)
		-请求扩展相关
			before_first_request,before_request,after_request依次执行
		-flask有一个请求上下文,一个应用上下文
			-ctx:
				-是:RequestContext对象:封装了request和session
				-调用了:_request_ctx_stack.push(self)就是把:ctx放到了那个位置
			-app_ctx:
				-是:AppContext(self) 对象:封装了当前的app和g
				-调用 _app_ctx_stack.push(self) 就是把:app_ctx放到了那个位置
	-g是个什么鬼?
		专门用来存储用户信息的g对象,g的全称的为global 
		g对象在一次请求中的所有的代码的地方,都是可以使用的 
		
		
	-代理模式
		-request和session就是代理对象,用的就是代理模式

G对象是什么

#1 g是一个全局变量,在当前请求中可以放值,取值

#2 session对象是可以跨request的,只要session还未失效,不同的request的请求会获取到同一个session,但是g对象不是,g对象不需要管过期时间,请求一次就g对象就改变了一次,或者重新赋值了一次

flask-session

# 1 替换flask内置的session,支持存到redis,存到数据库

#2 flask-session如何使用
	方式一:
conn=redis.Redis(host='127.0.0.1',port=6379)
app.session_interface=RedisSessionInterface(conn,'lqz',permanent=False)
    方式二:
# from redis import Redis
# from flask.ext.session import Session
# app.config['SESSION_TYPE'] = 'redis'
#
# app.config['SESSION_KEY_PREFIX'] = 'lqz'
#
# app.config['SESSION_REDIS'] = Redis(host='127.0.0.1',port='6379')
# # 本质跟上面一样
# # 类似的用法在flask中很常见 函数(app)
# Session(app)


# 问题1
	-关闭浏览器cookie失效
    		app.session_interface=RedisSessionInterface(conn,key_prefix='lqz',permanent=False)
# 问题2 
	-cookie默认超时时间是多少?如何设置超时时间
    'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),#这个配置文件控制

数据库连接池

#pip install DBUtils

import pymysql

from DBUtils.PooledDB import PooledDB
import time
from threading import Thread
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    host='127.0.0.1',
    port=3306,
    user='root',
    password='123',
    database='flask',
    charset='utf8'
)


def func():
    # 检测当前正在运行连接数的是否小于最大链接数,如果不小于则:等待或报raise TooManyConnections异常
    # 否则
    # 则优先去初始化时创建的链接中获取链接 SteadyDBConnection。
    # 然后将SteadyDBConnection对象封装到PooledDedicatedDBConnection中并返回。
    # 如果最开始创建的链接没有链接,则去创建一个SteadyDBConnection对象,再封装到PooledDedicatedDBConnection中并返回。
    # 一旦关闭链接后,连接就返回到连接池让后续线程继续使用。
    conn = POOL.connection()

    # print(th, '链接被拿走了', conn1._con)
    # print(th, '池子里目前有', pool._idle_cache, '
')

    cursor = conn.cursor()
    cursor.execute('select * from boy')
    result = cursor.fetchall()
    time.sleep(2)
    print(result)
    conn.close()

if __name__ == '__main__':
    for i in range(10):
        t=Thread(target=func)
        t.start()



import pymysql
from settings import Config
class SQLHelper(object):

    @staticmethod
    def open(cursor):
        POOL = Config.PYMYSQL_POOL
        conn = POOL.connection()
        cursor = conn.cursor(cursor=cursor)
        return conn,cursor

    @staticmethod
    def close(conn,cursor):
        conn.commit()
        cursor.close()
        conn.close()

    @classmethod
    def fetch_one(cls,sql,args,cursor =pymysql.cursors.DictCursor):
        conn,cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchone()
        cls.close(conn,cursor)
        return obj

    @classmethod
    def fetch_all(cls,sql, args,cursor =pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        obj = cursor.fetchall()
        cls.close(conn, cursor)
        return obj
    @classmethod
    def execute(cls,sql, args,cursor =pymysql.cursors.DictCursor):
        conn, cursor = cls.open(cursor)
        cursor.execute(sql, args)
        cls.close(conn, cursor)

wtforms(forms组件)

1 校验数据
2 渲染标签
详见代码

#pip3 install wtforms
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')

app.debug = True


class LoginForm(Form):
    # 字段(内部包含正则表达式)
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired(message='用户名不能为空.'),
            validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
        ],
        widget=widgets.TextInput(), # 页面上显示的插件
        render_kw={'class': 'form-control'}

    )
    # 字段(内部包含正则表达式)
    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.'),
            validators.Length(min=8, message='用户名长度必须大于%(min)d'),
            validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*d)(?=.*[$@$!%*?&])[A-Za-zd$@$!%*?&]{8,}",
                              message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')

        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )



@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'GET':
        form = LoginForm()
        return render_template('login.html', form=form)
    else:
        form = LoginForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('login.html', form=form)

if __name__ == '__main__':
    app.run()

#login.html
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>登录</h1>
    <form method="post">
        <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>

        <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
        <input type="submit" value="提交">
    </form>
    </body>
    </html>

    
#使用2
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets

app = Flask(__name__, template_folder='templates')
app.debug = True



class RegisterForm(Form):
    name = simple.StringField(
        label='用户名',
        validators=[
            validators.DataRequired()
        ],
        widget=widgets.TextInput(),
        render_kw={'class': 'form-control'},
        default='alex'
    )

    pwd = simple.PasswordField(
        label='密码',
        validators=[
            validators.DataRequired(message='密码不能为空.')
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    pwd_confirm = simple.PasswordField(
        label='重复密码',
        validators=[
            validators.DataRequired(message='重复密码不能为空.'),
            validators.EqualTo('pwd', message="两次密码输入不一致")
        ],
        widget=widgets.PasswordInput(),
        render_kw={'class': 'form-control'}
    )

    email = html5.EmailField(
        label='邮箱',
        validators=[
            validators.DataRequired(message='邮箱不能为空.'),
            validators.Email(message='邮箱格式错误')
        ],
        widget=widgets.TextInput(input_type='email'),
        render_kw={'class': 'form-control'}
    )

    gender = core.RadioField(
        label='性别',
        choices=(
            (1, '男'),
            (2, '女'),
        ),
        coerce=int # “1” “2”
     )
    city = core.SelectField(
        label='城市',
        choices=(
            ('bj', '北京'),
            ('sh', '上海'),
        )
    )

    hobby = core.SelectMultipleField(
        label='爱好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        coerce=int
    )

    favor = core.SelectMultipleField(
        label='喜好',
        choices=(
            (1, '篮球'),
            (2, '足球'),
        ),
        widget=widgets.ListWidget(prefix_label=False),
        option_widget=widgets.CheckboxInput(),
        coerce=int,
        default=[1, 2]
    )

    def __init__(self, *args, **kwargs):
        super(RegisterForm, self).__init__(*args, **kwargs)
        self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))

    def validate_pwd_confirm(self, field):
        """
        自定义pwd_confirm字段规则,例:与pwd字段是否一致
        :param field:
        :return:
        """
        # 最开始初始化时,self.data中已经有所有的值

        if field.data != self.data['pwd']:
            # raise validators.ValidationError("密码不一致") # 继续后续验证
            raise validators.StopValidation("密码不一致")  # 不再继续后续验证


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'GET':
        form = RegisterForm(data={'gender': 2,'hobby':[1,]}) # initial
        return render_template('register.html', form=form)
    else:
        form = RegisterForm(formdata=request.form)
        if form.validate():
            print('用户提交数据通过格式验证,提交的值为:', form.data)
        else:
            print(form.errors)
        return render_template('register.html', form=form)



if __name__ == '__main__':
    app.run()

#login.html
    <!DOCTYPE html>
    <html lang="en">
    <head>
        <meta charset="UTF-8">
        <title>Title</title>
    </head>
    <body>
    <h1>用户注册</h1>
    <form method="post" novalidate style="padding:0  50px">
        {% for field in form %}
        <p>{{field.label}}: {{field}} {{field.errors[0] }}</p>
        {% endfor %}
        <input type="submit" value="提交">
    </form>
    </body>
    </html>

信号

1 semaphore跟线程的它没有半毛钱关系

2 signal翻译过来的,flask的signal

3 信号是同步操作

4 如何使用(内置的)
	######### 内置信号的使用
    ##第一步写一个函数(触发某些动作)
    # 往信号中注册函数
    def func(*args,**kwargs):
        print(args[0])  # 当前app对象
        print('触发信号',args,kwargs)
    # 第二步:函数跟内置信号绑定
    signals.request_started.connect(func)
    
5 自定义信号的使用
	# 自定义信号
    # #第一步:定义一个信号
    # xxxxx = _signals.signal('xxxxx')
    # # 第二步:定义一个函数
    # def func3(*args,**kwargs):
    #     import time
    #     time.sleep(1)
    #     print('触发信号',args,kwargs)
    # #第三步:信号跟函数绑定
    # xxxxx.connect(func3)

    #第四步:触发信号
    xxxxx.send(1,k='2')

多app应用(了解)

### 多个app实例(启用)
from werkzeug.wsgi import DispatcherMiddleware
from werkzeug.serving import run_simple
from flask import Flask, current_app
app1 = Flask('app01')
app2 = Flask('app02')

@app1.route('/index')
def index():
    return "app01"

@app2.route('/index2')
def index2():
    return "app2"

# http://www.oldboyedu.com/index
# http://www.oldboyedu.com/sec/index2
dm = DispatcherMiddleware(app1, {
    '/sec': app2,
})

if __name__ == "__main__":
    run_simple('localhost', 5000, dm)
    # 请求来了,会执行dm()--->__call__

flask-script(制定命令)

1 模拟出类似django的启动方式:python manage.py runserver

2 pip install flask-script

3 把excel的数据导入数据库,定制个命令,去执行(openpyxl)
	python manage.py insertdb -f xxx.excl -t aa
    
4 使用
	-方式一:python manage.py runserver
    from flask import Flask
    from flask_script import Manager
    app = Flask(__name__)
    manager=Manager(app)
    if __name__ == '__main__':
        manager.run()
    -方式二:自定制命令
        @manager.command
        def custom(arg):
            print(arg)
        @manager.option('-n', '--name', dest='name')
        @manager.option('-u', '--url', dest='url')
        def cmd(name, url):
            print(name, url)
            
5 创建超级用户

6 现在有一万条excel用户,批量导入到数据库中
	-navicate直接支持
    -脚本
    -flask-script

sqlalchemy

概念

1 sqlachemy:第三方orm框架(对象关系映射)
	-go 中gorm,xorm
    -python中:django orm,sqlachemy,peewee
    -老刘带你手写的:https://www.cnblogs.com/liuqingzheng/articles/9006025.html
        
2 django orm,只能在django中用,不能单独用

3 使用 pip install sqlachemy

4 SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件

5 补充:django orm反向生成models
	-python manage.py inspectdb > app/models.py

基本使用(原生sql)

import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine

# 第一步生成一个engine对象
engine = create_engine(
    "mysql+pymysql://root:123@127.0.0.1:3306/flask?charset=utf8",
    max_overflow=0,  # 超过连接池大小外最多创建的连接
    pool_size=5,  # 连接池大小
    pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
)

# 第二步:创建连接(执行原生sql)
conn = engine.raw_connection()

# 第三步:获取游标对象
cursor = conn.cursor()

# 第四步:具体操作
cursor.execute('select * from boy')

res=cursor.fetchall()
print(res)

# 比pymysql优势在,有数据库连接池

orm使用

# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base

# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index

# 制造了一个类,作为所有模型类的基类
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
    id = Column(Integer, primary_key=True)  # id 主键
    # mysql中主键自动建索引:聚簇索引
    # 其他建建的索引叫:辅助索引
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    # email = Column(String(32), unique=True)  # 唯一
    # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
    # extra = Column(Text, nullable=True)

    #类似于djagno的 Meta
    # __table_args__ = (
    #     UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
    #     Index('ix_id_name', 'name', 'email'), #索引
    # )



# 创建表
def create_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象创建表
    Base.metadata.create_all(engine)

# 删除表
def drop_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象删除所有表
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    # create_table()
    drop_table()

# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持

线程安全

#基于scoped_session实现线程安全
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User  # pycharm报错,不会影响我们
from sqlalchemy.orm import scoped_session

# 1 制作engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)

# 2 制造一个 session 类(会话)
Session = sessionmaker(bind=engine)    # 得到一个类

# 3 得到一个session对象(线程安全的session)
#现在的session已经不是session对象了
#为什么线程安全,还是用的local
session = scoped_session(Session)

# session=Session()

# 4 创建一个对象
obj1 = User(name="2008")

# 5 把对象通过add放入
session.add(obj1)
# session.aaa()

# 6 提交
session.commit()
session.close()


# 类不继承Session类,但是有该类的所有方法(通过反射,一个个放进去)

# scoped_session.add------->instrument(name)--->do函数内存地址---》现在假设我要这么用:session.add()--->do()
# scoped_session.close----->instrument(name)--->do函数内存地址

基本的增删查改

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.orm import scoped_session
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)

Session = sessionmaker(bind=engine)
# session = scoped_session(Session)
session=Session()

####1 新增多个对象
# obj=User(name='xxx')
# obj2=User(name='yyyy')
# obj3=User(name='zzz')
#新增同样对象
# session.add_all([obj,obj2,obj3])
#新增不同对象
# session.add_all([Person(name='lqz'),Hobby()])

####2 简单删除(查到删除)
# res=session.query(User).filter_by(name='2008').delete()
# res=session.query(User).filter(User.id>=2).delete()
# # 影响1行
# print(res)

#### 3 修改
# res=session.query(User).filter_by(id=1).update({User.name:'ccc'})
# res=session.query(User).filter_by(id=1).update({'name':'ccc'})

# session.query(User).filter(User.id > 0).update({User.name: User.name + "099"}, synchronize_session=False) # 如果要把它转成字符串相加
# session.query(User).filter(User.id > 0).update({"age": User.age + 1}, synchronize_session="evaluate")  ## 如果要把它转成数字相加


####4 基本查询操作

# res=session.query(User).all()
# print(type(res))
# res=session.query(User).first()
# print(res)

#filter传的是表达式,filter_by传的是参数
# res=session.query(User).filter(User.id==1).all()
# res=session.query(User).filter(User.id>=1).all()
# res=session.query(User).filter(User.id<1).all()

# res=session.query(User).filter_by(name='ccc099').all()


#了解
# res = session.query(User).from_statement(text("SELECT * FROM users where name=:name")).params(name='ccc099').all()
# print(res)


session.commit()
# 并没有真正关闭连接,而是放回池中
session.close()

高级操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()


# 1 查询名字为lqz的所有user对象
# ret = session.query(User).filter_by(name='ccc099').all()

# 2 表达式,and条件连接
# ret = session.query(User).filter(User.id > 1, User.name == 'egon').all()
# 查找id在1和10之间,并且name=egon的对象
# ret = session.query(User).filter(User.id.between(1, 10), User.name == 'egon').all()

# in条件(class_,因为这是关键字,不能直接用)
# ret = session.query(User).filter(User.id.in_([1,3,4])).all()

# 取反 ~
ret = session.query(User).filter(~User.id.in_([1,3,4])).all()

#二次筛选
# select *
# ret = session.query(User).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()
# # select name,id 。。。。
# ret = session.query(User.id,User.name).filter(User.id.in_(session.query(User.id).filter_by(name='egon'))).all()

'''
SELECT users.id AS users_id, users.name AS users_name 
FROM users 
WHERE users.id IN (SELECT users.id AS users_id 
FROM users 
WHERE users.name = %(name_1)s)

'''


#
from sqlalchemy import and_, or_
#or_包裹的都是or条件,and_包裹的都是and条件
#查询id>3并且name=egon的人
# ret = session.query(User).filter(and_(User.id > 3, User.name == 'egon')).all()

# 查询id大于2或者name=ccc099的数据
# ret = session.query(User).filter(or_(User.id > 2, User.name == 'ccc099')).all()
# ret = session.query(User).filter(
#     or_(
#         User.id < 2,
#         and_(User.name == 'egon', User.id > 3),
#         User.extra != ""
#     )).all()
# print(ret)

'''
select *from user where id<2 or (name=egon and id >3) or extra !=''
'''


# 通配符,以e开头,不以e开头
# ret = session.query(User).filter(User.name.like('e%')).all()
# ret = session.query(User).filter(~User.name.like('e%')).all()

# 限制,用于分页,区间 limit
# 前闭后开区间,1能取到,3取不到
ret = session.query(User)[1:3]

'''
select * from users limit 1,2;
'''


# 排序,根据name降序排列(从大到小)
# ret = session.query(User).order_by(User.name.desc()).all()
# ret = session.query(User).order_by(User.name.asc()).all()
#第一个条件降序排序后,再按第二个条件升序排
# ret = session.query(User).order_by(User.id.asc(),User.name.desc()).all()
# ret = session.query(User).order_by(User.name.desc(),User.id.asc()).all()


# 分组
from sqlalchemy.sql import func

# ret = session.query(User).group_by(User.name).all()
#分组之后取最大id,id之和,最小id
# sql 分组之后,要查询的字段只能有分组字段和聚合函数
# ret = session.query(
#     func.max(User.id),
#     func.sum(User.id),
#     func.min(User.id),
#     User.name).group_by(User.name).all()
# '''
# select max(id),sum(id),min(id) from user group by name;
#
# '''
# for obj in ret:
#     print(obj[0],'----',obj[1],'-----',obj[2],'-----',obj[3])
# print(ret)

#haviing筛选
# ret = session.query(
#     func.max(User.id),
#     func.sum(User.id),
#     func.min(User.id)).group_by(User.name).having(func.min(User.id) >2).all()

'''
select max(id),sum(id),min(id) from user group by name having min(id)>2;

'''
print(ret)
session.commit()

session.close()

多表操作

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import User,Person,Hobby,Boy,Girl,Boy2Girl
from sqlalchemy.sql import text
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/aaa", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session=Session()



###  1 一对多插入数据
# obj=Hobby(caption='足球')
# session.add(obj)
# p=Person(name='张三',hobby_id=2)
# session.add(p)

### 2 方式二(默认情况传对象有问题)
###### Person表中要加 hobby = relationship('Hobby', backref='pers')
# p=Person(name='李四',hobby=Hobby(caption='美女'))
# 等同于
# p=Person(name='李四2')
# p.hobby=Hobby(caption='美女2')
# session.add(p)

## 3 方式三,通过反向操作
# hb = Hobby(caption='人妖')
# hb.pers = [Person(name='文飞'), Person(name='博雅')]
# session.add(hb)


#### 4 查询(查询:基于连表的查询,基于对象的跨表查询)
### 4.1 基于对象的跨表查询(子查询,两次查询)
# 正查
# p=session.query(Person).filter_by(name='张三').first()
# print(p)
# print(p.hobby.caption)
# 反查
# h=session.query(Hobby).filter_by(caption='人妖').first()
# print(h.pers)

### 4.2 基于连表的跨表查(查一次)
# 默认根据外键连表
# isouter=True 左外连,表示Person left join Hobby,没有右连接,反过来即可
# 不写 inner join
# person_list=session.query(Person,Hobby).join(Hobby,isouter=True).all()
# print(person_list)
# print(person_list)
# for row in person_list:
#     print(row[0].name,row[1].caption)

# '''
# select * from person left join hobby on person.hobby_id=hobby.id
# '''
#
# ret = session.query(Person, Hobby).filter(Person.hobby_id == Hobby.id)
# print(ret)
# '''
# select * from user,hobby where user.id=favor.nid;
#
# '''


#join表,默认是inner join
# ret = session.query(Person).join(Hobby)
# # ret = session.query(Hobby).join(Person,isouter=True)
# '''
# SELECT *
# FROM person INNER JOIN hobby ON hobby.id = person.hobby_id
# '''
# print(ret)


# 指定连表字段(从来没用过)
# ret = session.query(Person).join(Hobby,Person.nid==Hobby.id, isouter=True)
# # ret = session.query(Person).join(Hobby,Person.hobby_id==Hobby.id, isouter=True).all()
# print(ret)
'''
SELECT *
FROM person LEFT OUTER JOIN hobby ON person.nid = hobby.id

'''

# print(ret)




# 组合(了解)UNION 操作符用于合并两个或多个 SELECT 语句的结果集
# union和union all的区别?
# q1 = session.query(User.name).filter(User.id > 2)  # 6条数据
# q2 = session.query(User.name).filter(User.id < 8) # 2条数据


# q1 = session.query(User.id,User.name).filter(User.id > 2)  # 6条数据
# q2 = session.query(User.id,User.name).filter(User.id < 8) # 2条数据
# ret = q1.union_all(q2).all()
# ret1 = q1.union(q2).all()
# print(ret)
# print(ret1)
#
# q1 = session.query(User.name).filter(User.id > 2)
# q2 = session.query(Hobby.caption).filter(Hobby.nid < 2)
# ret = q1.union_all(q2).all()







#### 多对多

# session.add_all([
#     Boy(hostname='霍建华'),
#     Boy(hostname='胡歌'),
#     Girl(name='刘亦菲'),
#     Girl(name='林心如'),
# ])
# session.add_all([
#     Boy2Girl(girl_id=1, boy_id=1),
#     Boy2Girl(girl_id=2, boy_id=1)
# ])


##### 要有girls = relationship('Girl', secondary='boy2girl', backref='boys')
# girl = Girl(name='张娜拉')
# girl.boys = [Boy(hostname='张铁林'),Boy(hostname='费玉清')]
# session.add(girl)

# boy=Boy(hostname='蔡徐坤')
# boy.girls=[Girl(name='谢娜'),Girl(name='巧碧螺')]
# session.add(boy)
# session.commit()


# 基于对象的跨表查

# girl=session.query(Girl).filter_by(id=3).first()
# print(girl.boys)

#### 基于连表的跨表查询

# 查询蔡徐坤约过的所有妹子
'''
select girl.name from girl,boy,Boy2Girl where boy.id=Boy2Girl.boy_id and girl.id=Boy2Girl.girl_id where boy.name='蔡徐坤'

'''
# ret=session.query(Girl.name).filter(Boy.id==Boy2Girl.boy_id,Girl.id==Boy2Girl.girl_id,Boy.hostname=='蔡徐坤').all()

'''
select girl.name from girl inner join Boy2Girl on girl.id=Boy2Girl.girl_id inner join boy on boy.id=Boy2Girl.boy_id where boy.hostname='蔡徐坤'

'''
# ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter(Boy.hostname=='蔡徐坤').all()
ret=session.query(Girl.name).join(Boy2Girl).join(Boy).filter_by(hostname='蔡徐坤').all()
print(ret)


### 执行原生sql(用的最多的)
### django中orm如何执行原生sql
#
# cursor = session.execute('insert into users(name) values(:value)',params={"value":'xxx'})
# print(cursor.lastrowid)
# session.commit()

session.close()

models.py



# 创建一个个类(继承谁?字段怎么写)
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
# 字段和字段属性
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
# 制造了一个类,作为所有模型类的基类
Base = declarative_base()

class User(Base):
    __tablename__ = 'users'  # 数据库表名称(固定写法),如果不写,默认以类名小写作为表的名字
    id = Column(Integer, primary_key=True)  # id 主键
    # mysql中主键自动建索引:聚簇索引
    # 其他建建的索引叫:辅助索引
    name = Column(String(32), index=True, nullable=False)  # name列,索引,不可为空
    # email = Column(String(32), unique=True)  # 唯一
    # #datetime.datetime.now不能加括号,加了括号,以后永远是当前时间
    # ctime = Column(DateTime, default=datetime.datetime.now) # default默认值
    # extra = Column(Text, nullable=True)

    #类似于djagno的 Meta
    # __table_args__ = (
    #     UniqueConstraint('id', 'name', name='uix_id_name'), #联合唯一
    #     Index('ix_id_name', 'name', 'email'), #索引
    # )
    def __str__(self):
        return self.name
    def __repr__(self):
        # python是强类型语言
        return self.name+str(self.id)




# 一对多关系

# 一个Hobby可以有很多人喜欢
# 一个人只能由一个Hobby
class Hobby(Base):
    __tablename__ = 'hobby'
    id = Column(Integer, primary_key=True)
    caption = Column(String(50), default='篮球')


class Person(Base):
    __tablename__ = 'person'
    nid = Column(Integer, primary_key=True)
    name = Column(String(32), index=True, nullable=True)
    # hobby指的是tablename而不是类名,uselist=False
    # 一对多的关系,关联字段写在多的一方
    hobby_id = Column(Integer, ForeignKey("hobby.id"))  # 默认可以为空

    # 跟数据库无关,不会新增字段,只用于快速链表操作
    # 类名,backref用于反向查询
    hobby = relationship('Hobby', backref='pers')


# 多对多关系
# 实实在在存在的表
class Boy2Girl(Base):
    __tablename__ = 'boy2girl'
    id = Column(Integer, primary_key=True, autoincrement=True) # autoincrement自增,默认是True
    girl_id = Column(Integer, ForeignKey('girl.id'))
    boy_id = Column(Integer, ForeignKey('boy.id'))



class Girl(Base):
    __tablename__ = 'girl'
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)


class Boy(Base):
    __tablename__ = 'boy'

    id = Column(Integer, primary_key=True, autoincrement=True)
    hostname = Column(String(64), unique=True, nullable=False)

    # 与生成表结构无关,仅用于查询方便,放在哪个单表中都可以
    # secondary 通过哪个表建关联,跟django中的through一模一样
    girls = relationship('Girl', secondary='boy2girl', backref='boys')

# 创建表
def create_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象创建表
    Base.metadata.create_all(engine)

# 删除表
def drop_table():
    # 创建engine对象
    engine = create_engine(
        "mysql+pymysql://root:123@127.0.0.1:3306/aaa?charset=utf8",
        max_overflow=0,  # 超过连接池大小外最多创建的连接
        pool_size=5,  # 连接池大小
        pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
        pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    )
    # 通过engine对象删除所有表
    Base.metadata.drop_all(engine)

if __name__ == '__main__':
    create_table()  # 原来已经存在user表,再执行一次不会有问题
    # drop_table()

# 创建库?手动创建库
# 问题,sqlachemy支持修改字段吗?不支持

Flask-SQLAlchemy

1 Flask-SQLAlchemy

2 flask-migrate
    -python3 manage.py db init 初始化:只执行一次
    -python3 manage.py db migrate 等同于 makemigartions
    -python3 manage.py db upgrade 等同于migrate
    
3 看代码

4 Flask-SQLAlchemy如何使用
	1 from flask_sqlalchemy import SQLAlchemy
	2 db = SQLAlchemy()
    3 db.init_app(app)
    4 以后在视图函数中使用
    	-db.session 就是咱们讲的session
        
5 flask-migrate的使用(表创建,字段修改)
	1 from flask_migrate import Migrate,MigrateCommand
    2 Migrate(app,db)
	3 manager.add_command('db', MigrateCommand)
    
6 直接使用
    -python3 manage.py db init 初始化:只执行一次,创建migrations文件夹
    -python3 manage.py db migrate 等同于 makemigartions
    -python3 manage.py db upgrade 等同于migrate
	
原文地址:https://www.cnblogs.com/linqiaobao/p/13822060.html