Spark-stream,kafka结合

先列参考文献:

Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher):http://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html

kafka(Java Client端Producer API):http://kafka.apache.org/documentation/#producerapi

版本: 

spark:2.1.1
scala:2.11.12
kafka运行版本:2.3.0
spark-streaming-kafka-0-10_2.11:2.2.0

开发环境:

  3台虚拟机部署kafka,域名分别为coo1、coo2、coo3,部署版本如上,zookeeper版本3.4.7

  在kafka上创建topic:xzrz,replica为3,partition为4;

./kafka-topics.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --create --topic xzrz --replication-factor 3 --partitions 4

  准备代码环境:

  一个是Java端的kafka发送端:KafkaSender.java

  另一个是scale端的spark-streaming-kafka消费端,KafkaStreaming.scala

kafka发送端:

  maven配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>kafkaTest</groupId>
    <artifactId>kafkaTest</artifactId>
    <version>1.0-SNAPSHOT</version>
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.3.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13-beta-2</version>
        <scope>compile</scope>
    </dependency>
</dependencies>
</project>
KafkaSender.java
 1 package gjm;
 2 
 3 import org.apache.kafka.clients.producer.KafkaProducer;
 4 import org.apache.kafka.clients.producer.Producer;
 5 import org.apache.kafka.clients.producer.ProducerConfig;
 6 import org.apache.kafka.clients.producer.ProducerRecord;
 7 import org.junit.Test;
 8 
 9 import java.util.Properties;
10 import java.util.concurrent.ExecutionException;
11 
12 public class KafkaSender {
13     @Test
14     public void producer() throws InterruptedException, ExecutionException {
15         Properties props = new Properties();
16         props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
17         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
18         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "coo1:9092,coo2:9092,coo3:9092");
19 //        props.put(ProducerConfig.BATCH_SIZE_CONFIG,"1024");
20 //        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,"0");
21         //配置kafka语义exacts once语义
22         props.put("acks", "all");
23         props.put("enable.idempotence", "true");
24         Producer<Integer, String> kafkaProducer = new KafkaProducer<Integer, String>(props);
25         for (int j = 0; j < 1; j++)
26             for (int i = 0; i < 100; i++) {
27                 ProducerRecord<Integer, String> message = new ProducerRecord<Integer, String>("xzrz", "{wo|2019-12-12|1|2|0|5}");
28                 kafkaProducer.send(message);
29             }
30         //这个flush和close一定要写,类似于流操作
31         //因为kafka自带betch和buffer,如果没有这两行代码,一是浪费资源,二是有可能消息没有发送到kafka中,依旧保留在本地betch中
32         kafkaProducer.flush();
33         kafkaProducer.close();
34     }
35 }

kafka消费端-->sparkstreaming-kafka-->KafkaStreaming.scala代码:

  maven:

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <project xmlns="http://maven.apache.org/POM/4.0.0"
 3          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 5     <modelVersion>4.0.0</modelVersion>
 6 
 7     <groupId>sparkMVN</groupId>
 8     <artifactId>sparkMVN</artifactId>
 9     <version>1.0-SNAPSHOT</version>
