Spring-Kafka —— KafkaListener手动启动和停止

一、KafkaListener消费

    /**
     * 手动提交监听.
     *
     * @param record 消息记录
     * @param ack    确认实例
     */
    @Override
    @KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING, topics = {"${kafka.app.topic.warning}"}, containerFactory = "ackContainerFactory", groupId = "warning")
    public void ackListener(ConsumerRecord record, Acknowledgment ack) {
        if (LOG.isInfoEnabled()) {
            LOG.info("###################预警ackListener接收到消息###################");
        }

        boolean ackFlag = true;
        long beginTime = System.currentTimeMillis();
        try {
            WarningInfo warningInfo = parseConsumerRecord(record);
            if (null == warningInfo) {
                dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_1, new Object[]{record.topic(), record.value()}));
            } else {
                warningBusinessHandle.doHandle(record, warningInfo);
            }
//      } catch (BusinessException ex) {
//            LOG.error(record.topic() + "消费失败:" + ex.getMessage(), ex);
//            // 业务处理失败(目前暂无此场景),把消息发送至重试主题
//            this.sendRetryTopic(record, this.interceptErrMessage(ex.getMessage()));
        } catch (Exception e) {
            LOG.error("[" + record.topic() + "]消费发生运行时异常:" + e.getMessage(), e);
            ackFlag = false;
            consumerListenerServiceImpl.stopListener(ConsumerConst.LISTENER_ID_WARNING);
            dingTalkService.sendMessage(MessageFormat.format(ConsumerConst.DING_TALK_MSG_2, new Object[]{record.topic()}));
        } finally {
            if (ackFlag) {
                // 手动提交offset
                ack.acknowledge();
            }
            LOG.info("###################预警ackListener处理完消息,耗时" + (System.currentTimeMillis()-beginTime) + "ms ###################");
        }
    }

二、使用KafkaListenerEndpointRegistry实现启动和停止功能

下面参数里面的listenerId值,必须是消费时@KafkaListener注解中指定的id值:@KafkaListener(id = ConsumerConst.LISTENER_ID_WARNING
package com.macaupass.kafka.consumer.service.impl;

import com.macaupass.kafka.consumer.service.KafkaConsumerListenerService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.stereotype.Service;

/**
 * Kafka消费监听服务实现类.
 *
 * @author weixiong.cao
 * @date 2019/7/2
 */
@Service
public class KafkaConsumerListenerServiceImpl implements KafkaConsumerListenerService {

    /**
     * LOG.
     */
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerServiceImpl.class);

    /**
     * registry.
     */
    @Autowired
    private KafkaListenerEndpointRegistry registry;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void startListener(String listenerId) {
        //判断监听容器是否启动,未启动则将其启动
        if (!registry.getListenerContainer(listenerId).isRunning()) {
            registry.getListenerContainer(listenerId).start();
        }
        //项目启动的时候监听容器是未启动状态,而resume是恢复的意思不是启动的意思
        registry.getListenerContainer(listenerId).resume();
        LOG.info(listenerId + "开启监听成功。");
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    @Override
    public void stopListener(String listenerId) {
        registry.getListenerContainer(listenerId).stop();
        LOG.info(listenerId + "停止监听成功。");
    }

}

三、Controller

package com.macaupass.kafka.consumer.controller;

import com.macaupass.kafka.consumer.service.impl.KafkaConsumerListenerServiceImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

import java.util.HashMap;
import java.util.Map;

/**
 * Kafka消费监听Controller.
 *
 * @author weixiong.cao
 * @date 2019/7/2
 */
@Controller
@RequestMapping(value = "/listener")
public class KafkaConsumerListenerController {

    /**
     * LOG.
     */
    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerListenerController.class);

    /**
     * 注入监听服务.
     */
    @Autowired
    private KafkaConsumerListenerServiceImpl kafkaConsumerListenerService;

    /**
     * 开启监听.
     *
     * @param listenerId 监听ID
     */
    @RequestMapping("/start")
    @ResponseBody
    public Map<String, String> startListener(@RequestParam(required=false) String listenerId) {
        if (LOG.isInfoEnabled()) {
            LOG.info("开启监听...listenerId=" + listenerId);
        }

        Map<String, String> retMap = new HashMap<>();
        try {
            kafkaConsumerListenerService.startListener(listenerId);
            retMap.put("respCode", "0000");
            retMap.put("respMsg", "启动成功。");
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            retMap.put("respCode", "0001");
            retMap.put("respMsg", "启动失败:" + e.getMessage());
        }
        return retMap;
    }

    /**
     * 停止监听.
     *
     * @param listenerId 监听ID
     */
    @RequestMapping("/stop")
    @ResponseBody
    public Map<String, String> stopListener(@RequestParam(required=false) String listenerId) {
        if (LOG.isInfoEnabled()) {
            LOG.info("停止监听...listenerId=" + listenerId);
        }

        Map<String, String> retMap = new HashMap<>();
        try {
            kafkaConsumerListenerService.stopListener(listenerId);
            retMap.put("respCode", "0000");
            retMap.put("respMsg", "停止成功。");
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            retMap.put("respCode", "0001");
            retMap.put("respMsg", "停止失败:" + e.getMessage());
        }
        return retMap;
    }

    /**
     * 访问入口.
     */
    @RequestMapping("/index")
    public String index() {
        return "kafka/listener";
    }

}

四、JSP界面

<%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
<html>
<head>
    <title>消费监听管理</title>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
    <link rel="stylesheet" href="../css/common.css">
    <script src="../jquery/jquery-3.3.1.min.js"></script>
    <script type="text/javascript">
        var contextPath = "<%=request.getContextPath() %>";

        /**
         * 开启监听.
         *
         * @param listenerId 监听ID
         */
        function startListener(listenerId) {
            ajaxPostByJson(contextPath + "/listener/start?listenerId=" + listenerId);
        }

        /**
         * 停止监听.
         *
         * @param listenerId 监听ID
         */
        function stopListener(listenerId) {
            ajaxPostByJson(contextPath + "/listener/stop?listenerId=" + listenerId);
        }

        /**
         * ajax请求.
         *
         * @param url 请求url
         */
        function ajaxPostByJson(url) {
            $.ajax({
                type: "POST",
                url: url,
                dataType:"json",
                contentType : 'application/json;charset=utf-8',
                success: function(respData){
                    alert(respData.respMsg);
                },
                error: function(res){
                    alert("系統異常:" + res.responseText);
                }
            });
        }
    </script>
</head>
<body text=#000000 bgColor="#ffffff" leftMargin=0 topMargin=4>
<div id="main">
    <div id="head">
        <dl class="alipay_link">
            <a target="_blank" href=""><span>&nbsp;</span></a>
        </dl>
        <span class="title">Kafka消费手动管理</span>
    </div>
    <div class="cashier-nav">
    </div>
    <form name=query method=post>
        <div id="body" style="clear:left">
            <dl class="content">
                <dd>
                    <span class="new-btn-login-sp">
                            <button class="new-btn-login" type="button" style="text-align:center;" onclick="startListener('listenerIdWarning')">开启【预警】消费</button>
                    </span>
                    <span class="new-btn-login-sp">
                            <button class="new-btn-login" type="button" style="text-align:center;" onclick="stopListener('listenerIdWarning')">停止【预警】消费</button>
                    </span>
                </dd>
            </dl>
        </div>
    </form>
</div>
</body>
</html>

五、功能界面

原文地址:https://www.cnblogs.com/caoweixiong/p/11181386.html