Flink Flow

1. Create environment for stream computing

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();
        env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
        env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
        env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
public static StreamExecutionEnvironment getExecutionEnvironment() {
        if (contextEnvironmentFactory != null) {
            return contextEnvironmentFactory.createExecutionEnvironment();
        }

        // because the streaming project depends on "flink-clients" (and not the other way around)
        // we currently need to intercept the data set environment and create a dependent stream env.
        // this should be fixed once we rework the project dependencies

        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        if (env instanceof ContextEnvironment) {
            return new StreamContextEnvironment((ContextEnvironment) env);
        } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
            return new StreamPlanEnvironment(env);
        } else {
            return createLocalEnvironment();
        }
    }

2.  Now we need to add the data source for further computing

DataStream<KafkaEvent> input = env
                .addSource( new FlinkKafkaConsumer010<>(
                            parameterTool.getRequired("input-topic"),
                            new KafkaEventSchema(),
                            parameterTool.getProperties()).assignTimestampsAndWatermarks(new CustomWatermarkExtractor()))
                .keyBy("word")
                .map(new RollingAdditionMapper());
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
        return addSource(function, "Custom Source");
    }
@SuppressWarnings("unchecked")
    public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {

        if (typeInfo == null) {
            if (function instanceof ResultTypeQueryable) {
                typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
            } else {
                try {
                    typeInfo = TypeExtractor.createTypeInfo(
                            SourceFunction.class,
                            function.getClass(), 0, null, null);
                } catch (final InvalidTypesException e) {
                    typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
                }
            }
        }

        boolean isParallel = function instanceof ParallelSourceFunction;

        clean(function);
        StreamSource<OUT, ?> sourceOperator;
        if (function instanceof StoppableFunction) {
            sourceOperator = new StoppableStreamSource<>(cast2StoppableSourceFunction(function));
        } else {
            sourceOperator = new StreamSource<>(function);
        }

        return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
    }
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper) {

        TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
                Utils.getCallLocationName(), true);

        return transform("Map", outType, new StreamMap<>(clean(mapper)));
    }
public <R> SingleOutputStreamOperator<R> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operator,
                outTypeInfo,
                environment.getParallelism());

        @SuppressWarnings({ "unchecked", "rawtypes" })
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }
@Internal
    public void addOperator(StreamTransformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();
public KeyedStream<T, Tuple> keyBy(String... fields) {
        return keyBy(new Keys.ExpressionKeys<>(fields, getType()));
    }

    private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream<>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
                getType(), getExecutionConfig())));
    }

3. The data from data source will be streamed into Flink Distributed Computing Runtime and the computed result will be transfered to data Sink.

input.addSink(          new FlinkKafkaProducer010<>(
                        parameterTool.getRequired("output-topic"),
                        new KafkaEventSchema(),
                        parameterTool.getProperties()));
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        // configure the type if needed
        if (sinkFunction instanceof InputTypeConfigurable) {
            ((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
        }

        StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

        DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);

        getExecutionEnvironment().addOperator(sink.getTransformation());
        return sink;
    }
@Internal
    public void addOperator(StreamTransformation<?> transformation) {
        Preconditions.checkNotNull(transformation, "transformation must not be null.");
        this.transformations.add(transformation);
    }
protected final List<StreamTransformation<?>> transformations = new ArrayList<>();

4. The last step is to start executing.

env.execute("Kafka 0.10 Example");

The mapper computing template is defined as blow.

private static class RollingAdditionMapper extends RichMapFunction<KafkaEvent, KafkaEvent> {

        private static final long serialVersionUID = 1180234853172462378L;

        private transient ValueState<Integer> currentTotalCount;

        @Override
        public KafkaEvent map(KafkaEvent event) throws Exception {
            Integer totalCount = currentTotalCount.value();

            if (totalCount == null) {
                totalCount = 0;
            }
            totalCount += event.getFrequency();

            currentTotalCount.update(totalCount);

            return new KafkaEvent(event.getWord(), totalCount, event.getTimestamp());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            currentTotalCount = getRuntimeContext().getState(new ValueStateDescriptor<>("currentTotalCount", Integer.class));
        }
    }

 

http://www.debugrun.com/a/LjK8Nni.html

原文地址:https://www.cnblogs.com/iiiDragon/p/9810367.html