代码走读 airflow 2

1.8.1

==================

 ================

 

 ============

 ===========

 ==========

1 max_threads :同时起这么多个进程处理pyfile
2 file_process_interval:在这么长时间(秒)中,一个文件最多执行一次
3 run_duration:scheduler的循环什么时候退出
4 DagStat中的信息是对DagRun中state的一个统计
5 每个dag都有最大并行执行个数dag.max_active_runs【ruing的个数】
6 dag_dir_list_interval:多长时间去找新增dag和删除的dag
7 DAG的dagrun_timeout属性可以控制dag最长能运行多久
8 DAG的dag_concurrency属性可以控制dag中可以并行执行的task个数
9 max_active_runs(max_active_runs_per_dag)属性控制正在running的dag个数,超过这个个数就不给这个dag起新的dagrun了
10 可以查看models中DAG类的属性来看控制DAG都有哪些方法
11 dagrun所有的taskinstance的状态决定了dagrun的状态,dagrun和taskinstance通过dag_id和execution_date相关联的
12 dag第一次开始执行的时间是:startdate + self._schedule_interval
13 通过表dag_run的run_id属性来判断dagrun是scheduler的还是backfill的
14 JOB_HEARTBEAT_SEC 表示scheduler每个多久心跳一次
15 default queue 有128个槽,即最多并行128个taskinstance
16 localworker将taskinstance状态改成success
17 executor的心跳是将queued_tasks中的任务放到queue中
18 processor_manager 里存了每个dagfile的执行情况

19 通过operator的execution_timeout属性来控制一个task的最长执行时间

20 duration 是总共执行时间:duration = (self.end_date - self.start_date).total_seconds()

21 taskinstance刚创建时状态是None->当运行条件满足时,状态改成scheduler ->当被放到executor的queue_command时,状态改成queue

->当刚进入taskinstance.run函数时,状态改成是running,最后根据执行结果状态改成success或fail

失败后,若想重试,state改成 UP_FOR_RETRY

22  scheduler一共起了PARALLELISM个woker和max_threads个Processing files,所以一共起了PARALLELISM+max_threads个进程

 23  解析py文件的DagFileProcessor的日志目录在/usr/local/airflow/logs/scheduler/下面,分日期,一天一个目录

24 worker通过subprocess.check_call起进程执行task,若task抛出异常或返回非0,如sys.exit(-1),则woker会把taskinstance标记为失败

25 当一个dag要4分钟才能执行完,但执行频率设为了1分钟,且'max_active_runs':1,应该在10分执行的dag可能被延迟到了13分,dagrun收到的execution_date还是10;

但若catchup=False,则execution_date不再是10

26 每隔 dag_dir_list_interval 秒刷新一次dag文件,若对应的dag文件没有了,则对应的processor要停掉

27 将dag变为simple_dags(dagrun)的日志级别是info,所在目录:/usr/local/airflow/logs/scheduler/2020-04-06  ,返回simple_dags后,子进程退出

28、被停止的dag重新on后,会继续上次运行的dag

29、

原文地址:https://www.cnblogs.com/testzcy/p/12593664.html