Kafka学习(三)——Java工具类、Springboot集成批量消费、SparkStreaming集成

前言
本次记录全部来自工作学习中,总结和测试。并非生产环境,仅供参考使用!

提示:以下是本篇文章正文内容,下面案例可供参考

一、Java中工具类
常用于测试使用~

1. 添加maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>1.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>1.0.0</version>
</dependency>
 
2.消费者:KafkaConsumerTest
public class KafkaConsumerTest implements Runnable {

private final KafkaConsumer<String, String> consumer;
private ConsumerRecords<String, String> msgList;
private final String topic;
private static final String GROUPID = "groupA";

public KafkaConsumerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
/**
* 组名 不同组名可以重复消费。
*/
props.put("group.id", GROUPID);
/**
* 是否自动提交,默认为true。
*/
props.put("enable.auto.commit", "true");
/**
* 从poll(拉)的回话处理时长。
*/
props.put("auto.commit.interval.ms", "1000");
/**
* 超时时间。
*/
props.put("session.timeout.ms", "30000");
/**
* 一次最大拉取的条数。
*/
props.put("max.poll.records",5000);
/**
* 消费规则,默认earliest 。
* earliest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 。
* latest: 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 。
* none: topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常。
*/
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.consumer.subscribe(Arrays.asList(topic));
}

@Override
public void run() {
int messageNo = 1;
boolean stop = false;
System.out.println("---------开始消费---------");
try {
PrintWriter printWriter = new PrintWriter(new FileWriter("data.txt"));
while (Thread.currentThread().isInterrupted() && !stop) {
msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (ConsumerRecord<String, String> record : msgList) {
//消费100条就打印 ,但打印的数据不一定是这个规律的
System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
printWriter.println(record.value());
messageNo++;
}
}else{
Thread.sleep(1000);
}
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
} finally {
consumer.close();
}
}
public static void main(String args[]) {
KafkaConsumerTest test1 = new KafkaConsumerTest("KAFKA_TEST");
Thread thread1 = new Thread(test1);
thread1.start();
}
}
 
3.生产者:KafkaProducerTest
public class KafkaProducerTest implements Runnable {

private final KafkaProducer<String, String> producer;
private final String topic;
public KafkaProducerTest(String topicName) {
Properties props = new Properties();
props.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
/**
* acks:消息的确认机制,默认值是0。
* acks=0:如果设置为0,生产者不会等待kafka的响应。
* acks=1:这个配置意味着kafka会把这条消息写到本地日志文件中,但是不会等待集群中其他机器的成功响应。
* acks=all:这个配置意味着leader会等待所有的follower同步完成。这个确保消息不会丢失,除非kafka集群中所有机器挂掉。这是最强的可用性保证。
*/
props.put("acks", "all");
/**
* 配置为大于0的值的话,客户端会在消息发送失败时重新发送。
*/
props.put("retries", 0);
/**
* 当多条消息需要发送到同一个分区时,生产者会尝试合并网络请求。这会提高client和生产者的效率。
*/
props.put("batch.size", 16384);
/**
* key值 序列化,指定序列化类
*/
props.put("key.serializer", StringSerializer.class.getName());
/**
* value值 序列化,指定序列化类
*/
props.put("value.serializer", StringSerializer.class.getName());
this.producer = new KafkaProducer<String, String>(props);
this.topic = topicName;
}

@Override
public void run() {
int messageNo = 1;

boolean stop = false;
try {
while (!Thread.currentThread().isInterrupted() && !stop) {
String messageStr="你好,这是第"+messageNo+"条数据";
producer.send(new ProducerRecord<String, String>(topic, "Message", messageStr));
//生产了100条就打印
if(messageNo%100==0){
System.out.println("发送的信息:" + messageStr);
}
//生产1000条就退出
if(messageNo%10000==0){
System.out.println("成功发送了"+messageNo+"条");
break;
}
messageNo++;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}

public static void main(String args[]) {
KafkaProducerTest test = new KafkaProducerTest("KAFKA_TEST");
Thread thread = new Thread(test);
thread.start();
}
}
 
二、 SpringBoot中使用
4.1 引入依赖
compile group: 'org.springframework.kafka' ,name: 'spring-kafka'
1
4.2 application.yml
#kafka配置信息
kafka:

bootstrap-servers: 192.168.44.136:9092,192.168.44.137:9092,192.168.44.138:9092
producer:
batch-size: 16785 #一次最多发送数据量
retries: 1 #发送失败后的重复发送次数
buffer-memory: 33554432 #32M批处理缓冲区
linger: 1
consumer:
auto-offset-reset: latest #最早未被消费的offset earliest
max-poll-records: 3100 #批量消费一次最大拉取的数据量
enable-auto-commit: false #是否开启自动提交
auto-commit-interval: 1000 #自动提交的间隔时间
session-timeout: 20000 #连接超时时间
max-poll-interval: 15000 #手动提交设置与poll的心跳数,如果消息队列中没有消息,等待毫秒后,调用poll()方法。如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
max-partition-fetch-bytes: 15728640 #设置拉取数据的大小,15M
listener:
batch-listener: true #是否开启批量消费,true表示批量消费
concurrencys: 3,6 #设置消费的线程数
poll-timeout: 1500 #只限自动提交,
topics: kafkaTest
group-id: test1-consumer-group
 
4.3 KafkaConfiguration.java
@Configuration
public class KafkaConfiguration {

@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Value("${kafka.producer.retries}")
private Integer retries;
@Value("${kafka.producer.batch-size}")
private Integer batchSize;
@Value("${kafka.producer.buffer-memory}")
private Integer bufferMemory;
@Value("${kafka.producer.linger}")
private Integer linger;

@Value("${kafka.consumer.enable-auto-commit}")
private Boolean autoCommit;

@Value("${kafka.consumer.auto-commit-interval}")
private Integer autoCommitInterval;

@Value("${kafka.consumer.max-poll-records}")
private Integer maxPollRecords;

@Value("${kafka.consumer.auto-offset-reset}")
private String autoOffsetReset;

@Value("#{'${kafka.listener.concurrencys}'.split(',')[0]}")
private Integer concurrency3;

@Value("#{'${kafka.listener.concurrencys}'.split(',')[1]}")
private Integer concurrency6;

@Value("${kafka.listener.poll-timeout}")
private Long pollTimeout;

@Value("${kafka.consumer.session-timeout}")
private String sessionTimeout;

@Value("${kafka.listener.batch-listener}")
private Boolean batchListener;

@Value("${kafka.consumer.max-poll-interval}")
private Integer maxPollInterval;

@Value("${kafka.consumer.max-partition-fetch-bytes}")
private Integer maxPartitionFetchBytes;

@Bean
public KafkaTemplate<String, String> kafkaTemplateString() {
return new KafkaTemplate<>(producerFactory());
}

@Bean
public ProducerFactory<String, String> producerFactory() {

Map<String, Object> props = new HashMap<>(7);
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.RETRIES_CONFIG, retries);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}


private ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, maxPartitionFetchBytes);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}


private ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
//批量消费
factory.setBatchListener(batchListener);
//如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
// 如果队列中有消息,立即消费消息,每次消费的消息的多少可以通过max.poll.records配置。
//手动提交无需配置
factory.getContainerProperties().setPollTimeout(pollTimeout);
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
/**
* 并发数3
*
* @return
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener3")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener3() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
factory.setConcurrency(concurrency3);
return factory;
}
/**
* 并发数6
*
* @return
*/
@Bean
@ConditionalOnMissingBean(name = "kafkaBatchListener6")
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaBatchListener6() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = kafkaListenerContainerFactory();
factory.setConcurrency(concurrency6);
return factory;
}
}
 
4.4 ProducerService.java
@Service
public final class ProducerService {
private static final Logger logger = LoggerFactory.getLogger(ProducerService.class);

@Value("${kafka.listener.topics}")
private String topic;

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;


public void sendMessage(String message) {
logger.info(String.format("Producing message: %s", message));
ListenableFuture<SendResult<String, String>> future = this.kafkaTemplate.send(topic, message);
future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {

@Override
public void onSuccess(SendResult<String, String> result) {
logger.info("Sent message=[ {} ] with offset=[ {} ]", message, result.getRecordMetadata().offset());
}

@Override
public void onFailure(Throwable ex) {
logger.info("Unable to send message=[ {} ] due to : {}", message, ex.getMessage());
}
});
}
}

 
4.5 ConsumerService.java
@Service
public final class ConsumerService {
private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);


@KafkaListener(containerFactory = "kafkaBatchListener6",topics = {"#{'${kafka.listener.topics}'.split(',')[0]}"},groupId ="${kafka.listener.topics}" )
public void batchListener(List<ConsumerRecord<?,?>> records,Acknowledgment ack){

try {
logger.info("本次总消息数量:{}",records.size());
List<CusApntClmModel> listModel = new ArrayList<>();
List<MsgEntity> listMsgs = new ArrayList<>();
records.forEach(record -> {
logger.info("消费消息:{}",record.value().toString());
});

} catch (Exception e) {
logger.error("Kafka监听异常"+e.getMessage(),e);
} finally {
ack.acknowledge();//手动提交偏移量
}

}

}
 
三、SparkStreaming集成
1.引入库
compile group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.1.1'
compile group: 'org.apache.spark', name: 'spark-streaming_2.11', version: '2.1.1'
compile group: 'org.apache.spark', name: 'spark-streaming-kafka-0-10_2.11', version: '2.1.1'
 
2.代码
public class StartKafka {
public static void main(String[] args) throws InterruptedException {
SparkConf sparkConf = new SparkConf()
.setAppName("StartKafka")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
//一个batch为10s内的数据
JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
//从kafka中获取数据
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "cdh1:9092,cdh2:9092,cdh3:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupB");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", true);

Collection<String> topics = Arrays.asList("KAFKA_TEST");

final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
jsc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
//打印
stream.print();
Thread.sleep(5000);
//开始执行
jsc.start();
//执行等待
jsc.awaitTermination();
//执行完毕后关闭
jsc.close();

}

总结
本章主要内容:java工具类、springboot集成、sparkstreaming集成使用~


————————————————
版权声明:本文为CSDN博主「笑里笑外~」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
原文链接:https://blog.csdn.net/qq_36488175/article/details/110234262

原文地址:https://www.cnblogs.com/javalinux/p/15060479.html