Spark Streaming的wordcount案例

之前测试的一些spark案例都是采用离线处理,spark streaming的流处理一样可以运行经典的wordcount。

基本环境:

spark-2.0.0

scala-2.11.0

IDEA-15.0.6

创建项目,贴上代码:

package org.iie

import org.apache.log4j.{Level,Logger}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Minutes, Seconds, StreamingContext}

/**
  * Created by hbwxcw on 2016/12/9.
  */
object NetworkWordCount {
  def main(args: Array[String]) {
    import org.apache.log4j.{Level,Logger}
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.sql").setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark.streaming").setLevel(Level.WARN)

    val sparkConf = new SparkConf().setAppName("nwc")
    val ssc = new StreamingContext(sparkConf,Seconds(1))
    val lines = ssc.socketTextStream(args(0),args(1).toInt,StorageLevel.MEMORY_AND_DISK_SER)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x,1)).reduceByKey(_+_)
    wordCounts.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}

记得在pom.xml下引入依赖:

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.0.0</version>
    </dependency>

再生成jar包传到集群上,用spark-submit进行提交。

但是得注意后面得加上主机名和端口号。。

你就会看到下面这个家伙= =。

表示上面那个ERROR对结果没什么影响啊,不管它= =。。

再在另外一个窗口运行:

nc -l -p 9999

我用的是9999端口。。你们随意。。。

网上好多博客都用的是nc -lk 9999,反正我是没用,疑似版本问题。。。

顺便贴一下结果:

在一端用nc输入:o o a a ss s aa aa

另外一端出现:

原文地址:https://www.cnblogs.com/hbwxcw/p/6149972.html