测开之路一百三十三:实现sql函数封装

连接数据库的频率很高,所以把数据库操作封装起来

函数封装:

def make_dicts(cursor, row):
""" 将游标获取的Tuple根据数据库列表转换为dict """
return dict((cursor.description[idx][0], value) for idx, value in enumerate(row))


def get_db():
""" 获取(简历数据库链接)
g: flask内置的变量:g = LocalProxy(partial(_lookup_app_object, "g"))
"""
db = getattr(g, '_database', None)
if not db:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = make_dicts
return db


def execute_sql(sql, params=()):
""" 执行sql语句不返回数据结果 insert、update、delete """
c = get_db().cursor()
c.execute(sql, params)
c.connection.commit()


def query_sql(sql, params=(), one=False):
""" 查询数据 one=False的时候返回多条"""
c = get_db().cursor()
result = c.execute(sql, params).fetchall()
c.close()
return (result[0] if result else None) if one else result


@app.teardown_appcontext # 在当前app上下文销毁时执行
def close_connection(exeption):
""" 关闭数据库 """
db = getattr(g, '_database', None)
if db is not None:
db.close()

修改之前的数据库操作为用函数

由于返回参数从元祖变为了字典,所以要修改前端渲染取值方式,从元祖取值改为字典取值

列表展示页:feedback-list.html

{% extends 'base.html'%}

{% block main_content %}
<div class="row">
<table class="table table-hover">
<tr>
<th>ID</th>
<th>主题</th>
<th>分类</th>
<th>用户</th>
<th>邮箱</th>
<th>处理状态</th>
<th>提交时间</th>
<th>操作</th>
</tr>
{% for item in items %}
<tr>
<td>{{ loop.index }}</td><!--jinja模板提供的遍历序号功能-->
<td>{{ item.Subjeck }}</td><!--jinja模板提供的字典取值功能-->
<td>{{ item.get('CategoryID', 0) }}</td><!--python自带的字典取值功能-->
<td>{{ item['UserName'] }}</td><!--python自带的字典取值功能-->
<td>{{ item['Email'] }}</td>
<td><span class="label label-{{ 'danger' if item['State'] ==0 else 'success' }}">{{ "未处理" if item['State'] ==0 else "已处理" }}</span></td>
<td>{{ item['ReleaseTime'] }}</td>
<td>
<a href="#" class="btn btn-success">查看</a>
<a href="{{ url_for('edit', id=item['rowid']) }}" class="btn btn-default">编辑</a>
<a href="{{ url_for('delete_feedback', id=item['rowid']) }}" class="btn btn-danger">删除</a>
</td>
</tr>
{% endfor %}
</table>
</div>

{% endblock %}

编辑页:edit.html

<!--继承base.html-->
{% extends 'base.html' %}

<!--引用base.html预留的正文部分-->
{% block main_content %}
<div class="row">
<div class="panel panel-default">
<div class="panel-heading">
<h4>问题反馈信息编辑</h4>
</div>
<div class="panel-body">
<!--保存的url-->
<form action="{{ url_for('save_edit') }}" method="POST" class="form-horizontal">
<div class="form-group">
<label for="subject" class="control-label col-md-2">主题</label>
<div class="col-md-6">
<!--渲染主题-->
<input type="text" value="{{ item['Subjeck'] }}" class="form-control" id="subject" name="subject">
<!--拿到id,不渲染,用于保存的时候做sql条件-->
<input type="hidden" value="{{ item['rowid'] }}" class="form-control" id="rowid" name="rowid">
</div>
</div>
<div class="form-group">
<label for="category" class="control-label col-md-2">问题分类</label>
<div class="col-md-2">
<select name="category" id="category" class="form-control">
<!--显示当前问题的问题分类-->
{% for category in categories %}
<option {{ 'selected=selected' if item['CategoryID']==category['rowid'] else '' }} value="{{ category['rowid'] }}">{{ category['CategoryName'] }}</option>
{% endfor %}
</select>
</div>
</div>
<div class="form-group">
<label for="username" class="control-label col-md-2">姓名</label>
<div class="col-md-2">
<!--渲染姓名-->
<input type="text" class="form-control" value="{{ item['UserName'] }}" id="username" name="username">
</div>
</div>
<div class="form-group">
<label for="email" class="control-label col-md-2">邮箱</label>
<div class="col-md-6">
<!--渲染邮箱-->
<input type="text" class="form-control" value="{{ item['Email'] }}" id="email" name="email">
</div>
</div>
<div class="form-group">
<label for="image" class="control-label col-md-2">图片</label>
<div class="col-md-6">
<input type="file" id="image" name="image">
</div>
</div>

