Fink1.13.1(五)

第11章 Flink SQL编程 

  如果用户需要同时流计算、批处理的场景下,用户需要维护两套业务代码,开发人员也要维护两套技术栈,非常不方便。Flink 社区很早就设想过将批数据看作一个有界流数据,将批处理看作流计算的一个特例,从而实现流批统一,Flink 社区的开发人员在多轮讨论后,基本敲定了Flink 未来的技术架构,Apache Flink 有两种关系型 API 来做流批统一处理Table API 和 SQL。

  Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。这两种 API 中的查询对于批(DataSet)和流(DataStream)的输入有相同的语义,也会产生同样的计算结果。

  Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream 和 DataSet API。你可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。比如,你可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者你可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。

  注意:Table API 和 SQL 现在还处于活跃开发阶段,还没有完全实现所有的特性。不是所有的 [Table API,SQL] 和 [流,批] 的组合都是支持的。

11.1 核心概念

  Flink 的 Table API 和 SQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解

11.1.1 动态表和连续查询

  动态表(Dynamic Tables)  Flink 的支持流数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。需要注意的是,连续查询的结果在语义上总是等价于以批处理模式在输入表快照上执行的相同查询的结果。

  1)将流转换为动态表。

  2)在动态表上计算一个连续查询,生成一个新的动态表。

  3)生成的动态表被转换回流。

11.1.2 在流上定义表(动态表)

  为了使用关系查询处理流,必须将其转换成 Table。从概念上讲,流的每条记录都被解释为对结果表的 INSERT 操作。

  假设有如下格式的数据:

[
  user:  VARCHAR,   // 用户名
  cTime: TIMESTAMP, // 访问 URL 的时间
  url:   VARCHAR    // 用户访问的 URL
]

  下图显示了单击事件流(左侧)如何转换为表(右侧)。当插入更多的单击流记录时,结果表的数据不断增长

  连续查询:在动态表上计算一个连续查询,并生成一个新的动态表。与批处理查询不同,连续查询从不终止,并根据其输入表上的更新更新其结果表。在任何时候,连续查询的结果在语义上与以批处理模式在输入表快照上执行的相同查询的结果相同。

  1)当查询开始,clicks 表(左侧)是空的。

  2)当第一行数据被插入到 clicks 表时,查询开始计算结果表。第一行数据 [Mary,./home] 插入后,结果表(右侧,上部)由一行 [Mary, 1] 组成。

  3)当第二行 [Bob, ./cart] 插入到 clicks 表时,查询会更新结果表并插入了一行新数据 [Bob, 1]。

  4)第三行 [Mary, ./prod?id=1] 将产生已计算的结果行的更新,[Mary, 1] 更新成 [Mary, 2]。

  5)最后,当第四行数据加入 clicks 表时,查询将第三行 [Liz, 1] 插入到结果表中。

11.2 Flink Table API

11.2.1 导入需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-scala_2.12</artifactId>
    <version>1.13.1</version>
    <scope>provided</scope>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-csv</artifactId>
    <version>1.13.1</version>
</dependency>

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-json</artifactId>
    <version>1.13.1</version>
</dependency>

11.2.2 基本使用:表与DataStream的混合使用 

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/25 21:21
 */
public class Flink_TableApi_BasicUse {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream = environment.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60)
        );

        //创建表的执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        //创建表: 将流转换成动态表. 表的字段名从pojo的属性名自动抽取
        Table table = tableEnvironment.fromDataStream(waterSensorStream);
        //对动态表进行查询
        Table result = table.where($("id").isEqual("sensor_1"))
                .select($("id"), $("ts"), $("vc"));

        //把动态表转换成流
        DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);

        rowDataStream.print();

        environment.execute();
    }
}

11.2.3 基本使用:聚合操作

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/25 21:21
 */
public class Flink_TableApi_BasicUse_Agg {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream = environment.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60)
        );

        //创建表的执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        //创建表: 将流转换成动态表. 表的字段名从pojo的属性名自动抽取
        Table table = tableEnvironment.fromDataStream(waterSensorStream);
        //对动态表进行查询
        Table result = table.where($("vc").isGreaterOrEqual(20))
                .groupBy($("id"))
                .aggregate($("vc").sum().as("vc_sum"))
                .select($("id"), $("vc_sum"));

        //把动态表转换成流 如果涉及到数据的更新, 要用到撤回流. 多个了一个boolean标记
        DataStream<Tuple2<Boolean, Row>> toRetractStream = tableEnvironment.toRetractStream(result, Row.class);

        toRetractStream.print();

        environment.execute();
    }
}

11.2.4 表到流的转换

  动态表可以像普通数据库表一样通过 INSERT、UPDATE 和 DELETE 来不断修改。它可能是一个只有一行、不断更新的表,也可能是一个 insert-only 的表,没有 UPDATE 和 DELETE 修改,或者介于两者之间的其他表。在将动态表转换为流或将其写入外部系统时,需要对这些更改进行编码。Flink的 Table API 和 SQL 支持三种方式来编码一个动态表的变化

  1)Append-only 流:仅通过 INSERT 操作修改的动态表可以通过输出插入的行转换为流。

  2)Retract 流:retract 流包含两种类型的 message: add messages 和 retract messages 。通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 流。下图显示了将动态表转换为 retract 流的过程。

  3)Upsert 流:upsert 流包含两种类型的 message: upsert messages 和delete messages。转换为 upsert 流的动态表需要(可能是组合的)唯一键。通过将 INSERT 和 UPDATE 操作编码为 upsert message,将 DELETE 操作编码为 delete message ,将具有唯一键的动态表转换为流。消费流的算子需要知道唯一键的属性,以便正确地应用 message。与 retract 流的主要区别在于 UPDATE 操作是用单个 message 编码的,因此效率更高。下图显示了将动态表转换为 upsert 流的过程。

  请注意,在将动态表转换为 DataStream 时,只支持 append 流和 retract 流。

11.2.5 通过Connector声明读入数据

  前面是先得到流, 再转成动态表, 其实动态表也可以直接连接到数据

  1)File source

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/25 21:21
 */
public class Flink_TableApi_Connector {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        //创建临时表
        tableEnvironment.connect(new FileSystem().path("input/sensor.txt"))
                .withFormat(new Csv())
                .withSchema(schema) //指定表中字段和字段的类型
                .createTemporaryTable("sensor");

        Table sensor = tableEnvironment.from("sensor");

        Table result = sensor.select($("id"), $("vc"));

        Schema schema1 = new Schema()
                .field("id", DataTypes.STRING())
                .field("vc", DataTypes.INT());

        // 关联文件的sink
        tableEnvironment
                .connect(new FileSystem().path("input/abc"))
                .withFormat(new Csv())
                .withSchema(schema1)
                .createTemporaryTable("s2");

        result.executeInsert("s2");

    }
}

  2)Kafka Source

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.Json;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;

/**
 * @作者:袁哥
 * @时间:2021/7/26 18:03
 */
public class Flink_TableApi_Kafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //表的元数据信息
        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        //连接文件, 并创建一个临时表, 其实就是一个动态表
        tableEnvironment
                .connect(
                    new Kafka()
                            .version("universal")
                            .property("group.id","Flink_TableApi_Kafka")
                            .property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092")
                            .topic("s1")
                            .startFromLatest()
                )
//                .withFormat(new Csv())
                .withFormat(new Json())
                .withSchema(schema)
                .createTemporaryTable("s1");

        Table s1 = tableEnvironment.from("s1");

        tableEnvironment
                .connect(
                        new Kafka()
                                .version("universal")
                                .property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092")
                                .topic("s2")
                                .sinkPartitionerRoundRobin()
                )
                .withFormat(new Csv())
