1.14 Flask websocket,加密 解密 Flask_session

Flask

DBpool

import pymysql
from DBUtils.PooledDB import PooledDB
POOL = PooledDB(
    creator=pymysql,  # 使用链接数据库的模块
    maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    maxshared=3,  # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,
    # 因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,
    # _maxcached永远为0,所以永远是所有链接都共享。
    blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    ping=0,
    # ping MySQL服务端,检查是否服务可用。
    #  如:0 = None = never,
    # 1 = default = whenever it is requested,
    # 2 = when a cursor is created,
    # 4 = when a query is executed,
    # 7 = always
    host="127.0.0.1",
    port=3306,
    user="root",
    password="",
    charset="utf8",
    db="day115"
)

#
# conn = POOL.connection() # pymysql - conn
# cur = conn.cursor(cursor=pymysql.cursors.DictCursor)
#
# sql = "select * from users WHERE name='jwb' and age=73 "
#
# res = cur.execute(sql)
#
#
# print(cur.fetchall())
#
# conn.close()

mysql回顾

import pymysql

mysql_conn = pymysql.connect(host="127.0.0.1",
                             port=3306,
                             user="root",
                             password="",
                             charset="utf8",
                             db="day115")
c = mysql_conn.cursor(cursor=pymysql.cursors.DictCursor)

sql = "select * from users WHERE name='jwb' and age=73 "

res = c.execute(sql)


print(c.fetchall())

c.close()
mysql_conn.close()

flask_session

from flask import Flask, session
from flask_session import Session
from redis import Redis

app = Flask(__name__)
# app.secret_key = "asdf%^&*"
app.config["SESSION_TYPE"] = "redis"
app.config["SESSION_REDIS"] = Redis(host="127.0.0.1", port=6379, db=6)

Session(app)


@app.route("/")
def index():
    session["user"] = "value"
    return "hello"


if __name__ == '__main__':
    app.run(debug=True)

    
# 相关参数介绍
from flask import Flask, render_template,request
from wtforms.fields import simple, core
from wtforms import validators
from wtforms import Form

app = Flask(__name__)


class LoginForm(Form):
    username = simple.StringField(
        label="用户名",  # 标签标记
        validators=[validators.DataRequired(message="用户名不能为空"),
                    validators.Length(min=3,max=8,message="不是长了就是短了")],  # 校验条件 可迭代条件
        # description='11111111111',  # 描述标记
        id="user_id",  # 标签ID
        default=None,  # 默认值
        widget=None,  # 默认组件(input type="text") 在StringField中已经被实例化了
        render_kw={"class":"my_login"},  # {"class":"my_login"}
    )
    password = simple.PasswordField(
        label="密码",  # 标签标记
        validators=[validators.DataRequired(message="密码不能为空"),
                    validators.Length(min=3, max=16, message="不是长了就是短了"),
                    validators.Email(message="密码必须符合邮箱规则")],  # 不知道
        # description='11111111111',  # 描述标记
        id="user_id",  # 标签ID
        default=None,  # 默认值
        widget=None,  # 默认组件(input type="text") 在StringField中已经被实例化了
        render_kw={"class": "my_login"},  # {"class":"my_login"}
    )


class RegForm(Form):
    username = simple.StringField(
        label="用户名",  # 标签标记
        validators=[validators.DataRequired(message="用户名不能为空"),
                    validators.Length(min=3,max=8,message="用户名不是长了就是短了")],  # 校验条件 可迭代条件
    )

    password = simple.PasswordField(
        label="密码",  # 标签标记
        validators=[validators.DataRequired(message="密码不能为空"),
                    validators.Length(min=3, max=16, message="密码不是长了就是短了"),
                    validators.Email(message="密码必须符合邮箱规则")],
    )

    repassword = simple.PasswordField(
        label="确认密码",  # 标签标记
        validators=[validators.EqualTo(fieldname="password",message="眼神未确认")]
    )

    gender = core.RadioField(
        label="性别",
        coerce=str,
        choices=(
            ("1","女"),
            ("2","男")
        ),
        default="1"
    )

    hobby = core.SelectMultipleField(
        label="爱好",
        validators=[validators.Length(min=2,max=4,message="癖好有问题")],
        coerce=int,
        choices=(
            (1, "fengjie"),
            (2, "luoyufeng"),
            (3, "lixueqin"),
            (4, "wuyifan"),
            (5, "panta")
        ),
        default=(1,3,5)
    )



@app.route("/",methods=["GET","POST"])
def index():
    if request.method == "GET":
        fm = LoginForm()
        return render_template("index.html",wtf = fm)
    else:
        new_fm = LoginForm(request.form)
        if new_fm.validate():
            return new_fm.data.get("password")
        else:
            return render_template("index.html", wtf=new_fm)

@app.route("/reg",methods=["GET","POST"])
def reg():
    if request.method == "GET":
        rf = RegForm()
        return render_template("reg.html",rf = rf)
    else:
        rf = RegForm(request.form)
        if rf.validate():
            print(type(rf.data.get("gender")),rf.data.get("gender"))
            return rf.data.get("password")
        else:
            return render_template("reg.html", rf=rf)

