flink on yarn部分源码解析

转发请注明原创地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html

flink任务的deploy形式有很多种选择,常见的有standalone,on yarn , Meos , Kubernetes等方式,目前公司内部统一采用flink on yarn的 single job模式(每个flink job 单独在yarn上声明一个flink集群),本文分析的是flink1.5.1版本源码使用legacy 模式提交yarn single job到yarn集群的部分源码。

典型的flink提交single job命令格式如下: ./flink run  -m yarn-cluster -d  -yst -yqu flinkqu -yst  -yn 4 -ys 2 -c flinkdemoclass  flinkdemo.jar  args1 args2 ... 

flink脚本的入口类为org.apache.flink.client.cli.CliFrontend

在CliFrontend的main函数中首先通过loadCustomCommandLines方法加载了提交yarn任务初始化一个重要工具类

org.apache.flink.yarn.cli.FlinkYarnSessionCli
    public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
        List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);

        //    Command line interface of the YARN session, with a special initialization here
        //    to prefix all options with y/yarn.
        //    Tips: DefaultCLI must be added at last, because getActiveCustomCommandLine(..) will get the
        //          active CustomCommandLine in order and DefaultCLI isActive always return true.
        final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
        try {
            customCommandLines.add(
                loadCustomCommandLine(flinkYarnSessionCLI,
                    configuration,
                    configurationDirectory,
                    "y",
                    "yarn"));
        } catch (NoClassDefFoundError | Exception e) {
            LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
        }

        if (configuration.getString(CoreOptions.MODE).equalsIgnoreCase(CoreOptions.NEW_MODE)) {
            customCommandLines.add(new DefaultCLI(configuration));
        } else {
            customCommandLines.add(new LegacyCLI(configuration));
        }

        return customCommandLines;
    }

根据启动参数,CliFrontend开始运行方法run()->runProgram(),runProgram内部与yarn相关的一个重点方法为

client = clusterDescriptor.deploySessionCluster(clusterSpecification);

上文中的clusterDescriptor就是前面的FlinkYarnSessionCli执行createClusterDescriptor()方法后产生的集群属性描述对象,在本模式中对应的具体类是org.apache.flink.yarn.LegacyYarnClusterDescriptor,父类为AbstractYarnClusterDescriptor

deploySessionCluster内部进一步调用deployInternal来向yarn集群提交一个flink集群。