//                .withFormat(new Json())
                .withSchema(schema)
                .createTemporaryTable("s2");

        s1.executeInsert("s2");
    }
}

11.2.6 通过Connector声明写出数据

  1)File Sink

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Csv;
import org.apache.flink.table.descriptors.FileSystem;
import org.apache.flink.table.descriptors.Schema;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 18:34
 */
public class Flink_TableApi_ToFileSystem {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream =
                environment.fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));

        //创建表的执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromDataStream(waterSensorStream);

        Table table2 = table.where($("id").isEqual("sensor_1"))
                .select($("id"), $("ts"), $("vc"));

        //元数据信息
        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        tableEnvironment.connect(new FileSystem().path("output/sensor_id.txt"))
                .withFormat(new Csv().fieldDelimiter('|'))
                .withSchema(schema)
                .createTemporaryTable("sensor");

        //把数据写入到输出表中
        table2.executeInsert("sensor");
    }
}

  2)Kafka Sink

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 18:34
 */
public class Flink_TableApi_ToKafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream =
                environment.fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));

        //创建表的执行环境
        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromDataStream(waterSensorStream);

        Table table2 = table.where($("id").isEqual("sensor_1"))
                .select($("id"), $("ts"), $("vc"));

        //元数据信息
        Schema schema = new Schema()
                .field("id", DataTypes.STRING())
                .field("ts", DataTypes.BIGINT())
                .field("vc", DataTypes.INT());

        tableEnvironment
                .connect(new Kafka()
                        .version("universal")
                        .topic("sink_sensor")
                        .sinkPartitionerRoundRobin()
                        .property("bootstrap.servers","hadoop162:9092,hadoop163:9092,hadoop164:9092")
                )
                .withFormat(new Json())
                .withSchema(schema)
                .createTemporaryTable("sensor");

        //把数据写入到输出表中
        table2.executeInsert("sensor");
    }
}

11.2.7 其他Connector用法

   参考官方文档: https://ci.apache.org/projects/flink/flink-docs-stable/zh/

11.3 Flink SQL

11.3.1 基本使用

  1)查询未注册的表

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @作者:袁哥
 * @时间:2021/7/26 18:55
 */
public class Flink_SQL_BaseUse {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream =
                environment.fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_2", 3000L, 30),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50),
                        new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //使用sql查询未注册的表
        Table table = tableEnvironment.fromDataStream(waterSensorStream);

        Table table2 = tableEnvironment.sqlQuery("select * from " + table + " where id = 'sensor_1'");

//        tableEnvironment.toAppendStream(table2, Row.class).print();
        tableEnvironment.toAppendStream(table2, WaterSensor.class).print();

        environment.execute();
    }
}

  2)查询已注册的表

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:01
 */
public class Flink_SQL_BaseUse_Two {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorStream  = environment.fromElements(new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //从流得到一个表
        Table table = tableEnvironment.fromDataStream(waterSensorStream);

        //注册为一个临时视图
        tableEnvironment.createTemporaryView("sensor",table);

        //在临时视图查询数据, 并得到一个新表
        Table table1 = tableEnvironment.sqlQuery("select * from sensor where id = 'sensor_1'");

        //显示table1的数据
        tableEnvironment.toAppendStream(table1, Row.class).print();

        environment.execute();
    }
}

11.3.2 File到File

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:09
 */
public class Flink06_Sql_FileToFile {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //建一张动态表与文件进行关联(source)
        tableEnvironment.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int " +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/sensor.txt', " +
                " 'format' = 'csv' " +
                ")");

        Table table = tableEnvironment.sqlQuery("select id,vc from sensor");

        //建一张动态表与文件进行关联(sink)
        tableEnvironment.executeSql("create table s1(" +
                "  id string, " +
                "  vc int" +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'output/s1', " +
                " 'format' = 'csv' " +
                ")");

        //把table数据写入到sink表
        table.executeInsert("s1");
    }
}

11.3.3 KafkaKafka

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:22
 */
public class Flink_Sql_KafkaToKafka {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //建一张动态表与kafka的topic进行关联(source)
        tableEnvironment.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int " +
                ")with(" +
                " 'connector' = 'kafka', " +
                " 'topic' = 's1', " +
                " 'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092,hadoop164:9092', " +
                " 'properties.group.id' = 'Flink_Sql_KafkaToKafka', " +
                " 'scan.startup.mode' = 'latest-offset', " +
                " 'format' = 'csv' " +
                ")");

//        Table table = tableEnvironment.sqlQuery("select * from sensor");

        //建一张动态表与kafka的topic进行关联(sink)
        tableEnvironment.executeSql("create table s2(" +
                " id string, " +
                " ts bigint, " +
                " vc int" +
                ")with(" +
                " 'connector' = 'kafka', " +
                " 'topic' = 's2', " +
                " 'properties.bootstrap.servers' = 'hadoop162:9092,hadoop163:9092,hadoop164:9092', " +
                " 'sink.partitioner' = 'fixed', " +
                " 'value.format' = 'json' " +
                ")");

        tableEnvironment.executeSql("insert into s2 select * from sensor");
    }
}

11.4 时间属性

  像窗口(在 Table API 和 SQL )这种基于时间的操作,需要有时间信息。因此,Table API 中的表就需要提供逻辑时间属性来表示时间,以及支持时间相关的操作。

11.4.1 处理时间

  1)DataStream 到 Table 转换时定义:处理时间属性可以在 schema 定义的时候用 .proctime 后缀来定义。时间属性一定不能定义在一个已有字段上,所以它只能定义在 schema 定义的最后

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:37
 */
public class Flink_Time_ProcessingTime {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterSensorDataStreamSource = environment.fromElements(
                new WaterSensor("sensor_1", 1L, 10),
                new WaterSensor("sensor_1", 2L, 20),
                new WaterSensor("sensor_2", 3L, 30),
                new WaterSensor("sensor_1", 4L, 40),
                new WaterSensor("sensor_1", 5L, 50)
        );

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //需要在schema的最后添加一个新的字段用来表示处理时间
        Table table = tableEnvironment.fromDataStream(waterSensorDataStreamSource,
                $("id"), $("ts"), $("vc"), $("pt").proctime());

        table.execute().print();
    }
}

  2)在创建表的 DDL 中定义

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:43
 */
public class Flink_Time_ProcessingTime_DDL {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //在DDL中指定字段作为处理时间
        tableEnvironment.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int, " +
                " ps as proctime()" +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/sensor.txt', " +
                " 'format' = 'csv' " +
                ")");

        tableEnvironment.sqlQuery("select * from sensor").execute().print();
    }
}

11.4.2 事件时间

  事件时间允许程序按照数据中包含的时间来处理,这样可以在有乱序或者晚到的数据的情况下产生一致的处理结果。它可以保证从外部存储读取数据后产生可以复现(replayable)的结果。

  除此之外,事件时间可以让程序在流式和批式作业中使用同样的语法。在流式程序中的事件时间属性,在批式程序中就是一个正常的时间字段。为了能够处理乱序的事件,并且区分正常到达和晚到的事件,Flink 需要从事件中获取事件时间并且产生 watermark(水印)。

  1)DataStream 到 Table 转换时定义:事件时间属性可以用 .rowtime 后缀在定义 DataStream schema 的时候来定义。时间戳和 watermark 在这之前一定是 DataStream 上已经定义好了。在从 DataStream 到 Table 转换时定义事件时间属性有两种方式。取决于用 .rowtime 后缀修饰的字段名字是否是已有字段,事件时间字段可以是:

    (1) schema 的结尾追加一个新的字段

    (2)替换一个已经存在的字段。

  不管在哪种情况下,事件时间字段都表示 DataStream 中定义的事件的时间戳。

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:50
 */
