Airflow调度http请求资源代码

import os
import pytz
from datetime import timedelta, datetime
from airflow.operators.http_operator import SimpleHttpOperator
from airflow.models import DAG


os.environ['AIRFLOW_CONN_HTTP_TEST']='http://host.docker.internal:8089'

default_args = {
'owner': 'Airflow',
'depends_on_past': True,
'wait_for_downstream': False,
'execution_timeout': timedelta(minutes=50),
'email': ['iss_isg_prf1@lenovo.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=20),
}


tz = pytz.timezone('Asia/Shanghai')
dt = datetime(2021, 10, 26, 12, 20, tzinfo=tz)
utc_dt = dt.astimezone(pytz.utc).replace(tzinfo=None)



dag = DAG('USAGE_LENOVO_Operator',
default_args=default_args,
schedule_interval='0 6 * * 1',
start_date=utc_dt
)



t1 = SimpleHttpOperator(
task_id='get_op_011',
http_conn_id='http_test',
method='GET',
endpoint='/api/USAGE/spider',
headers={"Content-Type": "application/json"},
response_check=lambda response: True if len(response.text) != 0 else False,
xcom_push=True,
dag=dag,
)



t2 = SimpleHttpOperator(
task_id='get_op_012',
http_conn_id='http_test',
method='GET',
endpoint='/api/USAGE/cleans',
headers={"Content-Type": "application/json"},
response_check=lambda response: True if len(response.text) != 0 else False,
xcom_push=True,
dag=dag,
)


t1 >> t2
原文地址:https://www.cnblogs.com/zhulimin/p/15638194.html