flink-demo2

package cn.irisz.steam

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{EnvironmentSettings, TableResult}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Demo2 {
  def main(args: Array[String]): Unit = {
    // 1. env
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setRuntimeMode(RuntimeExecutionMode.BATCH)
    env.setParallelism(1)
    val settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
    val tEnv = StreamTableEnvironment.create(env, settings)

    // 2. source
    // val fileSource: DataStream[String] = env.readTextFile("data/aceess.log_20200914.csv")
    tEnv.executeSql(
      """
        |CREATE TABLE log (
        |   `id` Int,
        |   `i_city` String,
        |   `i_country` String,
        |   `i_isp` String,
        |   `i_province` String,
        |   `ip` String,
        |   `length` BigInt,
        |   `method` String,
        |   `referer` String,
        |   `status_code` Int,
        |   `t_hour` Int,
        |   `t_minute` Int,
        |   `t` TIMESTAMP,
        |   `ua` String,
        |   `url` String,
        |   `url_param` String,
        |   `url_path` String,
        |   `version` String,
        |   `xff` String
        |)WITH (
        |   'connector' = 'filesystem',
        |   'path' = 'data/aceess.log_20200914.csv',
        |   'format' = 'csv'
        |)
        |""".stripMargin)

    tEnv.executeSql(
      """
        |CREATE TABLE `result` (
        |   `t_hour` Int,
        |   `t_minute` Int,
        |   `cnt` BigInt
        |) WITH (
        |   'connector' = 'print'
        |)
        |""".stripMargin)

    // 3. transfer

    // 4. sink
//    logStream.print()
val result: TableResult = tEnv.sqlQuery(
  """
    |   SELECT t_hour, t_minute, COUNT(1) AS cnt
    |   FROM log
    |   WHERE status_code = 200
    |   GROUP BY t_hour, t_minute
    |""".stripMargin).execute()

    result.print()

    // 5. execute
    env.execute("calc log count for minute and hour").wait()
//    tEnv.execute("calc log count for minute and hour")
  }
}


原文地址:https://www.cnblogs.com/zpzhue/p/14948086.html