python flask框架搭建以及大佬代码参考

1.基本搭建

python3.5 flask框架使用mysql配置(flask学习)
falsk使用MySQL的方法试了几个终于找到没问题的了,以下是代码:
#coding=utf-8

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
app=Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:root@127.0.0.1/blackwhite'
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = True
db = SQLAlchemy(app)

class User(db.Model):
    __tablename__='bw_user'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(55))
    password = db.Column(db.String(255))
    createTime = db.Column(db.Integer)
    email = db.Column(db.String(55))#, unique=True

    def __init__(self, username, password):
        self.username = username
        self.password = password

    #__repr__方法告诉Python如何打印class对象,方便我们调试使用。
    def __repr__(self):
        return '<User %r>' % self.username

    def checkUser(self):
        if User.query.filter_by(username=self.username,password=self.password).first()!=None:
            return True
        else:
            return False

    def save(self):
        db.session.add(self)
        db.session.commit()

引入model:

from models import User
视图方法:
@app.route('/login', methods=['GET', 'POST'])
def login():
    form=loginForm()
    if form.validate_on_submit():
        user=User(form.username.data,form.password.data)
        if user.checkUser():
            session['user']=form.username.data
            return redirect(url_for('index'))
        else:
            flash('用户名或者密码错误')
    return render_template('login.html',form=form,title='登录')
这就是个简单的是通过MySQL数据库实现用户登录的实例

2.代码参考文件 api_moniter.py

# -*- coding: utf-8 -*-

import gevent
from gevent import monkey; monkey.patch_all()
from gevent.pywsgi import WSGIServer
from gevent.pool import Pool
from flask import Flask, make_response, request, jsonify
import logging, logging.handlers, sys, json, platform, hashlib, requests, datetime, base64, hmac, time, os
from pymongo import MongoClient
from bson import ObjectId
from functools import wraps
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email.utils import formatdate
from email import encoders
import smtplib

if platform.system() != 'Windows':
    host_mongo, port_mongo = '192.168.1.25', 10000
    handler = logging.handlers.RotatingFileHandler("/home/guozx/log/api_moniter.log", maxBytes=1024 * 1024 * 5, backupCount=5)
else:
    host_mongo, port_mongo = '127.0.0.1', 27017
    handler = logging.StreamHandler()


app = Flask(__name__)
route = app.route
app.debug = False

logging.basicConfig(
    level=logging.DEBUG,
    format=u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s",
)
formatter = logging.Formatter(u"%(asctime)s - %(name)s - %(levelname)s - %(funcName)s - %(message)s")
handler.setFormatter(formatter)
mylog = logging.getLogger('x')
mylog.propagate = 0
mylog.addHandler(handler)

conn = MongoClient(host=host_mongo, port=port_mongo)
db = conn['api_moniter']

db_p = db['project']
db_p.create_index([('name', 1), ], unique=True)

db_pc = db['project_config']
db_pc.create_index([('project_id', 1), ], unique=True)

db_api = db['api']
db_api.create_index([('url', 1), ('param', 1)], unique=True)

db_arl = db['api_run_log']

def allow_jsonp(fun):   #装饰器允许跨域
    @wraps(fun)
    def wrapper_fun(*args, **kwargs):
        mylog.info(u"%s%s" % (request, request.values.items() if request.method != 'GET' else ''))
        res = fun(*args, **kwargs)
        res = "%s(%s)" % (request.values.get('callback'), res) if request.values.get('callback', None) else res
        rst = make_response(res)
        rst.headers['Access-Control-Allow-Origin'] = '*'
        rst.headers['Access-Control-Allow-Headers'] = "accept, origin, content-type, content-length"
        rst.headers['Access-Control-Allow-Methods'] = 'GET,PUT,POST,DELETE,OPTIONS'
        rst.headers['Content-Type'] = 'application/json; charset=UTF-8'
        return rst
    return wrapper_fun

