flask factory

https://gist.github.com/zaccrites/c5bcf96ed90907d92042
import jinja2
from flask import Flask, render_template, request, redirect, url_for
from flask.ext.sqlalchemy import SQLAlchemy

from . import formatting
from .config import get_config


db = SQLAlchemy()


def create_app(config_name):
    app = Flask(__name__)
    config = get_config(config_name)
    app.config.from_object(config)

    config.init_app(app)
    db.init_app(app)

    register_pre_request_handlers(app)
    register_post_request_handlers(app)

    set_error_handlers(app)
    setup_logging(app)
    register_blueprints(app)

    app.jinja_env.undefined = jinja2.StrictUndefined
    setup_jinja_filters(app)

    return app


def register_blueprints(app):
    from . import general
    from . import auth

    app.register_blueprint(general.mod)
    app.register_blueprint(auth.mod)


def set_error_handlers(app):
    @app.errorhandler(404)
    def not_found(error):
        return render_template('errors/not_found.html'), 404

    @app.errorhandler(403)
    def forbidden(error):
        return render_template('errors/forbidden.html'), 403


def register_pre_request_handlers(app):
    pass


def register_post_request_handlers(app):
    pass


def setup_jinja_filters(app):
    app.jinja_env.filters['format_datetime'] = formatting.format_datetime


def setup_logging(app):
    if not app.debug:
        import logging
        from logging.handlers import RotatingFileHandler
        handler = RotatingFileHandler('jmickey-blog.log', maxBytes=10000, backupCount=1)
        handler.setLevel(logging.INFO)
        app.logger.addHandler(handler)
import pytest
from jmickey_blog.app import create_app, db

# import relevant models here

@pytest.fixture
def test_app():
    app = create_app('test')
    with app.app_context():
        db.drop_all()
        db.create_all()
    return app
    

def test_some_thing():
    pass


def test_some_other_thing():
    pass
#!/usr/bin/env python3

import sys
from jmickey_blog.app import create_app

if len(sys.argv) > 1:
    config_name = sys.argv[1]
else:
    config_name = 'development'

app = create_app(config_name)


def main():
    app.run()

if __name__ == '__main__':
   main()
https://gist.github.com/Leo-G/99dd3e1147498daade61
#!/usr/bin/env python
from flask import Flask


#http://flask.pocoo.org/docs/0.10/patterns/appfactories/
def create_app(config_filename):
    app = Flask(__name__)
    app.config.from_pyfile(config_filename)

    from app.users.models import db
    db.init_app(app)

    #Blueprints
    from app.users.views import users
    app.register_blueprint(users)

    return app


app = create_app('config.py')
app.secret_key = 'some_secret'
app.debug = True
app.run
https://gist.github.com/alexanderjulo/913beb668a4f25471f2e
from flask import Flask
from celery import Celery


def create_app():
    app = Flask(__name__)

    TaskBase = celery.Task

    class Task(TaskBase):
        """
            The usual celery base `Task` does not offer any integration into
            flask, which is why we need this modified class. As the modified
            class is dependent on the app, it can not just be subclassed,
            but has to be dynamically created which is being taken care of
            here.
        """
        abstract = True

        def __call__(self, *args, **kwargs):
            # FIXME: actually we only need app context, but flask-babel
            # is broken and needs a request context
            with app.test_request_context("/"):
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = Task
    celery.conf.update(app.config)
    if app.config.get('CELERY_BROKER_URL'):
        celery.conf['BROKER_URL'] = app.config['CELERY_BROKER_URL']
https://gist.github.com/nejdetckenobi/6c6446b7337da9d71f4ea0580f1343c3
#!/usr/bin/env python

from __future__ import print_function

import os
import tempfile
import time

import flask
import psutil
import werkzeug

from saveserver import current_milli_time, intWithCommas, measure_spent_time

app = flask.Flask(__name__)

@app.route('/', defaults={'path': ''}, methods = ['POST'])
@app.route('/<path:path>', methods = ['POST'])
def hello(path):
    app.logger.info('new request')
    def custom_stream_factory(total_content_length, filename, content_type, content_length=None):
        tmpfile = tempfile.NamedTemporaryFile('wb+', prefix='flaskapp')
        app.logger.info("start receiving file ... filename => " + str(tmpfile.name))
        return tmpfile
    ms = measure_spent_time()
    
    stream,form,files = werkzeug.formparser.parse_form_data(flask.request.environ, stream_factory=custom_stream_factory)
    total_size = 0
    
    for fil in files.values():
        app.logger.info(" ".join(["saved form name", fil.name, "submitted as", fil.filename, "to temporary file", fil.stream.name]))
        total_size += os.path.getsize(fil.stream.name)
    mb_per_s = "%.1f" % ((total_size / (1024.0*1024.0)) / ((1.0+ms(raw=True))/1000.0))
    app.logger.info(" ".join([str(x) for x in ["handling POST request, spent", ms(), "ms.", mb_per_s, "MB/s.", "Number of files:", len(files.values())]]))
    process = psutil.Process(os.getpid())
    app.logger.info("memory usage: %.1f MiB" % (process.memory_info().rss / (1024.0*1024.0)))

    return "Hello World!"

if __name__ == "__main__":
    app.run(port=8090)
https://gist.github.com/Kevinstronger/9088662
from flask import Flask, jsonify, g, request
from sqlite3 import dbapi2 as sqlite3
DATABASE = './db/test.db'
app = Flask(__name__)
 
def get_db():
	db = getattr(g, '_database', None)
	if db is None:
		db = g._database = sqlite3.connect(DATABASE)
		db.row_factory = sqlite3.Row
	return db
 
@app.teardown_appcontext
def close_connection(exception):
	db = getattr(g, '_database', None)
	if db is not None: db.close()
 
def query_db(query, args=(), one=False):
	cur = get_db().execute(query, args)
	rv = cur.fetchall()
	cur.close()
	return (rv[0] if rv else None) if one else rv
 
def init_db():
	with app.app_context():
		db = get_db()
		with app.open_resource('schema.sql', mode='r') as f:
			db.cursor().executescript(f.read())
		db.commit()
 
def add_student(name='test', age=10, sex='male'):
	sql = "INSERT INTO students (name, sex, age) VALUES('%s', '%s', %d)" %(name, sex, int(age))
	print sql
	db = get_db()
	db.execute(sql)
	res = db.commit()
	return res
 
def find_student(name=''):
	sql = "select * from students where name = '%s' limit 1" %(name)
	print sql
	db = get_db()
	rv = db.execute(sql)
	res = rv.fetchall()
	rv.close()
	return res[0]
 
 
@app.route('/')
def users():
	return jsonify(hello='world')
 
@app.route('/add',methods=['POST'])
def add_user():
	print add_student(name=request.form['name'], age=request.form['age'], sex=request.form['sex'])
	return ''
 
@app.route('/find_user')
def find_user_by_name():
	name = request.args.get('name', '')
	student = find_student(name)
	return jsonify(name=student['name'], age=student['age'], sex=student['sex'])
 
if __name__ == '__main__' : app.run(debug=True)
原文地址:https://www.cnblogs.com/liujitao79/p/8622251.html