Flink之ProcessFunction的使用(2):侧输出流的使用

相关文章链接

Flink之ProcessFunction的使用(1):定时器和状态管理的使用

Flink之ProcessFunction的使用(2):侧输出流的使用

具体实现代码如下所示:

main函数中代码如下:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)

val sensorStream: DataStream[SensorReading] = env
    .socketTextStream("localhost", 9999)
    .map(new MyMapToSensorReading)

// 调用process方法,传入自定义的实现了ProcessFunction抽象类的对象,返回的是主流
val highTempStream: DataStream[SensorReading] = sensorStream.process(new MySideOutputFunction(30))
// 可以通过主流,使用getSideOutput方法,传入OutputTag参数,获取侧输出流
val lowTempStream: DataStream[SensorReading] = highTempStream.getSideOutput(new OutputTag[SensorReading]("low-temp"))

// 打印
highTempStream.print("high")
lowTempStream.print("low")

env.execute("SideOutputDemo")

自定义类实现ProcessFunction接口:

/**
 * 当温度低于 threshold 时,将数据输出到侧输出流
 *
 * @param threshold 温度临界值
 */
class MySideOutputFunction(threshold: Double) extends ProcessFunction[SensorReading, SensorReading] {
    override def processElement(value: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = {
        // 判断当前数据的温度值,如果大于阈值,输出到主流;如果小于阈值,输出到侧输出流
        if (value.temperature > threshold) {
            out.collect(value)
        } else {
            ctx.output(new OutputTag[SensorReading]("low-temp"), value)
        }
    }
}
原文地址:https://www.cnblogs.com/yangshibiao/p/14133531.html