if __name__ == '__main__':
    app.run(debug=True)
    app.__call__

sqlhelper

from dbpool import POOL
import pymysql

# sql = ""


def create_conn():
    conn = POOL.connection()
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)

    return conn, cursor


def close_conn(conn, cursor):
    cursor.close()
    conn.close()


def insert(sql, args):
    conn, cursor = create_conn()
    res = cursor.execute(sql, args)
    conn.commit()
    close_conn(conn, cursor)
    return res


def update(sql, args):
    conn, cursor = create_conn()
    res = cursor.execute(sql, args)
    conn.commit()
    close_conn(conn, cursor)
    return res


def fetch_one(sql, args):
    conn, cursor = create_conn()
    cursor.execute(sql, args)
    res = cursor.fetchone()
    close_conn(conn, cursor)
    return res


def fetch_all(sql, args):
    conn, cursor = create_conn()
    cursor.execute(sql, args)
    res = cursor.fetchall()
    close_conn(conn, cursor)
    return res


# sql = "insert into users(name,age) VALUES (%s, %s)"

# insert(sql,("mjj",9))

search_sql = "Select * from users where name=%s and age=%s"
update_sql = "UPDATE users SET name=%s and age=%s"


print(fetch_one(search_sql, ("mjj", 9)))

websocket_hello

import socket
import base64
import hashlib

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('127.0.0.1', 9527))
sock.listen(5)
# 获取客户端socket对象
conn, address = sock.accept()
# 获取客户端的【握手】信息
data = conn.recv(1024)
print(data)
"""
GET /ws HTTP/1.1

Host: 127.0.0.1:9527

Connection: Upgrade

Pragma: no-cache

Cache-Control: no-cache

User-Agent: Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36

Upgrade: websocket

Origin: http://localhost:63342

Sec-WebSocket-Version: 13

Accept-Encoding: gzip, deflate, br

Accept-Language: zh-CN,zh;q=0.9

Cookie: session=a6f96c20-c59e-4f33-84d9-c664a2f29dfc

Sec-WebSocket-Key: MAZZU5DPIxWmhk/UWL2+BA==

Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits


"""
# 以下动作是有websockethandler完成的
# magic string为:258EAFA5-E914-47DA-95CA-C5AB0DC85B11


