SpringBoot与Kafka整合实现简单分布式消息队列
1、此处只是单纯的梳理一下SpringBoot整合kafka,其他像Zookeeper、kafka等环境的安装就不在详
细说明,kafka安装可参考https://www.cnblogs.com/jhtian/p/13708679.html Zookeeper安装里面也有
相应的链接。
环境说明: Kafka:192.168.232.3:9020
Zookeeper:192.168.232.3:2181
192.168.232.4:2181(master)
192.168.232.5:2181
2、工程目录(生产者-producer)
既然是SpringBoot整合kafka,那肯定是要搭建一个SpringBoot工程目录,SpringBoot的搭建就不在
此处说明,搭建好之后的生产端工程如下
引入相应的依赖到pom.xml文件中去
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>kafka-producer</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>kafka-producer</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 <!--SpringBoot 启动相关的依赖--> 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <!-- 日志文件依赖--> 33 <dependency> 34 <groupId>org.projectlombok</groupId> 35 <artifactId>lombok</artifactId> 36 </dependency> 37 <!-- ## SpringBoot 整合 kafka核心依赖 ##--> 38 <dependency> 39 <groupId>org.springframework.kafka</groupId> 40 <artifactId>spring-kafka</artifactId> 41 </dependency> 42 43 <!-- Springboot 单元测试--> 44 <dependency> 45 <groupId>org.springframework.boot</groupId> 46 <artifactId>spring-boot-starter-test</artifactId> 47 <scope>test</scope> 48 <exclusions> 49 <exclusion> 50 <groupId>org.junit.vintage</groupId> 51 <artifactId>junit-vintage-engine</artifactId> 52 </exclusion> 53 </exclusions> 54 </dependency> 55 </dependencies> 56 57 <build> 58 <plugins> 59 <plugin> 60 <groupId>org.springframework.boot</groupId> 61 <artifactId>spring-boot-maven-plugin</artifactId> 62 </plugin> 63 </plugins> 64 </build> 65 66 </project>
Springboot工程创建好之后会自动生成一个application.properties文件
1 server.servlet.context-path=/producer 2 server.port=8001 3 4 ## Spring 整合 kafka kafka的ip:port 5 spring.kafka.bootstrap-servers=192.168.232.3:9092 6 ## kafka producer 发送消息失败时的重试次数 7 spring.kafka.producer.retries=0 8 ## 批量发送数据的配置 9 spring.kafka.producer.batch-size=16384 10 ## 设置kafka 生产者内存缓存区的大小(32M) 11 spring.kafka.producer.buffer-memory=33554432 12 ## kafka消息的序列化配置 13 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer 14 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer 15 16 # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。 17 # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。 18 # acks=-1: 表示分区leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为producer请求成功。 19 # 这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。 20 21 ## 这个是kafka生产端最核心的配置 22 spring.kafka.producer.acks=1
启动类Application就很简单了
1 package com.tianjh.kafka; 2 3 import org.springframework.boot.SpringApplication; 4 import org.springframework.boot.autoconfigure.SpringBootApplication; 5 6 @SpringBootApplication 7 public class Application { 8 9 public static void main(String[] args) { 10 SpringApplication.run(Application.class, args); 11 } 12 13 }
接下来就是最重要的kafka发送消息的方法 KafkaProducerService.java
1 package com.tianjh.kafka.producer; 2 3 import lombok.extern.slf4j.Slf4j; 4 import org.springframework.beans.factory.annotation.Autowired; 5 import org.springframework.kafka.core.KafkaTemplate; 6 import org.springframework.kafka.support.SendResult; 7 import org.springframework.stereotype.Component; 8 import org.springframework.util.concurrent.ListenableFuture; 9 import org.springframework.util.concurrent.ListenableFutureCallback; 10 11 /** 12 * $KafkaProducerService 13 * @author tianjh 14 * Component 注解交由Spring管理 15 * Slf4j 引入日志 16 */ 17 @Slf4j 18 @Component 19 public class KafkaProducerService { 20 21 @Autowired 22 private KafkaTemplate<String, Object> kafkaTemplate; 23 24 /** 25 * 生产端真正发消息的方法 直接调用kafkaTemplate.send 26 * 方法进行发送 返回ListenableFuture<SendResult<K, V>> 27 * 使用future.addCallback 创建ListenableFutureCallback<SendResult<String, Object>>()对象 28 * 此处需要实现它的onSuccess、onFailure两个方法 29 * @param topic 发到哪个topic上面 30 * @param object 要发送的消息内容 31 */ 32 public void sendMessage(String topic, Object object) { 33 ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object); 34 future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { 35 @Override 36 public void onSuccess(SendResult<String, Object> result) { 37 log.info("Kafka-Producer发放消息成功:" + result.toString()); 38 } 39 40 @Override 41 public void onFailure(Throwable throwable) { 42 log.info("Kafka-Producer发放消息失败:" + throwable.getMessage()); 43 } 44 45 }); 46 } 47 48 }
3、工程目录(消费端-consumer)
工程目录结构和生成端差不多,可以直接拷贝生产端代码进行简单修改
pom.xml文件
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 3 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> 4 <modelVersion>4.0.0</modelVersion> 5 <parent> 6 <groupId>org.springframework.boot</groupId> 7 <artifactId>spring-boot-starter-parent</artifactId> 8 <version>2.1.5.RELEASE</version> 9 <relativePath/> <!-- lookup parent from repository --> 10 </parent> 11 <groupId>com.tianjh</groupId> 12 <artifactId>kafka-consumer</artifactId> 13 <version>0.0.1-SNAPSHOT</version> 14 <name>kafka-consumer</name> 15 <description>Demo project for Spring Boot</description> 16 17 <properties> 18 <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> 19 <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> 20 <java.version>1.8</java.version> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter</artifactId> 27 </dependency> 28 <dependency> 29 <groupId>org.springframework.boot</groupId> 30 <artifactId>spring-boot-starter-web</artifactId> 31 </dependency> 32 <dependency> 33 <groupId>org.projectlombok</groupId> 34 <artifactId>lombok</artifactId> 35 </dependency> 36 <dependency> 37 <groupId>org.springframework.kafka</groupId> 38 <artifactId>spring-kafka</artifactId> 39 </dependency> 40 41 <dependency> 42 <groupId>org.springframework.boot</groupId> 43 <artifactId>spring-boot-starter-test</artifactId> 44 <scope>test</scope> 45 <exclusions> 46 <exclusion> 47 <groupId>org.junit.vintage</groupId> 48 <artifactId>junit-vintage-engine</artifactId> 49 </exclusion> 50 </exclusions> 51 </dependency> 52 </dependencies> 53 54 <build> 55 <plugins> 56 <plugin> 57 <groupId>org.springframework.boot</groupId> 58 <artifactId>spring-boot-maven-plugin</artifactId> 59 </plugin> 60 </plugins> 61 </build> 62 63 </project>
消费端的SpringBoot的配置文件application.properties
1 server.servlet.context-path=/consumer 2 server.port=8002 3 4 spring.kafka.bootstrap-servers=192.168.232.3:9092 5 6 ## consumer 消息的签收机制:手工签收 等于false 表示需要手工签收 7 spring.kafka.consumer.enable-auto-commit=false 8 spring.kafka.listener.ack-mode=manual 9 # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理: 10 # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录) 11 # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录 12 spring.kafka.consumer.auto-offset-reset=earliest 13 ## 序列化配置 14 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer 15 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer 16 17 spring.kafka.listener.concurrency=5
消费端消费消息的具体方法KafkaConsumerService
1 package com.tianjh.kafka.consumer; 2 3 import org.apache.kafka.clients.consumer.Consumer; 4 import org.apache.kafka.clients.consumer.ConsumerRecord; 5 import org.springframework.kafka.annotation.KafkaListener; 6 import org.springframework.kafka.support.Acknowledgment; 7 import org.springframework.stereotype.Component; 8 9 import lombok.extern.slf4j.Slf4j; 10 11 @Slf4j 12 @Component 13 public class KafkaConsumerService { 14 15 @KafkaListener(groupId = "group02", topics = "topic02") 16 public void onMessage(ConsumerRecord<String, Object> record, Acknowledgment acknowledgment, Consumer<?, ?> consumer) { 17 log.info("消费端接收消息: {}", record.value()); 18 // 手工签收机制 19 acknowledgment.acknowledge(); 20 } 21 22 23 }
4、测试
在生产端的src/test/java下新建一个Java测试类,我这儿新建的是ApplicationTests.java
1 package com.tianjh.kafka.test; 2 3 import com.tianjh.kafka.producer.KafkaProducerService; 4 import org.junit.Test; 5 import org.junit.runner.RunWith; 6 import org.springframework.beans.factory.annotation.Autowired; 7 import org.springframework.boot.test.context.SpringBootTest; 8 import org.springframework.test.context.junit4.SpringRunner; 9 10 /** 11 * 测试发现消息 12 */ 13 @RunWith(SpringRunner.class) 14 @SpringBootTest 15 public class ApplicationTests { 16 /** 17 * 依赖注入之前写好的发送 18 * 消息的实现类,调用send方法进行发送 19 */ 20 @Autowired 21 private KafkaProducerService kafkaProducerService; 22 23 @Test 24 public void send() { 25 try { 26 String topic = "topic02"; 27 for (int i = 0; i < 10; i++) { 28 kafkaProducerService.sendMessage(topic, "hello kafka" + i); 29 } 30 Thread.sleep(1000); 31 } catch (InterruptedException e) { 32 e.printStackTrace(); 33 } 34 } 35 }
生产端运行之后控制台打印:
说明消息发送成功,现在就看看消费端有没有进行消费消息,运行之后查看控制台发现收到了消息
虽然消费端收到了消息但是并没有真正的被消费,因为在消费端消费消息的时候注释了手工签收的代码
// acknowledgment.acknowledge();
通过kafka控制台查看 使用如下命令
./kafka-consumer-groups.sh --bootstrap-server 192.168.232.3:9092 --describe --group group02
现在把消费端注释掉的代码放开,确认手工签收 acknowledgment.acknowledge(); 接着再重新运行一下消费端项目
再去kafka使用上诉命令进行查看,如下:
这表明咋们发送的十条消息都被真正的进行消费了。