airflow部署

官网:

http://airflow.apache.org/installation.html

https://github.com/apache/airflow

原理:

https://www.cnblogs.com/cord/p/9450910.html

安装:

https://www.cnblogs.com/cord/p/9226608.html

高可用部署等:

 https://www.jianshu.com/p/2ecef979c606

使用方法等:

https://www.jianshu.com/p/cbff05e3f125

日志在:

/tmp/scheduler.log 

#用普通帐号部署:

su airflow

启动命令:

. /home/airflow/venv/bin/activate

#单机部署

https://www.jianshu.com/p/9bed1e3ab93b

 1/ python3.6.5环境

https://www.cnblogs.com/hongfeng2019/p/11250989.html

依赖和superset也相同

yum install mysql-devel
pip3 install mysqlclient

2/ 安装mysql

https://www.cnblogs.com/hongfeng2019/p/11279488.html

create database airflowdb
use airflowdb;
GRANT all privileges ON airflowdb.* TO 'airflow'@'localhost' IDENTIFIED BY 'Fengfeng99&';
GRANT all privileges ON airflowdb.* TO 'airflow'@'10.52.%.%' IDENTIFIED BY 'Fengfeng&';

set @@global.explicit_defaults_for_timestamp=on;

防止后面报错:
"Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql"

3/ 安装airflow

mkdir -p /root/airflow

vim /etc/profile

export AIRFLOW_HOME=/root/airflow
export SLUGIFY_USES_TEXT_UNIDECODE=yes

source /etc/profile

pip install apache-airflow[all]

4/修改配置

vim /root/airflow/airflow.cfg 

sql_alchemy_conn = mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb

5/初始化数据库

airflow initdb

#集群模式部署:

https://www.jianshu.com/p/bddacfd66c1f

1/ 安装celery

pip install celery

2/ 安装redis (现redis和mysql都改为ucloud的)

普通帐号起参考: https://www.cnblogs.com/hongfeng2019/p/11524173.html

wget http://download.redis.io/releases/redis-4.0.11.tar.gz

$ tar xzf redis-4.0.11.tar.gz
mv redis-4.0.11 /opt/
cd /opt/redis-4.0.11   #公司的在/hongfeng/software
$ make
make执行完后会在src目录下生成redis-server,redis-cli等六个可执行文件.
redis-server: redis服务器的daemon启动程序
redis-cli: redis命令操作工具

#改redis-4.0.11目录下的redis.conf
send -i 's/daemonize no/daemonize yes/g' /opt/redis-4.0.11/redis.conf
sed -i 's/bind 127.0.0.1/#bind 127.0.0.1/g' /opt/redis-4.0.11/redis.conf #允许远程连接
sed -i 's/protected-mode yes/protected-mode no/g' /opt/redis-4.0.11/redis.conf #允许远程连接

启动redis服务:

cd src/

./redis-server ../redis.conf

3/ 配置airflow.cfg

#改LocalExecutor

executor = CeleryExecutor

broker_url = redis://10.52.56.119:6379/0

result_backend = db+mysql://airflow:Fengfeng&@10.52.56.119:3306/airflowdb

4/ 安装python的redis包

pip install redis

5/ 运行airflow

#启动airflow webserver测试

airflow webserver -p 8080

后台启动:

#启动webserver
#后台运行 airflow webserver -p 8080 -D
airflow webserver -p 8080
#启动scheduler
#后台运行 airflow scheduler -D
airflow scheduler
#启动worker
#后台运行 airflow worker -D
#如提示addres already use ,则查看 worker_log_server_port = 8793 是否被占用,如是则修改为 8974 等
#未被占用的端口
airflow worker
#启动flower -- 可以不启动
#后台运行 airflow flower -D
airflow flower

如果 airflow flower 运行报错,请执行 pip install flower 来安装 flower 。

三. 多点的高可用

我们可以借助第三方组件 airflow-scheduler-failover-controller 实现 scheduler 的高可用。

具体步骤如下所示:

