spark streaming中使用flume数据源

有两种方式,一种是sparkstreaming中的driver起监听,flume来推数据;另一种是sparkstreaming按照时间策略轮训的向flume拉数据。

最开始我以为只有第一种方法,但是尼玛问题在于driver起来的结点是没谱的,所以每次我重启streaming后发现尼玛每次都要修改flume的sinks,蛋疼死了,后来才发现有后面的方法,好吧,把不同的方法代码写出来,其实变化不大。(代码转自官方的githup)

第一种,监听端口:

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with an AvroSink in Flume. It will start
 *  an Avro server on at the request host:port address and listen for requests.
 *  Your Flume AvroSink should be pointed to this address.
 *
 *  Usage: FlumeEventCount <host> <port>
 *    <host> is the host the Flume receiver will be started on - a receiver
 *           creates a server and listens for flume events.
 *    <port> is the port the Flume receiver will listen on.
 *
 *  To run this example:
 *    `$ bin/run-example org.apache.spark.examples.streaming.FlumeEventCount <host> <port> `
 */
object FlumeEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumeEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumeEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream
    val stream = FlumeUtils.createStream(ssc, host, port, StorageLevel.MEMORY_ONLY_SER_2)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

第二种是轮训主动向flume拿数据

package org.apache.spark.examples.streaming

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParam
import java.net.InetSocketAddress

/**
 *  Produces a count of events received from Flume.
 *
 *  This should be used in conjunction with the Spark Sink running in a Flume agent. See
 *  the Spark Streaming programming guide for more details.
 *
 *  Usage: FlumePollingEventCount <host> <port>
 *    `host` is the host on which the Spark Sink is running.
 *    `port` is the port at which the Spark Sink is listening.
 *
 *  To run this example:
 *    `$ bin/run-example org.apache.spark.examples.streaming.FlumePollingEventCount [host] [port] `
 */
object FlumePollingEventCount {
  def main(args: Array[String]) {
    if (args.length < 2) {
      System.err.println(
        "Usage: FlumePollingEventCount <host> <port>")
      System.exit(1)
    }

    StreamingExamples.setStreamingLogLevels()

    val Array(host, IntParam(port)) = args

    val batchInterval = Milliseconds(2000)

    // Create the context and set the batch size
    val sparkConf = new SparkConf().setAppName("FlumePollingEventCount")
    val ssc = new StreamingContext(sparkConf, batchInterval)

    // Create a flume stream that polls the Spark Sink running in a Flume agent
    val stream = FlumeUtils.createPollingStream(ssc, host, port)

    // Print out the count of events received from this server in each batch
    stream.count().map(cnt => "Received " + cnt + " flume events." ).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  

原文地址:https://www.cnblogs.com/hark0623/p/4499503.html