Kafka

Kafka添加依赖:

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.11.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.11.0.0</version>
        </dependency>

创建生产者(向Kafka写入数据):

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Component;

import java.util.Properties;
import java.util.ResourceBundle;
import java.util.concurrent.Future;

/**
 * Kafka生产者
 */
@Component
public class CustomProducer {
    private final static Logger logger = Logger.getLogger(CustomProducer.class);

    private static ResourceBundle resource = ResourceBundle.getBundle("application");
    private static KafkaProducer<String, String> producer;
    public final static String TOPIC = "XXX_JSON_TOPIC";

    public void InitCustomProducer() {
        try {
            logger.debug("开始初始化KafKa生产者");
            Properties props = new Properties();
            // Kafka服务端的主机名和端口号
            props.put("bootstrap.servers", resource.getString("kafka.ip")+":"+resource.getString("kafka.port"));
            // 等待所有副本节点的应答
            props.put("acks", "all");
            // 消息发送最大尝试次数e'AAZ
            props.put("retries", 0);
            // 一批消息处理大小
            props.put("batch.size", 16384);
            // 请求延时
            props.put("linger.ms", 1);
            // 发送缓存区内存大小
            props.put("buffer.memory", 33554432);
            // key序列化
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            // value序列化
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            producer = new KafkaProducer<>(props);
            logger.debug("初始化KafKa生产者成功");
        } catch (Exception e) {
            logger.error("Kafka生产者初始化异常:"+e.toString()+"|||"+e.getCause());
        }
    }

    /**
     * 发生数据
     * @param data
     */
    public void SendData(String data){
        if (producer == null){
            InitCustomProducer();
        }
        try {
            logger.debug("入Kafka数据:"+data);
            Future future = producer.send(new ProducerRecord<>(TOPIC,data));
        } catch (Exception e) {
            logger.error("send to kafka is error:"+e.toString()+"|||"+e.getStackTrace());
        }
    }
}

创建Kafka消费者(读取数据):import org.apache.cxf.endpoint.Client;import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.xml.namespace.QName;
import java.util.*;

/**
 * Kafka消费者
 */
@Component
public class CustomConsumer {
    private final static Logger logger = Logger.getLogger(CustomConsumer.class);
    private static ResourceBundle resource = ResourceBundle.getBundle("application");

    @Autowired
    private AsyncTask asyncTask;

    public void InitCustomConsumer(){
        logger.debug("开始初始化KafKa消费者");
        Properties props = new Properties();
        // 定义kakfa 服务的地址,不需要将所有broker指定上
        props.put("bootstrap.servers", resource.getString("kafka.ip")+":"+resource.getString("kafka.port"));
        // 制定consumer group
// 由于同一group.id会导致每次重新启动都会重复读取数据,这里暂时随机分配group.id(原因不明)
props.put("group.id", "test-consumer-group"+System.currentTimeMillis()); //消费规则 //props.put("auto.offset.reset","latest"); // 是否自动确认offset
//自动提交偏移量
props.put("enable.auto.commit", "true"); // 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Collections.singletonList("XXX_JSON_TOPIC")); logger.debug("初始化KafKa消费者成功"); try { Client client = asyncTask.getClient(); QName qName = asyncTask.getQName(); while (true) { // 读取数据,读取超时时间为100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ logger.debug(String.format("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value())); logger.debug("消费数据:"+record.value()); } /*
手动提交偏移量
try { consumer.commitSync(); } catch (Exception e) { logger.error("commit failed", e); }*/ } }catch (Exception e) { logger.error("consumer异常:"+e); }finally { consumer.close(); } } }
原文地址:https://www.cnblogs.com/lijianda/p/11857545.html