Flink--Table和DataStream和DataSet的集成

将DataStream或DataSet转换为表格

在上面的例子讲解中,直接使用的是:registerTableSource注册表

对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。

然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据

语法:

// get TableEnvironment 
// registration of a DataSet is equivalent
Env:DataStream
val tableEnv = TableEnvironment.getTableEnvironment(env)

val stream: DataStream[(Long, String)] = ...

// register the DataStream as Table "myTable" with fields "f0", "f1"
tableEnv.registerDataStream("myTable", stream)
object SQLToDataSetAndStreamSet {
  def main(args: Array[String]): Unit = {

    // set up execution environment
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)
    //构造数据
    val orderA: DataStream[Order] = env.fromCollection(Seq(
      Order(1L, "beer", 3),
      Order(1L, "diaper", 4),
      Order(3L, "rubber", 2)))
    val orderB: DataStream[Order] = env.fromCollection(Seq(
      Order(2L, "pen", 3),
      Order(2L, "rubber", 3),
      Order(4L, "beer", 1)))
    // 根据数据注册表
    tEnv.registerDataStream("OrderA", orderA)
    tEnv.registerDataStream("OrderB", orderB)
    // union the two tables
    val result = tEnv.sqlQuery(
      "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " +
        "SELECT * FROM OrderB WHERE amount < 2")
    result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE))
    env.execute()
  }
}
case class Order(user: Long, product: String, amount: Int)
原文地址:https://www.cnblogs.com/niutao/p/10548690.html