Flink

先看例子,

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Long>> stream = env.addSource(...);
stream
    .keyBy(0)
    .timeWindow(Time.of(2500, MILLISECONDS), Time.of(500, MILLISECONDS))
    .reduce(new SummingReducer())
    .addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

env.execute();

看出,和batch最大的不同是,这里是DataStream而不是DataSet;

/**
 * A DataStream represents a stream of elements of the same type. A DataStream
 * can be transformed into another DataStream by applying a transformation as
 * for example:
 * <ul>
 * <li>{@link DataStream#map},
 * <li>{@link DataStream#filter}, or
 * </ul>
 *
 * @param <T> The type of the elements in this Stream
 */
public class DataStream<T> {
    
    protected final StreamExecutionEnvironment environment;
    
    protected final StreamTransformation<T> transformation;
    
    /**
     * Create a new {@link DataStream} in the given execution environment with
     * partitioning set to forward by default.
     *
     * @param environment The StreamExecutionEnvironment
     */
    public DataStream(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
        this.environment = Preconditions.checkNotNull(environment, "Execution Environment must not be null.");
        this.transformation = Preconditions.checkNotNull(transformation, "Stream Transformation must not be null.");
    }    
    
    //DataStream上的各种操作。。。。。。
    //map,reduce,keyby......
}

DataStream的核心,即

StreamTransformation<T> transformation; 如何产生data stream

 

StreamTransformation

对于StreamTransformation,表示一个用于create dataStream的operation;
并且不一定需要对应于一个实际的物理operation,可能只是个逻辑概念,比如下面的例子

/**
 * A {@code StreamTransformation} represents the operation that creates a
 * {@link org.apache.flink.streaming.api.datastream.DataStream}. Every
 * {@link org.apache.flink.streaming.api.datastream.DataStream} has an underlying
 * {@code StreamTransformation} that is the origin of said DataStream.
 *
 * <p>
 * API operations such as {@link org.apache.flink.streaming.api.datastream.DataStream#map} create
 * a tree of {@code StreamTransformation}s underneath. When the stream program is to be executed this
 * graph is translated to a {@link StreamGraph} using
 * {@link org.apache.flink.streaming.api.graph.StreamGraphGenerator}.
 *
 * <p>
 * A {@code StreamTransformation} does not necessarily correspond to a physical operation
 * at runtime. Some operations are only logical concepts. Examples of this are union,
 * split/select data stream, partitioning.
 *
 * <p>
 * The following graph of {@code StreamTransformations}:
 *
 * <pre>{@code
 *   Source              Source        
 *      +                   +           
 *      |                   |           
 *      v                   v           
 *  Rebalance          HashPartition    
 *      +                   +           
 *      |                   |           
 *      |                   |           
 *      +------>Union<------+           
 *                +                     
 *                |                     
 *                v                     
 *              Split                   
 *                +                     
 *                |                     
 *                v                     
 *              Select                  
 *                +                     
 *                v                     
 *               Map                    
 *                +                     
 *                |                     
 *                v                     
 *              Sink 
 * }</pre>
 *
 * Would result in this graph of operations at runtime:
 *
 * <pre>{@code
 *  Source              Source
 *    +                   +
 *    |                   |
 *    |                   |
 *    +------->Map<-------+
 *              +
 *              |
 *              v
 *             Sink
 * }</pre>
 *
 * The information about partitioning, union, split/select end up being encoded in the edges
 * that connect the sources to the map operation.
 *
 * @param <T> The type of the elements that result from this {@code StreamTransformation}
 */
public abstract class StreamTransformation<T>

对于StreamTransformation只定义了output,即该transform产生的result stream
这是抽象类无法直接用,transform产生stream的逻辑还是要封装在具体的operator中

通过下面的例子体会一下,transform和operator的区别,这里设计的有点绕

 

OneInputTransformation,在StreamTransformation基础上加上input

/**
 * This Transformation represents the application of a
 * {@link org.apache.flink.streaming.api.operators.OneInputStreamOperator} to one input
 * {@link org.apache.flink.streaming.api.transformations.StreamTransformation}.
 *
 * @param <IN> The type of the elements in the nput {@code StreamTransformation}
 * @param <OUT> The type of the elements that result from this {@code OneInputTransformation}
 */
