springboot整合kafka消息队列

一、概述

消息队列,我们“窥探”已久,终于将kafka集成到项目springboot项目里面了,这里记录下操作流程。知识的回顾;

二、kafka服务器的安装

服务端下载地址 ,Linux下,我选择安装最新的版本2.13,但是window系统下 ,该版本无法启动,只能选择安装kafka_2.11-2.0.0.tgz;

 Linux和window系统下解压该文件

Linux进入cmd启动命令:zk端口默认2181,kafka默认端口9092

./zookeeper-server-start.sh -daemon ../config/zookeeper.properties 

./kafka-server-start.sh -daemon ../config/server.properties  

window进入cmd启动命令cmd:

./window/zookeeper-server-start.sh  ../config/zookeeper.properties 

./window/kafka-server-start.sh -daemon ../config/server.properties  

三、java代码

由于我的项目是springboot版本是1.5.2.RELEASE,kafka的版本只能选择spring-kafka版本1.2.1.RELEASE,否则版本冲突;

<dependency>                    
   <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>1.2.1.RELEASE</version>
</dependency>

spring boot 项目配置

#制定kafka代理地址
spring.kafka.bootstrap-servers=localhost:9092
#Kafkaf--producer---消息发送失败重试次数
spring.kafka.producer.retries=0
#每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
#每次批量发送消息的缓冲区大小
spring.kafka.producer.buffer-memory=335554432
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#Kafkaf--consumer  =======================
# 指定默认消费者group id:commit手动提交
spring.kafka.consumer.group-id=user-log-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.listener.ack-mode=manual_immediate

Sprintboot代码编写

producter代码

package com.szdbgo.sale.invoicemgr.domain.service.kafka;

import com.szdbgo.framework.core.constant.SaleConstant;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
 * Description  发票日志处理
 * Author justin.jia
 * Date 2021/11/27 17:11
 **/
@Component("invoiceProducterService")
public class InvoiceProducterService {

    private static Logger logger = Logger.getLogger(InvoiceProducterService.class);


    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;


    //发票上传信息查询
    public void invoiceUploadSend(String key,String value) {
        logger.info("Kafka接口准备上传发送消息为发票ID:"+ key+"****"+value);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(SaleConstant.TOPIC_INVOICE_UPLOAD,value);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {            //发送失败的处理
                logger.error(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息失败:" + throwable.getMessage());
            }
            @Override
            public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                //成功的处理
                logger.info(SaleConstant.TOPIC_INVOICE_UPLOAD + " - 生产者 发送消息成功:" + stringObjectSendResult.toString());
            }
        });
    }
}

customer代码

package com.szdbgo.sale.invoicemgr.domain.service.kafka;

import com.szdbgo.framework.core.constant.SaleConstant;
import com.szdbgo.framework.core.utils.common.CommonStringUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;

/**
 * Description  发票上传消费者
 * Author justin.jia
 * Date 2021/11/27 17:12
 **/
@Component("invoiceUploadCustomerService")
public class InvoiceUploadCustomerService {

    private static Logger logger = Logger.getLogger(InvoiceUploadCustomerService.class);//kafka的监听器
    @KafkaListener(topics = SaleConstant.TOPIC_INVOICE_UPLOAD)
    public void invoiceUploadCmd(ConsumerRecord<String, String> record, Acknowledgment ack) {
        //手动提交offset
        ack.acknowledge();
        String value = record.value();
        logger.info("***********接受数据,开始上传发票,发票ID:"+value);
        try{
            if(CommonStringUtils.isNotEmpty(value)) {
                //do something
            }
        }
        catch (Exception exception){
            logger.error("发票上传处理失败");
        }
        finally {

        }
    }

}
原文地址:https://www.cnblogs.com/xibei666/p/15627910.html