10 
11 
12     <properties>
13         <spark.version>2.1.1</spark.version>
14         <hadoop.version>2.7.3</hadoop.version>
15         <hbase.version>0.98.17-hadoop2</hbase.version>
16     </properties>
17 <dependencies>
18 
19     <dependency>
20         <groupId>junit</groupId>
21         <artifactId>junit</artifactId>
22         <version>4.12</version>
23     </dependency>
24 
25     <dependency>
26         <groupId>org.apache.spark</groupId>
27         <artifactId>spark-core_2.11</artifactId>
28         <version>${spark.version}</version>
29     </dependency>
30     <dependency>
31         <groupId>org.apache.spark</groupId>
32         <artifactId>spark-sql_2.11</artifactId>
33         <version>${spark.version}</version>
34     </dependency>
35     <dependency>
36         <groupId>org.apache.spark</groupId>
37         <artifactId>spark-streaming_2.11</artifactId>
38         <version>${spark.version}</version>
39         <!--这行在local模式中,一定要注销,否则会导致找不到spark context类的异常-->
40         <!--<scope>provided</scope>-->
41     </dependency>
42     <dependency>
43         <groupId>org.apache.spark</groupId>
44         <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
45         <version>2.2.0</version>
46     </dependency>
47 
48     <!-- hadoop -->
49     <dependency>
50         <groupId>org.apache.hadoop</groupId>
51         <artifactId>hadoop-client</artifactId>
52         <version>${hadoop.version}</version>
53     </dependency>
54     <dependency>
55         <groupId>org.apache.hadoop</groupId>
56         <artifactId>hadoop-common</artifactId>
57         <version>${hadoop.version}</version>
58     </dependency>
59     <dependency>
60         <groupId>org.apache.hadoop</groupId>
61         <artifactId>hadoop-hdfs</artifactId>
62         <version>${hadoop.version}</version>
63     </dependency>
64 
65     <!--hbase-->
66     <dependency>
67         <groupId>org.apache.hbase</groupId>
68         <artifactId>hbase-client</artifactId>
69         <version>${hbase.version}</version>
70     </dependency>
71     <dependency>
72         <groupId>org.apache.hbase</groupId>
73         <artifactId>hbase-server</artifactId>
74         <version>${hbase.version}</version>
75     </dependency>
76 </dependencies>
77 </project>
KafkaStreaming.scala代码:
 1 package gjm.sparkDemos
 2 
 3 import org.apache.kafka.common.serialization.StringDeserializer
 4 import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
 5 import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
 6 import org.apache.spark.streaming.kafka010.{CanCommitOffsets, HasOffsetRanges, KafkaUtils}
 7 import org.apache.spark.streaming.{Seconds, StreamingContext}
 8 import org.apache.spark.{SparkConf, SparkContext}
 9 import org.slf4j.LoggerFactory
10 
11 object KafkaStreaming {
12   def main(args: Array[String]): Unit = {
13     val LOG = LoggerFactory.getLogger(KafkaStreaming.getClass)
14     LOG.info("Streaming start----->")
15 
16     val conf = new SparkConf().setMaster("local[6]")//这里设置消费kafka的线程数为6,看看会有什么情况
17       .setAppName("KafkaStreaming")
18     val sc = new SparkContext(conf)
19     val ssc = new StreamingContext(sc, Seconds(3))
20     val topics = Array("xzrz")
21     val kafkaParams = Map[String, Object](
22       "bootstrap.servers" -> "coo1:9092,coo2:9092,coo3:9092",
23       "key.deserializer" -> classOf[StringDeserializer],
24       "value.deserializer" -> classOf[StringDeserializer],
25       "group.id" -> "fwjkcx",
26       "auto.offset.reset" -> "earliest",
27       "enable.auto.commit" -> (false: java.lang.Boolean)
28       //      "heartbeat.interval.ms" -> (90000: java.lang.Integer),
29       //      "session.timeout.ms" -> (120000: java.lang.Integer),
30       //      "group.max.session.timeout.ms" -> (120000: java.lang.Integer),
31       //      "request.timeout.ms" -> (130000: java.lang.Integer),
32       //      "fetch.max.wait.ms" -> (120000: java.lang.Integer)
33     )
34 
35     val stream = KafkaUtils.createDirectStream[String, String](
36       ssc,
37       PreferConsistent,
38       Subscribe[String, String](topics, kafkaParams)
39     )
40     LOG.info("Streaming had Created----->")
41     LOG.info("Streaming Consuming msg----->")
42     stream.foreachRDD { rdd =>
43       val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
44       rdd.foreachPartition(recordIt => {
45         for (record <- recordIt) {
46           LOG.info("Message recode info : Topics-->{},partition-->{}, checkNum-->{}, offset-->{}, value-->{}", record.topic(), record.partition().toString, record.checksum().toString, record.offset().toString, record.value())
47         }
48       })
49       // some time later, after outputs have completed
50       stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
51     }
52     ssc.start()
53     ssc.awaitTermination()
54   }
55 }

