java 架构之路(队列)kafka

什么是kafka

kafka是一种高吞吐量的分布式发布订阅消息系统
主要是三个功能

1. 发布和订阅记录的流,类似于消息队列或者企业级消息系统。
2. 以容错的、持久的方式存储记录流
3. 当发生时处理记录流。
复制代码

SpringBoot集成kafka

新建SpringBoot项目

引入kafka

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
复制代码

yml 文件中添加配置

spring:
  kafka:
    # Kafka集群
    bootstrap-servers: xxxxxxx
复制代码

Hello Kafka

生产者

public class KafkaProducer {
    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public void sendMessage(String normalMessage) {
        kafkaTemplate.send("topic", normalMessage);
    }
}
复制代码

消费者

@Component
public class KafkaConsumer {
    // 消费监听
    @KafkaListener(topics = {"topic"})
    public void onMessage1(ConsumerRecord<?, ?> record){
        // 消费的哪个topic、partition的消息,打印出消息内容
        System.out.println("简单消费:"+record.topic()+"-"+record.partition()+"-"+record.value());
    }
}
复制代码

生产者

1、带回调的生产者

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    public void sendMessage(String normalMessage) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate
                    .send("topic", normalMessage);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onSuccess(SendResult<String, String> result) {
                onSendSuccess(result, k, v);
            }

            @Override
            public void onFailure(Throwable ex) {
                System.out.println("发送消息成功:" + result.getRecordMetadata().topic() + "-"
                    + result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
            }
        });  
    }
复制代码

消费者


    public String getPubSubTopic() {
        return "topic";
    }

    public String getSubscriber() {
        return "group";
    }
    @SuppressWarnings({ "rawtypes", "unchecked" })
    @KafkaListener(groupId = "#{__listener.subscriber}", topics = "#{__listener.pubSubTopic.split(',')}", containerFactory = "batchFactory")
    public void listen(ConsumerRecord<String, String> records) {
    
    }
复制代码

属性解释:

① id:消费者ID;

② groupId:消费组ID;

③ topics:监听的topic,可监听多个;

④ topicPartitions:可配置更加详细的监听信息,可指定topic、parition、offset监听。

批量消费者

# 设置批量消费
spring.kafka.listener.type=batch
# 批量消费每次最多消费多少条消息
spring.kafka.consumer.max-poll-records=50
复制代码
public void onMessage(List<ConsumerRecord<?, ?>> records) {}
复制代码
 
原文地址:https://www.cnblogs.com/ming569/p/13693204.html