Spark Streaming实现实时流处理

一、Streaming与Flume的联调

Spark 2.2.0 对应于 Flume 1.6.0
 
两种模式:
 
1. Flume-style push-based approach:
 
Flume推送数据給Streaming
 
Streaming的receiver作为Flume的Avro agent
 
Spark workers应该跑在Flume这台机器上
 
Streaming先启动,receiver监听Flume push data的端口
 
 
实现:
 
写flume配置文件:
netcat source -> memory channel -> avro sink
 
IDEA开发:
添加Spark-flume依赖
对应的API是FlumeUtils
 
开发代码:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* Spark Streaming整合Flume的第一种方式
* */
object FlumePushWordCount {
  def main(args: Array[String]): Unit = {
 
    //外部传入参数
    if (args.length != 2) {
      System.out.println("Usage: FlumePushWordCount <hostname> <port>")
      System.exit(1)
    }
 
    val Array(hostname, port) = args  //外部args数组
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePushWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //选择输入ssc的createStream方法,生成一个InputDStream
    val flumeStream = FlumeUtils.createStream(ssc, hostname, port.toInt)
 
    //由于flume的内容有head有body, 需要先把内容拿出来, 并去掉空值
    flumeStream.map(x => new String(x.event.getBody.array()).trim)
        .flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:为了不hard-core,选择外部传入hostname和port
 
在IDEA测试时,可以在
里面的program argument输入运行参数
 
在本地测试时:
 
先启动Streaming作业,然后启动flume agent,最后通过telnet输入数据,观察IDEA的控制台输出
 
 
在服务器测试时:
 
submit时一定要把maven依赖中在--packages加上,自动会在网络上下载依赖
当不能下载时,需要--jars才能把预先下载好的jar包加上
 
 
 
2. Pull-based approach using a custom sink:
 
Streaming拉数据
 
Flume推送的数据先放到sink缓冲区
 
Streaming使用一个reliable flume receiver,确保了数据的接收和备份
 
可靠性更高,支持容错,生产上面常用
 
一台机器运行Flume agent,Spark集群其他机器可访问这台机器的custom sink
 
实现:
 
Flume配置:
使用相关jars包,配置依赖:(参考Spark官网)
sink是一个独特的type
 
IDEA开发:
对应上面Flume的依赖,使用的是createPollStream,区别于第一种模式
其他地方都一样,体现了Spark代码的复用性
 
本地测试:
先启动flume!!后启动Streaming作业
 
 
 
二、Streaming与Kafka的联调
 
Spark2.2.0对应于Kafka 0.8.2.1或更新(本次使用的是0.9.0.0)
 
两种模式:
 
1. Receiver-based approach
 
使用Kafka高级用户API
为了确保零数据丢失,需要用到Write Ahead Logs(出现于Spark 1.2)
同步地保存接收到的数据到日志当中,出错时可以恢复(容错机制)
这是传统的方式,在ZK server中消费数据
 
用KafkaUtils和Streaming对接,一样需要加入kafka的各种依赖(见官网)
使用的API是createStream
 
注意:
  1. 此处的topic分区和RDD的分区不同概念
  2. 多个Kafka DStream可以并行接收
  3. 用write ahead logs时需要配置StorageLevel.MEMORY_AND_DISK_SER
 
 
准备工作:
 
启动ZK server
启动kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties
创建topic
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic
测试topic能否正确生产和消费
kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic
kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic
 
IDEA代码:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
/*
* SparkStreaming对接Kafka其中的Receiver-based方式
* */
object KafkaReceiverWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
      System.exit(1)
    }
 
    val Array(zkQuorum, group, topics, numThreads) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createStream需要传入的其中一个参数是一个Map,就是topics对应的线程数
    val topicsMap = topics.split(",").map((_, numThreads.toInt)).toMap
 
    val message = KafkaUtils.createStream(ssc, zkQuorum, group, topicsMap)
 
    //一定要取Stream的第二位才是数据,可以print出来看看,在实际生产中只是更改这一行的业务逻辑!!!
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
本地测试/服务器测试:
 
从IDEA中输入参数,即可看到结果
 
从服务器测试也是打包submit就行,看web UI的时候留意验证receiver是占有一个Job的,证实了前面的理论
 
 
 
2. Direct Approach
 
No receiver!!!
 
Spark 1.3 版本开始有
 
没有了Receiver,而是周期性地检测Kafka的offset,用了kafka simple consumer API
 
优点:
  1. 简化了并行度,不需要创建多个input stream
  2. 性能更好,达到零数据丢失,且不需要保存副本于write ahead logs中
  3.  一次语义Exactly-once semantics
 
缺点:不能在zookeeper中更新offset,但可以自己设置让其更新
 
 使用的API是createDirectStream
 
准备工作和上面一样。
 
IDEA代码:
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
 
 
/*
* SparkStreaming对接Kafka其中的Direct方式
* */
object KafkaDirectWordCount {
  def main(args: Array[String]): Unit = {
 
    if (args.length != 4) {
      System.out.println("Usage: KafkaReceiverWordCount <brokers> <topics>")
      System.exit(1)
    }
 
    val Array(brokers, topics) = args
 
    val sparkConf = new SparkConf().setMaster("local[2]").setAppName("KafkaReceiverWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(5))
 
    //createDirectStream需要传入kafkaParams和topicsSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
    val topicsSet = topics.split(",").toSet
 
    val message = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
      ssc, kafkaParams, topicsSet
    )
 
 
    //一定要取Stream的第二位才是数据,可以print出来看看
    message.map(_._2).flatMap(_.split(",")).map((_, 1)).reduceByKey(_+_).print()
 
    ssc.start()
    ssc.awaitTermination()
  }
}
 
 
注意:StringDecoder有可能因为前面写Kafka java API时的包冲突而导入失败
 
在IDEA运行时报错:
这是由于之前在Kafka基础学习中我设置的kafka的依赖是0.9.0.0,和我们IDEA冲突,所以要把这一个依赖注释掉才能执行
 
调优时就是配置createDirectStream的参数嘛!!
 
 
 
三、Flume + Kafka + Spark Streaming常用流处理架构
 
实现的需求:实时(到现在为止)的日志访问统计操作
 
由于本人缺乏日志采集来源,故使用python语言来实现一个日志生成器,模拟生产环境中服务器不断生成日志的过程
本生成器产生的日志内容包括ip、time、url、status、referer
 
根据前面的知识,我们在实现的过程中有以下步骤:
1. Flume的选型,在本例中设为exec-memory-kafka
2. 打开kafka一个消费者,再启动flume读取日志生成器中的log文件,可看到kafka中成功读取到日志产生器的实时数据
3. 让Kafka接收到的数据传输到Spark Streaming当中,这样就可以在Spark对实时接收到的数据进行操作了
 
由于与前面一、二的操作基本一致,此处不再重复列出详细操作过程
 
 
下面直接进入Spark中对实时数据的操作:
 
分为数据清洗过程、统计功能实现过程两个步骤!其中统计功能的实现基本上和Spark SQL中的操作一致,这又体现了Spark的代码复用性,即能通用于多个框架中
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
原文地址:https://www.cnblogs.com/kinghey-java-ljx/p/8544405.html