protected ClusterClient<ApplicationId> deployInternal(
            ClusterSpecification clusterSpecification,
            String applicationName,
            String yarnClusterEntrypoint,
            @Nullable JobGraph jobGraph,
            boolean detached) throws Exception {

        // ------------------ Check if configuration is valid --------------------
        validateClusterSpecification(clusterSpecification);

        if (UserGroupInformation.isSecurityEnabled()) {
            // note: UGI::hasKerberosCredentials inaccurately reports false
            // for logins based on a keytab (fixed in Hadoop 2.6.1, see HADOOP-10786),
            // so we check only in ticket cache scenario.
            boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

            UserGroupInformation loginUser = UserGroupInformation.getCurrentUser();
            if (loginUser.getAuthenticationMethod() == UserGroupInformation.AuthenticationMethod.KERBEROS
                && useTicketCache && !loginUser.hasKerberosCredentials()) {
                LOG.error("Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials");
                throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
                    "does not have Kerberos credentials");
            }
        }

        isReadyForDeployment(clusterSpecification);

        // ------------------ Check if the specified queue exists --------------------

        checkYarnQueues(yarnClient);

        // ------------------ Add dynamic properties to local flinkConfiguraton ------
        Map<String, String> dynProperties = getDynamicProperties(dynamicPropertiesEncoded);
        for (Map.Entry<String, String> dynProperty : dynProperties.entrySet()) {
            flinkConfiguration.setString(dynProperty.getKey(), dynProperty.getValue());
        }

        // ------------------ Check if the YARN ClusterClient has the requested resources --------------

        // Create application via yarnClient
        final YarnClientApplication yarnApplication = yarnClient.createApplication();
        final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

        Resource maxRes = appResponse.getMaximumResourceCapability();

        final ClusterResourceDescription freeClusterMem;
        try {
            freeClusterMem = getCurrentFreeClusterResources(yarnClient);
        } catch (YarnException | IOException e) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
        }

        final int yarnMinAllocationMB = yarnConfiguration.getInt(yarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);

        final ClusterSpecification validClusterSpecification;
        try {
            validClusterSpecification = validateClusterResources(
                clusterSpecification,
                yarnMinAllocationMB,
                maxRes,
                freeClusterMem);
        } catch (YarnDeploymentException yde) {
            failSessionDuringDeployment(yarnClient, yarnApplication);
            throw yde;
        }

        LOG.info("Cluster specification: {}", validClusterSpecification);

        final ClusterEntrypoint.ExecutionMode executionMode = detached ?
            ClusterEntrypoint.ExecutionMode.DETACHED
            : ClusterEntrypoint.ExecutionMode.NORMAL;

        flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

        ApplicationReport report = startAppMaster(
            flinkConfiguration,
            applicationName,
            yarnClusterEntrypoint,
            jobGraph,
            yarnClient,
            yarnApplication,
            clusterSpecification);

        String host = report.getHost();
        int port = report.getRpcPort();

        // Correctly initialize the Flink config
        flinkConfiguration.setString(JobManagerOptions.ADDRESS, host);
        flinkConfiguration.setInteger(JobManagerOptions.PORT, port);

        flinkConfiguration.setString(RestOptions.ADDRESS, host);
        flinkConfiguration.setInteger(RestOptions.PORT, port);

        // the Flink cluster is deployed in YARN. Represent cluster
        return createYarnClusterClient(
            this,
            clusterSpecification.getNumberTaskManagers(),
            clusterSpecification.getSlotsPerTaskManager(),
            report,
            flinkConfiguration,
            true);
    }

 deployInternal方法开头对yarn集群的可用内存,queue等进行检查后申请了一个application,并调用startAppMaster声明了AM的启动类:YarnApplicationMasterRunner

public ApplicationReport startAppMaster(
            Configuration configuration,
            String applicationName,
            String yarnClusterEntrypoint,
            JobGraph jobGraph,
            YarnClient yarnClient,
            YarnClientApplication yarnApplication,
            ClusterSpecification clusterSpecification) throws Exception {
.....

        setApplicationTags(appContext);

        // add a hook to clean up in case deployment fails
        Thread deploymentFailureHook = new DeploymentFailureHook(yarnClient, yarnApplication, yarnFilesDir);
        Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
        LOG.info("Submitting application master " + appId);
        yarnClient.submitApplication(appContext);

        LOG.info("Waiting for the cluster to be allocated");
        final long startTime = System.currentTimeMillis();
        ApplicationReport report;


}

 YarnApplicationMasterRunner会在yarn集群上作为appmaster与resourcemanager通信申请对应的Taskmanagercontainer服务,启动jobmanager服务和webui服务等

    protected int runApplicationMaster(Configuration config) {
......
......
    webMonitor = BootstrapTools.startWebMonitorIfConfigured(
                config,
                highAvailabilityServices,
                new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
                new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
                webMonitorTimeout,
                new ScheduledExecutorServiceAdapter(futureExecutor),
                LOG);

            metricRegistry = new MetricRegistryImpl(
                MetricRegistryConfiguration.fromConfiguration(config));

            metricRegistry.startQueryService(actorSystem, null);

            // 2: the JobManager
            LOG.debug("Starting JobManager actor");

            // we start the JobManager with its standard name
            ActorRef jobManager = JobManager.startJobManagerActors(
                config,
                actorSystem,
                futureExecutor,
                ioExecutor,
                highAvailabilityServices,
                metricRegistry,
                webMonitor == null ? Option.empty() : Option.apply(webMonitor.getRestAddress()),
                new Some<>(JobMaster.JOB_MANAGER_NAME),
                Option.<String>empty(),
                getJobManagerClass(),
                getArchivistClass())._1();

            final String webMonitorURL = webMonitor == null ? null : webMonitor.getRestAddress();

            // 3: Flink's Yarn ResourceManager
            LOG.debug("Starting YARN Flink Resource Manager");

            Props resourceMasterProps = YarnFlinkResourceManager.createActorProps(
                getResourceManagerClass(),
                config,
                yarnConfig,
                highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
                appMasterHostname,
                webMonitorURL,
                taskManagerParameters,
                taskManagerContext,
                numInitialTaskManagers,
                LOG);

            ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps);