下载 failover
1/ git clone https://github.com/teamclairvoyant/airflow-scheduler-failover-controller
使用 pip 进行安装
cd{AIRFLOW_FAILOVER_CONTROLLER_HOME}
pip install -e .
初始化 failover
scheduler_failover_controller init
注:初始化时,会向airflow.cfg中追加内容,因此需要先安装 airflow 并初始化。

2/ 更改 failover 配置
vim /root/airflow.cnf
scheduler_nodes_in_cluster= host1,host2 #可用ip
注:host name 可以通过scheduler_failover_controller get_current_host命令获得

# Number of times to retry starting up the scheduler before it sends an alert
retry_count_before_alerting = 5 #failover前尝试在那台起schudle几次,可改为0

测试:

可把airflow-1关掉,看scheudle是否切换,tailf /root/airflow/failover.log看切换情况.

3/ 配置安装 failover 的机器之间的免密登录,配置完成后,可以使用如下命令进行验证:
scheduler_failover_controller test_connection
启动 failover
scheduler_failover_controller start

/data/venv/bin/scheduler_failover_controller start

测试: 

把主node 119停掉

. /data/venv/bin/activate

supervisorctl status

vim /data/venv/etc/supervisord.conf

发现failover起作用了,但schudle进程并没有拉起来.检查发现:

要实现真正的高可用还需要:

1/ mysql的MHA

2/ redies的集群

#集群要增加节点:

同样的配置克隆一台,但supervisor里只起failover和worker服务

119上没有woker schudle进程   ps -ef , supervisor看

重建服务器后配置:

改hostname

改/etc/hosts


cd /root/airflow

vim airflow.cnf
改下面的ip为新ip:
broker_url = redis://10.52.56.119:6379/0
result_backend = db+mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
sql_alchemy_conn = mysql://airflow:Airflow99&@10.52.56.119:3306/airflowdb
scheduler_nodes_in_cluster = 10.52.96.246,10.52.143.191

cd /data/venv/etc
vim supervisord.conf
supervisord -c /data/venv/etc/supervisord.conf     #启动supervisor

cd /root/airflow/dags
git pull

. /data/venv/bin/activate
服务命令:
supervisorctl start [service_name] #启动服务
supervisorctl status [service_name] #停止服务

停止supervisor:

venv) [root@airflow-1 ~]# ps -ef |grep supervisord
root 31684 1 0 Oct29 ? 00:07:07 /data/venv/bin/python3 /data/venv/bin/supervisord -c /data/venv/etc/supervisord.conf

pkill supervisord

or kill -9 31684

#supervisor自身的日志在/tmp/supervisord.log

#更新环境

scp -r /opt/cloudera  10.52.41.187:/opt 

airflow的web改动:

1/ admin--connection; 改成node1.datalake.opay.com

2/ echo $JAVA_HOME问题:

echo $HADOOP_HOME
/opt/cloudera/parcels/CDH/lib/hadoop

cd /opt/cloudera/parcels/CDH/lib/hadoop/etc/hadoop

在hadoop-env.sh中,再显示地重新声明一遍JAVA_HOME

export HADOOP_MAPRED_HOME=$( ([[ ! '/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce' =~ CDH_MR2_HOME ]] && echo /opt/cloudera/parcels/CDH/lib/hadoop-mapreduce ) || echo ${CDH_MR2_HOME:-/usr/lib/hadoop-mapreduce/} )
export YARN_OPTS="-Xmx838860800 -Djava.net.preferIPv4Stack=true $YARN_OPTS"
export HADOOP_CLIENT_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS"
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.222.b10-0.el7_6.x86_64

3/ 日志同步,从2复制到1 /root/airflow/logs,在master上配, 不然会报[ERROR - Failed to open transport (tries_left=3)]找不到日志的错

vim  /hongfeng/script/rsynce_airflow_log.sh

rsync -azuq -e ssh root@10.52.51.116:/root/airflow/logs/ /root/airflow/logs/ --exclude 'scheduler'  --exclude 'scheduler_failover' --exclude 'dag_processor_manager'

*/2 * * * * /bin/sh /hongfeng/script/rsynce_airflow_log.sh >/dev/null 2>&1