public class Flink_Time_EventTime {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStream<WaterSensor> stream = environment.fromElements(
                new WaterSensor("sensor_1", -1L, 10),
                new WaterSensor("sensor_1", 2L, 20),
                new WaterSensor("sensor_2", 3L, 30),
                new WaterSensor("sensor_1", 4L, 40),
                new WaterSensor("sensor_1", 5L, 50)
        ).assignTimestampsAndWatermarks(
                WatermarkStrategy
                        .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(2))
                        .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000)
        );

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //使用已有的字段作为时间属性
//        Table table = tableEnvironment.fromDataStream(stream, $("id"), $("ts").rowtime(), $("vc"));

        //用一个额外的字段作为事件时间属性
        Table table = tableEnvironment.fromDataStream(stream, $("id"), $("ts"), $("vc"),$("et").rowtime());

        table.execute().print();
    }
}

  2)在创建表的 DDL 中定义:事件时间属性可以用 WATERMARK 语句在 CREATE TABLE DDL 中进行定义。WATERMARK 语句在一个已有字段上定义一个 watermark 生成表达式,同时标记这个已有字段为时间属性字段

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.ZoneOffset;

/**
 * @作者:袁哥
 * @时间:2021/7/26 19:57
 */
public class Flink_Time_EventTime_DDL {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //设置时区
        tableEnvironment.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));

        //事件时间的类型: 必须是 timestamp(3)
        tableEnvironment.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int, " +
                " et as to_timestamp(from_unixtime(ts / 1000,'yyyy-MM-dd HH:mm:ss')), " +
                " watermark for et as et - interval '3' second" +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/sensor.txt', " +
                " 'format' = 'csv' " +
                ")");
        tableEnvironment.sqlQuery("select * from sensor").execute().print();
    }
}

  说明:

    (1)把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。

    (2)严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column

    (3)递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL '0.001' SECOND

    (4)乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL 'string' timeUnit。

11.5 窗口(window)

  时间语义,要配合窗口操作才能发挥作用。最主要的用途,当然就是开窗口然后根据时间段做计算了,下面我们就来看看Table API和SQL中,怎么利用时间字段做窗口操作。

  在Table API和SQL中,主要有两种窗口:Group Windows和Over Windows

11.5.1 Table API中使用窗口

  1)Group Windows:分组窗口(Group Windows)会根据时间或行计数间隔,将行聚合到有限的组(Group)中,并对每个组的数据执行一次聚合函数。Table API中的Group Windows都是使用.window(w:GroupWindow)子句定义的,并且必须由as子句指定一个别名。为了按窗口对表进行分组,窗口的别名必须在group by子句中,像常规的分组字段一样引用。

    (1)滚动窗口

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Expressions;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.Tumble;
import org.apache.flink.table.api.TumbleWithSizeOnTimeWithAlias;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.expressions.Expression;

import java.time.Duration;
import java.time.ZoneOffset;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 20:26
 */
public class Flink_Table_Window_Grouped {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        tableEnvironment.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));

        SingleOutputStreamOperator<WaterSensor> waterStream = environment
                .fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_1", 4000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );

        Table table = tableEnvironment.fromDataStream(waterStream, $("id"), $("ts").rowtime().as("et"), $("vc"));

        //每2s统计最近2s每个传感器的水位和
        TumbleWithSizeOnTimeWithAlias w = Tumble.over(Expressions.lit(2).second()).on($("et")).as("w");

        table.window(w)
                .groupBy($("w"),$("id"))
                .select($("w").start().as("w_start"),$("w").end().as("w_end"),
                        $("id"),$("vc").sum().as("vc_sum"))
                .execute()
                .print();
    }
}

    (2)滑动窗口

//每2s统计最近5s每个传感器的水位和
        SlideWithSizeAndSlideOnTimeWithAlias w = Slide.over(lit(5).second()).every(lit(2).second()).on($("et")).as("w");

        table.window(w)
                .groupBy($("w"),$("id"))
                .select($("w").start().as("w_start"),$("w").end().as("w_end"),
                        $("id"),$("vc").sum().as("vc_sum"))
                .execute()
                .print();

    (3)会话窗口

SessionWithGapOnTimeWithAlias w = Session.withGap(lit(2).second()).on($("et")).as("w");

        table.window(w)
                .groupBy($("w"),$("id"))
                .select($("w").start().as("w_start"),$("w").end().as("w_end"),
                        $("id"),$("vc").sum().as("vc_sum"))
                .execute()
                .print();

  2)Over Windows:Over window聚合是标准SQL中已有的(Over子句),可以在查询的SELECT子句中定义。Over window 聚合,会针对每个输入行,计算相邻行范围内的聚合。Table API提供了Over类,来配置Over窗口的属性。可以在事件时间或处理时间,以及指定为时间间隔、或行计数的范围内,定义Over windows。无界的over window是使用常量指定的。也就是说,时间间隔要指定UNBOUNDED_RANGE,或者行计数间隔要指定UNBOUNDED_ROW。而有界的over window是用间隔的大小指定的。

    (1)Unbounded Over Windows

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Over;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;

import static org.apache.flink.table.api.Expressions.*;

/**
 * @作者:袁哥
 * @时间:2021/7/26 20:46
 */
public class Flink_Table_Window_Over {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tenv = StreamTableEnvironment.create(environment);
        tenv.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));

        SingleOutputStreamOperator<WaterSensor> waterSensorStream = environment
                .fromElements(new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_1", 2000L, 30),
                        new WaterSensor("sensor_1", 3000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );
        Table table = tenv.fromDataStream(waterSensorStream, $("id"), $("ts").rowtime().as("et"), $("vc"));

        //每条数据后面新增一个字段, 用来记录当前这个sensor的水位和
        table
                .window(Over.partitionBy($("id")).orderBy($("et")).preceding(UNBOUNDED_ROW).as("w"))    //UNBOUNDED_ROW:窗口大小是从起始行到当前行
                .select($("id"),$("et"),$("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();
    }
}
//每条数据后面新增一个字段, 用来记录当前这个sensor的水位和
        table
                .window(Over.partitionBy($("id")).orderBy($("et")).preceding(UNBOUNDED_RANGE).as("w"))  //按照实际元素值(时间戳)确定窗口大小
                .select($("id"),$("et"),$("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();

     (2)Bounded Over Windows

//每条数据后面新增一个字段, 用来记录当前这个sensor的水位和
        table
                .window(Over.partitionBy($("id")).orderBy($("et")).preceding(rowInterval(1L)).as("w"))  //当前行向前推1行算一个窗口
                .select($("id"),$("et"),$("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();
//每条数据后面新增一个字段, 用来记录当前这个sensor的水位和
        table
                .window(Over.partitionBy($("id")).orderBy($("et")).preceding(lit(1).second()).as("w"))  //当前事件时间向前推1s算一个窗口
                .select($("id"),$("et"),$("vc").sum().over($("w")).as("sum_vc"))
                .execute()
                .print();

11.5.2 SQL API中使用窗口

  1)Group Windows:SQL 查询的分组窗口是通过 GROUP BY 子句定义的。类似于使用常规 GROUP BY 语句的查询,窗口分组语句的 GROUP BY 子句中带有一个窗口函数为每个分组计算出一个结果。以下是批处理表和流处理表支持的分组窗口函数:

分组窗口函数

描述

TUMBLE(time_attr, interval)

定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

HOP(time_attr, interval, interval)

定义一个跳跃的时间窗口(在 Table API 中称为滑动窗口)。滑动窗口有一个固定的持续时间( 第二个 interval 参数 )以及一个滑动的间隔(第一个 interval 参数 )。若滑动间隔小于窗口的持续时间,滑动窗口则会出现重叠;因此,行将会被分配到多个窗口中。比如,一个大小为 15 分组的滑动窗口,其滑动间隔为 5 分钟,将会把每一行数据分配到 3 个 15 分钟的窗口中。滑动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

SESSION(time_attr, interval)

定义一个会话时间窗口。会话时间窗口没有一个固定的持续时间,但是它们的边界会根据 interval 所定义的不活跃时间所确定;即一个会话时间窗口在定义的间隔时间内没有时间出现,该窗口会被关闭。例如时间窗口的间隔时间是 30 分钟,当其不活跃的时间达到30分钟后,若观测到新的记录,则会启动一个新的会话时间窗口(否则该行数据会被添加到当前的窗口),且若在 30 分钟内没有观测到新纪录,这个窗口将会被关闭。会话时间窗口可以使用事件时间(批处理、流处理)或处理时间(流处理)。

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.ZoneOffset;

/**
 * @作者:袁哥
 * @时间:2021/7/26 21:20
 */
public class Flink_SQL_Window_Grouped {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        tableEnvironment.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));

        tableEnvironment.executeSql("create table sensor(" +
                " id string, " +
                " ts bigint, " +
                " vc int, " +
                " et as to_timestamp(from_unixtime(ts / 1000)), " +
                " watermark for et as et - interval '3' second" +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/sensor.txt', " +
                " 'format' = 'csv' " +
                ")");

        //sql语句中使用分组窗口
        //滚动
        tableEnvironment.sqlQuery("select " +
                " id," +
                " tumble_start(et,interval '5' second) w_start," +
                " tumble_end(et,interval '5' second) w_end," +
                " sum(vc) sum_vc " +
                " from sensor " +
                " group by id,tumble(et, interval '5' second)")
                .execute()
                .print();
    }
}
//滑动
tableEnvironment.sqlQuery("select " + " id, " + " hop_start(et, interval '2' second, interval '5' second) w_start, " + " hop_end(et, interval '2' second, interval '5' second) w_end, " + " sum(vc) sum_vc " + " from sensor " + " group by id,hop(et, interval '2' second, interval '5' second)") .execute() .print();
//session会话窗口
        tableEnvironment.sqlQuery("select " +
                " id, " +
                " session_start(et,interval '2' second) w_start," +
                " session_end(et,interval '2' second) w_end," +
                " sum(vc) sum_vc" +
                " from sensor " +
                " group by id, session(et, interval '2' second)")
                .execute()
                .print();

  2)Over Windows

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;
import java.time.ZoneOffset;

