flink kafka source

 1 import java.util.Properties
 2 
 3 import org.apache.flink.api.common.serialization.SimpleStringSchema
 4 import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
 5 import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
 6 
 7 object FlinkDemo05_KafkaSource {
 8     
 9     val prop = new Properties()
10     prop.setProperty("bootstrap.servers", "linux01:9092")
11     prop.setProperty("group.id", "flink-grp")
12     prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
13     prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
14     prop.setProperty("auto.offset.reset", "latest")
15     
16     def main(args: Array[String]): Unit = {
17         //1 创建环境
18         val env = StreamExecutionEnvironment.getExecutionEnvironment
19         //2 获取Stream
20         import org.apache.flink.api.scala._
21         val topic = "flink-topic"
22         val schema = new SimpleStringSchema()
23         val dStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topic, schema, prop))
24         //3 计算
25         val result = dStream.flatMap(_.split("\s")).filter(_.nonEmpty).map((_,1)).keyBy(0).sum(1)
26         result.print()
27         
28         //4 执行
29         env.execute("kafka source job")
30         
31     }
32 }
View Code
原文地址:https://www.cnblogs.com/xiefeichn/p/13174975.html