SparkStreaming+Kafka 处理实时WIFI数据

转载自:http://www.cnblogs.com/bigbigtree/p/6908014.html

业务背景

通过实时抽取华为ESIGHT系统的wifi数据,与校园的学生数据、课程数据、地理位置数据等进行关联,进行校园大数据的流数据处理与分析。

技术选型

  • Kafka调用ESIGHT的resutful API,接入无线数据;
  • Sparkstreaming将流数据与Hive中的其他校园数据关联分析
  • 使用ES For Hadoop将分析结果导出到ES集群中

Kafka Producer

技术常规,使用kafka接入ESIGHT数据,只需要注意

  • 默认的分区方法是否产生数据偏移
  • 如果偏移需要自定义kafka.producer.Partitioner

SparkStreaming 接收Kafka数据流

用spark streaming流式处理kafka中的数据,第一步当然是先把数据接收过来,转换为spark streaming中的数据结构Dstream。
接收数据的方式有两种:

基于Receiver接收数据

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

需要注意的问题有:

  • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度。
  • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
  • 如果我们启用了Write Ahead Logs复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER

直连方式读取kafka数据

这种新的不基于Receiver的直接方式,是在Spark 1.3之后引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

这种方式有如下优点:

  • 简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

  • 高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

  • 一次且仅一次(extract-once)的事务机制: 基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。 基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

Direct连接示例

  1. import org.apache.spark.streaming.kafka.*;
  2.  
  3.  JavaPairInputDStream<String, String> directKafkaStream =
  4.      KafkaUtils.createDirectStream(streamingContext,
  5.          [key class], [value class], [key decoder class], [value decoder class],
  6.          [map of Kafka parameters], [set of topics to consume]);

但Direct连接方式为了能够进行异常恢复,需要考虑如何维护KafkaOffset的问题。通常由两种方式维护

  • 使用Spark的checkpoint机制,根据需要定期checkpoint并恢复。由于项目使用SparkSQL从Hive中拉取数据,可能由于SparkSQLContext的恢复处理不当,在恢复的时候会失败;
  • 通过SparkStreaming的API在Zookeeper中维护Kafka的Offset

