kafka的客户端操作,admin与producer

 

1.大纲

`  可以构建kafka的java客户段

  了解kafka客户端类型

  掌握kafka客户端的基本操作

二:客户端类型

1.五类客户端类型

  

2.kafka客户端API类型

  AdminClient:允许管理核检测Topic,broker,以及其他的kafka对象

  Producer:发布消息到topic

  Consumer:订阅消息,并处理消息

  Stream:高效的将输入流转换为输出流

  Connectors:从一些源系统或者应用程序中拉取数据到kafka

三:AdminClient API

1.程序

package com.jun.kafka.admin;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.jun.kafka.common.utils.JsonUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.config.ConfigResource;

import java.util.*;
import java.util.concurrent.ExecutionException;

/**
 * admincliet的演示
 */
@Slf4j
public class AdminSample {
    private static final String TOPIC_NAME = "jun-topic";

    public static void main(String[] args) {
//        alterConfig();
        describeTopic();
    }

    /**
     * 设置client
     */
    public static AdminClient adminClient(){
        Properties properties = new Properties();
        // kafka服务器
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
        return AdminClient.create(properties);
    }

    /**
     * 创建topic
     */
    public static void createTopic(){
        AdminClient adminClient = adminClient();
        // 副本因子
        short replicationFactor = 1;
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, replicationFactor);
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        log.info("{}", JsonUtils.toJsonString(topics));
    }

    /**
     * 查询所有的topic
     */
    public static void topicLists(){
        AdminClient adminClient = adminClient();
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        try {
            Set<String> names = listTopicsResult.names().get();
            names.stream().forEach(System.out::println);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }


    /**
     * 查询所有的topic,可以查看内部topic
     */
    public static void topicOptionLists(){
        AdminClient adminClient = adminClient();
        ListTopicsOptions options = new ListTopicsOptions();
        options.listInternal(true);
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        try {
            Set<String> names = listTopicsResult.names().get();
            names.stream().forEach(System.out::println);
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * 删除topic
     */
    public static void deleteTopic(){
        AdminClient adminClient = adminClient();
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
    }

    /**
     * 查询topic的描述
     */
    public static void describeTopic(){
        String NEW_TOPIC_NAME = "caojun-topic";
        AdminClient adminClient = adminClient();
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(NEW_TOPIC_NAME));
        try {
            Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
            Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
            entries.stream().forEach(entry->{
                log.info("key={}", entry.getKey());
                log.info("value={}", entry.getValue());
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * topic的配置信息
     */
    public static void describeConfig(){
        String NEW_TOPIC_NAME = "caojun-topic";
        AdminClient adminClient = adminClient();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        try {
            Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
            configResourceConfigMap.entrySet().stream().forEach(configResourceConfigEntry -> {
                log.info("key={}", configResourceConfigEntry.getKey());
                log.info("value={}", configResourceConfigEntry.getValue());
            });
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    /**
     * topic的配置信息修改
     */
    public static void alterConfig(){
        String NEW_TOPIC_NAME = "caojun-topic";
        AdminClient adminClient = adminClient();
        Map<ConfigResource, Config> configMap= new HashMap<>();
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME);
        Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true")));
        configMap.put(configResource, config);
        AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
    }

    /**
     * 增加partitions
     */
    public static void incrPartitions(){
        String NEW_TOPIC_NAME = "caojun-topic";
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();
        NewPartitions newPartitions = NewPartitions.increaseTo(2);
        partitionsMap.put(NEW_TOPIC_NAME, newPartitions);
        CreatePartitionsResult partitions = adminClient.createPartitions(partitionsMap);
        try {
            Void aVoid = partitions.all().get();

        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }



}

  

四:ProducerClient API

 1.发送模式

  同步发送

  异步发送

  异步发送回调发送

2.异步发送

    /**
     * 异步发送
     */
    public static void producerSend(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 消息对象

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value");

        producer.send(record);
        producer.close();
    }

  效果:‘

  

3.同步发送

    /**
     * 同步发送,异步阻塞的方式
     */
    public static void producerSyncSend(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 消息对象

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-sync", "value-sync");

        Future<RecordMetadata> send = producer.send(record);
        RecordMetadata recordMetadata = null;
        try {
            recordMetadata = send.get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
        log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
        producer.close();
    }

  

4.异步回掉

 /**
     * 异步回调
     */
    public static void producerSendWithCallback(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 消息对象

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-callback", "value-callback");

        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                log.info("回调了");
                log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset());
            }
        });
        producer.close();
    }

  效果:

  

5.原理分析

  KafkaProducer的构造器:

KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) {
        ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer));

        try {
            Map<String, Object> userProvidedConfigs = config.originals();
            this.producerConfig = config;
            this.time = time;
            String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null;
            this.clientId = buildClientId(config.getString("client.id"), transactionalId);
            LogContext logContext;
            if (transactionalId == null) {
                logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId));
            } else {
                logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId));
            }

            this.log = logContext.logger(KafkaProducer.class);
            this.log.trace("Starting the Kafka producer");
            Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId);
       // kafka的监控
            MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags);
            List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId));
            reporters.add(new JmxReporter("kafka.producer"));
            this.metrics = new Metrics(metricConfig, reporters, time);
       // 加载分区器
            this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class);
            long retryBackoffMs = config.getLong("retry.backoff.ms");
       // 初始化系列化
            if (keySerializer == null) {
                  this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class);
                  this.keySerializer.configure(config.originals(), true);
               } else {
                  config.ignore("key.serializer");
                  this.keySerializer = keySerializer;
               }

            if (valueSerializer == null) {
                this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class);
                this.valueSerializer.configure(config.originals(), false);
            } else {
                config.ignore("value.serializer");
                this.valueSerializer = valueSerializer;
            }

            userProvidedConfigs.put("client.id", this.clientId);
            ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false);
            List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class);
            if (interceptors != null) {
                this.interceptors = interceptors;
            } else {
                this.interceptors = new ProducerInterceptors(interceptorList);
            }

            ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
            this.maxRequestSize = config.getInt("max.request.size");
            this.totalMemorySize = config.getLong("buffer.memory");
            this.compressionType = CompressionType.forName(config.getString("compression.type"));
            this.maxBlockTimeMs = config.getLong("max.block.ms");
            this.transactionManager = configureTransactionState(config, logContext, this.log);
            int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log);
            this.apiVersions = new ApiVersions();
       // 初始化计数器
            this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics"));
            List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup"));
            if (metadata != null) {
                this.metadata = metadata;
            } else {
                this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), logContext, clusterResourceListeners, Time.SYSTEM);
                this.metadata.bootstrap(addresses, time.milliseconds());
            }

            this.errors = this.metrics.sensor("errors");
       // 守护线程
            this.sender = this.newSender(logContext, kafkaClient, this.metadata);
            String ioThreadName = "kafka-producer-network-thread | " + this.clientId;
            this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
       // 启动守护线程
            this.ioThread.start();
            config.logUnused();
            AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds());
            this.log.debug("Kafka producer started");
        } catch (Throwable var23) {
            this.close(Duration.ofMillis(0L), true);
            throw new KafkaException("Failed to construct kafka producer", var23);
        }
    }

  

  sender的原理:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }

  

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        try {
            this.throwIfProducerClosed();

            KafkaProducer.ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs);
            } catch (KafkaException var20) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var20);
                }

                throw var20;
            }

            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            byte[] serializedKey;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var19) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var19);
            }

            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var18) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var18);
            }
       // 计算分区,消息具体进入哪个partition
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
            }
       // 组织calllback对象
            Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp);
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                this.transactionManager.failIfNotReadyForSend();
            }
       // 计算批次
            RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);
       // 超出阈值 if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); } if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
         // 发送靠的是守护进程 this.sender.wakeup(); } return result.future; } catch (ApiException var21) { this.log.debug("Exception occurred during message send:", var21); if (callback != null) { callback.onCompletion((RecordMetadata)null, var21); } this.errors.record(); this.interceptors.onSendError(record, tp, var21); return new KafkaProducer.FutureFailure(var21); } catch (InterruptedException var22) { this.errors.record(); this.interceptors.onSendError(record, tp, var22); throw new InterruptException(var22); } catch (BufferExhaustedException var23) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, var23); throw var23; } catch (KafkaException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw var24; } catch (Exception var25) { this.interceptors.onSendError(record, tp, var25); throw var25; } }

  图形示意图:

  

  

6.自定义负载均衡器

package com.jun.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

/**
 * 自定义分区器
 */
public class SamplePartition implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        /*
            key-1
            key-2
            key-3
         */
        String keyStr = key + "";
        String keyInt = keyStr.substring(4);
        System.out.println("keyStr : " + keyStr + "keyInt : " + keyInt);

        int i = Integer.parseInt(keyInt);

        return i % 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }

}

  使用:

    /**
     * 有自定义分区器的发送
     */
    public static void producerSendWithPartition(){
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092");
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        properties.put(ProducerConfig.RETRIES_CONFIG, "0");
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
        properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
        // 分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.jun.kafka.producer.SamplePartition");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // Producer的主对象
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        // 消息对象

        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-2", "value");

        producer.send(record);
        producer.close();
    }

  

7.消费传递保障

  kafka提供了三种传递保障:

    最多一次

    至少一次

    正好一次

  传递依赖于Producer与Consumer的共同实现

  传递保障主要依赖于Producer

  依赖于配置项:

properties.put(ProducerConfig.ACKS_CONFIG, "all");

  

原文地址:https://www.cnblogs.com/juncaoit/p/13407207.html