4/ ERROR - cannot import name 'constants' 的问题

vi /data/venv/lib/python3.6/site-packages/airflow/hooks/hive_hooks.py
line:802

#from pyhive.hive import connect
from impala.dbapi import connect
# return connect(
# host=db.host,
# port=db.port,
# auth_mechanism='PLAIN',
#auth=auth_mechanism,
# kerberos_service_name=kerberos_service_name,
# username=db.login or username,
# database=schema or db.schema or 'default')

return connect(
host=db.host,
port=db.port,
auth_mechanism='PLAIN',
user=db.login,
password=db.password)

检查thrift-sasl版本
pip list | grep thrift-sasl
thrift-sasl 0.3.0
pip install thrift-sasl==0.2.1

5/ airflow-web服务起不来

看/root/airflow/webserver.log日志,是否有登陆滚动信息

schedule的假死问题可以用strace追踪

strace -p

nohup strace -o output.txt -T -tt -e trace=all -p 8761 &
nohup strace -o /hongfeng/output.txt -p 32536 &

6/ 当定义的dag文件过多的时候,airflow的scheduler节点运行效率缓慢

[scheduler]
# The scheduler can run multiple threads in parallel to schedule dags.
# This defines how many threads will run.
#默认是2这里改为100
max_threads = 20

查看:

 ps -ef |grep schedu

7/ 任务失败发邮件时同时发送微信报警

 vi /data/venv/lib/python3.6/site-packages/airflow/models/taskinstance.py

email_alert方法

        # wx
        import sys
        sys.path.append("/root/airflow/dags")
        from plugins.comwx import ComwxApi
        comwx = ComwxApi('wwd26d45f97ea74ad2', 'BLE_v25zCmnZaFUgum93j3zVBDK-DjtRkLisI_Wns4g', '1000011')
        comwx.postAppMessage(subject, '271')

8/ 随着任务增多,airflow调度hive的压力增大

解决:配置ulb转发到集群的多hive节点,分担压力。 ulb配置见:https://www.cnblogs.com/hongfeng2019/p/11934689.html

10.52.151.166     10000

 

airlow2上:

scp airflow.cfg  10.52.51.8:/root/airflow


cd /data/venv/etc
vim supervisord.conf
删除
[program:airflow_flower]
[program:airflow_web]

supervisord -c /data/venv/etc/supervisord.conf 

systemctl service mysqld

airflow日常维护:

同事震乾赞助

大数据Airflow集群目前部署了两台server:

10.52.56.119

10.52.12.116

服务部署情况:

10.52.56.119

  • mysql

  • redis

  • airflow_failover

  • airflow_flower

  • airflow_scheduler

  • airflow_web

  • airflow_worker

10.52.12.116

  • airflow_failover

  • airflow_worker

 

访问服务:

ip:8080 airflow_web 服务

ip:5555 airflow_flower 服务

 

airflow相关服务由supvisor管理,日常维护命令如下

先进入python虚拟环境

. /data/venv/bin/activate

 退出:deactivate

服务命令:

supervisorctl start [service_name] #启动服务

supervisorctl stop [service_name] #停止服务

supervisorctl stop all

supervisorctl restart [service_name] #重启服务

 

supervisor配置文件 /data/venv/etc/supervisord.conf

[program:airflow_web]
command=airflow webserver -p 8080
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/webserver.log

[program:airflow_scheduler]
command=airflow scheduler
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/scheduler.log

[program:airflow_worker]
command=airflow worker
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
environment=C_FORCE_ROOT=true
stdout_logfile=/root/airflow/worker.log

[program:airflow_flower]
command=airflow flower --basic_auth=admin:opay321
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/flower.log

[program:airflow_failover]
command=scheduler_failover_controller start
autostart=true
autorestart=true
stopasgroup=true
killasgroup=true
stdout_logfile=/root/airflow/failover.log

 
View Code

git同步脚本:

vim /root/deploy_git_airflow.sh
ansible airflow -m shell -a 'cd /root/airflow/dags && git pull'

airflow连接配置:

 

原文地址:https://www.cnblogs.com/hongfeng2019/p/11390473.html