Springboot中使用kafka

注:kafka消息队列默认采用配置消息主题进行消费,一个topic中的消息只能被同一个组(groupId)的消费者中的一个消费者消费。

1.在pom.xml依赖下新添加一下kafka依赖ar包

<!--kafka-->
<dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
   <version>1.1.1.RELEASE</version>
</dependency>
<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka_2.10</artifactId>
   <version>0.10.0.1</version>
</dependency>


2.在application.properties增加配置:

#原始数据kafka读取
kafka.consumer.servers=IP:9092,IP:9092(kafka消费集群ip+port端口)
kafka.consumer.enable.auto.commit=true(是否自动提交)
kafka.consumer.session.timeout=20000(连接超时时间)
kafka.consumer.auto.commit.interval=100
kafka.consumer.auto.offset.reset=latest(实时生产,实时消费,不会从头开始消费)
kafka.consumer.topic=result(消费的topic)
kafka.consumer.group.id=test(消费组)
kafka.consumer.concurrency=10(设置消费线程数)

#协议转换后存储kafka
kafka.producer.servers=IP:9092,IP:9092(kafka生产集群ip+port端口)
kafka.producer.topic=result(生产的topic)
kafka.producer.retries=0
kafka.producer.batch.size=4096
kafka.producer.linger=1
kafka.producer.buffer.memory=40960


3.生产者配置类:

package com.mapbar.track_storage.config;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

/**
* kafka生产配置
* @author Lvjiapeng
*
*/
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.producer.servers}")
private String servers;
@Value("${kafka.producer.retries}")
private int retries;
@Value("${kafka.producer.batch.size}")
private int batchSize;
@Value("${kafka.producer.linger}")
private int linger;
@Value("${kafka.producer.buffer.memory}")
private int bufferMemory;

public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return props;
}

public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<String, String>(producerFactory());
}
}
View Code

4.消费者配置类:

package com.mapbar.track_storage.config;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
* kafka消费者配置
* @author Lvjiapeng
*
*/
@Configuration
@EnableKafka
public class KafkaConsumerConfig {

@Value("${kafka.consumer.servers}")
private String servers;
@Value("${kafka.consumer.enable.auto.commit}")
private boolean enableAutoCommit;
@Value("${kafka.consumer.session.timeout}")
private String sessionTimeout;
@Value("${kafka.consumer.auto.commit.interval}")
private String autoCommitInterval;
@Value("${kafka.consumer.group.id}")
private String groupId;
@Value("${kafka.consumer.auto.offset.reset}")
private String autoOffsetReset;
@Value("${kafka.consumer.concurrency}")
private int concurrency;

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(concurrency);
factory.getContainerProperties().setPollTimeout(1500);
return factory;
}

public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}


public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
return propsMap;
}
/**
* kafka监听
* @return
*/
@Bean
public RawDataListener listener() {
return new RawDataListener();
}

}
View Code

5.测试生产者:

package com.mapbar.track_storage.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