另一方面,flink客户端在提交完集群后从runprogram()方法进入executeProgram();

    protected void executeProgram(PackagedProgram program, ClusterClient<?> client, int parallelism) throws ProgramMissingJobException, ProgramInvocationException {
        logAndSysout("Starting execution of program");

        final JobSubmissionResult result = client.run(program, parallelism);

        if (null == result) {
            throw new ProgramMissingJobException("No JobSubmissionResult returned, please make sure you called " +
                "ExecutionEnvironment.execute()");
        }

        if (result.isJobExecutionResult()) {
            logAndSysout("Program execution finished");
            JobExecutionResult execResult = result.getJobExecutionResult();
            System.out.println("Job with JobID " + execResult.getJobID() + " has finished.");
            System.out.println("Job Runtime: " + execResult.getNetRuntime() + " ms");
            Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
            if (accumulatorsResult.size() > 0) {
                System.out.println("Accumulator Results: ");
                System.out.println(AccumulatorHelper.getResultsFormatted(accumulatorsResult));
            }
        } else {
            logAndSysout("Job has been submitted with JobID " + result.getJobID());
        }
    }

代码从ClusterClient.run()->prog.invokeInteractiveModeForExecution()开始真正进入用户flink job的main方法。

main方法中,代码最后的env.execute() 会把生成job的执行plan并返回对应的DetachedEnvironment对象。

方法调用链路为DetachedEnvironment.finalizeExecute()->ClusterClient.run()->YarnClusterClient.submitJob->ClusterClient.runDetached();

    /**
     * Submits a JobGraph detached.
     * @param jobGraph The JobGraph
     * @param classLoader User code class loader to deserialize the results and errors (may contain custom classes).
     * @return JobSubmissionResult
     * @throws ProgramInvocationException
     */
    public JobSubmissionResult runDetached(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {

        waitForClusterToBeReady();

        final ActorGateway jobManagerGateway;
        try {
            jobManagerGateway = getJobManagerGateway();
        } catch (Exception e) {
            throw new ProgramInvocationException("Failed to retrieve the JobManager gateway.", e);
        }

        try {
            logAndSysout("Submitting Job with JobID: " + jobGraph.getJobID() + ". Returning after job submission.");
            JobClient.submitJobDetached(
                new AkkaJobManagerGateway(jobManagerGateway),
                flinkConfig,
                jobGraph,
                Time.milliseconds(timeout.toMillis()),
                classLoader);
            return new JobSubmissionResult(jobGraph.getJobID());
        } catch (JobExecutionException e) {
            throw new ProgramInvocationException("The program execution failed: " + e.getMessage(), e);
        }
    }
    @Override
    public JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
        if (isDetached()) {
            if (newlyCreatedCluster) {
                stopAfterJob(jobGraph.getJobID());
            }
            LOG.info("super.runDetached");
            return super.runDetached(jobGraph, classLoader);
        } else {
            LOG.info("super.run");
            return super.run(jobGraph, classLoader);
        }
    }

最后,客户端连接到前文对应的jobmanager服务并把flink job grafaph提交给yarn上已经申请好的flink集群。 

结论:flink on yarn的single job模式提交作业的逻辑为flink客户端首先申请一个yarn集群的application,等待集群成功部署后再联系jobmanager并把job提交到集群上面。这个模式的优点是每个

flink job有一个独立的集群便于资源规划和管理,缺点是经过验证在am挂掉后yarn只能把原来的集群重启回来但是无法恢复flink jobgraph的行为,所以需要额外配置ha信息

原文地址:https://www.cnblogs.com/dongxiao-yang/p/9403427.html