import static org.apache.flink.table.api.Expressions.$;

/**
 * @作者:袁哥
 * @时间:2021/7/26 23:07
 */
public class Flink_SQL_Window_Over {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);
        tableEnvironment.getConfig().setLocalTimeZone(ZoneOffset.ofHours(0));

        SingleOutputStreamOperator<WaterSensor> waterStream = environment
                .fromElements(
                        new WaterSensor("sensor_1", 1000L, 10),
                        new WaterSensor("sensor_1", 2000L, 20),
                        new WaterSensor("sensor_1", 2000L, 30),
                        new WaterSensor("sensor_1", 3000L, 40),
                        new WaterSensor("sensor_1", 5000L, 50)
                )
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs())
                );
        Table table = tableEnvironment.fromDataStream(waterStream, $("id"), $("ts").rowtime().as("et"), $("vc"));
        tableEnvironment.createTemporaryView("sensor",table);

     //写法一: tableEnvironment.sqlQuery(
"select " + " id," + " et," + " vc," + " sum(vc) over(partition by id order by et rows between unbounded preceding and current row) vc_sum, " + //窗口大小:起始行到当前行 " max(vc) over(partition by id order by et rows between unbounded preceding and current row) vc_max, " + " min(vc) over(partition by id order by et rows between unbounded preceding and current row) va_min " + // " sum(vc) over(partition by id order by et rows between 1 preceding and current row) vc_sum2 " + //窗口大小:从当前行往前取一行 // " sum(vc) over(partition by id order by et range between unbounded preceding and current row) vc_sum2 " + //窗口大小:起始行到当前行,若当前行的时间与后面行的时间相同,则也包括后面行 // " sum(vc) over(partition by id order by et range between interval '1' second preceding and current row) vc_sum2 " + //窗口大小:当前行到前面1秒范围内的所有行,包括所有与当前行时间相同的行 "from sensor") .execute() .print(); } }
//写法二
        tableEnvironment.sqlQuery("select " +
                " id," +
                " et," +
                " vc," +
                " sum(vc) over w sum_vc, " +
                " max(vc) over w max_vc, " +
                " min(vc) over w min_vc " +
                " from default_catalog.default_database.sensor " +
                " window w as (partition by id order by et rows between unbounded preceding and current row)")
                .execute().print();

11.6 Catalog

  Catalog 提供了元数据信息,例如数据库、表、分区、视图以及数据库或其他外部系统中存储的函数和信息。数据处理最关键的方面之一是管理元数据。 元数据可以是临时的,例如临时表、或者通过 TableEnvironment 注册的 UDF。 元数据也可以是持久化的,例如 Hive Metastore 中的元数据。Catalog 提供了一个统一的API,用于管理元数据,并使其可以从 Table API 和 SQL 查询语句中来访问。前面用到Connector其实就是在使用Catalog

11.6.1 Catalog类型

  1)GenericInMemoryCatalog:GenericInMemoryCatalog 是基于内存实现的 Catalog,所有元数据只在 session 的生命周期内可用。

  2)JdbcCatalog:JdbcCatalog 使得用户可以将 Flink 通过 JDBC 协议连接到关系数据库。PostgresCatalog 是当前实现的唯一一种 JDBC Catalog。

  3)HiveCatalog:HiveCatalog 有两个用途,一是作为原生 Flink 元数据的持久化存储,二是作为读写现有 Hive 元数据的接口。 Flink 的 Hive 文档 提供了有关设置 HiveCatalog 以及访问现有Hive 元数据的详细信息。

11.6.2 HiveCatalog

  1)导入需要的依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.12</artifactId>
    <version>1.13.1</version>
</dependency>
<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>3.1.2</version>
</dependency>

  2)在hadoop162启动hive元数据

nohup hive --service metastore >/dev/null 2>&1 &

  3)连接 Hive(在此之前必须先创建gmall库,并新建person表,将hive配置文件上传至idea中的input目录下)

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * @作者:袁哥
 * @时间:2021/7/26 23:53
 */
public class Flink_Hive {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //创建hive的catalog
        HiveCatalog hiveCatalog = new HiveCatalog("hive", "gmall", "input/");
        //注册catalog
        tableEnvironment.registerCatalog("hive",hiveCatalog);
        tableEnvironment.useCatalog("hive");    //设置默认的catalog
        tableEnvironment.useDatabase("gmall");
//        tableEnvironment.sqlQuery("select * from hive.gmall.person").execute().print();
        tableEnvironment.sqlQuery("select * from person").execute().print();
    }
}

11.7 函数(function)

  Flink 允许用户在 Table API 和 SQL 中使用函数进行数据的转换。

11.7.1 内置函数

  Flink Table APISQL给用户提供了大量的函数用于数据转换.

  1)Scalar Functions(标量函数)

package com.yuange.flink.day08;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 11:05
 */
public class Flink01_Function_Scala {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromValues("yuange", "henniubi", "kelasi");

        //在tableApi中使用自定义函数
        //一、内联方式方式使用
        /*table.select($("f0"),call(MyUpperCase.class,$("f0")).as("u"))
                .execute()
                .print();*/

        //二、注册之后使用
        /*tableEnvironment.createTemporaryFunction("upper", MyUpperCase.class);
        table.select($("f0"),call("upper",$("f0")).as("u"))
                .execute()
                .print();*/

