python之便携式mysql类和tornado mysql长链接

mymysql.py

class MyMysql2(object):
    def __init__(self,
                    host   = '',
                    user   = '',
                    passwd = '',
                    db     = '',
                    port   = 3306,
                    charset= 'utf8'):
        self.host   = host
        self.user   = user
        self.passwd = passwd
        self.db     = db
        self.port   = port
        self.charset= charset
        self.conn   = None
        self.connet()


    def connet(self):
        try:
            self.conn = pymysql.connect(host=self.host,
                                        user=self.user,
                                        passwd=self.passwd,
                                        port=int(self.port) ,
                                        database=self.db,
                                        charset=self.charset,
                                        cursorclass = pymysql.cursors.DictCursor)
            return True
        except Exception as e:
            print(e)
            return False


    def _reConn (self,num = 2,stime = 1):
        _number = 0
        _status = True
        while _status and _number <= num:
            try:
                self.conn.ping()       #cping 校验连接是否异常
                _status = False
            except:
                if self.connet()==True: #重新连接,成功退出
                    _status = False
                    break
                _number +=1
                time.sleep(stime)      #连接不成功,休眠3秒钟,继续循环,知道成功或重试次数结束
        if _status == True:
            return (False,'数据库连接失败')
        else:
            return (True,'数据库连接成功')
    def query (self, sql_list):
        try:
            ret=self._reConn()
            if ret[0] == False:
                return ret
            self.cur = self.conn.cursor()
            for sql_str in sql_list:
                self.count_nb=self.cur.execute(sql_str)
                self.result = self.cur.fetchall()
            self.conn.commit()
            self.cur.close ()
            self.conn.close()
            return (True,self.result,self.count_nb)
        except Exception as e:
            return (False,[e.args[1]])

    def close (self):

        self.conn.close()

调用:

myconn = MyMysql2('1.1.1.1', 'user', 'password', 'database', 3308)
ret=myconn.query(['select * from user'])

  

tornado之mysql长链接

import tornado.ioloop
import tornado.web
import requests
import json
import os
import time
from tornado import httpserver
import tornado.options
from tornado.options import options , define
from datetime import datetime
import pymysql
from pymysql.cursors import DictCursor as DicCur


def myget(url):
    ret = requests.get(url=url)
    return ret.json()

def mypost(url,data):
    data=json.dumps(data)
    ret = requests.post(url, data=data)
    return ret.json()


#基本handler
class BaseHandler(tornado.web.RequestHandler):
    def get_user_ip(self):
        if 'X-Real-Ip' in dict(self.request.headers):
            user_ip=dict(self.request.headers)['X-Real-Ip']
        elif 'X-Forwarded-For' in dict(self.request.headers):
            user_ip = dict(self.request.headers)['X-Forwarded-For']
        else:
            user_ip=self.request.remote_ip
        return user_ip
    # 记录日志:
    def on_finish(self):
        method = self.request.method
        host = self.request.host
        remote_ip = self.get_user_ip()
        uri = self.request.uri
        version = self.request.version
        time_ = datetime.now().strftime("%Y-%m-%d_%H:%M:%S")
        data_dic = self.request.arguments
        msg = {}
        for k, v in data_dic.items():
            msg[k] = v[0].decode().strip()
        if 'password' in msg:
            msg['password'] = "***"
        if method == "GET":
            ret_msg = {
                'date_time': time_,
                'request_url': str(uri).split('?')[0],
                'method': method,
                'host': host,
                'remote_ip': remote_ip,
                'version': version,
                'data_msg': msg,
            }
        else:
            ret_msg = {
                'date_time': time_,
                'request_url': uri,
                'method': method,
                'host': host,
                'remote_ip': remote_ip,
                'version': version,
                'data_msg': msg,
            }
        log_str='%s  %s  %s  %s  %s'%(ret_msg['date_time'],ret_msg['method'],ret_msg['version'],ret_msg['host'],ret_msg['request_url'])
        applog_file = os.path.join(os.path.dirname(__file__), 'access.log')
        with open(applog_file, 'a+') as f:
            f.write("%s
" % log_str)

    def get_mysql_conn(self):
         conn_count=1
         for i in range(5):
             try:
                 if not self.application.mysql_conn:
                     pymysql_config = self.application.pymysql_config
                     #print(pymysql_config)
                     self.application.mysql_conn=pymysql.connect(**pymysql_config)
                 else:
                     self.application.mysql_conn.ping()
                 return (True,self.application.mysql_conn)
             except Exception as e:
                 #print(e)
                 time.sleep(2)
                 conn_count+=1
                 print("mysql conn retyr: %s" % conn_count)
                 if conn_count == 6:
                     return (False,str(e))

    def do_mysql_query(self,query_list):
        '''
            注意:
               query_list可以传入多个sql一起执行
               query_list期望格式: [[sql1,args_list1],[sql2,args_list2]]
            本方法的return有两种情况:
               (1) (True,result_last,rows_last)
                   result_last和rows_last,只能反映最后一个语句的返回情况(如果所有语句都没出错的话)
               (2) (False,str(e),0)
                   如果有任意一条语句出错,返回的就是第一条出错的语句的执行结果;
                   如果有任意一条语句出错,所有语句的执行都不会成功;
        '''
        cur_conn=self.get_mysql_conn()
        if not cur_conn[0]:
            return (False,cur_conn[1],0)
        try:
            cur_conn=cur_conn[1]
            with cur_conn.cursor() as cur:
                cur.execute("SET NAMES utf8mb4")
                cur.execute("SET AUTOCOMMIT = 0")
                for sql_argslist in query_list:
                    cur.execute(sql_argslist[0],sql_argslist[1])
                cur_conn.commit()
                rows_last = cur.rowcount
                result_last = cur.fetchall()
                if not result_last:
                    result_last=[]
                return (True,result_last,rows_last)
                ########################################
        except Exception as e:
            cur_conn.rollback()
            if cur: cur.close()
            return (False,str(e),0)

#定义一个类继承application
class MyApplication(tornado.web.Application):
    def __init__(self):
        pymysql_config = {}
        pymysql_config['port'] = 3306
        pymysql_config['host'] = '1.1.1.1'
        pymysql_config['user'] = 'user'
        pymysql_config['password'] = 'pass'
        pymysql_config['db'] = 'user'
        pymysql_config['charset'] = 'utf8mb4'
        pymysql_config['cursorclass'] = DicCur

        self.pymysql_config=pymysql_config
        self.mysql_conn = False
        tornado.web.Application.__init__(self, handlers=handlerSettings)
        #可以添加debug=debug,xsrf_cookies=xsrf_cookies,**settings


class avatarUpdataHandler(BaseHandler):
    def get(self, *args, **kwargs):
        ret=self.do_mysql_query([['select * from mysql;',()]])
        print(ret)
        self.write('hellow word')



if __name__ == "__main__":

    define("port", default=8899,help="port 8899", type=int)

    handlerSettings=[
            (r"/avatar", avatarUpdataHandler),
        ]
    app = MyApplication()

    http_server = tornado.httpserver.HTTPServer(app)
    http_server.listen(options.port)
    # http_server.start(5)
    http_server.start(1)
    tornado.ioloop.IOLoop.instance().start()

  

原文地址:https://www.cnblogs.com/zhangkui/p/11102741.html