python 跨模块实现按照文件大小,日期实现日志分割,反转

    笔者的一个自动化测试平台项目,采用了python作为后端服务器语言。项目基于快速成型目的,写了一个极其简陋的日志记录功能,支持日志记录到文件和支持根据日志级别在终端打印不同颜色的log。但随着测试平台上线运行,发现日志文件大小急剧膨胀,运行一段时间,往往一个log能有几个G大小,而且也不能根据日期查看日志内容。基于根据文件大小和日志实现日志分割,在下查阅了不少前辈的资料,不断尝试,终于得出一个可以用的demo,在此分享也做个记录,不足之处,还望指正。

这是本人工作前辈的初始版本:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import os
import time
import logging
import platform
if platform.system() == 'Windows':
    from ctypes import windll, c_ulong

    def color_text_decorator(function):
        def real_func(self, string):
            windll.Kernel32.GetStdHandle.restype = c_ulong
            h = windll.Kernel32.GetStdHandle(c_ulong(0xfffffff5))
            if function.__name__.upper() == 'ERROR':
                windll.Kernel32.SetConsoleTextAttribute(h, 12)
            elif function.__name__.upper() == 'WARN':
                windll.Kernel32.SetConsoleTextAttribute(h, 13)
            elif function.__name__.upper() == 'INFO':
                windll.Kernel32.SetConsoleTextAttribute(h, 14)
            elif function.__name__.upper() == 'DEBUG':
                windll.Kernel32.SetConsoleTextAttribute(h, 15)
            else:
                windll.Kernel32.SetConsoleTextAttribute(h, 15)
            function(self, string)
            windll.Kernel32.SetConsoleTextAttribute(h, 15)
        return real_func
else:
    def color_text_decorator(function):
        def real_func(self, string):
            if function.__name__.upper() == 'ERROR':
                self.stream.write('\033[0;31;40m')
            elif function.__name__.upper() == 'WARN':
                self.stream.write('\033[0;35;40m')
            elif function.__name__.upper() == 'INFO':
                self.stream.write('\033[0;33;40m')
            elif function.__name__.upper() == 'DEBUG':
                self.stream.write('\033[0;37;40m')
            else:
                self.stream.write('\033[0;37;40m')
            function(self, string)
            self.stream.write('\033[0m')
        return real_func

FORMAT = '[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s'


