flink on yarn

为什么要用yarn?

如果不用yarn.假设有10个job运行在flink集群上,如果有一个出问题.发生了OOM,最后导致taskmanager挂掉.那么jobmanager会调度任务到其他的taskmanager上面.最后是连锁反应,会造成所有的taskmanager都挂掉.集群挂掉.

所以要用yarn.

flink on yarn 有两种形式

  • yarn per job
  • yarn session

yarn per job.

意思就是,一个任务一个集群.如果你这个任务出问题了,那就你自己挂掉,其他集群会活的好好的.

但是缺点是,每个任务都要一个jobmanager.都要单独的内存.

如果一个jobmanager占用1g内存.那么要是有50个任务就会占用50g内存.浪费.

适合执行时间长的作业

yarn session

因为上一个的缺点,jobmanager浪费内存.因此有了这个模式.

意思就是我在yarn中启动一个集群,然后不重要的任务,都放在这里面运行,或者在这里面测试.

相当于一个沙箱.

  • 适合规模小,执行时间短的作业
  • 离线处理
  • 共享资源
  • 可用增加taskmanager 和终止空闲的taskmanager

yarn-per-job启动参数

./flink run 
-m yarn-cluster 
-yn 1 
-yjm 1024 
-ytm 4096 
-ynm FlinkOnYarnSession-MemberLogInfoProducer
-c com.igg.flink.tool.member.rabbitmq.producer.MqMemberProducer 
/home/test_gjm/igg-flink-tool/igg-flink-tool-1.0.0-SNAPSHOT.jar
参数名	含义
-m	固定为yarn-cluster
-yjm	指定JobManager所在的Container内存。单位:MB
-ytm	每一个TaskManager Container的内存,单位MB。
-ys	每一个TaskManager中slots的数量。
-ynm	YARN中application的名称。
-c	指定Job对应的jar包中主函数所在类名。

说明

这里只有一个taskmanager.

假设你的任务并行度是5.那么 -ys 就是5 .

假设你的任务需要8G内存.那么 -ytm 就是 8192

-yjm 固定为 1024

例子

/data/flink-1.10.1/bin/flink run 
-m yarn-cluster 
-yjm 1024 
-ytm 18432 
-ys 3 
-p 6 
-yD env.java.opts="-XX:+UseG1GC" 
-ynm huadan_active_job 
-c com.xxxxxx.FlinkHuadanActiveJob 
/data/hadoop/data/xxxxx-analysis-1.0-SNAPSHOT.jar 

使用 G1 收集器
启动 2 个 tm ,每个3slot,18G 内存.

Options for yarn-cluster mode:
     -d,--detached                        If present, runs the job in detached
                                          mode
     -m,--jobmanager <arg>                Address of the JobManager (master) to
                                          which to connect. Use this flag to
                                          connect to a different JobManager than
                                          the one specified in the
                                          configuration.
     -yat,--yarnapplicationType <arg>     Set a custom application type for the
                                          application on YARN
     -yD <property=value>                 use value for given property
     -yd,--yarndetached                   If present, runs the job in detached
                                          mode (deprecated; use non-YARN
                                          specific option instead)
     -yh,--yarnhelp                       Help for the Yarn session CLI.
     -yid,--yarnapplicationId <arg>       Attach to running YARN session
     -yj,--yarnjar <arg>                  Path to Flink jar file
     -yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                          optional unit (default: MB)
     -ynl,--yarnnodeLabel <arg>           Specify YARN node label for the YARN
                                          application
     -ynm,--yarnname <arg>                Set a custom name for the application
                                          on YARN
     -yq,--yarnquery                      Display available YARN resources
                                          (memory, cores)
     -yqu,--yarnqueue <arg>               Specify YARN queue.
     -ys,--yarnslots <arg>                Number of slots per TaskManager
     -yt,--yarnship <arg>                 Ship files in the specified directory
                                          (t for transfer)
     -ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                          optional unit (default: MB)
     -yz,--yarnzookeeperNamespace <arg>   Namespace to create the Zookeeper
                                          sub-paths for high availability mode
     -z,--zookeeperNamespace <arg>        Namespace to create the Zookeeper
                                          sub-paths for high availability mode

yarn session 启动参数

先启动 yarn-session

例子:

#!/bin/bash

/data/flink-1.10.1/bin/yarn-session.sh 
-s 4 
-jm 1g 
-tm 8g 
-d 
-ynm yarn-flink

yarn-session.sh -n 5 -jm 1024 -tm 2048 -s 2 -d
参数解释:
// -jm 1024 表示jobmanager 1024M内存
// -tm 1024表示taskmanager 1024M内存
// -s  每一个TaskManager上的slots数量。
//-d 任务后台运行
//-nm,--name YARN上为一个自定义的应用设置一个名字


意思:
启动一个集群.有5个taskmanager,每个taskmanager 内存2G,2个slot.
但是在开始的时候,是没有的,在你提交任务的时候,就会创建taskmanager.
比如你提交一个 3并行度的任务,就会创建出来2个taskmanager,一共有4个slot.剩余1个.
你再启动一个2并行度的任务,那么就会再启动1个taskmanager,一共 4个slot,还剩1个slot.
 
因此在yarn-session中,taskmanager的slot不要设置的过多.
n可以大一点,后面可以扩充.

提交到yarn-session

/data/flink-1.10.1/bin/flink run 
-yid {application id} 
-p 3 
-yD env.java.opts="-XX:+UseG1GC" 
-c com.xxxxxxx.FlinkBatchTimeLengthJob 
/data/hadoop/data/xxxxxxx.jar 

Usage:
   Optional
     -D <arg>                        Dynamic properties
     -d,--detached                   Start detached
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -nm,--name                      Set a custom name for the application on YARN
     -at,--applicationType           Set a custom application type on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for HA mode
原文地址:https://www.cnblogs.com/weijiqian/p/14034567.html