使用Zookeeper维护KafkaOffset示例

  1. import java.util.HashMap;
  2. import java.util.HashSet;
  3. import java.util.Map;
  4. import java.util.Set;
  5. import java.util.concurrent.atomic.AtomicReference;
  6. import org.apache.spark.SparkConf;
  7. import org.apache.spark.api.java.JavaRDD;
  8. import org.apache.spark.api.java.function.Function;
  9. import org.apache.spark.api.java.function.VoidFunction;
  10. import org.apache.spark.broadcast.Broadcast;
  11. import org.apache.spark.streaming.Duration;
  12. import org.apache.spark.streaming.api.java.JavaDStream;
  13. import org.apache.spark.streaming.api.java.JavaInputDStream;
  14. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  15. import org.apache.spark.streaming.kafka.HasOffsetRanges;
  16. import org.apache.spark.streaming.kafka.KafkaCluster;
  17. import org.apache.spark.streaming.kafka.KafkaUtils;
  18. import org.apache.spark.streaming.kafka.OffsetRange;
  19.  
  20. import com.sugon.smartcampus.etl.wifi.conf.WIFIConfig;
  21.  
  22. import kafka.common.TopicAndPartition;
  23. import kafka.message.MessageAndMetadata;
  24. import kafka.serializer.StringDecoder;
  25. import scala.Predef;
  26. import scala.Tuple2;
  27. import scala.collection.JavaConversions;
  28. import lombok.extern.slf4j.*;
  29.  
  30. @Slf4j
  31. public class KafkaOffsetExample {
  32.     private static KafkaCluster kafkaCluster = null;
  33.     private static HashMap<String, String> kafkaParam = new HashMap<String, String>();
  34.     private static Broadcast<HashMap<String, String>> kafkaParamBroadcast = null;
  35.     private static scala.collection.immutable.Set<String> immutableTopics = null;
  36.  
  37.     /** * Create the Kafka Stream Directly With Offset in ZK * * @param jssc * SparkStreamContext * @param consumerOffsetsLong * Save the Offset of Kafka Topic * @return */
  38.     private static JavaInputDStream<String> createKafkaDStream(JavaStreamingContext jssc,
  39.             Map<TopicAndPartition, Long> consumerOffsetsLong) {
  40.         KafkaOffsetExample.log.warn("Create KafkaDriectStream with Offset");
  41.         JavaInputDStream<String> message = KafkaUtils.createDirectStream(jssc, String.class, String.class,
  42.                 StringDecoder.class, StringDecoder.class, String.class, kafkaParamBroadcast.getValue(),
  43.                 consumerOffsetsLong, new Function<MessageAndMetadata<String, String>, String>() {
  44.                     private static final long serialVersionUID = 1L;
  45.  
  46.                     @Override
  47.                     public String call(MessageAndMetadata<String, String> v1) throws Exception {
  48.                         return v1.message();
  49.                     }
  50.                 });
  51.         return message;
  52.     }
  53.  
  54.     private static Map<TopicAndPartition, Long> initConsumerOffset(String topic) {
  55.         Set<String> topicSet = new HashSet<String>();
  56.         topicSet.add(topic);
  57.         scala.collection.mutable.Set<String> mutableTopics = JavaConversions.asScalaSet(topicSet);
  58.         immutableTopics = mutableTopics.toSet();
  59.         scala.collection.immutable.Set<TopicAndPartition> topicAndPartitionSet = kafkaCluster
  60.                 .getPartitions(immutableTopics).right().get();
  61.          
  62.         // kafka direct stream 初始化时使用的offset数据
  63.         Map<TopicAndPartition, Long> consumerOffsetsLong = new HashMap<TopicAndPartition, Long>();
  64.         if (kafkaCluster.getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).isLeft()) {
  65.             KafkaOffsetExample.log.warn("没有保存offset, 各个partition offset 默认为0");
  66.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
  67.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
  68.                 consumerOffsetsLong.put(topicAndPartition, 0L);
  69.             }
  70.         }
  71.         else {
  72.             KafkaOffsetExample.log.warn("offset已存在, 使用保存的offset");
  73.             scala.collection.immutable.Map<TopicAndPartition, Object> consumerOffsetsTemp = kafkaCluster
  74.                     .getConsumerOffsets(kafkaParam.get("group.id"), topicAndPartitionSet).right().get();
  75.  
  76.             Map<TopicAndPartition, Object> consumerOffsets = JavaConversions.mapAsJavaMap(consumerOffsetsTemp);
  77.             Set<TopicAndPartition> topicAndPartitionSet1 = JavaConversions.setAsJavaSet(topicAndPartitionSet);
  78.  
  79.             KafkaOffsetExample.log.warn("put data in consumerOffsetsLong");
  80.             for (TopicAndPartition topicAndPartition : topicAndPartitionSet1) {
  81.                 Long offset = (Long) consumerOffsets.get(topicAndPartition);
  82.                 consumerOffsetsLong.put(topicAndPartition, offset);
  83.             }
  84.         }
  85.         return consumerOffsetsLong;
  86.     }
  87.      
  88.     private static JavaDStream<String> getAndUpdateKafkaOffset(JavaInputDStream<String> message,
  89.             AtomicReference<OffsetRange[]> offsetRanges) {
  90.         JavaDStream<String> javaDStream = message.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
  91.             private static final long serialVersionUID = 1L;
  92.             public JavaRDD<String> call(JavaRDD<String> rdd) throws Exception {
  93.                 OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  94.                 offsetRanges.set(offsets);
  95.                 for (int i = 0; i < offsets.length; i++)
  96.                     KafkaOffsetExample.log.warn("topic : {}, partitions: {}, fromoffset: {}, untiloffset: {}",
  97.                             offsets[i].topic(), offsets[i].partition(), offsets[i].fromOffset(),
  98.                             offsets[i].untilOffset());
  99.                 return rdd;
  100.             }
  101.         });
  102.         KafkaOffsetExample.log.warn("foreachRDD");
  103.         // output
  104.         javaDStream.foreachRDD(new VoidFunction<JavaRDD<String>>() {
  105.             private static final long serialVersionUID = 1L;
  106.  
  107.             public void call(JavaRDD<String> rdd) throws Exception {
  108.                 if (rdd.isEmpty()) {
  109.                     KafkaOffsetExample.log.warn("Empty RDD");
  110.                     return;
  111.                 }
  112.                 for (OffsetRange o : offsetRanges.get()) {
  113.                     // 封装topic.partition 与 offset对应关系 java Map
  114.                     TopicAndPartition topicAndPartition = new TopicAndPartition(o.topic(), o.partition());
  115.                     Map<TopicAndPartition, Object> topicAndPartitionObjectMap = new HashMap<TopicAndPartition, Object>();
  116.                     topicAndPartitionObjectMap.put(topicAndPartition, o.untilOffset());
  117.  
  118.                     KafkaOffsetExample.log.warn(
  119.                             "Topic: " + o.topic() + " partitions: " + o.partition() + " offset : " + o.untilOffset());
  120.  
  121.                     // 转换java map to scala immutable.map
  122.                     scala.collection.mutable.Map<TopicAndPartition, Object> testMap = JavaConversions
  123.                             .mapAsScalaMap(topicAndPartitionObjectMap);
  124.                     scala.collection.immutable.Map<TopicAndPartition, Object> scalatopicAndPartitionObjectMap = testMap
  125.                             .toMap(new Predef.$less$colon$less<Tuple2<TopicAndPartition, Object>, Tuple2<TopicAndPartition, Object>>() {
  126.                                 private static final long serialVersionUID = 1L;
  127.  
  128.                                 @Override
  129.                                 public Tuple2<TopicAndPartition, Object> apply(Tuple2<TopicAndPartition, Object> v1) {
  130.                                     return v1;
  131.                                 }
  132.                             });
  133.                     // 更新offset到kafkaCluster
  134.                     kafkaCluster.setConsumerOffsets(kafkaParamBroadcast.getValue().get("group.id"),
  135.                             scalatopicAndPartitionObjectMap);
  136.                 }
  137.             }
  138.         });
  139.         return javaDStream;
  140.     }
  141.      
  142.     private static void initKafkaParams() {
  143.         kafkaParam.put("metadata.broker.list", WIFIConfig.BROKER_LIST);
  144.         kafkaParam.put("zookeeper.connect", WIFIConfig.ZK_CONNECT);
  145.         kafkaParam.put("auto.offset.reset", WIFIConfig.AUTO_OFFSET_RESET);
  146.         kafkaParam.put("group.id", WIFIConfig.GROUP_ID);
  147.     }
  148.      
  149.     private static KafkaCluster initKafkaCluster() {
  150.         KafkaOffsetExample.log.warn("transform java Map to scala immutable.map");
  151.         // transform java Map to scala immutable.map
  152.         scala.collection.mutable.Map<String, String> testMap = JavaConversions.mapAsScalaMap(kafkaParam);
  153.         scala.collection.immutable.Map<String, String> scalaKafkaParam = testMap
  154.                 .toMap(new Predef.$less$colon$less<Tuple2<String, String>, Tuple2<String, String>>() {
  155.                     private static final long serialVersionUID = 1L;
  156.  
  157.                     @Override
  158.                     public Tuple2<String, String> apply(Tuple2<String, String> arg0) {
  159.                         return arg0;
  160.                     }
  161.                 });
  162.  
  163.         // init KafkaCluster
  164.         KafkaOffsetExample.log.warn("Init KafkaCluster");
  165.         return new KafkaCluster(scalaKafkaParam);
  166.     }
  167.      
  168.     public static void run() {
  169.         initKafkaParams();
  170.         kafkaCluster = initKafkaCluster();
  171.  
  172.         SparkConf sparkConf = new SparkConf().setMaster("local[4]").setAppName("tachyon-test-consumer");
  173.         JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(5000));
  174.          
  175.         // 得到rdd各个分区对应的offset, 并保存在offsetRanges中
  176.         KafkaOffsetExample.log.warn("initConsumer Offset");
  177.         Map<TopicAndPartition, Long> consumerOffsetsLong = initConsumerOffset(WIFIConfig.KAFKA_TOPIC);
  178.         kafkaParamBroadcast = jssc.sparkContext().broadcast(kafkaParam);
  179.          
  180.         JavaInputDStream<String> message = createKafkaDStream(jssc, consumerOffsetsLong);
  181.         final AtomicReference<OffsetRange[]> offsetRanges = new AtomicReference<OffsetRange[]>();
  182.         JavaDStream<String> javaDStream = getAndUpdateKafkaOffset(message, offsetRanges);
  183.          
  184.         javaDStream.print();
  185.          
  186.         jssc.start();
  187.         try {
  188.             jssc.awaitTermination();
  189.         } catch (InterruptedException e) {
  190.             e.printStackTrace();
  191.         }
  192.     }
  193.      
  194.     public static void main(String[] args) throws Exception {
  195.         String testPath = "E:\javaCodes\svn\SmartCampus\Trunk\smartcampus.etl.wifi\src\main\resources\WifiConfig.yaml";
  196.         WIFIConfig.init(testPath);
  197.         KafkaOffsetExample.log.warn(WIFIConfig.toStr());
  198.          
  199.         KafkaOffsetExample.run();
  200.     }
  201. }