public class OneInputTransformation<IN, OUT> extends StreamTransformation<OUT> {

    private final StreamTransformation<IN> input;

    private final OneInputStreamOperator<IN, OUT> operator;

    private KeySelector<IN, ?> stateKeySelector;
    
    private TypeInformation<?> stateKeyType;
}

所以包含,
产生input stream的StreamTransformation<IN> input
以及通过input产生output的OneInputStreamOperator<IN, OUT> operator

同时也可以看下,

public class TwoInputTransformation<IN1, IN2, OUT> extends StreamTransformation<OUT> {

    private final StreamTransformation<IN1> input1;
    private final StreamTransformation<IN2> input2;

    private final TwoInputStreamOperator<IN1, IN2, OUT> operator;
}

 

在看下SourceTransformation和SinkTransformation的对比,

public class SourceTransformation<T> extends StreamTransformation<T> {

    private final StreamSource<T> operator;
}

public class SinkTransformation<T> extends StreamTransformation<Object> {

    private final StreamTransformation<T> input;

    private final StreamSink<T> operator;
}

比较容易理解transform的作用,
对于source,没有input,所以没有代表input的transformation
而对于sink,有input,但是sink的operator不是普通的streamOperator,是StreamSink,即流的终点

 

transform

这个函数的意思,用用户自定义的operator,将当前的Stream,转化为用户指定类型的Stream

/**
 * Method for passing user defined operators along with the type
 * information that will transform the DataStream.
 *
 * @param operatorName
 *            name of the operator, for logging purposes
 * @param outTypeInfo
 *            the output type of the operator
 * @param operator
 *            the object containing the transformation logic
 * @param <R>
 *            type of the return stream
 * @return the data stream constructed
 */
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName, TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    // read the output type of the input Transform to coax out errors about MissingTypeInfo
    transformation.getOutputType();

    OneInputTransformation<T, R> resultTransform = new OneInputTransformation<T, R>(
            this.transformation,
            operatorName,
            operator,
            outTypeInfo,
            environment.getParallelism());

    @SuppressWarnings({ "unchecked", "rawtypes" })
    SingleOutputStreamOperator<R, ?> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

    getExecutionEnvironment().addOperator(resultTransform);

    return returnStream;
}

所以参数为,

用户定义的: 输出的TypeInformation,以及OneInputStreamOperator

实现是,

创建OneInputTransformation,以this.transformation为input,以传入的operator为OneInputStreamOperator
所以通过resultTransform,就会将当前的stream转换为目的流

然后又封装一个SingleOutputStreamOperator,这是什么?

/**
 * The SingleOutputStreamOperator represents a user defined transformation
 * applied on a {@link DataStream} with one predefined output type.
 *
 * @param <T> The type of the elements in this Stream
 * @param <O> Type of the operator.
 */
public class SingleOutputStreamOperator<T, O extends SingleOutputStreamOperator<T, O>> extends DataStream<T> {

    protected SingleOutputStreamOperator(StreamExecutionEnvironment environment, StreamTransformation<T> transformation) {
        super(environment, transformation);
    }
}

说白了,就是封装了一下用户定义的transformation

Flink这块代码的命名有点混乱,Operator,transformation,两个概念容易混

 

上面的例子,里面keyBy(0)

会产生

KeyedStream
对于keyedStream,关键的就是
keySelector和keyType,如何产生key以及key的类型
/**
 * A {@code KeyedStream} represents a {@link DataStream} on which operator state is
 * partitioned by key using a provided {@link KeySelector}. Typical operations supported by a
 * {@code DataStream} are also possible on a {@code KeyedStream}, with the exception of
 * partitioning methods such as shuffle, forward and keyBy.
 *
 * <p>
 * Reduce-style operations, such as {@link #reduce}, {@link #sum} and {@link #fold} work on elements
 * that have the same key.
 *
 * @param <T> The type of the elements in the Keyed Stream.
 * @param <KEY> The type of the key in the Keyed Stream.
 */
