redis慢日志结构化

把aws云redis 5.x(不支持慢日志输出到cloudatch)慢日志采集到mysql库,方便查询:

table ddl:

create table redis_log
(
    session_id  int           null,
    host_ip     varchar(100)  null,
    start_time  datetime      null,
    duration    decimal       null,
    cmd         varchar(3000) null,
    insert_time timestamp(3)  null,
    remark      varchar(100)  null,
    constraint redis_slog_pk
        unique (session_id)
);

lambda_function.py:  

## ==================================================================================
##    让读书成为一种生活方式。就像吃喝拉撒每天必须要干的事,
## 终有一天你的举止、言谈、气质会不一样。
##                                        —- async
##
## Created Date: Sunday, 2021-05-23, 9:48:04 am
## copyright (c):    WHHL Tech. LTD.
## Engineer:   async
## Module Name:
## Revision:   v0.01
## Description:
##
## Revision History :
## Revision     editor         date         Description
## v0.01        async          2021-05-23   File Created
## ==================================================================================
import redis
import datetime,time
import mysql.connector
import pytz

pystime = time.time()
tz = pytz.timezone('Asia/Shanghai')

redis_ips=['redis1','redis2','redisn'
           ]
def slow_redis():
    for ip in redis_ips:
            try:
                Redis = redis.StrictRedis(host=ip,port=6379,db=0,socket_timeout=1)
                results = Redis.slowlog_get(200) #每次获取200条
                session_id=results[0]['id']
                i_start_time = datetime.datetime.fromtimestamp(int(results[0]['start_time'])).strftime('%Y-%m-%d %H:%M:%S') #时间格式转换
                duration = round(int(results[0]['duration'])/1000,2) #毫秒
                command = results[0]['command']
                if duration>=30: # 开发定义超过10ms的慢日志会留意
                    i_host=ip.split('.',1)[0]
                    s_results={ "session_id":session_id,"host_ip":i_host,"start_time":i_start_time,"duration":duration,"cmd":command,'insert_time':datetime.datetime.now(tz) }
                    # print(s_results)

                    conn = mysql.connector.connect(host='10.10.x.x', port=3306, user='xxx', passwd='xxxx',db='xxxx', charset='utf8mb4')
                    cur = conn.cursor(buffered=True)
                    try:
                        sql_d = "delete from devops.redis_log where insert_time <= DATE_SUB(CURDATE(), INTERVAL 7 DAY) " # 删除7天以前的数据
                        cur.execute(sql_d)
                        conn.commit()
                    except:
                        conn.rollback()
                        print('adevops.redis_log table delete failed. Transaction rolled back')
                        raise
                    try:
                        sql_i = (
                            f"insert ignore into devops.redis_log ({', '.join(s_results.keys())}) "
                            f"values (%({')s, %('.join(s_results.keys())})s)")
                        cur.execute(sql_i,s_results)
                        conn.commit()
                        # inserted = cur.rowcount
                    except:
                        conn.rollback()
                        print('adevops.redis_log table ops failed. Transaction rolled back')
                        raise
                    cur.close()
                    conn.close()

                    # print('Inserted', inserted, 'row(s) into devops.redis_log  table')
            except:
                logging.error('%s Timeout reading from socket!' %ip)
                raise

# if  __name__ == '__main__':
def lambda_handler(event, context):
    slow_redis()
pyetime = time.time()
print ('当前脚本运行耗时为:',round(float(pyetime - pystime),3),'s')

 然后添加到crontab,5min执行一次。

业余经济爱好者
原文地址:https://www.cnblogs.com/5sdba-notes/p/14799597.html