<div class="form-group">
<!--渲染内容-->
<label for="body" class="control-label col-md-2">内容</label>
<div class="col-md-6">
<textarea name="body" id="body" cols="30" rows="10" class="form-control">{{ item['Body'] }}</textarea>
</div>
</div>
<!--增加回复内容、发布时间和处理状态-->
<div class="form-group">
<label for="reply" class="control-label col-md-2">回复</label>
<div class="col-md-6">
<!--渲染回复-->
<textarea name="reply" id="reply" cols="30" rows="10" class="form-control">{{ item['Reply'] if item['Reply'] }}</textarea>
</div>
</div>
<div class="form-group">
<label for="releasetime" class="control-label col-md-2">发布时间</label>
<div class="col-md-6">
<input type="text" value="{{ item['ReleaseTime'] }}" name="releasetime" id="releasetime" class="form-control">
</div>
</div>
<div class="form-group">
<label for="state" class="control-label col-md-2">处理状态</label>
<div class="col-md-6">
<!--渲染处理状态-->
<input type="checkbox" name="state" id="state" {{ 'checked=checked' if item['State'] == 1 else ''}} >
</div>
</div>

<div class="col-md-offset-2">
<input type="submit" class="btn btn-primary" value="提交">
<input type="reset" class="btn btn-default" value="重置">
<!--如果不想修改了,点击回到列表页-->
<a href="{{ url_for('list') }} " class="btn btn-default">点击回到列表页</a>
</div>
</form>
</div>
<div class="panel-footer">

</div>
</div>
</div>

{% endblock %}

页面渲染

后台打印的数据

编辑页面

程序入口:main.py

# coding:utf-8
import sqlite3
from datetime import datetime
from flask import Flask, request, render_template, redirect, url_for, g

app = Flask(__name__)

DATABASE = r'.dbfeedbach.db'

'=======================封装sql助手函数============================='


def make_dicts(cursor, row):
""" 将游标获取的Tuple根据数据库列表转换为dict """
return dict((cursor.description[idx][0], value) for idx, value in enumerate(row))


def get_db():
""" 获取(简历数据库链接)
g: flask内置的变量:g = LocalProxy(partial(_lookup_app_object, "g"))
"""
db = getattr(g, '_database', None)
if not db:
db = g._database = sqlite3.connect(DATABASE)
db.row_factory = make_dicts
return db


def execute_sql(sql, params=()):
""" 执行sql语句不返回数据结果 insert、update、delete """
c = get_db().cursor()
c.execute(sql, params)
c.connection.commit()


def query_sql(sql, params=(), one=False):
""" 查询数据 one=False的时候返回多条"""
c = get_db().cursor()
result = c.execute(sql, params).fetchall()
c.close()
return (result[0] if result else None) if one else result


@app.teardown_appcontext # 在当前app上下文销毁时执行
def close_connection(exeption):
""" 关闭数据库 """
db = getattr(g, '_database', None)
if db is not None:
db.close()


'========================================================================'


@app.route("/")
def index():
return render_template('base.html')


# 模板继承
@app.route("/feedback/")
def feedback():
return render_template('post.html')


@app.route("/post_feedback/", methods=["POST"])
def post_feedback():
""" 提交视图 """
if request.method == 'POST': # 如果是post请求就获取表单值
subject = request.form.get('subject', None)
categoryid = request.form.get('category', 1)
username = request.form.get('username')
email = request.form.get('email')
body = request.form.get('body')
release_time = str(datetime.now())
state = 0
print(subject, categoryid, username, email, body, state, release_time)
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
# 防止sql注入,用?代替值
sql = "insert into feedback (Subjeck, CategoryID, UserName, Email, Body, State, ReleaseTime) values (?,?,?,?,?,?,?)"
c.execute(sql, (subject, categoryid, username, email, body, state, release_time))
conn.commit()
conn.close()
# 为防止因卡顿引起重复提交,提交过后跳转到填写页面
return redirect(url_for('feedback'))


@app.route("/list/")
def list():
""" 展示所有问题 """
sql = "select ROWID,* from feedback order by ROWID DESC"
feedbacks = query_sql(sql)
print(feedbacks)
return render_template('feedback-list.html', items=feedbacks)


@app.route('/del/<id>/')
def delete_feedback(id=0):
""" 删除问题 ,前端传id"""
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
sql = "delete from feedback where ROWID = ?"
c.execute(sql, (id,))
conn.commit()
conn.close()
return redirect(url_for('list'))


# 编辑功能
@app.route("/edit/<id>/")
def edit(id=None):
""" 根据前端传过来的id返回编辑的html """
# 获取绑定的下拉列表
sql = "select ROWID,CategoryName from category"
categories = query_sql(sql)
# 获取当前id的信息,并绑定至form表单,以备修改
sql = "select rowid,* from feedback where rowid = ?"
curren_feedback = query_sql(sql, (id,), True)
# return str(curren_feedback) # 查看查出来的数据顺序,方便html渲染排序
return render_template('edit.html', categories=categories, item=curren_feedback)


@app.route("/save_edit/", methods=['POST'])
def save_edit():
""" 保存编辑 """
if request.method == 'POST':
id = request.form.get('rowid', None)
reply = request.form.get('reply')
state = 1 if request.form.get('state', 0) == 'on' else 0
sql = 'update feedback set Reply=?, State=? where rowid=?'
conn = sqlite3.connect(DATABASE)
c = conn.cursor()
c.execute(sql, (reply, state, id))
conn.commit()
conn.close()
return redirect(url_for('list'))


if __name__ == '__main__':
app.run(
debug=True
)
原文地址:https://www.cnblogs.com/zhongyehai/p/11461595.html