SparkStreaming 数据处理

根据需要,将流式数据与Hive中的静态数据关联,结果通过Elasticsearch For Hadoop导出到ES集群中。

如果静态数据需要定时更新,可以在创建数据流后,在foreachRDD逻辑中,根据实际情况定期更新静态数据。

调优

由于个人经验较少,处理的数据量不大,以下内容大多是纸上谈兵,仅供参考。

合理的批处理时间(batchDuration)

  • 几乎所有的Spark Streaming调优文档都会提及批处理时间的调整,在StreamingContext初始化的时候,有一个参数便是批处理时间的设定。
  • 如果这个值设置的过短,即个batchDuration所产生的Job并不能在这期间完成处理,那么就会造成数据不断堆积,最终导致Spark Streaming发生阻塞。
  • 一般对于batchDuration的设置不会小于500ms,因为过小会导致SparkStreaming频繁的提交作业,对整个streaming造成额外的负担。
  • 在平时的应用中,根据不同的应用场景和硬件配置,我设在1~10s之间,我们可以根据SparkStreaming的可视化监控界面,观察Total Delay来进行batchDuration的调整,直达SparkStreaming刚刚能及时处理完上一个批处理的数据,这样就是目前情况的最优值。

合理的Kafka拉取量(maxRatePerPartition重要)

