kafka生产者

KafkaProducer实例是一个kafka客户端,是线程安全的。

简单用例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.100:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer(props);
        for (int i = 0; i < 100; i++) {
            producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

KafkaProducer实例包含一个缓冲区(socket buffer)和一个后台线程(sender thread,是个Sender实例)。缓冲区中存放还没有传输到kafka服务器的记录,后台线程负责把记录传输到kafka服务器。KafkaProducer实例在使用完之后要close,否则就会有资源泄露。send()方法是异步的,把记录放到缓冲区中后就立即返回。这样的操作使得kafkaProducer可以批量发送记录到服务器,这样更有效率。send()方法有两个重载:

Future<RecordMetadata> send(ProducerRecord record):记录在消费者这边用ProducerRecord实例表示。ProducerRecord有6个成员变量:String topic、Integer partition、Headers headers、K key、V value、Long timestamp。也就是说在发送记录的时候,可以指定要发送到的主题、要发送到的分区、记录的key、记录的value、记录的时间戳,topic和value必须指定,其他是可选的。

Future<RecordMetadata> send(ProducerRecord record, Callback callback):Callback是个回调函数,当记录发送到服务器(producer客户端收到kafka服务器的确认通知)后就会触发此函数。

如果想同步发送消息,只需调用Future<RecordMetadata>实例的get()方法即可。

消息会路由到哪个分区?如果指定分区,则消息会发送到这个指定的分区。如果不指定分区,则由partitioner.class值对应的类的partition()方法决定。partitioner.class值默认是DefaultPartitioner,如果指定了key,则分区和key的hash值有关;如果不指定key,则消息会轮询发送到所有的可用分区。当然,也可以自定义分区的路由策略,具体做法是自定义一个Partitioner接口实现类,并把partitioner.class值配置成这个类的全类名字符串。

可以调用producer的List<PartitionInfo> partitionsFor(String topic)方法获得某主题的分区信息,可以获取一共有多少个分区,每个分区的副本数,每个分区的leader,每个分区的ISR,每个分区的离线副本,这些都是PartitionInfo的成员变量。

可以在ProducerConfig类中找到生产者客户端的所有配置及其解释,常用的有:

1、bootstrap.servers

指定kafka集群的ip和端口,如值是192.168.56.100:9092,192.168.56.101:9092,192.168.56.102:9092,多个ip+端口用逗号分隔。

2、acks、retries、max.in.flight.requests.per.connection

指定producer客户端认为请求完成前需要收到的确认通知个数,值是字符串,可选有"all"、"-1"、"0"、"1"。"all"表示需要收到N个确认通知后才认为请求完成,N是ISR(in-sync replicas)的个数,这种情况下需要等的确认通知数最多,发送消息速度最慢,但也最有保证。如果请求失败了,就会重试,重试次数由retries指定,retries默认值是Integer.MAX_VALUE。"-1"效果和"all"一样。"0"表示producer不会等待任何确认通知就认为请求已经完成了,直接会继续下次请求,这种情况下发送消息速度最快,但没有任何保证。kafka服务器还没说话呢,客户端就说你别说了,我就当发送成功了,哈哈,好叼,其实成不成功没一定。此外,这种情况下,不会重试,哪怕retries值不为0。"1"表示收到1个确认通知后就认为请求已经成功。这1个确认通知是leader的确认通知,这种情况下只能保证leader收到了消息,不能保证其follower也收到了消息。这种情况下,leader宕机可能会导致消息丢失,因为新消息可能还没有拉取到follower,当某个follower变成leader后,新消息就丢了。

在有重试的情况下,可能会影响分区消息的顺序性。如果要保证顺序性,必须设置max.in.flight.requests.per.connection值为1,默认是5。解释是:The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

3、batch.size、linger.ms

producer会为每一个主题的每一个分区都维护一个未发送记录的缓冲区。缓冲区的大小由batch.size指定,单位是byte,默认值是16384,即16K。缓冲区越大,sender线程单次向kafka服务器发送的消息就越多,这样对kafka服务器的请求数就会越少,对kafka服务器的压力就会小,当然缓冲区越大,占用的客户端内存也更多。默认情况下,只要缓冲区中有未发送的记录,就会立即发送。sender线程死循环向kafka服务器发送消息。这是因为默认情况下,linger.ms值为0。我们可以把linger.ms设置成一个大于0的值,如N,这样sender线程每次发送完消息后,就会等候N毫秒,以期望缓冲区有足够多的记录。N毫秒后,或者缓冲区大小达到batch.size后,只要满足任意一个条件,sender线程就会下一次发送消息。当然,在重负载情况下,即使linger.ms值为0,sender线程单次发送的消息也是很多的。

4、buffer.memory、max.block.ms

指定producer客户端缓冲区的总大小,单位是byte,默认值是33554432,即32M。如果消息发送到缓冲区的速度比从缓冲区发送到kafka服务器的速度快(可以通过设置batch.size和linger.ms都足够大来模拟这种情况),那么缓冲区就会逐渐被用完。当缓冲区用完后,send()方法就会阻塞,当阻塞超过max.block.ms后,就会抛出TimeoutException,max.block.ms默认值是60000,即1分钟。

5、key.serializer、value.serializer

指定序列化类,org.apache.kafka.common.serialization.Serializer接口实现类,如org.apache.kafka.common.serialization.StringSerializer、org.apache.kafka.common.serialization.BytesSerializer。

6、compression.type

消息压缩类型,默认不压缩,可选值有"gzip"、"snappy"、"lz4"、"zstd"。压缩的好处就是减少传输的数据量,减轻对网络传输的压力。

7、enable.idempotence、transactional.id

从kafka0.11版本开始,kafkaProducer支持两种额外的模式:幂等生产者和事务生产者。

幂等生产者增强了kafka的传输语义,由至少一次增强为刚好一次(at least once to exactly once),这种情况下重试不会产生重复的消息。为了使用幂等生产者,只需把enable.idempotence设为true即可,默认是false。此时还需要3个关联配置,不然会报org.apache.kafka.common.config.ConfigException,即acks必须为"all"、retries必须大于0、max.in.flight.requests.per.connection必须小于等于5。幂等生产者和普通生产者的编程API是一样的。

事务生产者允许原子性地向多个分区或者主题发送消息。为了使用事务生产者,需要设置transactional.id。注意,设置了transactional.id后,就自动启用了幂等。事务生产者操作的主题的replication.factor最少为3,min.insync.replicas最少为2,在topic创建时指定。此外,该主题的消费者必须配置为只读取已提交的消息(read only committed messages)???

事务可以在单个生产者实例的多个会话

The purpose of the transactional.id is to enable transaction recovery across multiple sessions of a single producer instance. It would typically be derived from the shard identifier in a partitioned, stateful, application.
As such, it should be unique to each producer instance running within a partitioned application.
All the new transactional APIs are blocking and will throw exceptions on failure. 

事务生产者用例:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.OutOfOrderSequenceException;
import org.apache.kafka.common.errors.ProducerFencedException;

import java.util.Properties;

public class TransactionKafkaProducerTest {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.56.100:9092,192.168.56.101:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks", "all");
        props.put("retries", 3);
        props.put("linger.ms", 1000);
        props.put("max.block.ms", 5000);
        props.put("compression.type", "zstd");
        props.put("transactional.id", "my-transactional-id");
        Producer<String, String> producer = new KafkaProducer(props);
        producer.initTransactions();
        try {
            producer.beginTransaction();
            for (int i = 0; i < 100; i++) {
                producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            }
            producer.commitTransaction();
        } catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
            // We can't recover from these exceptions, so our only option is to close the producer and exit.
            producer.close();
        } catch (KafkaException e) {
            // For all other exceptions, just abort the transaction and try again.
            producer.abortTransaction();
        }
//        producer.close();
    }
}

