spring-kafka手动提交offset

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans
 5          http://www.springframework.org/schema/beans/spring-beans.xsd">
 6 
 7 
 8     <bean id="consumerProperties" class="java.util.HashMap">
 9         <constructor-arg>
10             <map>
11                 <entry key="bootstrap.servers" value="${bootstrap.servers}"/>
12                 <!-- 指定消费组名 -->
13                 <entry key="group.id" value="friend-group"/>
14                 <entry key="enable.auto.commit" value="false"/>
15                 <entry key="auto.commit.interval.ms" value="1000"/>
16                 <entry key="session.timeout.ms" value="15000"/>
17                 <entry key="max.poll.records" value="1"/>
18                 <entry key="key.deserializer" value="org.apache.kafka.common.serialization.IntegerDeserializer"/>
19                 <!--<entry key="key.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>-->
20                 <entry key="value.deserializer" value="org.apache.kafka.common.serialization.StringDeserializer"/>
21             </map>
22         </constructor-arg>
23     </bean>
24 
25     <bean id="consumerFactory" class="org.springframework.kafka.core.DefaultKafkaConsumerFactory">
26         <constructor-arg>
27             <ref bean="consumerProperties"/>
28         </constructor-arg>
29     </bean>
30 
31     <!-- 消费消息的服务类 -->
32     <bean id="messageListernerConsumerService" class="com.zhaopin.consumer.ConsumerService"/>
33 
34     <!-- 消费者容器配置信息 -->
35     <bean id="containerProperties" class="org.springframework.kafka.listener.config.ContainerProperties">
36         <constructor-arg value="friend"/>
37         <!--<constructor-arg>
38             <list>
39                 <value>zptopic</value>
40                 <value>ssmk</value>
41                 <value>friend</value>
42             </list>
43         </constructor-arg>-->
44         <property name="messageListener" ref="messageListernerConsumerService"/>
45 
46         <!-- 设置如何提交offset -->
47         <property name="ackMode" value="MANUAL_IMMEDIATE"/>
48     </bean>
49 
50     <!-- 单线程消息监听容器 -->
51     <!--<bean id="messageListenerContainer" class="org.springframework.kafka.listener.KafkaMessageListenerContainer" init-method="doStart">
52         <constructor-arg ref="consumerFactory"/>
53         <constructor-arg ref="containerProperties"/>
54     </bean>-->
55 
56     <!-- 多线程消息监听容器 -->
57     <bean id="messageListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer" init-method="doStart">
58         <constructor-arg ref="consumerFactory"/>
59         <constructor-arg ref="containerProperties"/>
60         <property name="concurrency" value="5"/>
61     </bean>
62 
63 </beans>

消费者监听类实现AcknowledgingMessageListener这个监听器,可以实现手动提交offset:

package com.zhaopin.consumer;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.zhaopin.consumer.dto.FriendRelationDto;
import com.zhaopin.consumer.dto.MessageDto;
import com.zhaopin.consumer.service.FriendRelationService;
import com.zhaopin.pojo.TbPerson;
import com.zhaopin.service.PersonService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by SYJ on 2017/3/21.
 */
@Service
public class ConsumerService implements AcknowledgingMessageListener<Integer, String> {

    private static final Logger logger = LoggerFactory.getLogger(ConsumerService.class);
    private static List<TbPerson> personList = new ArrayList<TbPerson>();
    private static final Integer INSERT_BATCH_COUNT = 50;

    @Autowired
    private PersonService personService;

    @Autowired
    private FriendRelationService friendRelationService;

    /**
     * 消息监听方法
     * @param record
     */
    /*@Override
    public void onMessage(ConsumerRecord<Integer, String> record) {
        logger.info("Before receiving:" + record.toString());
        String value = record.value();
        MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
        try {
            friendRelationService.process(message.getData());
        } catch (IOException e) {
            e.printStackTrace();
        }
        //insert(record);
        //insertBatch(record);
    }*/

    /**
     * 单个TbPerson入库
     * @param record
     */
    public void insert(ConsumerRecord<Integer, String> record){
        String value = record.value();
        TbPerson person = JSON.parseObject(value, TbPerson.class);
        personService.insert(person);
        System.out.println("Single data writing to the database:" + record);
    }

    /**
     * 批量TbPerson入库
     * @param record
     */
    public void insertBatch(ConsumerRecord<Integer, String> record){
        String value = record.value();
        TbPerson person = JSON.parseObject(value, TbPerson.class);
        personList.add(person);
        if (personList.size() == INSERT_BATCH_COUNT) {
            personService.insertBatch(personList);
            System.out.println("Batch data writing to the database:" + personList);
            personList.clear();
        }
    }

    @Override
    public void onMessage(ConsumerRecord<Integer, String> record, Acknowledgment acknowledgment) {
        logger.info("Before receiving:" + record.toString());
        String value = record.value();
        MessageDto<FriendRelationDto> message = JSON.parseObject(value, new TypeReference<MessageDto<FriendRelationDto>>(){});
        try {
            friendRelationService.process(message.getData());
            acknowledgment.acknowledge();//提交offset
        } catch (IOException e) {
            e.printStackTrace();
        }
        //insert(record);
        //insertBatch(record);
    }
}
原文地址:https://www.cnblogs.com/jun1019/p/6628807.html