        //三、在SQL中使用(只能先注册再使用)
        tableEnvironment.createTemporaryFunction("upper", MyUpperCase.class);
        tableEnvironment.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
    }

    //自定义函数
    public static class MyUpperCase extends ScalarFunction {
        public String eval(String s) {
            if (s == null) {
                return null;
            }
            return s.toUpperCase();
        }
    }
}
    (1)Comparison Functions(比较函数)

    (2)Logical Functions(逻辑函数)

  2)Aggregate Functions(聚合函数)

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 12:52
 */
public class Flink_Function_Agg {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterStream = environment.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromDataStream(waterStream);

        //一、直接使用
        /*table.groupBy($("id"))
                .select($("id"),call(MySum.class,$("vc")).as("vc_sum"))
                .execute()
                .print();*/

        //二、注册后使用
        /*tableEnvironment.createTemporaryFunction("my_sum",MySum.class);
        table.groupBy($("id"))
                .select($("id"),call("my_sum",$("vc")).as("vc_sum"))
                .execute()
                .print();*/

        //三、使用SQL的方式
        tableEnvironment.createTemporaryFunction("my_sum",MySum.class);
        tableEnvironment.sqlQuery("select " +
                " id," +
                " my_sum(vc) sum_vc" +
                " from " + table +
                " group by id")
                .execute()
                .print();
    }

    public static class MySum extends AggregateFunction<Integer,MyAcc> {
        // 返回最终聚合的结果
        @Override
        public Integer getValue(MyAcc accumulator) {
            return accumulator.sum;
        }

        // 初始化累加器
        @Override
        public MyAcc createAccumulator() {
            return new MyAcc();
        }

        // 累加
        public void accumulate(MyAcc acc, Integer v){
            acc.sum += v;
        }
    }

    public static class MyAcc {
        public Integer sum = 0;
    }
}

  3)其他所有内置函数:https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/functions/overview/

11.7.2 自定义函数 

  自定义函数(UDF)是一种扩展开发机制,可以用来在查询语句里调用难以用其他方式表达的频繁使用或自定义的逻辑。

  自定义函数分类:

    a) 标量函数(Scalar functions) 将标量值转换成一个新标量值;

    b) 表值函数(Table functions) 将标量值转换成新的行数据;

    c) 聚合函数(Aggregate functions) 将多行数据里的标量值转换成一个新标量值;

    d) 表值聚合函数(Table aggregate) 将多行数据里的标量值转换成新的行数据;

    e) 异步表值函数(Async table functions) 是异步查询外部数据系统的特殊函数。

  函数用于 SQL 查询前要先经过注册;而在用于 Table API 时,函数可以先注册后调用,也可以 内联 后直接使用。

  1)标量函数

    (1)介绍:用户定义的标量函数,可以将0、1或多个标量值,映射到新的标量值。为了定义标量函数,必须在org.apache.flink.table.functions中扩展基类Scalar Function,并实现(一个或多个)求值(evaluation,eval)方法。标量函数的行为由求值方法决定,求值方法必须公开声明并命名为eval(直接def声明,没有override)。求值方法的参数类型和返回类型,确定了标量函数的参数和返回类型。

    (2)定义函数

// 定义一个可以把字符串转成大写标量函数
public static class ToUpperCase extends ScalarFunction {
    public String eval(String s){
        return s.toUpperCase();
    }
}

    (3)TableAPI或SQL中使用

package com.yuange.flink.day08;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.ScalarFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 11:05
 */
public class Flink01_Function_Scala {

    public static void main(String[] args) {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(conf).setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromValues("yuange", "henniubi", "kelasi");

        //在tableApi中使用自定义函数
        //一、内联方式方式使用
        /*table.select($("f0"),call(MyUpperCase.class,$("f0")).as("u"))
                .execute()
                .print();*/

        //二、注册之后使用
        /*tableEnvironment.createTemporaryFunction("upper", MyUpperCase.class);
        table.select($("f0"),call("upper",$("f0")).as("u"))
                .execute()
                .print();*/

        //三、在SQL中使用(只能先注册再使用)
        tableEnvironment.createTemporaryFunction("upper", MyUpperCase.class);
        tableEnvironment.sqlQuery("select f0, upper(f0) u from " + table).execute().print();
    }

    //自定义函数
    public static class MyUpperCase extends ScalarFunction {
        public String eval(String s) {
            if (s == null) {
                return null;
            }
            return s.toUpperCase();
        }
    }
}

  2)表值函数

    (1)介绍:跟自定义标量函数一样,自定义表值函数的输入参数也可以是 0 到多个标量。但是跟标量函数只能返回一个值不同的是,它可以返回任意多行。返回的每一行可以包含 1 到多列,如果输出行只包含 1 列,会省略结构化信息并生成标量值,这个标量值在运行阶段会隐式地包装进行里。要定义一个表值函数,你需要扩展 org.apache.flink.table.functions 下的 TableFunction,可以通过实现多个名为 eval 的方法对求值方法进行重载。像其他函数一样,输入和输出类型也可以通过反射自动提取出来。表值函数返回的表的类型取决于 TableFunction 类的泛型参数 T,不同于标量函数,表值函数的求值方法本身不包含返回类型,而是通过 collect(T) 方法来发送要输出的行。在 Table API 中,表值函数是通过 .joinLateral(...) 或者 .leftOuterJoinLateral(...) 来使用的。joinLateral 算子会把外表(算子左侧的表)的每一行跟跟表值函数返回的所有行(位于算子右侧)进行 (cross)join。leftOuterJoinLateral 算子也是把外表(算子左侧的表)的每一行跟表值函数返回的所有行(位于算子右侧)进行(cross)join,并且如果表值函数返回 0 行也会保留外表的这一行。在 SQL 里面用 JOIN 或者 以 ON TRUE 为条件的 LEFT JOIN 来配合 LATERAL TABLE(<TableFunction>) 的使用。其实就是以前的UDTF函数

    (2)定义函数

@FunctionHint(output = @DataTypeHint("ROW(word string, len int)"))
public static class Split extends TableFunction<Row> {
    public void eval(String line) {
        if (line.length() == 0) {
            return;
        }
        for (String s : line.split(",")) {
            // 来一个字符串, 按照逗号分割, 得到多行, 每行为这个单词和他的长度
            collect(Row.of(s, s.length()));
        }
    }
}

    (3)TableAPI或SQL中使用

package com.yuange.flink.day08;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 11:20
 */
public class Flink_Function_Table {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromValues("yuange hello", "yuange hello", "aa bb cc 123");

        //一、内联
/*        table
//                .joinLateral(call(SplitAndLen.class,$("f0")))  //内连接
                .leftOuterJoinLateral(call(SplitAndLen.class,$("f0")))  //左连接
                .select($("f0"),$("w"),$("len"))
                .execute()
                .print();*/

        //二、注册后使用
        /*tableEnvironment.createTemporaryFunction("split",SplitAndLen.class);
        table
                .joinLateral(call("split",$("f0")))    //内连接
                .select($("f0"),$("w"),$("len"))
                .execute()
                .print();*/

        tableEnvironment.createTemporaryView("t1",table);
        tableEnvironment.createTemporaryFunction("split",SplitAndLen.class);
        //三、SQL写法:内连接1
        /*tableEnvironment.sqlQuery("select " +
                " f0," +
                " w," +
                " len " +
                " from t1 " +
                " join lateral table (split(f0)) on true")
                .execute()
                .print();*/

