1.大纲
` 可以构建kafka的java客户段
了解kafka客户端类型
掌握kafka客户端的基本操作
二:客户端类型
1.五类客户端类型
2.kafka客户端API类型
AdminClient:允许管理核检测Topic,broker,以及其他的kafka对象
Producer:发布消息到topic
Consumer:订阅消息,并处理消息
Stream:高效的将输入流转换为输出流
Connectors:从一些源系统或者应用程序中拉取数据到kafka
三:AdminClient API
1.程序
package com.jun.kafka.admin; import com.fasterxml.jackson.core.JsonProcessingException; import com.jun.kafka.common.utils.JsonUtils; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.config.ConfigResource; import java.util.*; import java.util.concurrent.ExecutionException; /** * admincliet的演示 */ @Slf4j public class AdminSample { private static final String TOPIC_NAME = "jun-topic"; public static void main(String[] args) { // alterConfig(); describeTopic(); } /** * 设置client */ public static AdminClient adminClient(){ Properties properties = new Properties(); // kafka服务器 properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092"); return AdminClient.create(properties); } /** * 创建topic */ public static void createTopic(){ AdminClient adminClient = adminClient(); // 副本因子 short replicationFactor = 1; NewTopic newTopic = new NewTopic(TOPIC_NAME, 1, replicationFactor); CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic)); log.info("{}", JsonUtils.toJsonString(topics)); } /** * 查询所有的topic */ public static void topicLists(){ AdminClient adminClient = adminClient(); ListTopicsResult listTopicsResult = adminClient.listTopics(); try { Set<String> names = listTopicsResult.names().get(); names.stream().forEach(System.out::println); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } /** * 查询所有的topic,可以查看内部topic */ public static void topicOptionLists(){ AdminClient adminClient = adminClient(); ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); try { Set<String> names = listTopicsResult.names().get(); names.stream().forEach(System.out::println); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } /** * 删除topic */ public static void deleteTopic(){ AdminClient adminClient = adminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME)); } /** * 查询topic的描述 */ public static void describeTopic(){ String NEW_TOPIC_NAME = "caojun-topic"; AdminClient adminClient = adminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(NEW_TOPIC_NAME)); try { Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach(entry->{ log.info("key={}", entry.getKey()); log.info("value={}", entry.getValue()); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } /** * topic的配置信息 */ public static void describeConfig(){ String NEW_TOPIC_NAME = "caojun-topic"; AdminClient adminClient = adminClient(); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME); DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource)); try { Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get(); configResourceConfigMap.entrySet().stream().forEach(configResourceConfigEntry -> { log.info("key={}", configResourceConfigEntry.getKey()); log.info("value={}", configResourceConfigEntry.getValue()); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } /** * topic的配置信息修改 */ public static void alterConfig(){ String NEW_TOPIC_NAME = "caojun-topic"; AdminClient adminClient = adminClient(); Map<ConfigResource, Config> configMap= new HashMap<>(); ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, NEW_TOPIC_NAME); Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true"))); configMap.put(configResource, config); AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap); } /** * 增加partitions */ public static void incrPartitions(){ String NEW_TOPIC_NAME = "caojun-topic"; AdminClient adminClient = adminClient(); Map<String, NewPartitions> partitionsMap = new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(2); partitionsMap.put(NEW_TOPIC_NAME, newPartitions); CreatePartitionsResult partitions = adminClient.createPartitions(partitionsMap); try { Void aVoid = partitions.all().get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }
四:ProducerClient API
1.发送模式
同步发送
异步发送
异步发送回调发送
2.异步发送
/** * 异步发送 */ public static void producerSend(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Producer的主对象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 消息对象 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key", "value"); producer.send(record); producer.close(); }
效果:‘
3.同步发送
/** * 同步发送,异步阻塞的方式 */ public static void producerSyncSend(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Producer的主对象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 消息对象 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-sync", "value-sync"); Future<RecordMetadata> send = producer.send(record); RecordMetadata recordMetadata = null; try { recordMetadata = send.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset()); producer.close(); }
4.异步回掉
/** * 异步回调 */ public static void producerSendWithCallback(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Producer的主对象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 消息对象 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-callback", "value-callback"); producer.send(record, new Callback() { @Override public void onCompletion(RecordMetadata recordMetadata, Exception e) { log.info("回调了"); log.info("partition:{}, offset:{}", recordMetadata.partition(), recordMetadata.offset()); } }); producer.close(); }
效果:
5.原理分析
KafkaProducer的构造器:
KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer, ProducerMetadata metadata, KafkaClient kafkaClient, ProducerInterceptors interceptors, Time time) { ProducerConfig config = new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)); try { Map<String, Object> userProvidedConfigs = config.originals(); this.producerConfig = config; this.time = time; String transactionalId = userProvidedConfigs.containsKey("transactional.id") ? (String)userProvidedConfigs.get("transactional.id") : null; this.clientId = buildClientId(config.getString("client.id"), transactionalId); LogContext logContext; if (transactionalId == null) { logContext = new LogContext(String.format("[Producer clientId=%s] ", this.clientId)); } else { logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", this.clientId, transactionalId)); } this.log = logContext.logger(KafkaProducer.class); this.log.trace("Starting the Kafka producer"); Map<String, String> metricTags = Collections.singletonMap("client-id", this.clientId); // kafka的监控 MetricConfig metricConfig = (new MetricConfig()).samples(config.getInt("metrics.num.samples")).timeWindow(config.getLong("metrics.sample.window.ms"), TimeUnit.MILLISECONDS).recordLevel(RecordingLevel.forName(config.getString("metrics.recording.level"))).tags(metricTags); List<MetricsReporter> reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class, Collections.singletonMap("client.id", this.clientId)); reporters.add(new JmxReporter("kafka.producer")); this.metrics = new Metrics(metricConfig, reporters, time); // 加载分区器 this.partitioner = (Partitioner)config.getConfiguredInstance("partitioner.class", Partitioner.class); long retryBackoffMs = config.getLong("retry.backoff.ms"); // 初始化系列化 if (keySerializer == null) { this.keySerializer = (Serializer)config.getConfiguredInstance("key.serializer", Serializer.class); this.keySerializer.configure(config.originals(), true); } else { config.ignore("key.serializer"); this.keySerializer = keySerializer; } if (valueSerializer == null) { this.valueSerializer = (Serializer)config.getConfiguredInstance("value.serializer", Serializer.class); this.valueSerializer.configure(config.originals(), false); } else { config.ignore("value.serializer"); this.valueSerializer = valueSerializer; } userProvidedConfigs.put("client.id", this.clientId); ProducerConfig configWithClientId = new ProducerConfig(userProvidedConfigs, false); List<ProducerInterceptor<K, V>> interceptorList = configWithClientId.getConfiguredInstances("interceptor.classes", ProducerInterceptor.class); if (interceptors != null) { this.interceptors = interceptors; } else { this.interceptors = new ProducerInterceptors(interceptorList); } ClusterResourceListeners clusterResourceListeners = this.configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters); this.maxRequestSize = config.getInt("max.request.size"); this.totalMemorySize = config.getLong("buffer.memory"); this.compressionType = CompressionType.forName(config.getString("compression.type")); this.maxBlockTimeMs = config.getLong("max.block.ms"); this.transactionManager = configureTransactionState(config, logContext, this.log); int deliveryTimeoutMs = configureDeliveryTimeout(config, this.log); this.apiVersions = new ApiVersions(); // 初始化计数器 this.accumulator = new RecordAccumulator(logContext, config.getInt("batch.size"), this.compressionType, lingerMs(config), retryBackoffMs, deliveryTimeoutMs, this.metrics, "producer-metrics", time, this.apiVersions, this.transactionManager, new BufferPool(this.totalMemorySize, config.getInt("batch.size"), this.metrics, time, "producer-metrics")); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList("bootstrap.servers"), config.getString("client.dns.lookup")); if (metadata != null) { this.metadata = metadata; } else { this.metadata = new ProducerMetadata(retryBackoffMs, config.getLong("metadata.max.age.ms"), logContext, clusterResourceListeners, Time.SYSTEM); this.metadata.bootstrap(addresses, time.milliseconds()); } this.errors = this.metrics.sensor("errors"); // 守护线程 this.sender = this.newSender(logContext, kafkaClient, this.metadata); String ioThreadName = "kafka-producer-network-thread | " + this.clientId; this.ioThread = new KafkaThread(ioThreadName, this.sender, true); // 启动守护线程 this.ioThread.start(); config.logUnused(); AppInfoParser.registerAppInfo("kafka.producer", this.clientId, this.metrics, time.milliseconds()); this.log.debug("Kafka producer started"); } catch (Throwable var23) { this.close(Duration.ofMillis(0L), true); throw new KafkaException("Failed to construct kafka producer", var23); } }
sender的原理:
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record); return this.doSend(interceptedRecord, callback); }
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) { TopicPartition tp = null; try { this.throwIfProducerClosed(); KafkaProducer.ClusterAndWaitTime clusterAndWaitTime; try { clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), this.maxBlockTimeMs); } catch (KafkaException var20) { if (this.metadata.isClosed()) { throw new KafkaException("Producer closed while send in progress", var20); } throw var20; } long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs); Cluster cluster = clusterAndWaitTime.cluster; byte[] serializedKey; try { serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key()); } catch (ClassCastException var19) { throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var19); } byte[] serializedValue; try { serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value()); } catch (ClassCastException var18) { throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var18); } // 计算分区,消息具体进入哪个partition int partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); this.setReadOnly(record.headers()); Header[] headers = record.headers().toArray(); int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers); this.ensureValidRecordSize(serializedSize); long timestamp = record.timestamp() == null ? this.time.milliseconds() : record.timestamp(); if (this.log.isTraceEnabled()) { this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition}); } // 组织calllback对象 Callback interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.failIfNotReadyForSend(); } // 计算批次 RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true);
// 超出阈值 if (result.abortForNewBatch) { int prevPartition = partition; this.partitioner.onNewBatch(record.topic(), cluster, partition); partition = this.partition(record, serializedKey, serializedValue, cluster); tp = new TopicPartition(record.topic(), partition); if (this.log.isTraceEnabled()) { this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition}); } interceptCallback = new KafkaProducer.InterceptorCallback(callback, this.interceptors, tp); result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false); } if (this.transactionManager != null && this.transactionManager.isTransactional()) { this.transactionManager.maybeAddPartitionToTransaction(tp); } if (result.batchIsFull || result.newBatchCreated) { this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
// 发送靠的是守护进程 this.sender.wakeup(); } return result.future; } catch (ApiException var21) { this.log.debug("Exception occurred during message send:", var21); if (callback != null) { callback.onCompletion((RecordMetadata)null, var21); } this.errors.record(); this.interceptors.onSendError(record, tp, var21); return new KafkaProducer.FutureFailure(var21); } catch (InterruptedException var22) { this.errors.record(); this.interceptors.onSendError(record, tp, var22); throw new InterruptException(var22); } catch (BufferExhaustedException var23) { this.errors.record(); this.metrics.sensor("buffer-exhausted-records").record(); this.interceptors.onSendError(record, tp, var23); throw var23; } catch (KafkaException var24) { this.errors.record(); this.interceptors.onSendError(record, tp, var24); throw var24; } catch (Exception var25) { this.interceptors.onSendError(record, tp, var25); throw var25; } }
图形示意图:
6.自定义负载均衡器
package com.jun.kafka.producer; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * 自定义分区器 */ public class SamplePartition implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { /* key-1 key-2 key-3 */ String keyStr = key + ""; String keyInt = keyStr.substring(4); System.out.println("keyStr : " + keyStr + "keyInt : " + keyInt); int i = Integer.parseInt(keyInt); return i % 2; } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } }
使用:
/** * 有自定义分区器的发送 */ public static void producerSendWithPartition(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.19.129:9092"); properties.put(ProducerConfig.ACKS_CONFIG, "all"); properties.put(ProducerConfig.RETRIES_CONFIG, "0"); properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); // 分区器 properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.jun.kafka.producer.SamplePartition"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); // Producer的主对象 KafkaProducer<String, String> producer = new KafkaProducer<>(properties); // 消息对象 ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, "key-2", "value"); producer.send(record); producer.close(); }
7.消费传递保障
kafka提供了三种传递保障:
最多一次
至少一次
正好一次
传递依赖于Producer与Consumer的共同实现
传递保障主要依赖于Producer
依赖于配置项:
properties.put(ProducerConfig.ACKS_CONFIG, "all");