flink Sink file

import it.bigdata.flink.study.SensorReding
import org.apache.flink.api.common.serialization.SimpleStringEncoder
import org.apache.flink.core.fs.Path
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
import org.apache.flink.streaming.api.scala._

object FileSink {
  def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取文件数据
    val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
    val inputSteam = env.readTextFile(inputPath)

    //简单的转换
    val dataSteam = inputSteam.map(data => {
      val arr = data.split(",")
      SensorReding(arr(0), arr(1).toLong, arr(2).toDouble).toString
    })

    //sink到文件
//    dataSteam.writeAsCsv("D:\ideaDemo\maven_flink\src\main\resources\out.txt")


    dataSteam.addSink(
      StreamingFileSink.forRowFormat(
        new Path("D:\ideaDemo\maven_flink\src\main\resources\out1.txt"),
        new SimpleStringEncoder[String]()
      ).build()
    )

    dataSteam.print()

    //调用执行环境
    env.execute("file sink test")

  }
}
author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14928233.html