python连接mysql

读取配置文件连接mysql并查询

示例代码:

SqlMonitoring.py

import json
import requests
import mysql.connector
import datetime
from configparser import ConfigParser
import traceback

def sendmessage(url, msg, at_list=[]):
    """ 给钉钉发消息 """
    HEADERS = {
        "Content-Type": "application/json ;charset=utf-8 "
    }
    # String_textMsg = {
    #     "msgtype": "markdown",
    #     "markdown": mdmsg,
    #     "at": {
    #         "atMobiles": at_list,
    #         "isAtAll": 0  # 如果需要@所有人,这些写1
    #     }
    # }
    String_textMsg = {
        "msgtype": "text",
        "text": {"content": msg},
        "at": {
            "atMobiles": at_list,  # ["10086"]
            "isAtAll": 0  # 如果需要@所有人,这些写1
        }
    }
    String_textMsg = json.dumps(String_textMsg)
    res = requests.post(url, data=String_textMsg, headers=HEADERS)
    print(str(datetime.datetime.now()) + " 发送钉钉消息:" + str(res.text))

def query_sql(dingding_url, mysql_conn, table,  duration_threshold=70000,  at_list=[]):
    sqlPattern = r"select duration,query_sql,type,timestamp from {} where timestamp > '{}' order by timestamp"

    now_time = datetime.datetime.now()
    pre_time = now_time - datetime.timedelta(hours=1)
    pre_time_str = pre_time.strftime("%Y-%m-%d %H:%M:%S")
    alarm_list = []

    # 获取上次查询时间
    with open("/tmp/pandora-sql-monitoring.time", "r") as file:
        pre_time_local = file.readline()
        if pre_time_local:
            pre_time_str = pre_time_local
    # 构建sql
    sql = sqlPattern.format(table, pre_time_str)
    print(sql)

    last_query_time = None
    # 查询sql
    try:
        cursor = mysql_conn.cursor(dictionary=True)
        cursor.execute(sql)
        result = cursor.fetchall()
        mysql_conn.commit()
        cursor.close()
        for row in result:
            last_query_time = str(row["timestamp"])
            if int(row["duration"]) >= int(duration_threshold):
                alarm_list.append(row)
    except Exception as e:
        traceback.print_exc()

    # 更新最近一次查询的时间
    if last_query_time:
        with open("/tmp/pandora-sql-monitoring.time", "w+") as file:
            file.write(last_query_time)

    # 发送钉钉消息
    index = 0
    for item in alarm_list:
        msg = ""
        msg += "耗时:"+str(item["duration"]/1000)+""
        msg += "时间:"+str(item["timestamp"])+"
"
        msg += "sql:"+str(item["query_sql"])+"
"
        sendmessage(dingding_url[index % len(dingding_url)], msg, at_list)
        index = index+1

if __name__ == '__main__':
    # 读取配置文件获取druid的请求url
    cp = ConfigParser()
    cp.read("properties.cfg")

    # 读取mysql配置
    mysql_host = cp.get("mysql", "host")
    mysql_port = cp.get("mysql", "port")
    mysql_user = cp.get("mysql", "user")
    mysql_password = cp.get("mysql", "password")
    mysql_db = cp.get("mysql", "database")
    mysql_table = cp.get("mysql", "table")
    mysql_conn = mysql.connector.connect(host=mysql_host, port=int(mysql_port), user=mysql_user, passwd=mysql_password, database=mysql_db)

    # 多个机器人 为了解决钉钉的消息发送数量限制,轮流发
    dingding_url = []
    dingding_url.append(cp.get("dingding", "url1"))
    at_list = ["10086"]

    # 读取alarm相关信息
    duration_threshold = cp.get("alarm", "duration_threshold")

    query_sql(dingding_url, mysql_conn, mysql_table, duration_threshold, at_list)

同目录下的 properties.cfg

[dingding]
url1 = https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxxxxxxxxxxxxxx

[mysql]
host = 127.0.0.1
port = 3306
user = root
password = 123456
database = test
table = test_table

[alarm]
duration_threshold = 3000
人生如修仙,岂是一日间。何时登临顶,上善若水前。
原文地址:https://www.cnblogs.com/f-society/p/12769288.html