Airflow v2.0 分布式部署 elasticsearch日志解决方案

Airflow v2.0 分布式部署 elasticsearch日志解决方案

安装环境:

  • docker
  • airflow v2.0
  • elasticsearch 7+
  • filebeat 7+

开发依赖:

pip install 'apache-airflow-providers-elasticsearch'

日志方案

graph LR AF[(Airflow)] -.写入.-> LOG[[json格式日志文件]] -.读取.-> FB[/Filebeat 解析规范化日志结构/] -.存入.-> ES[(ElasticSearch)]

上图filebeat和logstash之间可以加入logstash处理,根据个人方案设计。

airflow配置

开启远程日志设置,另配置elasticsearch设置信息,使webserver可以访问到elasticsearch,远程日志的获取是通过对log_id进行搜索的,所以要保证日志输出包含与log_id_template配置格式匹配的log_id字段。

[logging]
# 开启远程日志开关
remote_logging = True

# 设置webserver elasticsearch连接信息
[elasticsearch]
host = your_host:your_port
# log_id模板,日志搜索的id
log_id_template = {dag_id}-{task_id}-{execution_date}-{try_number}
# 是否将日志输出到标准输出,根据个人日志方案设置,由于本人是需要filebeat读取日志文件再发送至elasticsearch,所以设为false
write_stdout = False
# 是否将日志输出为json
json_format = True

# elasticsearch 加密配置,根据个人需求配置
[elasticsearch_configs]
use_ssl = False
verify_certs = False

filebeat配置

filebeat负责读取worker节点任务执行产生的日志,并将其格式化规范化后发给elasticsearch进行保存。

注意事项:

  • host字段须为可哈希类型或者不存在

下面配置仅包含涉及到需要修改的配置。

# 读取日志文件
filebeat.inputs:

# Each - is an input. Most options can be set at the input level, so
# you can use different inputs for various configurations.
# Below are the input specific configurations.

- type: log
  enabled: true
  paths: 
    - /app/logs/**/*.log
  exclude_files: 
    - '.py.log$'

setup.template:
  # name和pattern为elasticsearch index设置需要
  name: "airflow_log_template"
  pattern: "airflow-log*"

# 关闭Index lifecycle management,否则修改index会无效
setup.ilm.enabled: false

output.elasticsearch:
  # Array of hosts to connect to.
  hosts: '${FILEBEAT_OUTPUT_ELASTICSEARCH_HOSTS:"127.0.0.1:9200"}'

  # Protocol - either `http` (default) or `https`.
  protocol: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PROTOCOL:"http"}'

  # Authentication credentials - either API key or username/password.
  #api_key: "id:api_key"
  username: '${FILEBEAT_OUTPUT_ELASTICSEARCH_USERNAME:""}'
  password: '${FILEBEAT_OUTPUT_ELASTICSEARCH_PASSWORD:""}'
  index: "airflow-log-%{+yyyy.MM}"

# 日志处理设置
processors:
  # 关闭host信息输出
  # - add_host_metadata:
        # when.not.contains.tags: forwarded

  # 添加对json日志的解析
  - decode_json_fields:
      fields: ["message"]
      process_array: false
      max_depth: 1
      target: ""
      overwrite_keys: true
      add_error_key: true

  # 移除host字段
  - drop_fields:
      fields: ["host"]
      ignore_missing: true

当配置修改成功后,使用filebeat -e进行配置测试

参考

airflow webserver获取日志报错

[2021-01-27 15:12:57,006] {app.py:1891} ERROR - Exception on /get_logs_with_metadata [GET]
Traceback (most recent call last):
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 2447, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1952, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1821, in handle_user_exception
    reraise(exc_type, exc_value, tb)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/_compat.py", line 39, in reraise
    raise value
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1950, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/flask/app.py", line 1936, in dispatch_request
    return self.view_functions[rule.endpoint](**req.view_args)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/auth.py", line 34, in decorated
    return func(*args, **kwargs)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/decorators.py", line 60, in wrapper
    return f(*args, **kwargs)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/session.py", line 65, in wrapper
    return func(*args, session=session, **kwargs)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/www/views.py", line 1054, in get_logs_with_metadata
    logs, metadata = task_log_reader.read_log_chunks(ti, try_number, metadata)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/log_reader.py", line 58, in read_log_chunks
    logs, metadatas = self.log_handler.read(ti, try_number, metadata=metadata)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/utils/log/file_task_handler.py", line 217, in read
    log, metadata = self._read(task_instance, try_number_element, metadata)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 163, in _read
    logs_by_host = self._group_logs_by_host(logs)
  File "/home/lzy/Git/Work/airflow_func/.venv/lib/python3.8/site-packages/airflow/providers/elasticsearch/log/es_task_handler.py", line 132, in _group_logs_by_host
    grouped_logs[key].append(log)
TypeError: unhashable type: 'AttrDict'

上面报错对应代码:

    @staticmethod
    def _group_logs_by_host(logs):
        grouped_logs = defaultdict(list)
        for log in logs:
            key = getattr(log, 'host', 'default_host')  # 此处为获取host值作为日志键
            grouped_logs[key].append(log)

        # return items sorted by timestamp.
        result = sorted(grouped_logs.items(), key=lambda kv: getattr(kv[1][0], 'message', '_'))

        return result

由上面代码可以看出,上面会获取日志host字段信息并将其作为字典的键,所以日志中host字段内容必须为可以做字典键的可哈希类型,不可为列表或者字典等可变类型,删除日志host字段或者设为哈希类型可以解决此问题。

原文地址:https://www.cnblogs.com/li1234yun/p/14336209.html