def send_mail(server, consignee, title, text, files=[], timeout=180, subtype='plain'):
    msg = MIMEMultipart()
    msg['From'] = server['user']
    msg['Subject'] = title
    msg['To'] = ",".join(consignee)
    msg['Date'] = formatdate(localtime=True)
    msg.attach(MIMEText(text, _subtype=subtype, _charset='utf-8'))

    coding = 'utf-8' if "Linux" == platform.system() else 'gbk'
    for file in files:
        part = MIMEBase('application', 'octet-stream')
        part.set_payload(open(file,'rb').read())
        encoders.encode_base64(part)
        part.add_header('Content-Disposition', 'attachment',filename="=?utf-8?b?%s?="%base64.b64encode(os.path.basename(file).decode(coding).encode('utf-8')))
        msg.attach(part)

    smtp = smtplib.SMTP(server['host'],server['port'] if server.get('port') else 0, timeout=timeout)
    smtp.login(server['user'], server['passwd'])
    smtp.sendmail(server['user'],consignee, msg.as_string())
    smtp.close()

send_mail({'host': "smtp.163.com", 'user': "url_giiso@163.com", 'passwd': "giiso123", "port":465}, ['guozx@giiso.com'], "API监控异常报告", 'text', subtype='html')
xxx
def emaillog(text, consignee=["76040100@qq.com"]):
    try:
        text = u"""
        <textarea style="font-size: 12px !important; resize: both;  100%; height:100%;" spellcheck="false" rows="{0}">
        {1}
        </textarea>
        """.format(text.count('
') + 5, text)
        send_mail({'host': "smtp.qq.com", 'user': "guozx@giiso.com", 'passwd': "RiUt4Hed6MNegkj3"}, consignee, "API监控异常报告", text, subtype='html')
    except Exception as e:
        mylog.exception(e)

def timed_task_thread(interval=600):
    while 1:
        try:
            data = list(db_api.find())
        except Exception as e:
            mylog.exception(e)
            emaillog(u"get mongodb data error: " % e)
        else:
            idDict = {}
            for d in data:
                id = str(d['_id'])
                x = {'name': d['name'], 'url': d['url'], 'param': d['param'], 'id': id}
                timed_task = d.get('timed_task')
                if timed_task:
                    if timed_task.get('auto_run', 1):
                        idDict[id] = x
                else:
                    idDict[id] = x
            mylog.info("start run task len: %s" % len(idDict))
            errorDict = {}
            for id, v in idDict.items():
                rData, consignee = _api_run(id, run_mode=1)
                consignee = ','.join(sorted(consignee.split(','))) if consignee else consignee
                if rData['code'] != 0:
                    rData.update(v)
                    if consignee in errorDict.keys():
                        errorDict[consignee]['error'].append(rData)
                        errorDict[consignee]['name'].append(v['name'])
                    else:
                        errorDict[consignee] = {'error': [rData], 'name': [v['name']]}
            for k, v in errorDict.iteritems():
                if k:
                    emaillog(json.dumps(v, indent=4, ensure_ascii=False, sort_keys=True), k.split(','))
            mylog.info(u"end run task ids: %s" % json.dumps(dict([(k, v['name']) for k, v in errorDict.iteritems()]), ensure_ascii=False))
        time.sleep(interval)
gevent.spawn(timed_task_thread)


def _data_page(tb, page, limit, find, sort):
    rDict = {'code': 1}
    try:
        findDict = _mongo_find(find)
        sort = _mongo_sort(sort)
        sum = tb.find(findDict).count()
        q = tb.find(findDict)
        if limit:
            q = q.skip((page - 1) * limit).limit(limit)
        if sort:
            q = q.sort(sort)
        rDict['data'] = _id_to_str(q)
        rDict['page'] = page
        rDict['limit'] = limit
        rDict['total'] = sum
        rDict['code'] = 0
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = '%s' % e
    return rDict
def _id_to_str(d):
    def _x(x):
        x['_id'] = str(x['_id'])
        return x
    return map(_x, d)
def _mongo_find(find):
    findDict = {}
    if find:
        findDict = json.loads(find)
    return findDict
def _mongo_sort(sort):
    sl = []
    if sort:
        for s in sort.split(","):
            x = s.split(':')
            sl.append((x[0], int(x[1]) if len(x) == 2 and x[1] == '-1' else 1))
    return sl

def api_run_log_add(logDict):
    # run_mode  运行模式 1系统调用  2接口调用
    logDict2 = {'api_id': None, 'api_name': None, 'run_mode': 1, 'isread': 0, 'code': 1, 'request_time': None,
               'data': None, 'error': None, 'status_code': 0, 'create_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}
    logDict2.update(logDict)
    db_arl.insert(logDict2)

def _api_run(id, run_mode=2):
    rDict = {'code': 1}
    consignee = None
    try:
        api = db_api.find_one(ObjectId(id))
        pmc = db_pc.find_one({'project_id': api['project_id']}) if api else None
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    else:
        logDict = {}    #记录log
        if api:
            logDict['api_id'] = id
            logDict['api_name'] = api['name']
            logDict['run_mode'] = run_mode
            if pmc:
                secretId = pmc['secret_id']
                secretKey = pmc['secret_key']
                consignee = pmc['consignee']
                fDate = (datetime.datetime.now() - datetime.timedelta(hours=8)).strftime('%a, %d %b %Y %H:%M:%S GMT')

                secretData = "date: " + fDate
                try:
                    sign = base64.b64encode(hmac.new(secretKey.encode('utf-8'), secretData, hashlib.sha1).digest())
                    authorization = "hmac username="" + secretId + "", algorithm="hmac-sha1", headers="date", signature="" + sign + """;
                    headers = {
                        'contentType': 'UTF-8',
                        'Accept-Charset': 'UTF-8',
                        'date': fDate,
                        'Authorization': authorization,
                    }
                    req = requests.request(api['method'], api['url'], timeout=100, params=api['param'].encode('utf-8'), headers=headers)
                    data = req.content.strip()
                    if data.startswith("{") and data.endswith("}"):
                        data = json.loads(data)
                except Exception as e:
                    mylog.exception(e)
                    rDict['error'] = u'%s' % e
                else:
                    if req.status_code == 200:
                        rDict['code'] = 0
                    else:
                        rDict['code'] = 1
                        rDict['status_code'] = req.status_code
                    rDict['data'] = data
                    rDict['request_time'] = req.elapsed.microseconds / 1000000.0
            else:
                rDict['error'] = 'not secret'
        else:
            rDict['error'] = 'not id'
    logDict.update(rDict)
    gevent.spawn(api_run_log_add, logDict)  # 开启协程记录调用记录
    return rDict, consignee


@route('/')
@allow_jsonp
def index():
    return '''hello word'''

@route('/project/add', methods=['POST', 'GET'])
@allow_jsonp
def project_add():
    rDict = {'code': 1}
    d = {
        'parent_id': request.values.get('parent_id'),
        'name': request.values['name'],
        'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'description': request.values.get('description'),
    }
    try:
        d['_id'] = str(db_p.insert(d))
        rDict['code'] = 0
        rDict['data'] = d
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/project/update', methods=['POST', 'GET'])
@allow_jsonp
def project_update():
    rDict = {'code': 1}
    id = request.values['id']
    d = {
        'parent_id': request.values.get('parent_id'),
        'name': request.values.get('name'),
        'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'description': request.values.get('description'),
    }
    [d.pop(k) for k in d.values() if k == None]
    try:
        rd = db_p.update({'_id': ObjectId(id)}, {"$set": d})
        if rd['n'] > 0:
            d['_id'] = id
            rDict['code'] = 0
            rDict['data'] = d
        else:
            rDict['error'] = u'id不存在'
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/project/delete', methods=['POST', 'GET'])
@allow_jsonp
def project_delete():
    rDict = {'code': 1}
    id = request.values['id']
    try:
        if db_p.find({'parent_id': id}).count() + db_pc.find({'project_id': id}).count() + db_api.find({'project_id': id}).count() > 0:
            rDict['error'] = u'关联表未删除'
        else:
            rd = db_p.remove({'_id': ObjectId(id)})
            if rd['n'] > 0:
                rDict['code'] = 0
            else:
                rDict['error'] = u'id不存在'
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/project/find', methods=['POST', 'GET'])
@allow_jsonp
def project_find():
    rDict = {'code': 0}
    isconfig = request.values.get('isconfig')
    isapi = request.values.get('isapi')
    find = request.values.get('find')
    parentDict = {}
    pmDict = {}
    pmList = []
    try:
        findDict = _mongo_find(find) if find else {}
        for d in db_p.find(findDict):
            id = str(d['_id'])
            if not d.get('parent_id'):
                parentDict[id] = d
            else:
                pmDict[id] = d
        if parentDict:
            ids = parentDict.keys() + pmDict.keys()
            if isconfig:
                for d in db_pc.find({'project_id': {'$in': ids}}):
                    pm = parentDict.get(d['project_id'])
                    pm = pm if pm else pmDict[d['project_id']]
                    pm['config'] = d
            if isapi:
                for d in db_api.find({'project_id': {'$in': ids}}):
                    pm = parentDict.get(d['project_id'])
                    pm = pm if pm else pmDict[d['project_id']]
                    apis = pm.get('apis')
                    if apis:
                        apis.append(d)
                    else:
                        pm['apis'] = [d]
            for k, v in pmDict.items():
                p = parentDict.get(v['parent_id'])
                if p:
                    items = p.get('items')
                    if items:
                        items.append(v)
                    else:
                        p['items'] = [v]
                    pmDict.pop(k)
                else:
                    pmList.append(v)
            pmList += parentDict.itervalues()
        rDict['code'] = 0
        rDict['data'] = pmList
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return json.dumps(rDict, default=lambda x: str(x) if type(x) == ObjectId else x)

@route('/project_config/add', methods=['POST', 'GET'])
@allow_jsonp
def project_config_add():
    rDict = {'code': 1}
    d = {
        'project_id': request.values['project_id'],
        'secret_id': request.values.get('secret_id'),
        'secret_key': request.values.get('secret_key'),
        'consignee': request.values.get('consignee'),
        'update_time':  datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'description': request.values.get('description'),
    }
    try:
        d['_id'] = str(db_pc.insert(d))
        rDict['code'] = 0
        rDict['data'] = d
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/project_config/update', methods=['POST', 'GET'])
@allow_jsonp
def project_config_update():
    rDict = {'code': 1}
    id = request.values['id']
    d = {
        'project_id': request.values.get('project_id'),
        'secret_id': request.values.get('secret_id'),
        'secret_key': request.values.get('secret_key'),
        'consignee': request.values.get('consignee'),
        'update_time': datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        'description': request.values.get('description'),
    }
    [d.pop(k) for k in d.values() if k == None]
    try:
        rd = db_pc.update({'_id': ObjectId(id)}, {"$set": d})
        if rd['n'] > 0:
            d['_id'] = id
            rDict['code'] = 0
            rDict['data'] = d
        else:
            rDict['error'] = u'id不存在'
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/project_config/delete', methods=['POST', 'GET'])
@allow_jsonp
def project_config_delete():
    rDict = {'code': 1}
    id = request.values['id']
    rd = db_pc.remove({'_id': ObjectId(id)})
    if rd['n'] > 0:
        rDict['code'] = 0
    else:
        rDict['error'] = u'id不存在'
    return jsonify(rDict)

@route('/project_config/find', methods=['POST', 'GET'])
@allow_jsonp
def get_project_menu_config():
    page = int(request.values.get('page', 1))
    limit = int(request.values.get('limit', 0))
    find = request.values.get('find')
    sort = request.values.get('sort')
    rDict = _data_page(db_pc, page, limit, find, sort)
    return jsonify(rDict)


@route('/api/add', methods=['POST', 'GET'])
@allow_jsonp
def api_add():
    rDict = {'code': 1}
    timed_task = request.values.get('timed_task')
    error_check = request.values.get('error_check')
    other = request.values.get('other')
    d = {
        'project_id': request.values['project_id'],
        'name': request.values['name'],
        'url': request.values['url'],
        'param': request.values.get('param'),
        'method': request.values.get('method', 'get').lower(),
        'timed_task': timed_task,
        'other': other,
    }
    try:
        if timed_task:
            d['timed_task'] = json.loads(timed_task)
        if error_check:
            d['error_check'] = json.loads(error_check)
        if other:
            d['other'] = json.loads(other)
        d['_id'] = str(db_api.insert(d))
        rDict['code'] = 0
        rDict['data'] = d
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/api/update', methods=['POST', 'GET'])
@allow_jsonp
def api_update():
    rDict = {'code': 1}
    id = request.values['id']
    timed_task = request.values.get('timed_task')
    error_check = request.values.get('error_check')
    other = request.values.get('other')
    d = {
        'project_id': request.values.get('project_id'),
        'name': request.values.get('name'),
        'url': request.values.get('url'),
        'param': request.values.get('param'),
        'method': request.values.get('method'),
        'timed_task': timed_task,
        'other': other,
    }
    [d.pop(k) for k in d.values() if k == None]
    try:
        if timed_task:
            d['timed_task'] = json.loads(timed_task)
        if error_check:
            d['error_check'] = json.loads(error_check)
        if other:
            d['other'] = json.loads(other)
        rd = db_api.update({'_id': ObjectId(id)}, {"$set": d})
        if rd['n'] > 0:
            d['_id'] = id
            rDict['code'] = 0
            rDict['data'] = d
        else:
            rDict['error'] = u'id不存在'
    except Exception as e:
        mylog.exception(e)
        rDict['error'] = u'%s' % e
    return jsonify(rDict)

@route('/api/delete', methods=['POST', 'GET'])
@allow_jsonp
def api_delete():
    rDict = {'code': 1}
    id = request.values['id']
    rd = db_api.remove({'_id': ObjectId(id)})
    if rd['n'] > 0:
        rDict['code'] = 0
    else:
        rDict['error'] = u'id不存在'
    return jsonify(rDict)

@route('/api/find', methods=['POST', 'GET'])
@allow_jsonp
def get_api():
    page = int(request.values.get('page', 1))
    limit = int(request.values.get('limit', 20))
    find = request.values.get('find')
    sort = request.values.get('sort')
    rDict = _data_page(db_api, page, limit, find, sort)
    return jsonify(rDict)

@route('/api/run', methods=['POST', 'GET'])
@allow_jsonp
def api_run():
    id = request.values['id']
    rDict, _ = _api_run(id, 2)
    return jsonify(rDict)

@route('/api/run_log_find', methods=['POST', 'GET'])
@allow_jsonp
def api_run_log_find():
    page = int(request.values.get('page', 1))
    limit = int(request.values.get('limit', 20))
    find = request.values.get('find')
    sort = request.values.get('sort')
    rDict = _data_page(db['run_api_log'], page, limit, find, sort)
    return jsonify(rDict)

@route('/api/run_log_update', methods=['POST', 'GET'])
@allow_jsonp
def api_run_log_update():
    rDict = {'code': 1}
    ids = request.values['ids']
    try:
        rd = db_arl.update({'_id': {'$in': [ObjectId(id) for id in ids.split(',')]}}, {"$set": {'isread': 1}})
    except Exception as e:
        mylog.exception(e)
    else:
        if rd['n'] > 0:
            rDict['code'] = 0
            rDict['data'] = rd['n']
        else:
            rDict['error'] = u'id不存在'
    return jsonify(rDict)


if __name__ == "__main__":
    host, port = '0.0.0.0', int(sys.argv[1]) if len(sys.argv) == 2 else 8000
    mylog.info("%s:%s service start..." % (host, port))
    WSGIServer((host, port), app, spawn=Pool(1000)).serve_forever()
原文地址:https://www.cnblogs.com/xmyfsj/p/15231956.html