        //四、SQL写法:内连接2
        tableEnvironment.sqlQuery("select " +
                " f0," +
                " w1," +
                " len1 " +
                " from t1, " +
                " lateral table(split(f0)) as t(w1,len1)")
                .execute()
                .print();
    }

    @FunctionHint(output = @DataTypeHint("row<w string, len int>")) //指定输出类型
    public static class SplitAndLen extends TableFunction {
        public void eval(String s){
            if (s.contains("yuange")) return;
            String[] split = s.split(" ");
            for (String str : split) {
                // 调用几次就生成几行
                collect(Row.of(str,str.length()));
            }
        }
    }
}

  3)聚合函数

    (1)介绍:用户自定义聚合函数(User-Defined Aggregate Functions,UDAGGs)可以把一个表中的数据,聚合成一个标量值。用户定义的聚合函数,是通过继承AggregateFunction抽象类实现的。

  假设现在有一张表,包含了各种饮料的数据。该表由三列(id、name和price)、五行组成数据。现在我们需要找到表中所有饮料的最高价格,即执行max()聚合,结果将是一个数值。

  AggregateFunction的工作原理如下。

    1) 首先,它需要一个累加器,用来保存聚合中间结果的数据结构(状态)。可以通过调用AggregateFunction的createAccumulator()方法创建空累加器。

    2) 随后,对每个输入行调用函数的accumulate()方法来更新累加器。

    3) 处理完所有行后,将调用函数的getValue()方法来计算并返回最终结果。

  AggregationFunction要求必须实现的方法:

    1) createAccumulator()

    2) accumulate()

    3) getValue()

  除了上述方法之外,还有一些可选择实现的方法。其中一些方法,可以让系统执行查询更有效率,而另一些方法,对于某些场景是必需的。例如,如果聚合函数应用在会话窗口(session group window)的上下文中,则merge()方法是必需的。

    1) retract()  bounded OVER 窗口中是必须实现的。

    2) merge() 

    3) resetAccumulator() 在许多批式聚合中是必须实现的。

    (2)定义函数

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 12:52
 */
public class Flink_Function_Agg {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterStream = environment.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromDataStream(waterStream);

        //一、直接使用
        /*table.groupBy($("id"))
                .select($("id"),call(MySum.class,$("vc")).as("vc_sum"))
                .execute()
                .print();*/

        //二、注册后使用
        /*tableEnvironment.createTemporaryFunction("my_sum",MySum.class);
        table.groupBy($("id"))
                .select($("id"),call("my_sum",$("vc")).as("vc_sum"))
                .execute()
                .print();*/

        //三、使用SQL的方式
        tableEnvironment.createTemporaryFunction("my_sum",MySum.class);
        tableEnvironment.sqlQuery("select " +
                " id," +
                " my_sum(vc) sum_vc" +
                " from " + table +
                " group by id")
                .execute()
                .print();
    }

    public static class MySum extends AggregateFunction<Integer,MyAcc> {
        // 返回最终聚合的结果
        @Override
        public Integer getValue(MyAcc accumulator) {
            return accumulator.sum;
        }

        // 初始化累加器
        @Override
        public MyAcc createAccumulator() {
            return new MyAcc();
        }

        // 累加
        public void accumulate(MyAcc acc, Integer v){
            acc.sum += v;
        }
    }

    public static class MyAcc {
        public Integer sum = 0;
    }
}

  4)表值聚合函数

    (1)介绍:自定义表值聚合函数(UDTAGG)可以把一个表(一行或者多行,每行有一列或者多列)聚合成另一张表结果中可以有多行多列

    比如现在我们需要找到表中所有饮料的前2个最高价格,即执行top2()表聚合。我们需要检查5行中的每一行,得到的结果将是一个具有排序后前2个值的表。

    用户定义的表聚合函数,是通过继承TableAggregateFunction抽象类来实现的。

    TableAggregateFunction的工作原理如下。

      1) 首先,它同样需要一个累加器(Accumulator),它是保存聚合中间结果的数据结构。通过调用TableAggregateFunction的createAccumulator()方法可以创建空累加器。

      2) 随后,对每个输入行调用函数的accumulate()方法来更新累加器。

      3) 处理完所有行后,将调用函数的emitValue()方法来计算并返回最终结果。

    AggregationFunction要求必须实现的方法:

      1) createAccumulator()

      2) accumulate()

    除了上述方法之外,还有一些可选择实现的方法。

      1) retract() 

      2) merge()  

      3) resetAccumulator() 

      4) emitValue() 

      5) emitUpdateWithRetract()

    (2)定义函数

package com.yuange.flink.day08;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @作者:袁哥
 * @时间:2021/7/27 13:04
 */
public class Flink_Function_TableAgg {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        DataStreamSource<WaterSensor> waterStream = environment.fromElements(
                new WaterSensor("sensor_1", 1000L, 10),
                new WaterSensor("sensor_1", 2000L, 20),
                new WaterSensor("sensor_2", 3000L, 30),
                new WaterSensor("sensor_1", 4000L, 40),
                new WaterSensor("sensor_1", 5000L, 50),
                new WaterSensor("sensor_2", 6000L, 60));

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        Table table = tableEnvironment.fromDataStream(waterStream);

        //自定义TableAggregateFunction,用来提取每个sensor最高的两个温度值(两行)
        table.groupBy($("id"))
                .flatAggregate(call(Top22.class, $("vc")).as("v", "rank"))
                .select($("id"), $("v"), $("rank"))
                .execute()
                .print();
    }

    public static class Top22 extends TableAggregateFunction<Tuple2<Integer,Integer>,FirstSecond> {
        @Override
        public FirstSecond createAccumulator() {
            return new FirstSecond();
        }

        // 聚合函数
        public void accumulate(FirstSecond acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }

        // 指标
        public void emitValue(FirstSecond acc, Collector<Tuple2<Integer, Integer>> collector) {
            collector.collect(Tuple2.of(acc.first, 1));  // 调用几次就有几行

            if (acc.second > 0)
                collector.collect(Tuple2.of(acc.second, 2));  // 调用几次就有几行
        }
    }

    public static class FirstSecond {
        public Integer first = 0;
        public Integer second = 0;
    }
}

11.8 SqlClient

  启动换一个yarn-session,然后启动一个sql客户端

/opt/module/flink-yarn/bin/yarn-session.sh -d
/opt/module/flink-yarn/bin/sql-client.sh embedded

  1)建立到Kafka的连接

    (1)创建一个流表从Kafka读取数据,复制 flink-sql-connector-kafka_2.12-1.13.1.jar依赖到 flinklib 目录下,下载地址: https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.13.1/flink-connector-kafka_2.12-1.13.1.jar

wget -P /opt/module/flink-yarn/lib https://repo1.maven.org/maven2/org/apache/flink/flink-connector-kafka_2.12/1.13.1/flink-connector-kafka_2.12-1.13.1.jar

    (2)创建与kafka的连接

create table sensor(id string, ts bigint, vc int)
with(
    'connector'='kafka',
    'topic'='flink_sensor',
    'properties.bootstrap.servers'='hadoop162:9092',
    'properties.group.id'='yuange',
    'format'='json',
    'scan.startup.mode'='latest-offset'
);

    (3)查询数据(若报错,则将kafka的客户端Jar包上传至lib目录,然后重启Yarn-session)

select * from sensor;
//拷贝Jar包
cp /opt/module/kafka-2.4.1/libs/kafka-clients-2.4.1.jar /opt/module/flink-yarn/lib

    (4)向Kafka中的 flink_sensor 主题中生产数据

producer flink_sensor
{"id": "sensor1", "ts": 1000, "vc": 10}

    (5)从流表查询数据(按住R即可刷新表中数据)

  2)建立到mysql的连接

    (1)依赖: flink-connector-jdbc_2.12-1.13.1.jar

    (2)下载地址: https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.1/flink-connector-jdbc_2.12-1.13.1.jar

    (3)copy mysql驱动到lib目录

