flink定义各类source

package it.bigdata.flink.study

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer

import scala.util.Random

//定义样例类,温度传感器
case class SensorReding(id:String,timestamp:Long,temperature:Double)

object SourceTask {
  def main(args: Array[String]): Unit = {
    //创建执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    //设置全局并行度为1
    env.setParallelism(1)
    //1.从集合中读取数据
    val dataList =List(
      SensorReding("sensor_1",1547718199,35.8),
      SensorReding("sensor_6",1547718101,15.4),
      SensorReding("sensor_7",1547718102,6.7),
      SensorReding("sensor_10",1547718205,38.1)
    )
    val stream1 = env.fromCollection(dataList)
//    env.fromElements(15,123,"hello")
    //    stream1.print()

    //2.从文件中读取数据
    val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt";
    val stream2 = env.readTextFile(inputPath)

    stream2.print()

    //3.从kafka中读取数据
    val props = new Properties()
    props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092")
//    props.setProperty("group.id","consumer-group")
    val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_topic", new SimpleStringSchema(), props))
//    stream3.print()


    //4.自定义Source
    val stream4 = env.addSource( new MySensorSource())
//    stream4.print()

    //执行
    env.execute("source test")

  }

}


//自定义的Sourcefunction
class MySensorSource() extends SourceFunction[SensorReding]{
  //定义一个标志位flag,用来表示数据是否正常运行发出数据
  var running:Boolean = true

  override def cancel(): Unit = running=false
    override def run(sourceContext: SourceFunction.SourceContext[SensorReding]): Unit = {
      //定义也给随机数发生器
      val rand = new Random()

      //随记生成一组(10个)传感器初始温度:(id,temp)
      var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100))

      //定义无限的循环,不停的产生数据除非被cancel
      while (running){
        //在上次基础上微调,更新温度值
        curTemp.map(
          data=>(data._1,data._2+rand.nextGaussian())
        )
        //获取当前时间戳,加入到数据中,调用sourceContext.collect发送数据
        val curTime = System.currentTimeMillis()
        curTemp.foreach(data=>sourceContext.collect(SensorReding(data._1,curTime,data._2)))
        //间隔100ms
        Thread.sleep(100)
      }
  }

}
author@nohert
原文地址:https://www.cnblogs.com/gzgBlog/p/14928146.html