关于kafka-clients JAVA API的基本使用

首先老规矩, 引入maven依赖

1 <dependency>
2     <groupId>org.apache.kafka</groupId>
3     <artifactId>kafka-clients</artifactId>
4     <version>1.0.0</version>
5 </dependency>

关于kafka-clients的消息生产者: 

 1 @Slf4j
 2 public class KafkaProducerClient {
 3 
 4     public static void pushMsg(String msg) throws Exception {
 5         Properties props = new Properties();
 6         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);
 7         props.put("acks", "0");
 8         props.put("retries", 0);
 9         props.put("batch.size", 16384);
10         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
11         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
12         Producer producer = new KafkaProducer(props);
13         ProducerRecord<String, String> record = new ProducerRecord<>(KafkaConstant.KAFKA_TOPIC_NAME, 0, "123", msg);
14         producer.send(record, new Callback() {
15             @Override
16             public void onCompletion(RecordMetadata metadata, Exception e) {
17                 if (e != null) {
18                     e.printStackTrace();
19                 }
20                 log.info("pushMsg of msg: {}, metadata: {}", msg, metadata);
21             }
22         });
23         producer.close();
24     }
25     
26 }

关于kafka-clients的消息消费者

 1 @Slf4j
 2 public class KafkaConsumerClient extends Thread {
 3     
 4     private KafkaConsumerClient() {
 5     }
 6 
 7     /**
 8      * 初始化consumer
 9      */
10     public void initKafkaConsumer () {
11         log.info("init Kafka Consumer");
12         new KafkaConsumerClient().start();
13     }
14     
15     @Override
16     public void run() {
17         Properties props = new Properties();
18 
19         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);
20         props.put("group.id", "1");
21         props.put("enable.auto.commit", "true");
22         props.put("auto.commit.interval.ms", "1000");
23         props.put("session.timeout.ms", "30000");
24         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
25         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
26 
27         Consumer<String, String> consumer = new KafkaConsumer<>(props);
28         consumer.subscribe(Arrays.asList(KafkaConstant.KAFKA_TOPIC_NAME));
29         consumer.seekToBeginning(new ArrayList<>());
30 
31         // ===== 拿到所有的topic ===== //
32         Map<String, List<PartitionInfo>> listTopics = consumer.listTopics();
33         Set<Map.Entry<String, List<PartitionInfo>>> entries = listTopics.entrySet();
34 
35         while (true) {
36             ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
37             for(ConsumerRecord<String, String> record : records) {
38                 System.out.println("[fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value() + "]");
39             }
40         }
41     }
42 }

我们需要在项目启动的时候将消费者consumer启动起来

1 <bean id="initKafkaConsumer" class="com.dywl.zhoushan.kafka.KafkaConsumerClient" init-method="initKafkaConsumer"></bean>

然后调用生产者producer时, 消费者consumer就能拿到消息

如: 

 1 @Override
 2 public void pushMsgById(Long id) throws Exception {
 3     User user = new User();
 4     user.setId(id);
 5     user.setUsername("test11111111");
 6     user.setPassword("test22222222");
 7     String str = JsonUtil.toCompactJsonString(user);
 8     log.info("pushMsgById is user: {}", str);
 9     KafkaProducerClient.pushMsg(str);
10 }

得到结果: 

原文地址:https://www.cnblogs.com/yanwu0527/p/9083386.html