@RequestMapping(value = "/kafka")
@Controller
public class ProducerController {
@Autowired
private KafkaTemplate kafkaTemplate;

@RequestMapping(value = "/producer",method = RequestMethod.GET)
public void consume(HttpServletRequest request, HttpServletResponse response) throws IOException{
String value = "{"code":200,"dataVersion":"17q1","message":"","id":"364f79f28eea48eefeca8c85477a10d3","source":"didi","tripList":[{"subTripList":[{"startTimeStamp":1519879598,"schemeList":[{"distance":0.0,"ids":"94666702,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879598,"subTripId":0},{"startTimeStamp":1519879727,"schemeList":[{"distance":1395.0,"ids":"94666729,7298838,7291709,7291706,88613298,88613297,7297542,7297541,94698785,94698786,94698778,94698780,94698779,94698782,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879812,"subTripId":1},{"startTimeStamp":1519879836,"schemeList":[{"distance":0.0,"ids":"54123007,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879904,"subTripId":2},{"startTimeStamp":1519879959,"schemeList":[{"distance":0.0,"ids":"54190443,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519879959,"subTripId":3},{"startTimeStamp":1519880088,"schemeList":[{"distance":2885.0,"ids":"94698824,94698822,94698789,94698786,54123011,54123012,54123002,94698763,94698727,94698722,94698765,54123006,54123004,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880300,"subTripId":4},{"startTimeStamp":1519880393,"schemeList":[{"distance":2398.0,"ids":"7309441,7303680,54123061,54123038,7309478,7309477,94698204,94698203,94698273,94698274,94698288,94698296,94698295,94698289,94698310,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519880636,"subTripId":5},{"startTimeStamp":1519881064,"schemeList":[{"distance":35.0,"ids":"7309474,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881204,"subTripId":6},{"startTimeStamp":1519881204,"schemeList":[{"distance":28.0,"ids":"7309476,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881266,"subTripId":7},{"startTimeStamp":1519881291,"schemeList":[{"distance":463.0,"ids":"7303683,","schemeId":0,"linkList":[{"score":72,"distance":1,"gpsList":[{"origLonLat":"116.321343,40.43242","grabLonLat":"112.32312,40.32132","timestamp":1515149926000}]}]}],"endTimeStamp":1519881329,"subTripId":8}],"startTimeStamp":1519879350,"unUseTime":1201,"totalTime":2049,"endTimeStamp":1519881399,"tripId":0}]}";
for (int i = 1; i<=500; i++){
kafkaTemplate.send("result",value);
}
}
}
View Code

6.测试消费者:

import net.sf.json.JSONObject;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.util.List;

/**
* kafka监听
* @author shangzz
*
*/
@Component
public class RawDataListener {
Logger logger=Logger.getLogger(RawDataListener.class);
@Autowired
private MatchRoadService matchRoadService;

/**
* 实时获取kafka数据(生产一条,监听生产topic自动消费一条)
* @param record
* @throws IOException
*/
@KafkaListener(topics = {"${kafka.consumer.topic}"})
public void listen(ConsumerRecord<?, ?> record) throws IOException {
String value = (String) record.value();
System.out.println(value);
}

}
View Code


总结:

         ①  生产者环境类配置好以后,@Autowired自动注入KafkaTemplate类,使用send方法生产消息

         ②  消费者环境类配置好以后,方法头前使用@KafkaListener(topics = {"${kafka.consumer.topic}"})注解监听topic并传入ConsumerRecord<?, ?> record对象即可自动消费topic

         ③  相关kafka配置只需在application.properties照葫芦画瓢添加,修改或者删除配置并在环境配置类中做出相应修改即可

二:怎么实现让一个topic可以让不同group消费呢

goupid不要用配置文件配置的方式,细心的话,会发现@KafkaListener 注解,里面有一个containerFactory参数,就是让你指定容器工厂的

栗子:

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

@Configuration
public class KafkaConsumerConfig {

private String brokers = "192.168.52.130:9092,192.168.52.131:9092,192.168.52.133:9092";

private String group1 = "test1";
private String group2 = "test2";

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory1() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory1());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory2() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(consumerFactory2());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(4000);
return factory;
}

public Map<String, Object> getCommonPropertis() {
Map<String, Object> properties = new HashMap<String, Object>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return properties;
}


public ConsumerFactory<String, String> consumerFactory1() {
Map<String, Object> properties = getCommonPropertis();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group1);
return new DefaultKafkaConsumerFactory<String, String>(properties);
}

public ConsumerFactory<String, String> consumerFactory2() {
Map<String, Object> properties = getCommonPropertis();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, group2);
return new DefaultKafkaConsumerFactory<String, String>(properties);
}
}
View Code

最后,在@KafkaListener 中指定容器名称

@KafkaListener(id="test1",topics = "test-topic", containerFactory="kafkaListenerContainerFactory1")
@KafkaListener(id="test2",topics = "test-topic", containerFactory="kafkaListenerContainerFactory2")
高版本 在@KafkaListener 注解中有groupId属性可以设置


--------------------------------------------------------------------------------------------------
转载:https://blog.csdn.net/lv_1093964643/article/details/83177280

原文地址:https://www.cnblogs.com/lucas1024/p/9948067.html