Flink之TableAPI和SQL(2):表和外部系统的连接方式

相关文章链接

Flink之TableAPI和SQL(1):基本功能描述

Flink之TableAPI和SQL(2):表和外部系统的连接方式

Flink之TableAPI和SQL(3):通过TableAPI和SQL表的一些操作(包括查询,过滤,聚集等)

Flink之TableAPI和SQL(4):表的Sink实现

Flink之TableAPI和SQL(5):表的时间特性

具体实现如下代码所示:

// 1、创建执行环境
// 1.1、创建flink流的执行环境(表的环境需要基于此环境,在新的blink planner中,已经批流统一,所以直接使用流环境即可)
val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
///1.2、设置并行度
env.setParallelism(2)
///1.3、创建表的环境(一般都是基于流,在flink中,批是特殊的流,用流环境也可以进行批处理)
// 创建表环境时,也可以传入配置对象,用于选择老的planner还是blink planner,也可以选择是基于批的方式 或者 流的方式
// (一般不用设置,直接基于env来创建表的环境即可)
val oldStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance()
    .useOldPlanner()
    .inStreamingMode()
    .build()
val blinkStreamSetting: EnvironmentSettings = EnvironmentSettings.newInstance()
    .useBlinkPlanner()
    .inStreamingMode()
    .build()
val tableEnv: StreamTableEnvironment = StreamTableEnvironment.create(env)

// 2、创建表
// 2.1、从外部系统(文件系统,比如文本的字符串数据)中读取数据,在环境中注册表
val filePath: String = "D:\Project\IDEA\bigdata-study\flink-demo\src\main\resources\source.txt"
tableEnv
    .connect(new FileSystem().path(filePath))               // 连接文件系统
    .withFormat(new Csv())                                  // 定义读取数据之后的格式化方法,使用新的Csv(用逗号分隔字段)
    .withSchema(new Schema()                                // 定义表结构
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("inputTable")   // 注册一张表(表名为传入的字符串,可以在sql中直接使用该表名)
// 2.2、连接到kafka,并在catalog中注册表
tableEnv
    .connect(new Kafka()
        .version("0.11")
        .topic("flinkTestTopic")
        .property("bootstrap.servers", "cdh1:9092")
        .property("zookeeper.connect", "cdh1:2181")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("kafkaInputTable")
// 2.3、连接到ES,并在catalog中注册表
tableEnv
    .connect(new Elasticsearch()
        .version("7")
        .host("cdh1", 9200, "http")
        .index("myUsers")
        .documentType("user")
    )
    .withFormat(new Csv())
    .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE())
    )
    .createTemporaryTable("esInputTable")
// 其他外部系统实现方式基本类似
// 可以参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connect.html

// 3、打印表结构 和 表
val inputTable: Table = tableEnv.from("inputTable")
inputTable.printSchema()
inputTable.toAppendStream[Row].print()

// 启动执行器
env.execute("CreateTableDemo")
原文地址:https://www.cnblogs.com/yangshibiao/p/14074208.html