public class KeyedStream<T, KEY> extends DataStream<T> {

    /** The key selector that can get the key by which the stream if partitioned from the elements */
    private final KeySelector<T, KEY> keySelector;

    /** The type of the key by which the stream is partitioned */
    private final TypeInformation<KEY> keyType;
}
 
看下transform,在调用DataStream.transform的同时,设置keySelector和keyType
// ------------------------------------------------------------------------
//  basic transformations
// ------------------------------------------------------------------------

@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);

    // inject the key selector and key type
    OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
    transform.setStateKeySelector(keySelector);
    transform.setStateKeyType(keyType);
    
    return returnStream;
}

 

KeyedStream很关键的是,作为一个到WindowedStream的过度,

所以提供一组生成Windowed的接口

// ------------------------------------------------------------------------
//  Windowing
// ------------------------------------------------------------------------

/**
 * Windows this {@code KeyedStream} into tumbling time windows.
 *
 * <p>
 * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
 * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
 * set using
 * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
 *
 * @param size The size of the window.
 */
public WindowedStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
    return window(TumblingTimeWindows.of(size));
}

 

WindowedStream

例子中
.timeWindow(Time.of(
2500, MILLISECONDS), Time.of(500, MILLISECONDS))

 

/**
 * A {@code WindowedStream} represents a data stream where elements are grouped by
 * key, and for each key, the stream of elements is split into windows based on a
 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. Window emission
 * is triggered based on a {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
 *
 * <p>
 * The windows are conceptually evaluated for each key individually, meaning windows can trigger at
 * different points for each key.
 *
 * <p>
 * If an {@link Evictor} is specified it will be used to evict elements from the window after
 * evaluation was triggered by the {@code Trigger} but before the actual evaluation of the window.
 * When using an evictor window performance will degrade significantly, since
 * pre-aggregation of window results cannot be used.
 *
 * <p>
 * Note that the {@code WindowedStream} is purely and API construct, during runtime
 * the {@code WindowedStream} will be collapsed together with the
 * {@code KeyedStream} and the operation over the window into one single operation.
 * 
 * @param <T> The type of elements in the stream.
 * @param <K> The type of the key by which elements are grouped.
 * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns the elements to.
 */
public class WindowedStream<T, K, W extends Window> {

    /** The keyed data stream that is windowed by this stream */
    private final KeyedStream<T, K> input;

    /** The window assigner */
    private final WindowAssigner<? super T, W> windowAssigner;

    /** The trigger that is used for window evaluation/emission. */
    private Trigger<? super T, ? super W> trigger;

    /** The evictor that is used for evicting elements before window evaluation. */
    private Evictor<? super T, ? super W> evictor;

可以看到WindowedStream没有直接继承自DataStream

而是以,KeyedStream作为他的input

当然window所必需的,WindowAssigner,Trigger和Evictor,也是不会少

 

继续例子, .reduce(new SummingReducer())

看看windowedStream的操作,reduce

/**
 * Applies a reduce function to the window. The window function is called for each evaluation
 * of the window for each key individually. The output of the reduce function is interpreted
 * as a regular non-windowed stream.
 * <p>
 * This window will try and pre-aggregate data as much as the window policies permit. For example,
 * tumbling time windows can perfectly pre-aggregate the data, meaning that only one element per
 * key is stored. Sliding time windows will pre-aggregate on the granularity of the slide interval,
 * so a few elements are stored per key (one per slide interval).
 * Custom windows may not be able to pre-aggregate, or may need to store extra values in an
 * aggregation tree.
 * 
 * @param function The reduce function.
 * @return The data stream that is the result of applying the reduce function to the window. 
 */
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
    //clean the closure
    function = input.getExecutionEnvironment().clean(function);

    String opName = "TriggerWindow(" + windowAssigner + ", " + trigger + ", " + udfName + ")";
    KeySelector<T, K> keySel = input.getKeySelector();

    OneInputStreamOperator<T, T> operator;

    boolean setProcessingTime = input.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;

