【原创】Airflow调用talend

核心原理

因为talend job build出来是一个可直接运行的程序,可以通过shell命名启动job进程,因此可以使用airflow的bashoperator调用生成好的talend job包里面的sh脚本,启动talend job。

设计talend job任务

本例子主要将一批数据同步到mysql表中

clip_image002

导出talend job

将设计好的talend job build到zip包。

clip_image004

定义airflow dag

新建个airflow dag python文件,例如如下脚本,其中job_commd就是我们平时写的调用shell脚本的一个命令。

# -*- coding: utf-8 -*-

import airflow

from datetime import datetime

from datetime import timedelta

from airflow.operators.bash_operator import BashOperator

from airflow.models import DAG

from airflow.utils.email import send_email

job_commd = "/softwares/shellscripts/airflow_exec_talend_job_demo/airflow_exec_talend_job_demo/airflow_exec_talend_job_demo_run.sh '/softwares/shellscripts/airflow_exec_talend_job_demo/airflow_exec_talend_job_demo'"

default_args = {

'owner': 'airflow',

'depends_on_past': False,

'start_date': datetime(2019, 11, 4),

'email': ['1183744742@qq.com'],

'email_on_failure': True,

'email_on_success': True,

'retries':2,

'retry_delay': timedelta(minutes=1),

}

dag = DAG(

        dag_id='airflow_exec_talend_job_demo',

        default_args=default_args,

        schedule_interval='0 12 * * *',

        dagrun_timeout=timedelta(minutes=60),

)

t1 = BashOperator(

        task_id='airflow_talend_demo',

        bash_command=job_commd,

        dag=dag

)

部署调度任务

clip_image006

运行查看是否可以正常入库

clip_image008


如果您觉得此文章对您有帮助,请点击右下方【推荐】让更多人看到,thanks!

原文地址:https://www.cnblogs.com/xiongnanbin/p/11978541.html