SpringBoot集成Kafka实践

一、准备

1、启动zookeeper

2、启动kafka

3、kafka创建主题。主题名称为:couponTopic

./kafka-topics.sh  --create --zookeeper localhost:2181  --replication-factor 1 --partitions 1 --topic couponTopic

二、生产者工程

1、增加引用

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>1.1.1.RELEASE</version>
        </dependency>

  

2、增加配置

spring:
  kafka:
    bootstrap-servers: 47.xx.xx.120:9092
    consumer:
      group-id:mygroup
    listener:
      concurrency: 4

  

3、服务层增加kafka调用

注入kafkaTemplate

    @Autowired
    public MerchantsServImpl(MerchantsDao merchantsDao,
                             KafkaTemplate<String, String> kafkaTemplate) {
        this.merchantsDao = merchantsDao;
        this.kafkaTemplate = kafkaTemplate;
    }

  

通过kafkaTemplate发送消息。Constants.TEMPLATE_TOPIC为couponTopic

            String passTemplate = JSON.toJSONString(template);
            kafkaTemplate.send(
                    Constants.TEMPLATE_TOPIC,
                    Constants.TEMPLATE_TOPIC,
                    passTemplate
            );

  

三、消费者工程

1、增加Kafka依赖

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
			<version>1.1.1.RELEASE</version>
		</dependency>

  

2、配置连接kafka

spring:
  application:
    name: kafkaConsume
  kafka:
    bootstrap-servers: 4x.xx.xx.xx:9092
    consumer:
      group-id: = mygroup
    listener:
      concurrency: 4

server:
  port: 9527

  

3、接收Kafka消息

主题名称:

public static final String TEMPLATE_TOPIC = "couponTopic";
@Slf4j
@Component
public class ConsumeTemplate {



    @KafkaListener(topics = {Constants.TEMPLATE_TOPIC})
    public void receive(@Payload String passTemplate,
                        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
                        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

        log.info("Consumer Receive PassTemplate: {}", passTemplate);

        PassTemplate pt;

        try {
            pt = JSON.parseObject(passTemplate, PassTemplate.class);
        } catch (Exception ex) {
            log.error("Parse PassTemplate Error: {}", ex.getMessage());
            return;
        }

        ...
    }
}

  

原文地址:https://www.cnblogs.com/linlf03/p/12956675.html