flink分流合流

数据:

sensor_1,1547718101,35.8
sensor_1,1547718102,22.2
sensor_1,1547718101,55.3
sensor_1,1547718102,24.1
sensor_1,1547718103,57
sensor_1,1547718103,58
sensor_1,1547718103,59
sensor_6,1547718101,15.4
sensor_7,1547718102,6.7
sensor_10,1547718205,38.1

  

代码

import org.apache.flink.api.common.functions.{FilterFunction, ReduceFunction, RichMapFunction}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala._

object TransformTest {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    //0.从文件中读取数据
    val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt";
    val inputStream = env.readTextFile(inputPath)

    //1.先转换成样例类类型(简单转换操作)
    val dataStream = inputStream.map(data => {
      val arr = data.split(",")
      SensorReding(arr(0), arr(1).toLong, arr(2).toDouble)
    })


//4.多流转换
    //4.1分流了,将传感器温度数据分成低温、高温两条流
    val splitSteam = dataStream
      .split(data => {
        if (data.temperature > 30.0) Seq("high") else Seq("low")
      })
    val highTempSteam = splitSteam.select("high")
    val lowTempSteam = splitSteam.select("low")
    val allTempSteam = splitSteam.select("high", "low")

//    highTempSteam.print()
//    lowTempSteam.print()
//    allTempSteam.print()

    //4.2合流 connect
    val warningSteam = highTempSteam.map(data => (data.id, data.temperature))
    val connectedSteams = warningSteam.connect(lowTempSteam)

    //用coMap队数据进行分别处理
    val coMapResultStream: DataStream[Product with Serializable] = connectedSteams
      .map(
        waringData => (waringData._1, waringData._2, "warning"),
        lowTempData => (lowTempData.id, "healthy")
      )
//    coMapResultStream.print()
//
//    //4.3union合流
    val unionStream = highTempSteam.union(lowTempSteam)


    env.execute()
  }
author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14928190.html