flink-connector-kafka consumer的topic分区分配源码

转载请注明原创地址 http://www.cnblogs.com/dongxiao-yang/p/7200599.html 

flink官方提供了连接kafka的connector实现,由于调试的时候发现部分消费行为与预期不太一致,所以需要研究一下源码。

flink-connector-kafka目前已有kafka 0.8、0.9、0.10三个版本的实现,本文以FlinkKafkaConsumer010版本代码为例。

FlinkKafkaConsumer010类的父类继承关系如下,FlinkKafkaConsumerBase包含了大多数实现。

FlinkKafkaConsumer010<T> extends FlinkKafkaConsumer09<T> extends FlinkKafkaConsumerBase<T> 

其中每个版本的FlinkKafkaConsumerBase内部都实现了一个对应的AbstractFetcher用来拉取kafka数据,继承关系如下

Kafka010Fetcher<T> extends Kafka09Fetcher<T>extends AbstractFetcher<T, TopicPartition> 

FlinkKafkaConsumerBase类定义如下,继承了RichParallelSourceFunction和CheckpointedFunction等接口。

public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements 
        CheckpointListener,
        ResultTypeQueryable<T>,
        CheckpointedFunction,
        CheckpointedRestoring<HashMap<KafkaTopicPartition, Long>> {

FlinkKafkaConsumer内部各方法的执行细节

initializeState

    public void initializeState(FunctionInitializationContext context) throws Exception {

        OperatorStateStore stateStore = context.getOperatorStateStore();
        offsetsStateForCheckpoint = stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);

        if (context.isRestored()) {
            if (restoredState == null) {
                restoredState = new HashMap<>();
                for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : offsetsStateForCheckpoint.get()) {
                    restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
                }

                LOG.info("Setting restore state in the FlinkKafkaConsumer.");
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Using the following offsets: {}", restoredState);
                }
            }
            if (restoredState != null && restoredState.isEmpty()) {
                restoredState = null;
            }
        } else {
            LOG.info("No restore state for FlinkKafkaConsumer.");
        }
    }

根据运行日志,initializeState会在flinkkafkaconusmer初始化的时候最先调用,方法通过运行时上下文FunctionSnapshotContext调用getOperatorStateStore和getSerializableListState拿到了checkpoint里面的state对象,如果这个task是从失败等过程中恢复的过程中,context.isRestored()会被判定为true,程序会试图从flink checkpoint里获取原来分配到的kafka partition以及最后提交完成的offset。

open

    public void open(Configuration configuration) {
        // determine the offset commit mode
        offsetCommitMode = OffsetCommitModes.fromConfiguration(
                getIsAutoCommitEnabled(),
                enableCommitOnCheckpoints,
                ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

        switch (offsetCommitMode) {
            case ON_CHECKPOINTS:
                LOG.info("Consumer subtask {} will commit offsets back to Kafka on completed checkpoints.",
                        getRuntimeContext().getIndexOfThisSubtask());
                break;
            case KAFKA_PERIODIC:
                LOG.info("Consumer subtask {} will commit offsets back to Kafka periodically using the Kafka client's auto commit.",
                        getRuntimeContext().getIndexOfThisSubtask());
                break;
            default:
            case DISABLED:
                LOG.info("Consumer subtask {} has disabled offset committing back to Kafka." +
                        " This does not compromise Flink's checkpoint integrity.",
                        getRuntimeContext().getIndexOfThisSubtask());
        }

        // initialize subscribed partitions
        List<KafkaTopicPartition> kafkaTopicPartitions = getKafkaPartitions(topics);
        Preconditions.checkNotNull(kafkaTopicPartitions, "TopicPartitions must not be null.");

        subscribedPartitionsToStartOffsets = new HashMap<>(kafkaTopicPartitions.size());

        if (restoredState != null) {
            for (KafkaTopicPartition kafkaTopicPartition : kafkaTopicPartitions) {
                if (restoredState.containsKey(kafkaTopicPartition)) {
                    subscribedPartitionsToStartOffsets.put(kafkaTopicPartition, restoredState.get(kafkaTopicPartition));
                }
            }

            LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
                getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
        } else {
            initializeSubscribedPartitionsToStartOffsets(
                subscribedPartitionsToStartOffsets,
                kafkaTopicPartitions,
                getRuntimeContext().getIndexOfThisSubtask(),
                getRuntimeContext().getNumberOfParallelSubtasks(),
                startupMode,
                specificStartupOffsets);

            if (subscribedPartitionsToStartOffsets.size() != 0) {
                switch (startupMode) {
                    case EARLIEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case LATEST:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                        break;
                    case SPECIFIC_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            specificStartupOffsets,
                            subscribedPartitionsToStartOffsets.keySet());

                        List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
                        for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
                            if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
                                partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
                            }
                        }

                        if (partitionsDefaultedToGroupOffsets.size() > 0) {
                            LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
                                    "; their startup offsets will be defaulted to their committed group offsets in Kafka.",
                                getRuntimeContext().getIndexOfThisSubtask(),
                                partitionsDefaultedToGroupOffsets.size(),
                                partitionsDefaultedToGroupOffsets);
                        }
                        break;
                    default:
                    case GROUP_OFFSETS:
                        LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
                            getRuntimeContext().getIndexOfThisSubtask(),
                            subscribedPartitionsToStartOffsets.size(),
                            subscribedPartitionsToStartOffsets.keySet());
                }
            }
        }
    }

