输入DStream之基础数据源以及基于HDFS的实时wordcount程序

输入DStream之基础数据源以及基于HDFS的实时wordcount程序

基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理,相当于处理实时的文件流。

	streamingContext.fileStream<KeyClass,ValueClass,InputFormatClass>(dataDirectory)
    streamingContext.fileStream[KeyClass,ValueClass,InputFormatClass](dataDirectory)

Spark Streaming会监控指定的HDFS目录,并且处理出现在目录中的文件。

所有放入HDFS目录中的文件,都必须有相同的格式,必须使用移动或者重命名的方式,将文件移入目录,一旦处理之后,文件的内容即使改变,也不会再处理了。

基于HDFS文件的数据源是没有Receiver的,因此也不会占用一个cpu core。

一、Java方式

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

/**
 * 基于HDFS文件的
 */
public class JavaHDFSWordCount {

    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("JavaSparkStreaming");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));

        //首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
        JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");
        JavaDStream<String> words = lines.flatMap(
                (FlatMapFunction<String, String>) s -> {
                    return null;
                    //return Arrays.asList(line.spilt(" "));
                }
        );

        JavaPairDStream<String, Integer> pairs = words.mapToPair(
                (PairFunction<String, String, Integer>) word -> new Tuple2<String, Integer>(word, 1)
        );

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                (Function2<Integer, Integer, Integer>) (v1, v2) -> v1 + v2
        );

        wordCounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();

    }
}

二、Scala方式

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object ScalaHDFSWordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setMaster("ScalaHDFSWordCount")

    //scala中,创建的是StreamingContext
    val ssc = new StreamingContext(conf, Seconds(5))

    //必须保证有该目录,否则报错
    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
    val words = lines.flatMap {
      _.split(" ")
    }
    val pairs = words.map {
      word => (word, 1)
    }
    val wordCounts = pairs.reduceByKey {
      _ + _
    }
    wordCounts.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
原文地址:https://www.cnblogs.com/aixing/p/13327434.html