Flume直接对接SaprkStreaming的两种方式

一、flume对接sparkStreaming的两种方式:

Push推送的方式

Poll拉取的方式

第一种Push方式:

代码如下:

package cn.itcast.spark.day5

import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * .
  */
object FlumePushWordCount {

  def main(args: Array[String]) {
    val host = args(0)
    val port = args(1).toInt
    LoggerLevels.setStreamingLogLevels()
    val conf = new SparkConf().setAppName("FlumeWordCount")//.setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //推送方式: flume向spark发送数据
    val flumeStream = FlumeUtils.createStream(ssc, host, port)
    //flume中的数据通过event.getBody()才能拿到真正的内容
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_, 1))

    val results = words.reduceByKey(_ + _)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

  flume配置如下:

#agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具体定义source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/monitor
#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#具体定义sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = 10.1.9.102  (是本机IP)
a1.sinks.k1.port = 6666
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  启动flume:

/usr/java/flume/bin/flume-ng agent -n a1 -c conf -f /usr/java/flume/mytest/push.properties

第二种Poll的方式:

但是这种方法必须要引入Spark官方的一个jar包,见官方的文档:点击跳转,将jar下载下来放到flume安装包的lib目录下即可,点击直接下载jar包

代码如下:

package cn.itcast.spark.day5

import java.net.InetSocketAddress

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}

object FlumePollWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("FlumePollWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))
    //从flume中拉取数据(flume的地址)
    val address = Seq(new InetSocketAddress("172.16.0.11", 8888))
    val flumeStream = FlumeUtils.createPollingStream(ssc, address, StorageLevel.MEMORY_AND_DISK)
    val words = flumeStream.flatMap(x => new String(x.event.getBody().array()).split(" ")).map((_,1))
    val results = words.reduceByKey(_+_)
    results.print()
    ssc.start()
    ssc.awaitTermination()
  }
}
启动flume
#agent名, source、channel、sink的名称
a1.sources = r1
a1.channels = c1
a1.sinks = k1
#具体定义source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/monitor
#具体定义channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 100
#具体定义sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = 192.168.80.123
a1.sinks.k1.port = 10086
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

  启动flume:

/usr/java/flume/bin/flume-ng agent -n a1 -c conf -f /usr/java/flume/mytest/push.properties

原文地址:https://www.cnblogs.com/zmoumou/p/10310880.html