【转载】使用Flink低级处理函数ProcessFunction

转载链接:https://zhuanlan.zhihu.com/p/130708277

Flink的转换操作是无法访问事件的时间戳信息和水印信息的。例如我们常用的MapFunction转换操作就无法访问时间戳或者当前事件的事件时间。而这在一些应用场景下,极为重要。

因此,Flink DataStream API提供了一系列的Low-Level转换操作,可以访问时间戳、水印以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。这一类的低级API,被称为"Process Function"。

ProcessFunction

ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子无法实现)。例如,Flink SQL就是使用Process Function实现的。

ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:

  • events:数据流中的元素
  • state:状态,用于容错和一致性,仅用于keyed stream
  • timers:定时器,支持事件时间和处理时间,仅用于keyed stream

Flink提供了8个Process Function:

  • ProcessFunction:dataStream
  • KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理
  • CoProcessFunction:用于connect连接的流
  • ProcessJoinFunction:用于join流操作
  • BroadcastProcessFunction:用于广播
  • KeyedBroadcastProcessFunction:keyBy之后的广播
  • ProcessWindowFunction:窗口增量聚合
  • ProcessAllWindowFunction:全窗口聚合

可以将ProcessFunction看作是一个具有key state和定时器(timer)访问权的FlatMapFunction。对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件。

对于容错状态,ProcessFunction 可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。

定时器可让应用程序对在处理时间和事件时间中的变化进行响应。每次调用 processElement(...)函数时都可以获得一个Context对象,通过该对象可以访问元素的事件时间(event time)时间戳以及 TimerService。可以使用TimerService为将来的事件时间/处理时间实例注册回调。对于事件时间计时器,当当前水印被提升到或超过计时器的时间戳时,将调用onTimer(…)方法,而对于处理时间计时器,当挂钟时间达到指定时间时,将调用onTimer(…)方法。在调用期间,所有状态的范围再次限定为创建定时器所用的key,从而允许定时器操作keyed state。

如果想要在流处理过程中访问keyed state和定时器,就必须在一个keyed stream上应用ProcessFunction函数,代码如下:

stream.keyBy(...).process(new MyProcessFunction())

KeyedProcessFunction使用示例

作为ProcessFunction的扩展(即子类),KeyedProcessFunction在其onTimer(…)方法中提供对计时器key的访问。其模板代码如下所示:

@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {
    K key = ctx.getCurrentKey();
    // ...
}

在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对:

  • 把计数、key和最后修改时间戳(last-modification-timestamp)存储在一个ValueState中, ValueState的作用域是通过key隐式确定的。
  • 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
  • 该函数还安排了一个一分钟后的回调(以事件时间)。
  • 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在该分钟内没有进一步的更新)。

【示例】维护数据流中每个key的计数,并在每过一分钟(以事件时间)而未更新该key时,发出一个key/count对。

1)首先导入必须所依赖包

package com.xueai8;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.util.Collector;

2)定义存储状态数据的数据结构(数据模型)

/**
 * 存储在状态中的数据类型
 */
public class CountWithTimestamp {

    public String key;           // 存储key
    public long count;           // 存储计数值
    public long lastModified;    // 最后一次修改时间
}

3)自定义ProcessFunction,继承自KeyedProcessFunction:

public class CountWithTimeoutFunction
        extends KeyedProcessFunction<Tuple, Tuple2<String, String>, Tuple2<String, Long>> {

    /** 由这个处理函数负责维护的状态 */
    private ValueState<CountWithTimestamp> state;

    // 首先获得由这个处理函数(process function)维护的状态
        // 通过 RuntimeContext 访问Flink的keyed state
    @Override
    public void open(Configuration parameters) throws Exception {
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
    }

    // 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
    // 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳
    @Override
    public void processElement(
            Tuple2<String, String> value,
            Context ctx,
            Collector<Tuple2<String, Long>> out) throws Exception {

        // 获取当前的计数
        CountWithTimestamp current = state.value();
        if (current == null) {
            current = new CountWithTimestamp();
            current.key = value.f0;
        }

        // 更新状态计数值
        current.count++;

        // 设置该状态的时间戳为记录的分配的事件时间时间时间戳
                if (ctx != null) {
                    current.lastModified = ctx.timestamp();
                }

                // 将状态写回
        state.update(current);

        // 从当前事件时间开始安排下一个计时器60秒
        ctx.timerService().registerEventTimeTimer(current.lastModified + 60000);
        }

    // 如果一分钟内没有进一步的更新,则发出 key/count对
    @Override
    public void onTimer(
            long timestamp,
            OnTimerContext ctx,
            Collector<Tuple2<String, Long>> out) throws Exception {

        // 获取调度此计时器的key的状态
        CountWithTimestamp result = state.value();

        // 检查这是一个过时的计时器还是最新的计时器
        if (timestamp == result.lastModified + 60000) {
            // 超时时发出状态
            out.collect(new Tuple2<String, Long>(result.key, result.count));
        }
    }
}

4)在流处理的主方法中应用自定义的处理函数

public class StreamingJob {
    public static void main(String[] args) throws Exception {
    // 设置流执行环境
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // 默认情况下,Flink将使用处理时间。要改变这个,可以设置时间特征:
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        // 源数据流
        DataStream<Tuple2<String, String>> stream = env
                .fromElements("good good study","day day up","you see see you")
                .flatMap(new FlatMapFunction<String, Tuple2<String,String>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, String>> collector) throws Exception {
                        for(String word : line.split("\W+")){
                            collector.collect(new Tuple2<>(word,"1"));
                        }
                    }
                });

    // 因为模拟数据没有时间戳,所以用此方法添加时间戳和水印
        DataStream<Tuple2<String, String>> withTimestampsAndWatermarks =
                stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, String>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple2<String, String> element) {
                        return System.currentTimeMillis();
                    }
                });

    // 在keyed stream上应用该处理函数
    DataStream<Tuple2<String, Long>> result = withTimestampsAndWatermarks.keyBy(0).process(new CountWithTimeoutFunction());

    // 输出查看
        result.print();

    // 执行流程序
    env.execute("Process Function");
    }
}
原文地址:https://www.cnblogs.com/carsonwuu/p/14926695.html