#一步到位
wget -P /opt/module/flink-yarn/lib https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc_2.12/1.13.1/flink-connector-jdbc_2.12-1.13.1.jar
#MySQL驱动包
cp /opt/module/hive-3.1.2/lib/mysql-connector-java-5.1.27-bin.jar /opt/module/flink-yarn/lib

    (4)在mysql新建gmall库

     (5)新建sensor表并插入数据(若之前启动过yarn-session,则先将其重启)

CREATE TABLE sensor(
    id VARCHAR(100),
    ts BIGINT,
    vc INT
)ENGINE=INNODB DEFAULT CHARSET=utf8;
INSERT INTO sensor VALUES('sensor_1',1,10);
INSERT INTO sensor VALUES('sensor_1',2,20);
INSERT INTO sensor VALUES('sensor_2',1,30);

    (6)创建与MySQL的连接

create table sensor2(id string, ts bigint, vc int)
with(
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://hadoop162:3306/gmall',
    'username'='root',
    'password'='aaaaaa',
    'table-name' = 'sensor'
);

    (7)查询流中的数据(按住R即可刷新表中数据)

select * from sensor2;

第12章 Flink SQL编程实战

12.1 使用SQL实现热门商品TOP N

  目前仅 Blink 计划器支持 Top-N 。Flink 使用 OVER 窗口条件过滤条件相结合以进行 Top-N 查询。利用 OVER 窗口的 PARTITION BY 子句的功能,Flink 还支持逐组 Top-N 。 例如,每个类别中实时销量最高的前五种产品。批处理表和流处理表都支持基于SQL的 Top-N 查询。

  流处理模式需注意:  TopN 查询的结果会带有更新。 Flink SQL 会根据排序键对输入的流进行排序;若 top N 的记录发生了变化,变化的部分会以撤销、更新记录的形式发送到下游。 推荐使用一个支持更新的存储作为 Top-N 查询的 sink 。另外,若 top N 记录需要存储到外部存储,则结果表需要拥有与 Top-N 查询相同的唯一键。

12.1.1 需求描述

  每隔10min 统计最近 1hour的热门商品 top3, 并把统计的结果写入到mysql中

  思路: 

    1)统计每个商品的点击量, 开窗

    2)分组窗口分组

    3)over窗口

12.1.2 数据源

  input/UserBehavior.csv

12.1.3 mysql中创建表

CREATE DATABASE gmall;

USE gmall;

DROP TABLE IF EXISTS `hot_item`;

CREATE TABLE `hot_item` (
  `w_end` timestamp NOT NULL,
  `item_id` bigint(20) NOT NULL,
  `item_count` bigint(20) NOT NULL,
  `rk` bigint(20) NOT NULL,
  PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

12.1.4 导入JDBC Connector依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.12</artifactId>
    <version>1.13.1</version>
</dependency>

12.1.5 具体实现代码

package com.yuange.flink.day09;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @作者:袁哥
 * @时间:2021/7/27 17:34
 */
public class Flink_TopN {

    public static void main(String[] args) {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(environment);

        //建立一个表与source关联
        tableEnvironment.executeSql("create table ub(" +
                " user_id bigint," +
                " item_id bigint," +
                " category_id int," +
                " behavior string," +
                " ts bigint," +
                " et as to_timestamp(from_unixtime(ts))," +
                " watermark for et as et - interval '5' second" +
                ")with(" +
                " 'connector' = 'filesystem', " +
                " 'path' = 'input/UserBehavior.csv', " +
                " 'format' = 'csv' " +
                ")");
//        tableEnvironment.sqlQuery("select * from ub").execute().print();

        //开窗聚合: 计算每个商品在每个窗口内的点击量
        Table table = tableEnvironment.sqlQuery("select " +
                " item_id," +
                " hop_start(et,interval '10' minute,interval '1' hour) w_start," +
                " hop_end(et,interval '10' minute,interval '1' hour) w_end," +
                " count(*) item_count " +
                " from ub " +
                " where behavior = 'pv' " +
                " group by item_id,hop(et,interval '10' minute,interval '1' hour)");
        tableEnvironment.createTemporaryView("t1",table);
//        tableEnvironment.sqlQuery("select * from t1").execute().print();

        //使用over窗口, 按照点击量排序, 给每个点击量配置一个名次
        Table table2 = tableEnvironment.sqlQuery("select " +
                " *," +
                " row_number() over(partition by w_end order by item_count desc) rk " +
                " from t1");
        tableEnvironment.createTemporaryView("t2",table2);

        //过滤出来名次小于等于3的
        Table table3 = tableEnvironment.sqlQuery("select " +
                " item_id," +
                " w_end," +
                " item_count," +
                " rk" +
                " from t2" +
                " where rk <= 3");
        tableEnvironment.createTemporaryView("t3",table3);
//        tableEnvironment.sqlQuery("select * from t3").execute().print();

        //数据写入到mysql中,创建动态表与mysql进行关联
        tableEnvironment.executeSql("create table hot_item(" +
                " item_id bigint," +
                " w_end timestamp(3)," +
                " item_count bigint," +
                " rk bigint," +
                " primary key (w_end,rk) not enforced" +
                ")with(" +
                " 'connector' = 'jdbc', " +
                " 'url' = 'jdbc:mysql://hadoop162:3306/gmall?useSSL=false', " +
                " 'table-name' = 'hot_item', " +
                " 'username' = 'root', " +
                " 'password' = 'aaaaaa' " +
                ")");
        table3.executeInsert("hot_item");
    }
}

  若运行程序过程中包如下错误,则将缺少的类复制下来,去Mavan中搜索该类的依赖,下载最新版本的依赖即可

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/commons/compress/compressors/zstandard/ZstdCompressorInputStream
    at org.apache.flink.api.common.io.FileInputFormat.initDefaultInflaterInputStreamFactories(FileInputFormat.java:124)
    at org.apache.flink.api.common.io.FileInputFormat.<clinit>(FileInputFormat.java:90)
    at org.apache.flink.formats.csv.CsvFileSystemFormatFactory.createReader(CsvFileSystemFormatFactory.java:117)
    at org.apache.flink.table.filesystem.FileSystemTableSource.getInputFormat(FileSystemTableSource.java:162)
    at org.apache.flink.table.filesystem.FileSystemTableSource.getScanRuntimeProvider(FileSystemTableSource.java:126)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:453)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:161)
    at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:119)
    at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:85)
    at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3585)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2507)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2144)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2093)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2050)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:663)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:644)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3438)
    at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:570)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:170)
    at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:162)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:967)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:936)
    at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:275)
    at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
    at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
    at com.yuange.flink.day09.Flink_TopN.main(Flink_TopN.java:32)
Caused by: java.lang.ClassNotFoundException: org.apache.commons.compress.compressors.zstandard.ZstdCompressorInputStream
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 26 more

12.1.6 查看数据

第13章 一些补充知识 

13.1 双流join

  在Flink中, 支持两种方式的流的Join: Window Join和Interval Join

13.1.1 Window Join

  窗口join会join具有相同的key并且处于同一个窗口中的两个流的元素,所有的窗口join都是 inner join, 意味着a流中的元素如果在b流中没有对应的, a流中这个元素就不会处理(就是忽略掉了)

,并且join成功后的元素的会以所在窗口的最大时间作为其时间戳.  例如窗口[5,10), 则元素会以9作为自己的时间戳

  1)滚动窗口Join

package com.yuange.flink.day09;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

/**
 * @作者:袁哥
 * @时间:2021/7/27 18:27
 */