spark.streaming.kafka.maxRatePerPartition参数配置指定了每秒每一个topic的每一个分区获取的最大消息数。

对于Spark Streaming消费kafka中数据的应用场景,这个配置是非常关键的。这个参数默认是没有上限的,即kafka当中有多少数据它就会直接全部拉出。而根据生产者写入Kafka的速率以及消费者本身处理数据的速度,同时这个参数需要结合上面的batchDuration,使得每个partition拉取在每个batchDuration期间拉取的数据能够顺利的处理完毕,做到尽可能高的吞吐量,而这个参数的调整可以参考可视化监控界面中的Input Rate和Processing Time。

缓存反复使用的Dstream(RDD)

Spark中的RDD和SparkStreaming中的Dstream,如果被反复的使用,最好利用cache(),将该数据流缓存起来,防止过度的调度资源造成的网络开销。可以参考观察Scheduling Delay参数。

设置合理的GC

长期使用Java的小伙伴都知道,JVM中的垃圾回收机制,可以让我们不过多的关注与内存的分配回收,更加专注于业务逻辑,JVM都会为我们搞定。对JVM有些了解的小伙伴应该知道,在Java虚拟机中,将内存分为了初生代(eden generation)、年轻代(young generation)、老年代(old generation)以及永久代(permanent generation),其中每次GC都是需要耗费一定时间的,尤其是老年代的GC回收,需要对内存碎片进行整理,通常采用标记-清楚的做法。同样的在Spark程序中,JVM GC的频率和时间也是影响整个Spark效率的关键因素。在通常的使用中建议:

  1. 设置年老代为并发收集。
  2. --conf "spark.executor.extraJavaOptions=-XX:+UseConcMarkSweepGC"

