假期学习12

今天做的是实验六的后一部分。

这是实验的部分代码:

package org.apache.spark.examples.streaming
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
object FlumeEventCount {
 def main(args: Array[String]) {
 if (args.length < 2) {
 System.err.println(
 "Usage: FlumeEventCount <host> <port>")
 System.exit(1)
 }
StreamingExamples.setStreamingLogLevels()
 val Array(host, IntParam(port)) = args
 val batchInterval = Milliseconds(2000)
 // Create the context and set the batch size
 val sparkConf = new SparkConf().setAppName("FlumeEventCount").setMaster("local
[2]")
 val ssc = new StreamingContext(sparkConf, batchInterval)
 // Create a flume stream
 val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_S
ER_2)
 // Print out the count of events received from this server in each batch
 stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
 ssc.start()
 ssc.awaitTermination()
 } }
package org.apache.spark.examples.streaming
import org.apache.log4j.{Level, Logger}
import org.apache.spark.internal.Logging
object StreamingExamples extends Logging {
 /** Set reasonable logging levels for streaming if the user has not configured log4
j. */
 def setStreamingLogLevels() {
 val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
 if (!log4jInitialized) {
 // We first log something to initialize Spark's default logging, then we overri
de the
 // logging level.
 logInfo("Setting log level to [WARN] for streaming example." +
 " To override add a custom log4j.properties to the classpath.")
 Logger.getRootLogger.setLevel(Level.WARN)
 }
 }
}
原文地址:https://www.cnblogs.com/Excusezuo/p/12315279.html