    if (evictor != null) {
        operator = new EvictingWindowOperator<>(windowAssigner,
                windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                keySel,
                input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                new HeapWindowBuffer.Factory<T>(),
                new ReduceWindowFunction<K, W, T>(function),
                trigger,
                evictor).enableSetProcessingTime(setProcessingTime);

    } else {
        operator = new WindowOperator<>(windowAssigner,
                windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                keySel,
                input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                new PreAggregatingHeapWindowBuffer.Factory<>(function), //PreAggre,即不会cache真实的element,而是直接存聚合过的值,这样比较节省空间
                new ReduceWindowFunction<K, W, T>(function),
                trigger).enableSetProcessingTime(setProcessingTime);
    }

    return input.transform(opName, input.getType(), operator);
}

关键就是根据是否有Evicting,选择创建不同的WindowOperator

然后调用input.transform,将windowedStream转换成SingleOutputStream,

这里input,即是keyedStream

// ------------------------------------------------------------------------
//  basic transformations
// ------------------------------------------------------------------------

@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
        TypeInformation<R> outTypeInfo, OneInputStreamOperator<T, R> operator) {

    SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,operator);

    // inject the key selector and key type
    OneInputTransformation<T, R> transform = (OneInputTransformation<T, R>) returnStream.getTransformation();
    transform.setStateKeySelector(keySelector);
    transform.setStateKeyType(keyType);
    
    return returnStream;
}

可以看到这里的参数是OneInputStreamOperator,而WindowOperator其实是实现了该interface的,

可以看到,对于OneInputStreamOperator而言,我们只需要实现,processElement和processWatermark两个接口,侧重如何处理input element

/**
 * Interface for stream operators with one input. Use
 * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
 * you want to implement a custom operator.
 * 
 * @param <IN> The input type of the operator
 * @param <OUT> The output type of the operator
 */
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {

    /**
     * Processes one element that arrived at this operator.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     */
    void processElement(StreamRecord<IN> element) throws Exception;

    /**
     * Processes a {@link Watermark}.
     * This method is guaranteed to not be called concurrently with other methods of the operator.
     *
     * @see org.apache.flink.streaming.api.watermark.Watermark
     */
    void processWatermark(Watermark mark) throws Exception;
}

继续调用,super.transform,即DataStream的transform

 

例子最后,

.addSink(new SinkFunction<Tuple2<Long, Long>>() {...});

实际是调用,

SingleOutputStreamOperator.addSink,即DataStream.addSink

/**
 * Adds the given sink to this DataStream. Only streams with sinks added
 * will be executed once the {@link StreamExecutionEnvironment#execute()}
 * method is called.
 *
 * @param sinkFunction
 *            The object containing the sink's invoke function.
 * @return The closed DataStream.
 */
public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

    StreamSink<T> sinkOperator = new StreamSink<T>(clean(sinkFunction));

    DataStreamSink<T> sink = new DataStreamSink<T>(this, sinkOperator);

    getExecutionEnvironment().addOperator(sink.getTransformation());
    return sink;
}

 

SinkFunction结构,

public interface SinkFunction<IN> extends Function, Serializable {

    /**
     * Function for standard sink behaviour. This function is called for every record.
     *
     * @param value The input record.
     * @throws Exception
     */
    void invoke(IN value) throws Exception;
}

 

StreamSink,即是OneInputStreamOperator,所以主要是processElement接口

public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
        implements OneInputStreamOperator<IN, Object> {

    public StreamSink(SinkFunction<IN> sinkFunction) {
        super(sinkFunction);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        userFunction.invoke(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        // ignore it for now, we are a sink, after all
    }
}

 

DataStreamSink,就是对SinkTransformation的封装

/**
 * A Stream Sink. This is used for emitting elements from a streaming topology.
 *
 * @param <T> The type of the elements in the Stream
 */
public class DataStreamSink<T> {

    SinkTransformation<T> transformation;

    @SuppressWarnings("unchecked")
    protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
        this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
    }
}

 

最终,

把SinkTransformation加入 List<StreamTransformation<?>> transformations

 

最后走到,env.execute();

原文地址:https://www.cnblogs.com/fxjwind/p/5706295.html