Flink中算子进行Chain的规则分析(最新代码,源码版本大于1.11.2)

在Flink中,从程序代码到真正执行需要经历如下几个过程:

  Program -> StreamGraph -> JobGraph -> ExecutionGraph,在StreamGraph -> JobGraph这个阶段,而Flink会对各个Operator按照一定的规则进行Chain。

首先,Chain的策略定义是这样的:

  https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ChainingStrategy.java

package org.apache.flink.streaming.api.operators;
import org.apache.flink.annotation.PublicEvolving;
/**
 * Defines the chaining scheme for the operator. When an operator is chained to the
 * predecessor, it means that they run in the same thread. They become one operator
 * consisting of multiple steps.
 *
 * <p>The default value used by the StreamOperator is {@link #HEAD}, which means that
 * the operator is not chained to its predecessor. Most operators override this with
 * {@link #ALWAYS}, meaning they will be chained to predecessors whenever possible.
 */
@PublicEvolving
public enum ChainingStrategy {
    /**
* 算子会尽可能的Chain在一起(为了优化性能,最好是使用最大数量的chain和加大算子的并行度) * Operators will be eagerly chained whenever possible. * <p>To optimize performance, it is generally a good practice to allow maximal chaining and increase operator parallelism. */ ALWAYS, /**
* 当前算子不会与前驱和后继算子进行Chain * The operator will not be chained to the preceding or succeeding operators. */ NEVER, /**
* 当前算子允许被后继算子Chain,但不会与前驱算子进行Chain * The operator will not be chained to the predecessor, but successors may chain to this operator. */ HEAD, /**
* 与HEAD类似,但此策略会尝试Chain Source算子 * This operator will run at the head of a chain (similar as in {@link #HEAD}, but it will additionally try to chain source inputs if possible. * This allows multi-input operators to be chained with multiple sources into one task. */ HEAD_WITH_SOURCES; public static final ChainingStrategy DEFAULT_CHAINING_STRATEGY = ALWAYS; }

其次,从StreamGraph生成JobGraph时:

  https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java

    /**
     * Gets the assembled {@link JobGraph} with a random {@link JobID}.
     */
    public JobGraph getJobGraph() {
        return getJobGraph(null);
    }

    /**
     * Gets the assembled {@link JobGraph} with a specified {@link JobID}.
     */
    public JobGraph getJobGraph(@Nullable JobID jobID) {
        return StreamingJobGraphGenerator.createJobGraph(this, jobID);
    }

再次,China的动作是这样的:

  https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java

    public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        return downStreamVertex.getInEdges().size() == 1
                && isChainableInput(edge, streamGraph);
    }

    private static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) {
        StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
        StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

        if (!(upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
            && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
            && (edge.getPartitioner() instanceof ForwardPartitioner)
            && edge.getShuffleMode() != ShuffleMode.BATCH
            && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
            && streamGraph.isChainingEnabled())) {

            return false;
        }

        // check that we do not have a union operation, because unions currently only work
        // through the network/byte-channel stack.
        // we check that by testing that each "type" (which means input position) is used only once
        for (StreamEdge inEdge : downStreamVertex.getInEdges()) {
            if (inEdge != edge && inEdge.getTypeNumber() == edge.getTypeNumber()) {
                return false;
            }
        }
        return true;
    }

最后,将JobGraph提交运行

  

原文地址:https://www.cnblogs.com/mengyao/p/14045389.html