def get_headers(data):
    header_dict = {}
    header_str = data.decode("utf8")
    for i in header_str.split("
"):
        if str(i).startswith("Sec-WebSocket-Key"):
            header_dict["Sec-WebSocket-Key"] = i.split(":")[1].strip()

    return header_dict


headers = get_headers(data)  # 提取请求头信息
#
magic_string = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'
#Sec-WebSocket-Key: MAZZU5DPIxWmhk/UWL2+BA==
value = headers['Sec-WebSocket-Key'] + magic_string
print(value)
ac = base64.b64encode(hashlib.sha1(value.encode('utf-8')).digest())

# 对请求头中的sec-websocket-key进行加密
response_tpl = "HTTP/1.1 101 Switching Protocols
" 
               "Upgrade:websocket
" 
               "Connection: Upgrade
" 
               "Sec-WebSocket-Accept: %s
" 
               "WebSocket-Location: ws://127.0.0.1:9527

"
print(ac.decode('utf-8'))
response_str = response_tpl % (ac.decode('utf-8'))
# 响应【握手】信息
conn.send(response_str.encode("utf8"))
#
while True:
    msg = conn.recv(8096)
    print(msg)

websocket_加密

import struct
msg_bytes = "the emperor has not been half-baked in the early days of the collapse of the road, today down three points, yizhou weakness, this serious crisis autumn".encode("utf8")
token = b"x81"  # + 数据长度/运算位 + mask/数据长度 + mask/数据 + 数据
length = len(msg_bytes)

if length < 126:
    token += struct.pack("B", length)
elif length == 126:
    token += struct.pack("!BH", 126, length)
else:
    token += struct.pack("!BQ", 127, length)

msg = token + msg_bytes

print(msg)

websocket_解密

# b'x81x89xf3x99x81-x15x05x01xcbOx1bex97]'
# b'x81x85sx92ax10x1bxf7
|x1c'
# b'x81x83Hxc0xxa6yxf2K'

hashstr = b'x81x85sx92ax10x1bxf7
|x1c'
# b'x81 x85s x92ax10x1bxf7  
|x1c' <126
# x85s = 5
hashstr = b'x81xfex02xdcx8dxe8-xb2hmxa5W5uxc8:x16x0cx95(ktx87Wx00bxc52x01x0cx95x1fdixbeW9Axcbx1cx0fx07x91>iSxa7W)Axc9
x06x0cx95;h`xab]1dxca)x07
x9a,j~x9fW1bxc2x0ex01x0ex80x16eGxb7Wx00Yxcb2(
x80*iRx8cV4cxcax15x06x0cx94-nhxafU	^xc9x0cx00
xa0x19iQxa6Z
Kxc9
x00x0exaa:iRxa3Wx0bmxc2x0ex01
x92x12hWxbaV4cxc8x11&
x92*eRx86V7fxc8x16x1bx00xad7bTxa1Ux16~xc5
0
xa8:hPxb0V4cxcbx1cx07x01xac5bTxa1T!Zxcb8(x0cx949iRxa3[x14sxc9
x06x0cx94-nhxafZ"rxc8x1cx11
x912hTx8dWx11Kxc8"!x07x91>iSx88Wx08axc87x05
x95/dixbaW3_xc2x0ex01x0exacx10hTxb5W2x7fxc8x11&x0cx949kXxb9]1dxc9
x00
x83.hNxa9Z
Bxc5=?x00xbb6bTxa1W1}xc8$6
x89x03iQxa4]1dxc9	(
x8c,hWx8dZ=gxc9x0bx06x00x9ax1diQxb2Q
jxc8x1c&x0cx95x1fhRxb1V5Exc2x0ex01x0cx92x03iPx97V5hxc9x0fx1ex07x91>dqxb2U0rxc55*
xbdx14bTxa1V5exc8x1cx11
x910hxxa1Q
jxc59(x0exb1;iUxb1W(Pxca8"x0fx8a#hgxa7V5Rxc8
-
xbb6ehxa8]1dxc8x1cx11x0cx96*ktxa4Wx02Pxc5x1c7
xa8x04h`xbcZ8gxc2x0ex01x0cx96x17kpx80[x14sxc9
x06
x94x01kpxa3V4cxca"x0bx07x91>iPxa0W#txc83x02x0fx8a3bTxa1V0Wxc84x08
x89$hTxafT>}xc9x0bx12x0bxad0iVxa0V5Exce2x0cx0cx93?dkxa3[x0eExcb&5x0cx949nhxacZ9Qxcax17x03x0bxad3eyx8eWx08ixcax1fx04x07x91>kEx89Ux17nxc5;"
x83,bTxa1W2x7fxc5+x1c
x92x12jRx82]1dxcb*"x0cx96x17hmxa5W5uxcax1c
x0exa6&iSx88[x0cx7fxc4+x16x0cx959nhxafT	rxc9	(x0cx95x08hFx86V5Exc9x0bx06x0cx979bTxa1V7cxcb%-
x89x15hXxa2]1dxcb0x04x0cx96x17hzx85V4cxc2x0ex01x0fxa9x04hxxa3Tx1bUxc5x13x01x07x91>hWxa8Zx0eUxc5x11%x00x8cx17dpxb4T1gxc2x0ex01x0exb1;kaxadW4Wxca)x07x0bxad0'
# print(chushibiao[1],chushibiao[1]&127)
# print(chushibiao[2:4],chushibiao[4:8])
# 将第二个字节也就是 x83 第9-16位 进行与127进行位运算
payload = hashstr[1] & 127
print(payload)
if payload == 127:
    extend_payload_len = hashstr[2:10]
    mask = hashstr[10:14]
    decoded = hashstr[14:]
# 当位运算结果等于127时,则第3-10个字节为数据长度
# 第11-14字节为mask 解密所需字符串
# 则数据为第15字节至结尾

if payload == 126:
    extend_payload_len = hashstr[2:4]
    mask = hashstr[4:8]
    decoded = hashstr[8:]
# 当位运算结果等于126时,则第3-4个字节为数据长度
# 第5-8字节为mask 解密所需字符串
# 则数据为第9字节至结尾


if payload <= 125:
    extend_payload_len = None
    mask = hashstr[2:6]
    decoded = hashstr[6:]

# 当位运算结果小于等于125时,则这个数字就是数据的长度
# 第3-6字节为mask 解密所需字符串
# 则数据为第7字节至结尾

str_byte = bytearray()
# b'x81 x85s x92ax10x1b xf7
|x1c' <126
for i in range(len(decoded)):  # 0  xf7 ^ x92a 1 
 ^ x10 x1c ^ x1b
    byte = decoded[i] ^ mask[i % 4]
    str_byte.append(byte)

print(str_byte.decode("utf8"))


ws简单了解回顾

from flask import Flask, request
from geventwebsocket.handler import WebSocketHandler
from gevent.pywsgi import WSGIServer
from geventwebsocket.websocket import WebSocket

app = Flask(__name__)


@app.route("/")
def index():
    return "hello"


@app.route("/ws")
def ws():
    user_socket = request.environ.get("wsgi.websocket")  # type:WebSocket
    while 1:
        msg = user_socket.receive()
        print(msg)
        user_socket.send(msg)


if __name__ == '__main__':
    # app.run(debug=True)
    http_server = WSGIServer(("0.0.0.0", 9527), app, handler_class=WebSocketHandler)
    http_server.serve_forever()

原文地址:https://www.cnblogs.com/zzy7372/p/10269010.html