kafka 学习(五)spring boot + kafka + zookeeper

spring boot + kafka + zookeeper

环境搭建见上一篇

pom文件添加依赖

<!-- kafka -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application.yml 配置如下(简单测试足够)

spring: 
 kafka:
    bootstrap-servers: 10.250.23.213:9092,10.250.23.214:9092,10.250.23.215:9092
    consumer:
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test-consumer-group
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 65536
      buffer-memory: 524288
TestKafkaController.java
package com.jmev.cn.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @Author: shaoxin
 * @Date: 2019/3/29 9:28
 * @Email:airvicii@163.com
 */
@RestController
@RequestMapping("/kafka")
public class TestKafkaController {

    /**
     * 注入kafkaTemplate
     */
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    /**
     * 发送消息的方法
     *
     * @param key  推送数据的key
     * @param data 推送数据的data
     */
    private void send(String key, String data) {
        // topic 名称 key data 消息数据
        kafkaTemplate.send("kafka_test", key, data);

    }

    // test 主题 1 my_test 3

    @RequestMapping("/send")
    public String testKafka() {
        int iMax = 6;
        for (int i = 1; i < iMax; i++) {
            send("key" + i, "data" + i);
        }
        return "success";
    }

    /**
     * 消费者使用日志打印消息
     */
    @KafkaListener(topics = "kafka_test")
    public void receive(ConsumerRecord<?, ?> consumer) {
        System.out.println("topic名称:" + consumer.topic()
                + ",key:" + consumer.key() + ",分区位置:" + consumer.partition()
                + ", 下标" + consumer.offset());
    }
}
原文地址:https://www.cnblogs.com/ShaoXin/p/10621260.html