Flink on yarn

1、准备

准备集群

  • Zookeeper集群
  • Hadoop集群

准备flink jar包

官网地址:https://flink.apache.org/downloads.html

flink-1.8之后没有集成hadoop,需要下载对应的hadoop jar包

1.8之前:

1.8之后:

需要下载对应hadoop的组件(然后放入flink的lib目录下)

配置Hadoop的环境变量

配置flink配置文件

置jobmanager地址

jobmanager.rpc.address: bigdata-03

其他自己看情况配置

配置master和slaves

2、Flink yarn

Session Cluster模式

(1) 启动hadoop集群(略)

(2)需要自己自定义配置的话,可以使用来查看参数:

bin/yarn-session.sh –help
Usage:
   Required
     -n,--container <arg>   Number of YARN container to allocate (=Number of Task Managers)
   Optional
     -D <property=value>             use value for given property
     -d,--detached                   If present, runs the job in detached mode
     -h,--help                       Help for the Yarn session CLI.
     -id,--applicationId <arg>       Attach to running YARN session
     -j,--jar <arg>                  Path to Flink jar file
     -jm,--jobManagerMemory <arg>    Memory for JobManager Container with optional unit (default: MB)
     -m,--jobmanager <arg>           Address of the JobManager (master) to which to connect. Use this flag to connect to a different JobManager than the one specified in the configuration.
     -n,--container <arg>            Number of YARN container to allocate (=Number of Task Managers)
     -nl,--nodeLabel <arg>           Specify YARN node label for the YARN application
     -nm,--name <arg>                Set a custom name for the application on YARN
     -q,--query                      Display available YARN resources (memory, cores)
     -qu,--queue <arg>               Specify YARN queue.
     -s,--slots <arg>                Number of slots per TaskManager
     -sae,--shutdownOnAttachedExit   If the job is submitted in attached mode, perform a best-effort cluster shutdown when the CLI is terminated abruptly, e.g., in response to a user interrupt, such
                                     as typing Ctrl + C.
     -st,--streaming                 Start Flink in streaming mode
     -t,--ship <arg>                 Ship files in the specified directory (t for transfer)
     -tm,--taskManagerMemory <arg>   Memory per TaskManager Container with optional unit (default: MB)
     -yd,--yarndetached              If present, runs the job in detached mode (deprecated; use non-YARN specific option instead)
     -z,--zookeeperNamespace <arg>   Namespace to create the Zookeeper sub-paths for high availability mode
yarn-session的参数介绍
  -n : 指定TaskManager的数量;
  -d: 以分离模式运行;
  -id:指定yarn的任务ID;
  -j:Flink jar文件的路径;
  -jm:JobManager容器的内存(默认值:MB);
  -nl:为YARN应用程序指定YARN节点标签;
  -nm:在YARN上为应用程序设置自定义名称;
  -q:显示可用的YARN资源(内存,内核);
  -qu:指定YARN队列;
  -s:指定TaskManager中slot的数量;
  -st:以流模式启动Flink;
  -tm:每个TaskManager容器的内存(默认值:MB);
  -z:命名空间,用于为高可用性模式创建Zookeeper子路径;

(3)启动yarn-session

./yarn-session.sh -n 2 -s 2 -jm 1024 -tm 1024 -nm test -d
其中:
-n(--container):TaskManager的数量。
-s(--slots):	每个TaskManager的slot数量,默认一个slot一个core,默认每个taskmanager的slot的个数为1,有时可以多一些taskmanager,做冗余。
-jm:JobManager的内存(单位MB)。
-tm:每个taskmanager的内存(单位MB)。
-nm:yarn 的appName(现在yarn的ui上的名字)。 
-d:后台执行。

(4)执行程序:

./flink run -c com.duoduo.WordCount  wordcount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

(5)yarn上查看

(6)取消yarn-session

yarn application --kill application_1577588252906_0001

Per Job Cluster模式

(1)启动hadoop集群(略)

(2)不启动yarn-session,直接执行job

./flink run –m yarn-cluster -c com.atguigu.WordCount  WordCount-1.0-SNAPSHOT-jar-with-dependencies.jar --host lcoalhost –port 7777

3、遇到的问题

3.1 java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties

2020-05-21 16:09:11,255 ERROR org.apache.flink.yarn.cli.FlinkYarnSessionCli   - Error while running the Flink session.
java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

------------------------------------------------------------
 The program finished with the following exception:

java.lang.NoClassDefFoundError: com/sun/jersey/core/util/FeaturesAndProperties
    at java.lang.ClassLoader.defineClass1(Native Method)
    at java.lang.ClassLoader.defineClass(ClassLoader.java:763)
    at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142)
    at java.net.URLClassLoader.defineClass(URLClassLoader.java:467)
    at java.net.URLClassLoader.access$100(URLClassLoader.java:73)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:368)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:362)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:361)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.hadoop.yarn.client.api.TimelineClient.createTimelineClient(TimelineClient.java:55)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.createTimelineClient(YarnClientImpl.java:181)
    at org.apache.hadoop.yarn.client.api.impl.YarnClientImpl.serviceInit(YarnClientImpl.java:168)
    at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
    at org.apache.flink.yarn.YarnClusterClientFactory.getClusterDescriptor(YarnClusterClientFactory.java:71)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:56)
    at org.apache.flink.yarn.YarnClusterClientFactory.createClusterDescriptor(YarnClusterClientFactory.java:42)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:529)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: java.lang.ClassNotFoundException: com.sun.jersey.core.util.FeaturesAndProperties
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

解决:

我的hadoop是2.73,放入flink-shaded-hadoop-2-uber-2.7.7-10.0.jar依旧报上面错误,换成高一个版本就好了flink-shaded-hadoop-2-uber-2.8.3-10.0.jar

3.2 在启动日志中发现如下错误

org.apache.flink.yarn.cli.FlinkYarnSessionCli.
java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:264)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLine(CliFrontend.java:1076)
        at org.apache.flink.client.cli.CliFrontend.loadCustomCommandLines(CliFrontend.java:1030)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:957)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more

可以参考以下Flink官网的提示https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/deployment/hadoop.html

解决办法就是在环境变量中增加

export HADOOP_CLASSPATH=`hadoop classpath`

3.3 Couldn't deploy Yarn session cluster

org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster
    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:380)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.run(FlinkYarnSessionCli.java:548)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.lambda$main$5(FlinkYarnSessionCli.java:785)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.yarn.cli.FlinkYarnSessionCli.main(FlinkYarnSessionCli.java:785)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 10 exceeds the maximum number of virtual cores 7 available in the YarnCluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'
    at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:292)
    at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:444)
    at org.apache.flink.yarn.YarnClusterDescriptor.deploySessionCluster(YarnClusterDescriptor.java:373)
    ... 7 more

解决:

yarn.containers.vcores设置了虚拟的cores=7,taskManager的slot我设置了10个,资源不够
方法一:要么调大vcores参数

方法二:减少slot个数taskmanager.numberOfTaskSlots: 5

参考地址:https://www.jianshu.com/p/1b05202c4fb6
参考地址:https://blog.csdn.net/joseph25/article/details/88878350

原文地址:https://www.cnblogs.com/hyunbar/p/12932377.html