设置合理的CPU资源数

CPU的core数量,每个executor可以占用一个或多个core,可以通过观察CPU的使用率变化来了解计算资源的使用情况,例如,很常见的一种浪费是一个executor占用了多个core,但是总的CPU使用率却不高(因为一个executor并不总能充分利用多核的能力),这个时候可以考虑让么个executor占用更少的core,同时worker下面增加更多的executor,或者一台host上面增加更多的worker来增加并行执行的executor的数量,从而增加CPU利用率。

但是增加executor的时候需要考虑好内存消耗,因为一台机器的内存分配给越多的executor,每个executor的内存就越小,以致出现过多的数据spill over甚至out of memory的情况。

设置合理的parallelism

partition和parallelism,partition指的就是数据分片的数量,每一次task只能处理一个partition的数据,这个值太小了会导致每片数据量太大,导致内存压力,或者诸多executor的计算能力无法利用充分;但是如果太大了则会导致分片太多,执行效率降低。在执行action类型操作的时候(比如各种reduce操作),partition的数量会选择parent RDD中最大的那一个。而parallelism则指的是在RDD进行reduce类操作的时候,默认返回数据的paritition数量(而在进行map类操作的时候,partition数量通常取自parent RDD中较大的一个,而且也不会涉及shuffle,因此这个parallelism的参数没有影响)。所以说,这两个概念密切相关,都是涉及到数据分片的,作用方式其实是统一的。通过spark.default.parallelism可以设置默认的分片数量,而很多RDD的操作都可以指定一个partition参数来显式控制具体的分片数量。 在SparkStreaming+kafka的使用中,我们采用了Direct连接方式,前文阐述过Spark中的partition和Kafka中的Partition是一一对应的,我们一般默认设置为Kafka中Partition的数量。

使用高性能的算子

这里参考了美团技术团队的博文,并没有做过具体的性能测试,其建议如下:

  • 使用reduceByKey/aggregateByKey替代groupByKey

  • 使用mapPartitions替代普通map

  • 使用foreachPartitions替代foreach

  • 使用filter之后进行coalesce操作

  • 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

  • 使用Kryo优化序列化性能 这个优化原则我本身也没有经过测试,但是好多优化文档有提到,这里也记录下来。 在Spark中,主要有三个地方涉及到了序列化:

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。

  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

对于这三种出现序列化的地方,我们都可以通过使用Kryo序列化类库,来优化序列化和反序列化的性能。Spark默认使用的是Java的序列化机制,也就是ObjectOutputStream/ObjectInputStream API来进行序列化和反序列化。但是Spark同时支持使用Kryo序列化库,Kryo序列化类库的性能比Java序列化类库的性能要高很多。
官方介绍,Kryo序列化机制比Java序列化机制,性能高10倍左右。Spark之所以默认没有使用Kryo作为序列化类库,是因为Kryo要求最好要注册所有需要进行序列化的自定义类型,因此对于开发者来说,这种方式比较麻烦。

以下是使用Kryo的代码示例,我们只要设置序列化类,再注册要序列化的自定义类型即可(比如算子函数中使用到的外部变量类型、作为RDD泛型类型的自定义类型等):

  1. // 创建SparkConf对象。
  2. val conf = new SparkConf().setMaster(...).setAppName(...)
  3. // 设置序列化器为KryoSerializer。
  4. conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  5. // 注册要序列化的自定义类型。
  6. conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))

参考

原文地址:https://www.cnblogs.com/yangcx666/p/8723704.html