【慕课网实战】Spark Streaming实时流处理项目实战笔记十二之铭文升级版

铭文一级:

======
Pull方式整合

Flume Agent的编写: flume_pull_streaming.conf

simple-agent.sources = netcat-source
simple-agent.sinks = spark-sink
simple-agent.channels = memory-channel

simple-agent.sources.netcat-source.type = netcat
simple-agent.sources.netcat-source.bind = hadoop000
simple-agent.sources.netcat-source.port = 44444

simple-agent.sinks.spark-sink.type = org.apache.spark.streaming.flume.sink.SparkSink
simple-agent.sinks.spark-sink.hostname = hadoop000
simple-agent.sinks.spark-sink.port = 41414

simple-agent.channels.memory-channel.type = memory

simple-agent.sources.netcat-source.channels = memory-channel
simple-agent.sinks.spark-sink.channel = memory-channel

注意点:先启动flume 后启动Spark Streaming应用程序
flume-ng agent
--name simple-agent
--conf $FLUME_HOME/conf
--conf-file $FLUME_HOME/conf/flume_pull_streaming.conf
-Dflume.root.logger=INFO,console


spark-submit
--class com.imooc.spark.FlumePullWordCount
--master local[2]
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
/home/hadoop/lib/sparktrain-1.0.jar
hadoop000 41414

铭文二级:

Pull方式与Flume对接(常用):

改flume的配置文件,改sink的名称以及必须属性

官网Advanced Source大标题下有路径:Flume Integration Guide

一、导入jar包三个(第二个如果是用maven构建的scala工程则自动有):

 groupId = org.apache.spark
 artifactId = spark-streaming-flume-sink_2.11
 version = 2.2.1
 groupId = org.scala-lang
 artifactId = scala-library
 version = 2.11.8
 groupId = org.apache.commons
 artifactId = commons-lang3
 version = 3.5

二、修改自定义sink:

 agent.sinks = spark                //自起名字
 agent.sinks.spark.type = org.apache.spark.streaming.flume.sink.SparkSink
 agent.sinks.spark.hostname = <hostname of the local machine>        //hadoop000
 agent.sinks.spark.port = <port to listen on for connection from Spark>  //41414
 agent.sinks.spark.channel = memoryChannel   //自起名字

三、将createStream=>改成createPollingStream即可

四、启动顺序:

启动Flume->启动代码->telnet localhost 44444

五、打包到服务器

注释掉.setMaster后面的代码,可先删除sparktrain-1.0.jar,重新打包。

spark-submit
--class com.imooc.spark.FlumePullWordCount
--master local[2]
--packages org.apache.spark:spark-streaming-flume_2.11:2.2.0
/home/hadoop/lib/sparktrain-1.0.jar
hadoop000 41414

(竟然没有--name??--name是指定UI界面的名称)

整合Spark Streaming与Kafka实战:

一、Receiver-based

二、Direct Approch(常用)

Receiver方法(会有数据丢失)

此处选版本:Kafka Integration Guide

Write Ahead Logs (spark1.2版本引入的)

先确定修改的配置文件能用再进行编码:

具体步骤=>

1.先启动zk:./zkServer.sh start

2.启动kafka:./kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties

3.创建topic

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic kafka_streaming_topic

./kafka-topics.sh --list --zookeeper localhost:2181

4.通过控制台测试是否能正常生产与消费

./kafka-console-producer.sh --broker-list localhost:9092 --topic kafka_streaming_topic

./kafka-console-consumer.sh --zookeeper localhost:2181 --topic kafka_streaming_topic

编码(与Flume相类似)=>

1.引入依赖(记得查看maven project是否真的导入了)

groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-8_2.11
version = 2.2.1

2.建KafkaReceiverWordCount类

main方法->最基础的四行代码->引入代码:

val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]) 

->引入数组,含四个数->val Array(zkQuorum,group,topics,numThreads) = args

->判断是否传入四个参数->构建topicMap:

val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap

->topicMap带入KafkaUtils参数

->messages.map(_._2).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

messages即官网代码的kafkaSteam

KafkaUtils

3.打包jar

4.部署到服务器

(一般生产上是不能联网的,所以--packages使用不了,只能下载后用--jars来完成)

原文地址:https://www.cnblogs.com/kkxwz/p/8385409.html