Flink之TableAPI和SQL(5):表的时间特性

相关文章链接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系统的连接方式

Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

Flink之TableAPI和SQL(4):表的Sink实现

Flink之TableAPI和SQL(5):表的时间特性

在FlinkTable中,时间特性可以通过如下3中方式指定:

1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)

2、定义TableSchema指定

3、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)

一般情况下请使用第一种,在blink  planner和old  planner的包中都能运行,后续2种当导错包时会报错。

如下代码所示:

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

val sensorStream: DataStream[SensorReading] = env
    .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")
    .map(new MyMapToSensorReading)
    .assignAscendingTimestamps(_.timestamp * 1000L)

// 1、处理时间(processing time)(一般直接使用第一种,先在流中转换成样例类,再转表)
// 1.1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)
val ptTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature, 'timestamp, 'pt.proctime)
// 1.2、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)
val ptDDL: String =
    """
      |create table dataTable (
      |     id varchar(20) not null,
      |     ts bigint,
      |     temperature double,
      |     pt AS PROCTIME()
      |) with (
      |     'connector.type' = 'filesystem',
      |     'connector.path' = 'file///D://Project//IDEA//bigdata-study//flink-demo//src//main//resources//source.txt',
      |     'format.type' = 'csv'
      |)
      |""".stripMargin
//        tableEnv.sqlUpdate(ptDDL)
//        val ptTable_2: Table = tableEnv.from("dataTable")

// 2、事件时间(event time)(使用事件时间时,需要先在env执行环境中设置时间特性,再在流中指定时间戳代表的字段)
// 2.1、将DataStream转换成Table,并指定事件时间字段 或者追加 字段
val eventTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp.rowtime, 'temperature)
val eventTable_2: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp, 'temperature, 'et.rowtime)
// 2.2、定义TableSchema指定
tableEnv
    .connect(new FileSystem().path("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt"))
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
        .rowtime(new Rowtime()
            .timestampsFromField("timestamp")       // 从字段中提取时间戳
            .watermarksPeriodicBounded(1000)            // watermark延迟1秒
        )
    )
    .createTemporaryTable("eventTable_3")
val eventTable_3: Table = tableEnv.from("eventTable_3")
// 2.3、使用DDL指定
val eventDDL: String =
    """
      |create table eventDataTable (
      |     id varchar(20) not null,
      |     ts bigint,
      |     temperature double,
      |     rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)),
      |     watermark for rt as rt -interval '1' second
      |) with (
      |     'connector.type' = 'filesystem'
      |     'connect.path' = 'file:///...\source.txt',
      |     'format.type' = 'csv'
      |)
      |""".stripMargin
//        tableEnv.sqlUpdate(eventDDL)
//        val eventTable_4: Table = tableEnv.from("eventDataTable")

eventTable_1.printSchema()
eventTable_1.toRetractStream[Row].print()

tableEnv.execute("TimeCharacterTableDemo")
原文地址:https://www.cnblogs.com/yangshibiao/p/14081199.html