kafka学习总结014 --- consumer多线程问题

KafkaConsumer是非线程安全的,多线程共享一个KafkaConsumer实例,kafka会有如下异常:java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

public class MyConsumer5 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MyConsumer5.class);

    public static void main(String[] args) throws InterruptedException {
        Consumer<String, String> consumer = KafkaTestUtil.createConsume2("group2");
        consumer.subscribe(Collections.singletonList("topic1"));

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer51: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();

        new Thread(() -> {
            while (true) {
                ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(2));
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    LOGGER.error("consumer52: offset={}, partition={}, value={}", record.offset(), record.partition(), record.value());
                }
            }
        }).start();
    }
}

运行结果:

使用方法可见:https://blog.csdn.net/clypm/article/details/80618036

原文地址:https://www.cnblogs.com/sniffs/p/13203040.html