springboot学习总结(五)集成kafka

(一)pom中引入kafka依赖

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

(二)在配置文件中加入与kafka相关的配置

spring:
  kafka:
    consumer:
      group-id: test111
      enable-auto-commit: true
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      retries: 3
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    bootstrap-servers: localhost:9092

(三)消费类,消费kafka

package com.vincent.config;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaListeners {

    @KafkaListener(topics = {"test"})
    public void roadConditionMessage(ConsumerRecord<?, ?> record) {
        try {
            Long startTime = System.currentTimeMillis();
            log.info("消息开始接收,startTime:" + startTime);
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            System.out.println(kafkaMessage.get());
        } catch (Exception e) {
            log.error("异常信息", e);
        }
    }
}

(四)总结

以上springboot集成kafka已经完成。

原文地址:https://www.cnblogs.com/vincentren/p/10465075.html