open方法会在initializeState技术后调用,主要逻辑分为几个步骤

1 判断offsetCommitMode。根据kafka的auto commit ,setCommitOffsetsOnCheckpoints()的值(默认为true)以及flink运行时有没有开启checkpoint三个参数的组合,

offsetCommitMode共有三种模式:ON_CHECKPOINTS  checkpoint结束后提交offset;KAFKA_PERIODIC kafkaconsumer自带的定期提交功能;DISABLED 不提交

2 分配kafka partition 。如果initializeState阶段已经拿到了state之前存储的partition,直接继续读取对应的分区,如果是第一次初始化,调initializeSubscribedPartitionsToStartOffsets

方法计算当前task对应的分区列表

  
    protected static void initializeSubscribedPartitionsToStartOffsets(
            Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
            List<KafkaTopicPartition> kafkaTopicPartitions,
            int indexOfThisSubtask,
            int numParallelSubtasks,
            StartupMode startupMode,
            Map<KafkaTopicPartition, Long> specificStartupOffsets) {

        for (int i = 0; i < kafkaTopicPartitions.size(); i++) {
            if (i % numParallelSubtasks == indexOfThisSubtask) {
                if (startupMode != StartupMode.SPECIFIC_OFFSETS) {
                    subscribedPartitionsToStartOffsets.put(kafkaTopicPartitions.get(i), startupMode.getStateSentinel());
                } else {
                    if (specificStartupOffsets == null) {
                        throw new IllegalArgumentException(
                            "Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
                                ", but no specific offsets were specified");
                    }

                    KafkaTopicPartition partition = kafkaTopicPartitions.get(i);

                    Long specificOffset = specificStartupOffsets.get(partition);
                    if (specificOffset != null) {
                        // since the specified offsets represent the next record to read, we subtract
                        // it by one so that the initial state of the consumer will be correct
                        subscribedPartitionsToStartOffsets.put(partition, specificOffset - 1);
                    } else {
                        subscribedPartitionsToStartOffsets.put(partition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
                    }
                }
            }
        }
    }

可以看到,flink采用分区号逐个对flink并发任务数量取余的方式来分配partition,如果i % numParallelSubtasks == indexOfThisSubtask,那么这个i分区就归属当前分区拥有。

partition的分区结果记录在私有变量Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets 里,用于后续初始化consumer。

run方法

    @Override
    public void run(SourceContext<T> sourceContext) throws Exception {
        if (subscribedPartitionsToStartOffsets == null) {
            throw new Exception("The partitions were not set for the consumer");
        }

        // we need only do work, if we actually have partitions assigned
        if (!subscribedPartitionsToStartOffsets.isEmpty()) {

            // create the fetcher that will communicate with the Kafka brokers
            final AbstractFetcher<T, ?> fetcher = createFetcher(
                    sourceContext,
                    subscribedPartitionsToStartOffsets,
                    periodicWatermarkAssigner,
                    punctuatedWatermarkAssigner,
                    (StreamingRuntimeContext) getRuntimeContext(),
                    offsetCommitMode);

            // publish the reference, for snapshot-, commit-, and cancel calls
            // IMPORTANT: We can only do that now, because only now will calls to
            //            the fetchers 'snapshotCurrentState()' method return at least
            //            the restored offsets
            this.kafkaFetcher = fetcher;
            if (!running) {
                return;
            }
            
            // (3) run the fetcher' main work method
            fetcher.runFetchLoop();
        }
        else {
            // this source never completes, so emit a Long.MAX_VALUE watermark
            // to not block watermark forwarding
            sourceContext.emitWatermark(new Watermark(Long.MAX_VALUE));

            // wait until this is canceled
            final Object waitLock = new Object();
            while (running) {
                try {
                    //noinspection SynchronizationOnLocalVariableOrMethodParameter
                    synchronized (waitLock) {
                        waitLock.wait();
                    }
                }
                catch (InterruptedException e) {
                    if (!running) {
                        // restore the interrupted state, and fall through the loop
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
    }

可以看到计算好的subscribedPartitionsToStartOffsets被传到了拥有consumerThread的AbstractFetcher实例内部,KafkaConsumerThread通过调用consumerCallBridge.assignPartitions(consumer, convertKafkaPartitions(subscribedPartitionStates));方法最终调用到了consumer.assign(topicPartitions);手动向consumer实例指定了topic分配。

参考文档:

Working with State

原文地址:https://www.cnblogs.com/dongxiao-yang/p/7200599.html