IDEA Spark Streaming 操作(文件源)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object DStream_file {
  def main(args: Array[String]): Unit = {
        val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[4]")
        val ss=new StreamingContext(sparkConf,Seconds(15)) //每15秒监听一次sreaming文件夹
        val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
        val words=lines.flatMap(_.split(" "))
        val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
        wordCounts.print(100)   //打印100组
         ss.start()
         ss.awaitTerminationOrTimeout(100000)  //运行100秒程序自动结束
  }
}

结果:

-------------------------------------------
Time: 1508045550000 ms
-------------------------------------------

-------------------------------------------
Time: 1508045565000 ms
-------------------------------------------

-------------------------------------------
Time: 1508045580000 ms
-------------------------------------------
(88,2)
(4,1)
(8,1)
(ya,1)
(55,2)
(me,2)
(49,1)
(i,4)
(9,1)
(but,1)
(1,2)
(dont,1)
(2,2)
(79,1)
(you,4)
(know,2)
(3,2)
(like,2)
(76,1)

-------------------------------------------
Time: 1508045595000 ms
-------------------------------------------

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Created by soyo on 17-10-15.
*/
object DStream_file {
def main(args: Array[String]): Unit = {
val sparkConf=new SparkConf().setAppName("wordCount").setMaster("local[2]")
val ss=new StreamingContext(sparkConf,Seconds(10))
val lines=ss.textFileStream("file:///usr/local2/spark/mycode/streaming")
val words=lines.flatMap(_.split(" "))
val wordCounts=words.map(x=>(x,1)).reduceByKey((x,y)=>x+y)
wordCounts.print(100)
ss.start()
ss.awaitTerminationOrTimeout(20000) //运行20秒程序自动结束
}

}
原文地址:https://www.cnblogs.com/soyo/p/7670710.html