public class Flink_Window_Join {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> s1 = environment.socketTextStream("hadoop164", 8888)  //在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] split = value.split(",");
                    return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000)
                );

        SingleOutputStreamOperator<WaterSensor> s2 = environment.socketTextStream("hadoop164", 9999)  //在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] split = value.split(",");
                    return new WaterSensor(split[0], Long.valueOf(split[1]), Integer.valueOf(split[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forMonotonousTimestamps()
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTs() * 1000)
                );

        s1.join(s2)
                .where(WaterSensor::getId)
                .equalTo(WaterSensor::getId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .apply(new JoinFunction<WaterSensor, WaterSensor, String>() {
                    @Override
                    public String join(WaterSensor first, WaterSensor second) throws Exception {
                        return first + ":" + second;
                    }
                })
                .print();
        environment.execute();
    }
}

  2)滑动窗口Join

  3)会话窗口Join

13.1.2 Interval Join

  间隔流join(Interval Join),是指使用一个流的数据按照keyjoin另外一条流的指定范围的数据,如下图: 橙色的流去join绿色的流,范围是由橙色流的event-time + lower boundevent-time + upper bound来决定的。

  orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

  注意: 

    1) Interval Join只支持event-time

    2) 必须是keyBy之后的流才可以interval join

package com.yuange.flink.day09;

import com.yuange.flink.day02.WaterSensor;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

/**
 * @作者:袁哥
 * @时间:2021/7/27 18:47
 */
public class Flink_Interval_Join {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        SingleOutputStreamOperator<WaterSensor> s1 = env
                .socketTextStream("hadoop164", 8888)  // 在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                                    @Override
                                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                        return element.getTs() * 1000;
                                    }
                                })
                );

        SingleOutputStreamOperator<WaterSensor> s2 = env
                .socketTextStream("hadoop164", 9999)  // 在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<WaterSensor>forMonotonousTimestamps()
                                .withTimestampAssigner(new SerializableTimestampAssigner<WaterSensor>() {
                                    @Override
                                    public long extractTimestamp(WaterSensor element, long recordTimestamp) {
                                        return element.getTs() * 1000;
                                    }
                                })
                );
        s1.keyBy(WaterSensor::getId)
                .intervalJoin(s2.keyBy(WaterSensor::getId))
                .between(Time.seconds(-5),Time.seconds(5))
                .process(new ProcessJoinFunction<WaterSensor, WaterSensor, String>() {
                    @Override
                    public void processElement(WaterSensor left,
                                               WaterSensor right,
                                               Context ctx,
                                               Collector<String> out) throws Exception {
                        out.collect(left + ":" + right);
                    }
                })
                .print();
        env.execute();
    }
}

13.2 海量数据实时去重

13.2.1 方案1: 借助redis的Set

  1)具体实现代码

  2)缺点:需要频繁连接Redis,如果数据量过大, redis的内存也是一种压力

13.2.2 方案2: 使用Flink的MapState

  1)具体实现代码

package com.yuange.flink.day05;

import com.yuange.flink.day02.WaterSensor;
import com.yuange.flink.utils.YuangeUtil;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.AggregatingState;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * @作者:袁哥
 * @时间:2021/7/20 22:33
 */
public class Flink_State_KV_Map {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        conf.setInteger("rest.port", 20000);
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(conf);
        env.setParallelism(2);

        env.socketTextStream("hadoop164",8888)
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0],Long.valueOf(split[1]),Integer.valueOf(split[2]));
                })
                .keyBy(WaterSensor::getId)
                .process(new KeyedProcessFunction<String, WaterSensor, String>() {
                    private MapState<Integer, Object> vcState;

                    @Override
                    public void open(Configuration parameters) throws Exception {
                        vcState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Object>(
                                "vcState",
                                Integer.class,
                                Object.class
                        ));
                    }

                    @Override
                    public void processElement(WaterSensor value, Context ctx, Collector<String> out) throws Exception {
                        vcState.put(value.getVc(),new Object());

                        List<Integer> vcs = YuangeUtil.toList(vcState.keys());
                        out.collect(ctx.getCurrentKey() + " " + vcs);
                    }
                })
                .print();
        env.execute();
    }
}

  2)缺点:如果数据量过大, 状态后端最好选择 RocksDBStateBackend,如果数据量过大, 对存储也有一定压力

13.2.3 方案3: 使用布隆过滤器

  布隆过滤器:布隆过滤器可以大大减少存储的数据的数据量

  1)为什么需要布隆过滤器?

    如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为。布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题。

    布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

  2)基本概念

    布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高概率型数据结构。它专门用来检测集合中是否存在特定的元素。它实际上是一个很长的二进制向量和一系列随机映射函数。

  3)实现原理

    布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

    BF是由一个长度为m比特位数组bit array)与k个哈希函数hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。

    下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。

  4)优点

    (1)不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;

    (2)时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是

    (3)哈希函数之间相互独立,可以在硬件指令层面并行计算。

  5)缺点

    (1)存在假阳性的概率,不适用于任何要求100%准确率的情境;

    (2)只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。

  6)使用场景

    BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适,另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。

  7)假阳性概率的计算

    假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)

    其中各个字母的含义: 

    (1)n :放入BF中的元素的总个数;

    (2)m:BF的总长度,也就是bit数组的个数

    (3)k:哈希函数的个数;

    (4)p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;

    (5)BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:

    a)BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:

    b)BF中的任何一个bit在所有的n元素都添加完之后 0的概率是:

    c)BF中的任何一个bit在所有的n元素都添加完之后 1的概率是:

    d)一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:

    结论:在哈数函数个数k一定的情况下,比特数组m长度越大, p越小, 表示假阳性率越低;已插入的元素个数n越大, p越大, 表示假阳性率越大

    经过各种数学推导:对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:

  8)使用布隆过滤器实现去重:Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

package com.yuange.flink.day09;

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import com.yuange.flink.day03.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;

/**
 * @作者:袁哥
 * @时间:2021/7/27 19:01
 */
public class Flink_UV_Bloom {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        environment.readTextFile("input/UserBehavior.csv")
                .map(line -> {
                    String[] split = line.split(",");
                    return new UserBehavior(Long.valueOf(split[0]),
                            Long.valueOf(split[1]),
                            Integer.valueOf(split[2]),
                            split[3],
                            Long.parseLong(split[4]) * 1000);
                })
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy
                                .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp())
                )
                .filter(behavior -> "pv".equals(behavior.getBehavior()))
                .keyBy(UserBehavior::getBehavior)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {
                    ValueState<Long> sumState;
                    ValueState<BloomFilter<Long>> bfState;
                    @Override
                    public void open(Configuration parameters) throws Exception {
                        sumState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("sumState", Long.class));
                        bfState = getRuntimeContext().getState(
                                new ValueStateDescriptor<BloomFilter<Long>>(
                                        "bfState",
                                        TypeInformation.of(new TypeHint<BloomFilter<Long>>() {
                        })));
                    }

                    @Override
                    public void process(String key,
                                        Context context,
                                        Iterable<UserBehavior> elements,
                                        Collector<String> out) throws Exception {
                        sumState.clear();
                        bfState.clear();

                        sumState.update(0L);
                        bfState.update(BloomFilter.create(Funnels.longFunnel(),1000000000,0.01));

                        Long sum = sumState.value();
                        BloomFilter<Long> bf = bfState.value();

                        for (UserBehavior element : elements) {
                            if (bf.put(element.getUserId())){
                                sum++;  //这次存储的时候元素不存在才算一个新的uid
                            }
                        }
                        out.collect("窗口开始: " + new Date(context.window().getStart()) +
                                ", 窗口结束: " + new Date(context.window().getEnd()) +
                                "uv:" + sum);
                    }
                })
                .print();
        environment.execute();
    }
}
原文地址:https://www.cnblogs.com/LzMingYueShanPao/p/15055284.html