sparkstreaming+socket workCount 小案例

Consumer代码

import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.storage.StorageLevel
object NetWorkStream {
def main(args: Array[String]): Unit = {
//创建sparkConf对象
var conf=new SparkConf().setMaster("spark://192.168.177.120:7077").setAppName("netWorkStream");
//创建streamingContext:是所有数据流的一个主入口
//Seconds(1)代表每一秒,批量执行一次结果
var ssc=new StreamingContext(conf,Seconds(1));
//从192.168.99.143接受到输入数据
var lines= ssc.socketTextStream("192.168.99.143", 9999);
//计算出传入单词的个数
var words=lines.flatMap { line => line.split(" ")}
var wordCount= words.map { w => (w,1) }.reduceByKey(_+_);
//打印结果
wordCount.print();
ssc.start();//启动进程
ssc.awaitTermination();//等待计算终止
}
在另一台机器上出入
nc -lk 9999
zhang xing sheng zhang
 
消费者终端会显示消费结果
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 128.0 (TID 134) in 30 ms on 192.168.177.120 (1/1)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 128.0, whose tasks have all completed, from pool
17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 128 (print at NetWorkStream.scala:18) finished in 0.031 s
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 64 finished: print at NetWorkStream.scala:18, took 0.080836 s
17/03/25 14:10:33 INFO spark.SparkContext: Starting job: print at NetWorkStream.scala:18
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Got job 65 (print at NetWorkStream.scala:18) with 1 output partitions
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Final stage: ResultStage 130 (print at NetWorkStream.scala:18)
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 129)
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Missing parents: List()
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17), which has no missing parents
17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67 stored as values in memory (estimated size 2.8 KB, free 366.2 MB)
17/03/25 14:10:33 INFO memory.MemoryStore: Block broadcast_67_piece0 stored as bytes in memory (estimated size 1711.0 B, free 366.2 MB)
17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on 192.168.177.120:37341 (size: 1711.0 B, free: 366.3 MB)
17/03/25 14:10:33 INFO spark.SparkContext: Created broadcast 67 from broadcast at DAGScheduler.scala:1012
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 130 (ShuffledRDD[131] at reduceByKey at NetWorkStream.scala:17)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Adding task set 130.0 with 1 tasks
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 130.0 (TID 135, 192.168.177.120, partition 1, NODE_LOCAL, 6468 bytes)
17/03/25 14:10:33 INFO cluster.CoarseGrainedSchedulerBackend$DriverEndpoint: Launching task 135 on executor id: 0 hostname: 192.168.177.120.
17/03/25 14:10:33 INFO storage.BlockManagerInfo: Added broadcast_67_piece0 in memory on 192.168.177.120:45262 (size: 1711.0 B, free: 366.3 MB)
17/03/25 14:10:33 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 130.0 (TID 135) in 14 ms on 192.168.177.120 (1/1)
17/03/25 14:10:33 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 130.0, whose tasks have all completed, from pool
17/03/25 14:10:33 INFO scheduler.DAGScheduler: ResultStage 130 (print at NetWorkStream.scala:18) finished in 0.014 s
17/03/25 14:10:33 INFO scheduler.DAGScheduler: Job 65 finished: print at NetWorkStream.scala:18, took 0.022658 s
-------------------------------------------
Time: 1490422233000 ms
-------------------------------------------
(xing,1)
(zhang,2)
(sheng,1)
 
 
备注:
var conf=new SparkConfig();
new StreamingContext(conf,Seconds(1));//创建context
  1. 定义上下文之后,你应该做下面事情
  2. After a context is defined, you have to do the following.
  3. 根据创建DStream定义输入数据源
  4. Define the input sources by creating input DStreams.
  5. 定义计算方式DStream转换和输出
  6. Define the streaming computations by applying transformation and output operations to DStreams.
  7. 使用streamingContext.start()启动接受数据的进程
  8. Start receiving data and processing it using streamingContext.start().
  9. 等待进程结束
  10. Wait for the processing to be stopped (manually or due to any error) using streamingContext.awaitTermination().
  11. 手动关闭进程
  12. The processing can be manually stopped using streamingContext.stop().
要点
  1. 一旦一个上下文启动,不能在这个上下文中设置新计算或者添加
  2. Once a context has been started, no new streaming computations can be set up or added to it.
  3. 一旦一个上下文停止,就不能在重启
  4. Once a context has been stopped, it cannot be restarted.
  5. 在同一时间一个jvm只能有一个StreamingContext 在活动
  6. Only one StreamingContext can be active in a JVM at the same time.//ssc.stop(false)
  7. StreamingContext 上使用stop函数,同事也会停止sparkContext,仅仅停止StreamingContext,在调用stopSparkContext设置参数为false
  8. stop() on StreamingContext also stops the SparkContext. To stop only the StreamingContext, set the optional parameter of stop() called stopSparkContext to false.
  9. 一个SparkContext 可以创建多个streamingContext和重用,只要在上一个StreamingContext停止前创建下一个StreamingContext
  10. A SparkContext can be re-used to create multiple StreamingContexts, as long as the previous StreamingContext is stopped (without stopping the SparkContext) before the next StreamingContext is created.
原文地址:https://www.cnblogs.com/zhangXingSheng/p/6646913.html