【Kafka】Producer API

Producer API


Kafka官网文档给了基本格式

地址:http://kafka.apachecn.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

基础模板
 Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer<String, String> producer = new KafkaProducer<>(props);
 for (int i = 0; i < 100; i++)
     producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

实际操作使用
package cn.itcast.kafka.demo1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class MyProducer {
    public static void main(String[] args) {
        Properties props = new Properties();
        //指定kafka服务器地址
        props.put("bootstrap.servers", "node01:9092");
        //消息确认机制
        props.put("acks", "all");
        //重试机制
        props.put("retries", 0);
        //批量发送大小
        props.put("batch.size", 16384);
        //消息延迟
        props.put("linger.ms", 1);
        //消息缓冲区大小
        props.put("buffer.memory", 33554432);
        //定义key和value的序列化
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        //for循环往消息队列发送数据
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord<String, String>("test", "这是第" + i + "条message"));
        }

        producer.close();
    }
}

运行代码就可以在Consumer控制台进行消费
在这里插入图片描述
但是我们可以发现数据是乱序的
这在【Kafka】Kafka简单介绍 讲内部架构的Partition中就有解释:当topic只有一个Partition的时候,Kafka可以保证Consumer消费数据时是有序的,但如果是多个Partition,Kafka则无法做到Consumer有序消费数据

想了解有关kafka数据分区策略的有关知识可以查看文章:【Kafka】数据分区策略

原文地址:https://www.cnblogs.com/zzzsw0412/p/12772445.html