class Logger(object):
    DEBUG_MODE = True
    LOG_LEVEL = 5
    GLOBAL_FILENAME = 'static/testlog/syslog/atc.log'

    def __init__(self, name, filename=None):
        current_path = os.path.join(os.path.dirname(
            os.path.abspath(__file__)), 'static', 'testlog/syslog')
        if not os.path.exists(current_path):
            os.makedirs(current_path)

        # baseconfig
        logging.basicConfig()
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter(FORMAT)

        # output to terminal
        sh = logging.StreamHandler()
        sh.setFormatter(formatter)
        sh.setLevel(logging.DEBUG if self.DEBUG_MODE else logging.INFO)
        self.logger.addHandler(sh)
        self.stream = sh.stream

        # output to global file
        if self.GLOBAL_FILENAME:
            fh_all = logging.FileHandler(self.GLOBAL_FILENAME, 'a')
            #fh_all = logging.handlers.TimedRotatingFileHandler(self.GLOBAL_FILENAME,'M',1,0)
            #fh_all.suffix ="_%Y_%m_%d-%H_%M.log"
            fh_all.setFormatter(formatter)
            fh_all.setLevel(logging.DEBUG)
            self.logger.addHandler(fh_all)
            self.logger.propagate = 0

        # output to user define file
        if filename is not None:
            fh = logging.FileHandler(filename, 'a')
            fh.setFormatter(formatter)
            fh.setLevel(logging.DEBUG)
            self.logger.addHandler(fh)
            self.logger.propagate = 0

    @color_text_decorator
    def hint(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 5:
            return self.logger.debug(strTmp)
        else:
            pass

    @color_text_decorator
    def debug(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 4:
            return self.logger.debug(strTmp)
        else:
            pass

    @color_text_decorator
    def info(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 3:
            return self.logger.info(strTmp)
        else:
            pass

    @color_text_decorator
    def warn(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 2:
            return self.logger.warn(strTmp)
        else:
            pass

    @color_text_decorator
    def error(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 1:
            return self.logger.error(strTmp)
        else:
            pass


class TestLogModule(object):

    def __init__(self):
        pass

    def runtest(self):
        logger = Logger('TEST')

        iCount = 0
        while True:
            iCount = iCount + 1
            logger.error(str(iCount))
            logger.debug('1   22   333   4444     55555      666666')
            logger.info('1   22   333   4444     55555      666666')
            logger.warn('1   22   333   4444     55555      666666')
            logger.error('1   22   333   4444     55555      666666')
            time.sleep(1)
            if iCount >= 120:
                break
        # for a in xrange(10):
        #     logger.debug('1   22   333   4444     55555      666666')
        #     logger.info('1   22   333   4444     55555      666666')
        #     logger.warn('1   22   333   4444     55555      666666')
        #     logger.error('1   22   333   4444     55555      666666')
        #     time.sleep(1)


if __name__ == '__main__':
    TestLogModule().runtest()

我们的需求可以用logging.handlers实现,具体方法为logging.handlers.TimedRotatingFileHandler和logging.handlers.RotatingFileHandler。

class logging.handlers.RotatingFileHandler(filename, mode='a', maxBytes=0, backupCount=0, encoding=None, delay=0)
返回RotatingFileHandler类的实例。指明的文件被打开,并被用作日志流。如果没有指明mode,使用'a'。如果encoding不为None,使用指定的编码来打开文件。如果delay为真,只到第一次调用emit()的时候才打开文件。默认情况下,文件会一直增长。

可以使用maxBytes 和 backupCount 来让文件在预定义的尺寸发生翻转。当文件大小大概要超出时,文件被关闭,新文件被打开用来输出。当文件大小接近于maxBytes长度时,翻转会发生;如果maxBytes为0,翻转永不发生。如果backupCount不为0,系统将保存老的日志文件,在文件名后加上‘.1’, ‘.2’这样的扩展名。例如如果backupCount是5,基本的文件名是app.log,将会得到app.log, app.log.1, app.log.2到 app.log.5。总是写到文件app.log中。当文件被填满,文件被关闭并重命名为app.log.1,而已存的app.log.1, app.log.2等文件被重命名为app.log.2, app.log.3等。

改变于版本2.6:新增了delay。

doRollover()
如上所述做文件的翻转。

emit(record)
输出记录到文件,负责文件的翻转。
class logging.handlers.TimedRotatingFileHandler(filename, when='h', interval=1, backupCount=0, encoding=None, delay=False, utc=False)
返回TimedRotatingFileHandler类的实例。 指明的文件被打开,并用作日志流。在循环时它也会设置文件后缀。循环发生基于when 和 interval的乘积。

使用when来指明interval的类型。可能的值列在下面。注意大小写不敏感。

Value    Type of interval
'S'    Seconds
'M'    Minutes
'H'    Hours
'D'    Days
'W0'-'W6'    Weekday (0=Monday)
'midnight'    Roll over at midnight
注意在使用基于工作日的循环时,‘W0’表示星期一,‘W1’表示星期二,依此类推,‘W6’表示星期日。这种情况下不使用interval。

系统会保存老的日志文件,在文件名后添加扩展名。扩展名基于日期和时间,根据翻转间隔,使用strftime格式%Y-%m-%d_%H-%M-%S,或者其前面一部分。

第一次计算下一次翻转时间的时候(创建handler时),要么使用已存文件的上一次修改时间,要么使用当前时间。

如果utc为真,使用UTC时间;否则使用本地时间。

如果backupCount不为0,最多保留backupCount个文件,如果产生更多的文件,最老的文件会被删除。删除逻辑使用间隔来决定删除哪些文件,所以改变间隔可能会导致老的文件被保留。

如果delay为真,只到第一次调用emit()时文件才被打开。

改变于版本2.6:新增了delay和utc。

doRollover()
如上所述做文件的翻转。

emit(record)
输出记录到文件,负责文件的翻转。

改良第一步:将logging.handlers.TimedRotatingFileHandler和logging.handlers.RotatingFileHandler添加到初始版本中去

Logger类新增logginghandlers.TimeRotatingFileHandler和logging.handlers.RotatingFileHandler 的handler

class Logger(object):
    DEBUG_MODE = True
    LOG_LEVEL = 5
    GLOBAL_FILENAME = 'static/testlog/syslog/atc.log'

    def __init__(self, name, filename=None):
        current_path = os.path.join(os.path.dirname(
            os.path.abspath(__file__)), 'static', 'testlog/syslog')
        if not os.path.exists(current_path):
            os.makedirs(current_path)

        # baseconfig
        logging.basicConfig()
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter(FORMAT)

        # output to terminal
        sh = logging.StreamHandler()
        sh.setFormatter(formatter)
        sh.setLevel(logging.DEBUG if self.DEBUG_MODE else logging.INFO)
        self.logger.addHandler(sh)
        self.stream = sh.stream

        # output to global file
        if self.GLOBAL_FILENAME:         
            th_all = logging.handlers.TimedRotatingFileHandler(self.GLOBAL_FILENAME, when='midnight',interval=1, backupCount=7)
            th_all.setFormatter(formatter)
            th_all.setLevel(logging.DEBUG)
            self.logger.addHandler(th_all)
            # self.logger.propagate = 0

    
        rh_all = logging.handlers.RotatingFileHandler('static/testlog/syslog/rf.log', mode='a',maxBytes=2000*2000, backupCount=3)
        rh_all.setFormatter(formatter)
        rh_all.setLevel(logging.DEBUG)
        self.logger.addHandler(rh_all)
        # self.logger.propagate = 0

        # output to user define file
        if filename is not None:
            fh = logging.FileHandler(filename, 'a')
            fh.setFormatter(formatter)
            fh.setLevel(logging.DEBUG)
            self.logger.addHandler(fh)
            self.logger.propagate = 0

实际结果并不如人意,新添加的两个handler在日志发生反转的时候,新建立的日志文件并不能把各模块的日志输出记录下来,会出现只记录了一部分模块日志的情况。

再次修改,这次我们为了移除模块共写一个文件的影响,另外新建一个mainlogger,由它记录各模块的日志输出,并且在日志反转时,新建日志文件。

#!/usr/bin/python
# -*- coding: utf-8 -*-

import os
import time
import logging
import logging.handlers
import platform

if platform.system() == 'Windows':
    from ctypes import windll, c_ulong

    def color_text_decorator(function):
        def real_func(self, string):
            windll.Kernel32.GetStdHandle.restype = c_ulong
            h = windll.Kernel32.GetStdHandle(c_ulong(0xfffffff5))
            if function.__name__.upper() == 'ERROR':
                windll.Kernel32.SetConsoleTextAttribute(h, 12)
            elif function.__name__.upper() == 'WARN':
                windll.Kernel32.SetConsoleTextAttribute(h, 13)
            elif function.__name__.upper() == 'INFO':
                windll.Kernel32.SetConsoleTextAttribute(h, 14)
            elif function.__name__.upper() == 'DEBUG':
                windll.Kernel32.SetConsoleTextAttribute(h, 15)
            else:
                windll.Kernel32.SetConsoleTextAttribute(h, 15)
            function(self, string)
            windll.Kernel32.SetConsoleTextAttribute(h, 15)
        return real_func
else:
    def color_text_decorator(function):
        def real_func(self, string):
            if function.__name__.upper() == 'ERROR':
                self.stream.write('\033[0;31;40m')
            elif function.__name__.upper() == 'WARN':
                self.stream.write('\033[0;35;40m')
            elif function.__name__.upper() == 'INFO':
                self.stream.write('\033[0;33;40m')
            elif function.__name__.upper() == 'DEBUG':
                self.stream.write('\033[0;37;40m')
            else:
                self.stream.write('\033[0;37;40m')
            function(self, string)
            self.stream.write('\033[0m')
        return real_func

FORMAT = '[%(asctime)s] [%(name)s] [%(levelname)s] %(message)s'


class MainLogger(object):
    DEBUG_MODE = True
    LOG_LEVEL = 5

    def __init__(self, name):
        current_path = os.path.join(os.path.dirname(
            os.path.abspath(__file__)), 'static', 'testlog', 'syslog')
        if not os.path.exists(current_path):
            os.makedirs(current_path)

        # baseconfig
        logging.basicConfig()
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter(FORMAT)

        th_all = logging.handlers.TimedRotatingFileHandler(
            os.path.join(current_path, 'master_main_atc.log'), when='midnight', interval=1, backupCount=7)
        th_all.setFormatter(formatter)
        th_all.setLevel(logging.DEBUG)
        self.logger.addHandler(th_all)

        rh_all = logging.handlers.RotatingFileHandler(
            os.path.join(current_path, 'master_main_logger_rf.log'), mode='a', maxBytes=2000 * 2000, backupCount=3)
        rh_all.setFormatter(formatter)
        rh_all.setLevel(logging.DEBUG)
        self.logger.addHandler(rh_all)
        # 防止在终端重复打印
        self.logger.propagate = 0

    def hint(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 5:
            return self.logger.debug(strTmp)
        else:
            pass

    def debug(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 4:
            return self.logger.debug(strTmp)
        else:
            pass

    def info(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 3:
            return self.logger.info(strTmp)
        else:
            pass

    def warn(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 2:
            return self.logger.warn(strTmp)
        else:
            pass

    def error(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        if self.LOG_LEVEL >= 1:
            return self.logger.error(strTmp)
        else:
            pass

main_logger = MainLogger('MasterMainLogger')


class Logger(object):
    DEBUG_MODE = True
    LOG_LEVEL = 5

    def __init__(self, name, filename=None):

        self.name = name
        # baseconfig
        logging.basicConfig()
        self.logger = logging.getLogger(name)
        self.logger.setLevel(logging.DEBUG)
        formatter = logging.Formatter(FORMAT)

        # output to terminal
        sh = logging.StreamHandler()
        sh.setFormatter(formatter)
        sh.setLevel(logging.DEBUG if self.DEBUG_MODE else logging.INFO)
        self.logger.addHandler(sh)
        self.stream = sh.stream
      
        # output to user define file
        if filename is not None:
            fh = logging.FileHandler(filename, 'a')
            fh.setFormatter(formatter)
            fh.setLevel(logging.DEBUG)
            self.logger.addHandler(fh)
            self.logger.propagate = 0

        # 防止在终端重复打印
        self.logger.propagate = 0

    @color_text_decorator
    def hint(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        main_logger.hint("[" + self.name + "] "  + strTmp)
        if self.LOG_LEVEL >= 5:
            return self.logger.debug(strTmp)
        else:
            pass

    @color_text_decorator
    def debug(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        main_logger.debug("[" + self.name + "] "  + strTmp)
        if self.LOG_LEVEL >= 4:
            return self.logger.debug(strTmp)
        else:
            pass

    @color_text_decorator
    def info(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        main_logger.info("[" + self.name + "] "  + strTmp)
        if self.LOG_LEVEL >= 3:
            return self.logger.info(strTmp)
        else:
            pass

    @color_text_decorator
    def warn(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        main_logger.warn("[" + self.name + "] "  + strTmp)
        if self.LOG_LEVEL >= 2:
            return self.logger.warn(strTmp)
        else:
            pass

    @color_text_decorator
    def error(self, string):
        # 去除多余连续空格
        strTmp = str(string)
        strTmp = ' '.join(strTmp.split())
        main_logger.error("[" + self.name + "] "  + strTmp)
        if self.LOG_LEVEL >= 1:
            return self.logger.error(strTmp)
        else:
            pass


class TestLogModule(object):

    def __init__(self):
        pass

    def runtest(self):
        logger = Logger('TEST')

        iCount = 0
        while True:
            iCount = iCount + 1
            logger.error(str(iCount))
            logger.debug('1   22   333   4444     55555      666666')
            logger.info('1   22   333   4444     55555      666666')
            logger.warn('1   22   333   4444     55555      666666')
            logger.error('1   22   333   4444     55555      666666')
            time.sleep(1)
            if iCount >= 120:
                break
        # for a in xrange(10):
        #     logger.debug('1   22   333   4444     55555      666666')
        #     logger.info('1   22   333   4444     55555      666666')
        #     logger.warn('1   22   333   4444     55555      666666')
        #     logger.error('1   22   333   4444     55555      666666')
        #     time.sleep(1)


if __name__ == '__main__':
    TestLogModule().runtest()

 这次,终于达到笔者现阶段的需求,但是又发现了新的问题,如果以上代码运行在多进程环境中,日志反转时,不再建立新日志文件,python的logging模块报错,提示文件已打开,导致日志记录失败。

原文地址:https://www.cnblogs.com/linyihai/p/6165521.html