先调用生产者的initTransaction()方法,之后再beginTransaction()开启事务,send(),再调用commitTransaction()提交事务,假如抛KafkaException的话,就调用abortTransaction()中止事务。注意,initTransactions()方法仅可调用一次,且必须在调用其他事务方法之前。beginTransaction()、commitTransaction()是方法对,可以多次调用。

RecordAccumulator

在new KafkaProducer()方法中,会创建一个RecordAccumulator实例并赋值给producer的accumulator成员变量。在producer的send()方法中,会调用accumulator的append()方法,把记录放到对应主题对应分区(TopicPartition)对应的缓冲区队列中(Deque<ProducerBatch>,实现类是java.util.ArrayDeque)。队列里面元素是ProducerBatch实例,producerBatch最大大小由batch.size指定。

Sender、KafkaThread

在new KafkaProducer()方法中,会创建一个后台Sender线程,并封装成kafkaThread后启动。线程名以KafkaProducer.NETWORK_THREAD_PREFIX(kafka-producer-network-thread)开头。Sender线程的run()方法中有个死循环:

while (running) {
    try {
        run(time.milliseconds());
    } catch (Exception e) {
        log.error("Uncaught error in kafka producer I/O thread: ", e);
    }
}

用于把缓冲区中的记录发送至kafka服务器。

原文地址:https://www.cnblogs.com/koushr/p/10947209.html