package it.bigdata.flink.study import java.util.Properties import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.streaming.api.functions.source.SourceFunction import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import scala.util.Random //定义样例类,温度传感器 case class SensorReding(id:String,timestamp:Long,temperature:Double) object SourceTask { def main(args: Array[String]): Unit = { //创建执行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment //设置全局并行度为1 env.setParallelism(1) //1.从集合中读取数据 val dataList =List( SensorReding("sensor_1",1547718199,35.8), SensorReding("sensor_6",1547718101,15.4), SensorReding("sensor_7",1547718102,6.7), SensorReding("sensor_10",1547718205,38.1) ) val stream1 = env.fromCollection(dataList) // env.fromElements(15,123,"hello") // stream1.print() //2.从文件中读取数据 val inputPath = "D:\ideaDemo\maven_flink\src\main\resources\sensor.txt"; val stream2 = env.readTextFile(inputPath) stream2.print() //3.从kafka中读取数据 val props = new Properties() props.setProperty("bootstrap.servers","10.18.35.155:9092,10.18.35.156:9092,10.18.35.157:9092") // props.setProperty("group.id","consumer-group") val stream3 = env.addSource(new FlinkKafkaConsumer[String]("test_topic", new SimpleStringSchema(), props)) // stream3.print() //4.自定义Source val stream4 = env.addSource( new MySensorSource()) // stream4.print() //执行 env.execute("source test") } } //自定义的Sourcefunction class MySensorSource() extends SourceFunction[SensorReding]{ //定义一个标志位flag,用来表示数据是否正常运行发出数据 var running:Boolean = true override def cancel(): Unit = running=false override def run(sourceContext: SourceFunction.SourceContext[SensorReding]): Unit = { //定义也给随机数发生器 val rand = new Random() //随记生成一组(10个)传感器初始温度:(id,temp) var curTemp = 1.to(10).map(i => ("sensor_" + i, rand.nextDouble() * 100)) //定义无限的循环,不停的产生数据除非被cancel while (running){ //在上次基础上微调,更新温度值 curTemp.map( data=>(data._1,data._2+rand.nextGaussian()) ) //获取当前时间戳,加入到数据中,调用sourceContext.collect发送数据 val curTime = System.currentTimeMillis() curTemp.foreach(data=>sourceContext.collect(SensorReding(data._1,curTime,data._2))) //间隔100ms Thread.sleep(100) } } }
flink定义各类source
author@nohert