Flink 读取用户自定义source

package com.kpwong.aiptest

import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.functions.source.SourceFunction

import scala.collection.immutable
import scala.util.Random

object MySourceTest {

  def main(args: Array[String]): Unit = {

     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val myDS: DataStream[SensorReading] = env.addSource(new MySensorSource)
     myDS.print()
     env.execute()
  }

}

//自定义SourceFunction

class  MySensorSource() extends  SourceFunction[SensorReading]{
  
  //定义一个flag 用来标记数据源是否正常发出数据

  var running = true

  System.currentTimeMillis()

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    //定义一个随机数发生器
     val random: Random = new Random()
     val init: immutable.IndexedSeq[(String, Long, Double)] = (1 to 10).map(i=>("Sensor"+i,System.currentTimeMillis(),random.nextDouble()))
     while (running)
       {
         Thread.sleep(1000)
         //在上次数据基础上微调,更新温度值
         val current: immutable.IndexedSeq[(String, Long, Double)] = init.map(data=>(data._1,System.currentTimeMillis(),data._3+random.nextGaussian()))

         val readings: immutable.IndexedSeq[SensorReading] = current.map(data=>SensorReading(data._1,data._2,data._3))

         readings.map(data=>sourceContext.collect(data))
//         readings.foreach(data=>sourceContext.collect(data))
       }
    
  }

  override def cancel(): Unit = {
    running = false
  }
}

运行结果:

原文地址:https://www.cnblogs.com/kpwong/p/14089985.html