验证测试:

1、使用发送端发送100条消息;
2、启动kafka自带的consumer消费端,group id为test;
sh kafka-console-consumer.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --topic xzrz --from-beginning --group test
3、启动spark-stream-kafka,在代码中已经设置流的间隔时间为每3s一次;
4、使用kafka自带的group消费情况查询消费情况:
1 ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group test
2 ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group fwjkcx
结果:
1、首先是测试的test消费端:消费情况
[root@coo3 bin]# ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group test

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
test            xzrz            0          25              25              0               consumer-1-4adfdb85-45ef-40a5-9127-7bb6239e0e29 /192.168.0.217  consumer-1
test            xzrz            1          25              25              0               consumer-1-4adfdb85-45ef-40a5-9127-7bb6239e0e29 /192.168.0.217  consumer-1
test            xzrz            2          25              25              0               consumer-1-4adfdb85-45ef-40a5-9127-7bb6239e0e29 /192.168.0.217  consumer-1
test            xzrz            3          25              25              0               consumer-1-4adfdb85-45ef-40a5-9127-7bb6239e0e29 /192.168.0.217  consumer-1
观察发现,4个分区同一个消费者线程,一共消费了100条。
2、spark-streaming的消费端:消费情况
[root@coo3 bin]# ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group fwjkcx

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
fwjkcx          xzrz            0          25              25              0               consumer-1-0cca92be-5970-4030-abd1-b8552dea9718 /192.168.0.60   consumer-1
fwjkcx          xzrz            1          25              25              0               consumer-1-0cca92be-5970-4030-abd1-b8552dea9718 /192.168.0.60   consumer-1
fwjkcx          xzrz            2          25              25              0               consumer-1-0cca92be-5970-4030-abd1-b8552dea9718 /192.168.0.60   consumer-1
fwjkcx          xzrz            3          25              25              0               consumer-1-0cca92be-5970-4030-abd1-b8552dea9718 /192.168.0.60   consumer-1
观察发现,4个分区每个分区消费25条,符合正常认知
3、多增加一个实验,现在将spart-stream的local数量改为3,更改消费者组为fwjkcx01,观察消费情况
[root@coo3 bin]# ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group fwjkcx01

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
fwjkcx01        xzrz            0          25              25              0               consumer-1-03542086-c95d-41c2-b199-24a158708b65 /192.168.0.60   consumer-1
fwjkcx01        xzrz            1          25              25              0               consumer-1-03542086-c95d-41c2-b199-24a158708b65 /192.168.0.60   consumer-1
fwjkcx01        xzrz            2          25              25              0               consumer-1-03542086-c95d-41c2-b199-24a158708b65 /192.168.0.60   consumer-1
fwjkcx01        xzrz            3          25              25              0               consumer-1-03542086-c95d-41c2-b199-24a158708b65 /192.168.0.60   consumer-1
发现仍然是4个线程在消费,所以在local位置指定线程数量根本不生效。
4、这时候再发1000条消息,观察group:fwjkcx的消费情况
[root@coo3 bin]# ./kafka-consumer-groups.sh --bootstrap-server coo3:9092,coo2:9092,coo1:9092 --describe --group fwjkcx

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
fwjkcx          xzrz            0          275             275             0               consumer-1-fc238353-1b55-4efa-9c4f-54580ed81b0e /192.168.0.60   consumer-1
fwjkcx          xzrz            1          275             275             0               consumer-1-fc238353-1b55-4efa-9c4f-54580ed81b0e /192.168.0.60   consumer-1
fwjkcx          xzrz            2          275             275             0               consumer-1-fc238353-1b55-4efa-9c4f-54580ed81b0e /192.168.0.60   consumer-1
fwjkcx          xzrz            3          275             275             0               consumer-1-fc238353-1b55-4efa-9c4f-54580ed81b0e /192.168.0.60   consumer-1
一切正常。










原文地址:https://www.cnblogs.com/qfxydtk/p/11662591.html