相关文章链接
Flink之TableAPI和SQL(2):表和外部系统的连接方式
Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)
Flink之TableAPI和SQL(4):表的Sink实现
在FlinkTable中,时间特性可以通过如下3中方式指定:
1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间)
2、定义TableSchema指定
3、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包)
一般情况下请使用第一种,在blink planner和old planner的包中都能运行,后续2种当导错包时会报错。
如下代码所示:
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sensorStream: DataStream[SensorReading] = env .readTextFile("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt") .map(new MyMapToSensorReading) .assignAscendingTimestamps(_.timestamp * 1000L) // 1、处理时间(processing time)(一般直接使用第一种,先在流中转换成样例类,再转表) // 1.1、在DataStream转换为Table时指定处理时间(使用名称指定,在最后面添加处理时间) val ptTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'temperature, 'timestamp, 'pt.proctime) // 1.2、创建表的DDL时指定(此DDL只能使用blink planner时才能正常运行,目前没有导入blink包,默认的planner是老的包) val ptDDL: String = """ |create table dataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | pt AS PROCTIME() |) with ( | 'connector.type' = 'filesystem', | 'connector.path' = 'file///D://Project//IDEA//bigdata-study//flink-demo//src//main//resources//source.txt', | 'format.type' = 'csv' |) |""".stripMargin // tableEnv.sqlUpdate(ptDDL) // val ptTable_2: Table = tableEnv.from("dataTable") // 2、事件时间(event time)(使用事件时间时,需要先在env执行环境中设置时间特性,再在流中指定时间戳代表的字段) // 2.1、将DataStream转换成Table,并指定事件时间字段 或者追加 字段 val eventTable_1: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp.rowtime, 'temperature) val eventTable_2: Table = tableEnv.fromDataStream(sensorStream, 'id, 'timestamp, 'temperature, 'et.rowtime) // 2.2、定义TableSchema指定 tableEnv .connect(new FileSystem().path("D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt")) .withFormat(new Csv()) .withSchema(new Schema() .field("id", DataTypes.STRING()) .field("timestamp", DataTypes.BIGINT()) .field("temperature", DataTypes.DOUBLE()) .rowtime(new Rowtime() .timestampsFromField("timestamp") // 从字段中提取时间戳 .watermarksPeriodicBounded(1000) // watermark延迟1秒 ) ) .createTemporaryTable("eventTable_3") val eventTable_3: Table = tableEnv.from("eventTable_3") // 2.3、使用DDL指定 val eventDDL: String = """ |create table eventDataTable ( | id varchar(20) not null, | ts bigint, | temperature double, | rt AS TO_TIMESTAMP(FROM_UNIXTIME(ts)), | watermark for rt as rt -interval '1' second |) with ( | 'connector.type' = 'filesystem' | 'connect.path' = 'file:///...\source.txt', | 'format.type' = 'csv' |) |""".stripMargin // tableEnv.sqlUpdate(eventDDL) // val eventTable_4: Table = tableEnv.from("eventDataTable") eventTable_1.printSchema() eventTable_1.toRetractStream[Row].print() tableEnv.execute("TimeCharacterTableDemo")