flink下沉数据到kafka

import it.bigdata.flink.study.SensorReding
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer


object KafkaSink {
  def main(args: Array[String]): Unit = {
    //创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    //读取数据
    val inputPath="D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"
    val inputStream = env.readTextFile(inputPath)

    //简单转换
    val dataStream = inputStream.map(data => {
      var arr = data.split(",")
      SensorReding(arr(0), arr(1).toLong, arr(1).toDouble).toString
    })

//    dataStream.print()
    dataStream.addSink(new FlinkKafkaProducer[String](
      "192.168.0.20:9092",
      "flink_sink_test",
      new SimpleStringSchema()
       )
    )



    env.execute("Kafka sink test")
  }
}
author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14928289.html