celery 日志设置

celery 日志设置

3种自定义Celery日志记录处理程序的策略

python日志处理程序可以自定义日志消息,例如,我们想把日志消息写入屏幕,文件和日志管理服务等,在这种情况下,我们能将三个日志处理程序添加到应用程序的根记录器中。

import logging

logger = logging.getLogger()
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s [%(lineno)d]')

# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)

# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)

在celery 中添加自定义日志处理程序(通常在 celery 中配置日志记录)可能涉及很多麻烦。celery 文档有点少,而且在各种论坛(比如 stackoverflow中也是充满了各种矛盾的答案),各种各样的文章都提出了相当复杂的解决办法和补丁。

在本文,我将展示三种配置 celery 记录器的替代策略,并说明每种策略的工作方式和原因。提供的代码也是可以运行的独立脚本。要求是 python3.6+和 celery4.2.0+结合使用。

首先启动celery

celery worker --app=app.app --concurrency=1 --loglevel=INFO

异步方式启动任务

python app.py

celery logging

celery logging 比较复杂且不易设置。底层的 python 日志记录系统需要支持 celery 支持的所有并发的设置:eventlet,greenlet,threads 等。但现实的情况是现在的 python 日志记录系统并不支持所有这些不同的配置。

celery 在 celery.app.log 中提供了特殊的get_task_logger 功能。这将返回一个继承自记录器celery的特殊记录器 celery.task,该记录器自动获取任务名称以及唯一 ID 作为日志的一部分。

但是,我们也可以使用标准getlogger方式获取日志记录对象,原因是我们很可能在 celery 或者 web 应用程序中调用代码。如果我们使用 logging.getlogger(name),可以使我们的底层代码与执行代码的上下文保持干净整洁。

第一种策略:Augment Celery 记录器

celery 提供了after_setup_logger在Celery设置记录器之后触发的信号,信号传递记录器对象,我们可以方便地自定义处理程序然后添加到记录器中

import os
import logging
from celery import Celery
from celery.signals import after_setup_logger


for f in ['./broker/out', './broker/processed']:
    if not os.path.exists(f):
        os.makedirs(f)


logger = logging.getLogger(__name__)


app = Celery('app')
app.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': './broker/out',
        'data_folder_out': './broker/out',
        'data_folder_processed': './broker/processed'
    }})


@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # FileHandler
    fh = logging.FileHandler('logs.log')
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # SysLogHandler
    slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
    slh.setFormatter(formatter)
    logger.addHandler(slh)


@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result


if __name__ == '__main__':
    task = add.s(x=2, y=3).delay()
    print(f'Started task: {task}')

第二种策略:覆盖 celery 根记录器

可以通过连接setup_logging 信号来阻止celery 配置任何记录器,这样,我们就可以完全自定义自己的日志记录配置

import os
import logging
from celery import Celery
from celery.signals import setup_logging

app = Celery('app')
app.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': './broker/out',
        'data_folder_out': './broker/out',
        'data_folder_processed': './broker/processed'
    }})


for f in ['./broker/out', './broker/processed']:
    if not os.path.exists(f):
        os.makedirs(f)


logger = logging.getLogger(__name__)


@after_setup_logger.connect
def setup_loggers(*args, **kwargs):
    logger = logging.getLogger()
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

    # StreamHandler
    sh = logging.StreamHandler()
    sh.setFormatter(formatter)
    logger.addHandler(sh)
    
    
    # FileHandler
    fh = logging.FileHandler('logs.log')
    fh.setFormatter(formatter)
    logger.addHandler(fh)

    # SysLogHandler
    slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
    slh.setFormatter(formatter)
    logger.addHandler(slh)



@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result


if __name__ == '__main__':
    task = add.s(x=2, y=3).delay()
    print(f'Started task: {task}')

第三种策略:停用 celery 记录器配置

另一种解决方案就是让 celery 设计其记录器但是不使用,并防止其劫持根记录器。默认情况下,celery 会在根记录器上先删除所有先前的配置的处理程序。如果要自定义自己的日志处理程序而不会妨碍celery,则可以通过设置禁用此行为 worker_hijack_root_logger=True。这将使我们能够收回对于根记录器的控制权,并退回到标准的 python 记录器设置。但是需要谨慎使用这种方案,因为我们需要确保python 日志记录与 celery 设置完全兼容(event,greenlet,threads 等)

import os
import logging
from celery import Celery
from celery.signals import setup_logging

app = Celery('app')
app.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': './broker/out',
        'data_folder_out': './broker/out',
        'data_folder_processed': './broker/processed'
    },
    'worker_hijack_root_logger': False})


