flink爬坑记录一

问题一

使用命令提交flink任务

flink run -c com.lezhi.business.dxxbs.transmission.ExecuteDML /data/jar/gkt-bigData-flink-1.0-SNAPSHOT-jar-with-dependencies.jar --id 50

报如下错误

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.

解决办法:

问题原因为 com.myql.jdbc.Driver 驱动已经过时,让使用com.mysql.cj.jdbc.Driver, 驱动,

首先检查项目钟MySQL版本,保证和mysql服务器在大版本上一致,其次更改驱动为com.mysql.cj.jdbc.Driver,重写打包。并且在flinklib下上传对应的mysql连接jar包。如下图,即可解决问题

问题二

 启动 yarn-session.sh ,报如下错误

[root@bigdata01 bin]# yarn-session.sh 
Error: A JNI error has occurred, please check your installation and try again
Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException
at java.lang.Class.getDeclaredMethods0(Native Method)
at java.lang.Class.privateGetDeclaredMethods(Class.java:2701)
at java.lang.Class.privateGetMethodRecursive(Class.java:3048)
at java.lang.Class.getMethod0(Class.java:3018)
at java.lang.Class.getMethod(Class.java:1784)
at sun.launcher.LauncherHelper.validateMainClass(LauncherHelper.java:544)
at sun.launcher.LauncherHelper.checkAndLoadMain(LauncherHelper.java:526)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

且,flink run --help 查看, yarn-cluster mode,仅有这些参数。

  Options for yarn-cluster mode:
     -m,--jobmanager <arg>            Set to yarn-cluster to use YARN execution
                                      mode.
     -yid,--yarnapplicationId <arg>   Attach to running YARN session
     -z,--zookeeperNamespace <arg>    Namespace to create the Zookeeper
                                      sub-paths for high availability mode

  Options for default mode:

问题原因:flink的lib下缺少到hadoop的jar导致。下载并将flink-shaded-hadoop-2-2.6.5-7.0.jar包上传至flink/lib即可。如下图

 注意:下载与对应hadoop版本一致的jar包

 问题三

提交代码时,遇到不知道jdbc,如下,

Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='jdbc'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(FactoryUtil.java:385)
        at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:372)
        at org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:159)
        ... 32 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'jdbc' that implements 'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

blackhole
datagen
filesystem
kafka
mysql-cdc
print
upsert-kafka

解决方法:为引入fflink-connector-jdbc_2.11-1.12.0.jar包,至flink/lib下,再次提交即可

问题四

使用命令提交flink任务到yarn上

flink run -m yarn-cluster -ynm flink-tbrw-epidemic_report -c com.lezhi.business.dxxbs.transmission.ExecuteDML  /data/jar/gkt-bigData-flink-1.0-SNAPSHOT-jar-with-dependencies.jar --id 50

提交报错,错误如下

2021-07-07 09:40:44,551 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/data/software/flink-1.12.0/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2021-07-07 09:40:44,768 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2021-07-07 09:40:44,857 INFO  org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider [] - Failing over to rm2
2021-07-07 09:40:44,933 INFO  org.apache.hadoop.conf.Configuration                         [] - resource-types.xml not found
2021-07-07 09:40:44,934 INFO  org.apache.hadoop.yarn.util.resource.ResourceUtils           [] - Unable to find 'resource-types.xml'.

------------------------------------------------------------
 The program finished with the following exception:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
    at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
    at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
    at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
    at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1876)
    at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
    at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.table.api.TableException: Failed to execute sql
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:696)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:759)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:665)
    at com.lezhi.business.dxxbs.transmission.ExecuteDML$.main(ExecuteDML.scala:79)
    at com.lezhi.business.dxxbs.transmission.ExecuteDML.main(ExecuteDML.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
    ... 11 more
Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster.
    at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:460)
    at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:70)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1940)
    at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:128)
    at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:57)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:680)
    ... 20 more
Caused by: org.apache.flink.configuration.IllegalConfigurationException: The number of requested virtual cores per node 320 exceeds the maximum number of virtual cores 8 available in the Yarn Cluster. Please note that the number of virtual cores is set to the number of task slots by default unless configured in the Flink config with 'yarn.containers.vcores.'
    at org.apache.flink.yarn.YarnClusterDescriptor.isReadyForDeployment(YarnClusterDescriptor.java:323)
    at org.apache.flink.yarn.YarnClusterDescriptor.deployInternal(YarnClusterDescriptor.java:507)
    at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:453)
    ... 25 more
You have new mail in /var/spool/mail/root

 原因:机器的可用cores 只有8个,而自己请求了320个,导致获取不到资源报错

解决方法:添加参数,控制任务使用资源数量即可

可参考:https://www.cnblogs.com/gzgBlog/p/14981617.html

author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14981295.html