spark streaming 实现接收网络传输数据进行WordCount功能

package iie.udps.example.operator.spark;

import scala.Tuple2;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.Time;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.List;

import com.google.common.io.Files;

import org.apache.spark.api.java.JavaPairRDD;

import com.google.common.base.Optional;

/**
 * To run this on your local machine, you need to first run a Netcat server
 * 
 * `$ nc -lk 9999`
 * 
 * and run the example as
 * 
 * spark-submit --class iie.udps.example.operator.spark.JavaNetworkWordCount
 * --master local /home/xdf/test2.jar localhost 9999 /user/test/checkpoint/
 * /home/xdf/outputFile /home/xdf/totalOutputFile
 * 
 * 此示例接收Netcat server产生的数据,进行WordCount操作,分别输出当前结果和历史结果到本地文件中
 */
public final class JavaNetworkWordCount {

	@SuppressWarnings("serial")
	public static void main(String[] args) {

		if (args.length != 5) {
			System.err.println("You arguments were " + Arrays.asList(args));
			System.err
					.println("Usage: JavaNetworkWordCount <hostname> <port> <checkpoint-directory>
"
							+ "     <output-file> <total-output-file>. <hostname> and <port> describe the TCP server that Spark
"
							+ "     Streaming would connect to receive data. <checkpoint-directory> directory to
"
							+ "     HDFS-compatible file system which checkpoint data <output-file> file to which
"
							+ "     the word counts will be appended
"
							+ "     <total-output-file> file to which the total word counts will be appended
"
							+ "
"
							+ "In local mode, <master> should be 'local[n]' with n > 1
"
							+ "Both <checkpoint-directory> and <output-file> and <total-output-file> must be absolute paths");
			System.exit(1);
		}

		final String checkpointDirectory = args[2]; // 检查点目录
		final String curOutputPath = args[3];// 输出当前WordCount结果的路径
		final String totalOutputPath = args[4];// 输出全部累计WordCount结果的路径
		System.out.println("Creating new context");
		final File curOutputFile = new File(curOutputPath);
		if (curOutputFile.exists()) {
			curOutputFile.delete();
		}
		final File totalOutputFile = new File(totalOutputPath);
		if (totalOutputFile.exists()) {
			totalOutputFile.delete();
		}
		// Create a StreamingContext
		SparkConf conf = new SparkConf().setAppName("NetworkWordCount");
		final JavaStreamingContext jssc = new JavaStreamingContext(conf,
				new Duration(1000));

		jssc.checkpoint(checkpointDirectory);

		// Create a DStream that will connect to hostname:port, like
		// localhost:9999
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream(args[0],
				Integer.parseInt(args[1]));

		// Split each line into words
		JavaDStream<String> words = lines
				.flatMap(new FlatMapFunction<String, String>() {
					@Override
					public Iterable<String> call(String x) {
						return Arrays.asList(x.split(" "));
					}
				});

		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words
				.mapToPair(new PairFunction<String, String, Integer>() {
					@Override
					public Tuple2<String, Integer> call(String s)
							throws Exception {
						return new Tuple2<String, Integer>(s, 1);
					}
				});
		JavaPairDStream<String, Integer> runningCounts = pairs
				.reduceByKey(new Function2<Integer, Integer, Integer>() {
					@Override
					public Integer call(Integer i1, Integer i2)
							throws Exception {
						return i1 + i2;
					}
				});

		runningCounts
				.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
					@Override
					public Void call(JavaPairRDD<String, Integer> rdd, Time time)
							throws IOException {
						String counts = "Counts at time " + time + " "
								+ rdd.collect();
						System.out.println(counts);
						System.out.println("Appending to "
								+ curOutputFile.getAbsolutePath());
						Files.append(counts + "
", curOutputFile,
								Charset.defaultCharset());
						return null;
					}
				});

		Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction = new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
			@Override
			public Optional<Integer> call(List<Integer> values,
					Optional<Integer> state) {
				Integer newSum = state.or(0);
				for (Integer i : values) {
					newSum += i;
				}
				return Optional.of(newSum);
			}
		};

		JavaPairDStream<String, Integer> TotalCounts = words.mapToPair(
				new PairFunction<String, String, Integer>() {
					@Override
					public Tuple2<String, Integer> call(String s) {
						return new Tuple2<String, Integer>(s, 1);
					}
				}).updateStateByKey(updateFunction);

		TotalCounts
				.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
					@Override
					public Void call(JavaPairRDD<String, Integer> rdd, Time time)
							throws IOException {
						String counts = "Counts at time " + time + " "
								+ rdd.collect();
						System.out.println(counts);
						System.out.println("Appending to "
								+ totalOutputFile.getAbsolutePath());
						Files.append(counts + "
", totalOutputFile,
								Charset.defaultCharset());
						return null;
					}
				});

		jssc.start(); // Start the computation
		jssc.awaitTermination(); // Wait for the computation to terminate
		System.exit(0);
	}

}

  

原文地址:https://www.cnblogs.com/xiaodf/p/5027179.html