# setup folder for message broking
for f in ['./broker/out', './broker/processed']:
    if not os.path.exists(f):
        os.makedirs(f)


formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)


# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)


logger = logging.getLogger(__name__)


@app.task()
def add(x, y):
    result = x + y
    logger.info(f'Add: {x} + {y} = {result}')
    return result


if __name__ == '__main__':
    task = add.s(x=2, y=3).delay()
    print(f'Started task: {task}')

我们采用第二种方法来定制celery 的日志格式

  • 添加 logging_config
import logging.config

LOG_CONFIG = {
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'simple': {
            'datefmt': '%Y-%m-%d %H:%M:%S',
            'format': '{"timestamp": "%(asctime)s", "app": "bs-whatweb", '
                      '"logger": "%(name)s", "level": "%(levelname)s", '
                      '"pathname": "%(pathname)s", "module": "%(module)s", '
                      '"funcName": "%(funcName)s", "lineno": "%(lineno)d", '
                      '"message": "%(message)s"}'

        },
        'json': {
            'class': 'project.api.tasks.logger.JSONFormatter'
        }
    },
    'handlers': {
        'celery': {
            'level': 'INFO',
            'formatter': 'simple',
            'class': 'logging.StreamHandler'
        },
        'celery_json': {
            'level': 'INFO',
            'formatter': 'json',
            'class': 'logging.StreamHandler'
        },
        'sentry': {
            'level': "CRITICAL",
            'formatter': 'simple',
            'class': 'raven.handlers.logging.SentryHandler',
            'args': ('https://facc2ededdfa45ba955dca1eb485915a@sentry.socmap.org/7',)
        },
    },
    'loggers': {
        'celery_logger': {
            'handlers': ['celery_json'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery.task': {
            'handlers': ['celery_json'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery.worker': {
            'handlers': ['celery_json'],
            'level': 'INFO',
            'propagate': False,
        },
        'celery': {
            'handlers': ['celery_json'],
            'level': 'INFO',
            'propagate': False,
        },
        'project': {
            'handlers': ['celery_json'],
            'level': 'INFO',
            'propagate': False,
        },
    }
}

logging.config.dictConfig(LOG_CONFIG)
  • 设置 JSON 格式化,并添加 task_id 及 task_name 两个参数
    def __init__(self, tags=None, hostname=None, fqdn=False, message_type='JSON',
                 indent=None):
        super().__init__()
        *********************新增******************
        try:
            from celery._state import get_current_task
            self.get_current_task = get_current_task
        except ImportError:
            self.get_current_task = lambda: None
        ******************************************
        """
        :param tags: a list of tags to add to every messages
        :hostname: force a specific hostname
        :fqdn: a boolean to use the FQDN instead of the machine's hostname
        :message_type: the message type for Logstash formatters
        :indent: indent level of the JSON output
        """
        self.message_type = message_type
        self.tags = tags if tags is not None else []
        self.extra_tags = []
        self.indent = indent

        if hostname:
            self.host = hostname
        elif fqdn:
            self.host = socket.getfqdn()
        else:
            self.host = socket.gethostname()
     
      def format(self, record, serialize=True):
      ****************************新增***********************
        task = self.get_current_task()
        if task and task.request:
            record.__dict__.update(task_id=task.request.id,
                                   task_name=task.name)

        else:
            record.__dict__.setdefault('task_name', '')
            record.__dict__.setdefault('task_id', '')
        if record.__dict__.get("data"):
            record.__dict__.pop("data")
       	*****************************************************
        new_message = record.getMessage()
        # Create message dict
        message = {
            'timestamp': self.format_timestamp(record.created),
            'app': os.environ.get('APP_NAME'),
            'host': self.host,
            'environment': os.environ.get('FLASK_ENV'),
            'logger': record.name,
            'level': record.levelname,
            'message': new_message,
            'path': record.pathname,
            'tags': self.tags[:]
        }

        # Add extra fields
        message.update(self.get_extra_fields(record))

        # Add extra tags
        if self.extra_tags:
            message['tags'].extend(self.extra_tags)

        # If exception, add debug info
        if record.exc_info or record.exc_text:
            message.update(self.get_debug_fields(record))

        if serialize:
            return self.serialize(message, indent=self.indent)
        return message

  • 拦截 celery 信号
from celery.signals import setup_logging
@setup_logging.connect
def setup_logger(*args, **kwargs):
    from project.api.tasks import logging_config
原文地址:https://www.cnblogs.com/zhangweijie01/p/11813215.html