SimpleKafkaProducer with ssl

import static org.apache.kafka.clients.CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BATCH_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.BUFFER_MEMORY_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.LINGER_MS_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.RETRIES_CONFIG;
import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;

import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class SimpleKafkaProducer {

  private static final String SCHEMA_REGISTRY_URL = "schema.registry.url";
  private static final String kafkaAckTopic = "test-input";

  private Properties kafkaProperties = new Properties();
  private Producer<String, Object> producerSupplier;

  {
    kafkaProperties.put(BOOTSTRAP_SERVERS_CONFIG, KafkaProducerConfiguration.bootstrapServer);
    kafkaProperties.put(RETRIES_CONFIG, KafkaProducerConfiguration.retries);
    kafkaProperties.put(BATCH_SIZE_CONFIG, KafkaProducerConfiguration.batchSize);
    kafkaProperties.put(LINGER_MS_CONFIG, KafkaProducerConfiguration.lingerMs);
    kafkaProperties.put(BUFFER_MEMORY_CONFIG, KafkaProducerConfiguration.bufferMemory);
    kafkaProperties.put(KEY_SERIALIZER_CLASS_CONFIG, KafkaProducerConfiguration.keySerializer);
    kafkaProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, KafkaProducerConfiguration.valueSerializer);
    kafkaProperties.put(SCHEMA_REGISTRY_URL, KafkaProducerConfiguration.schemaRegistryUrl);
    kafkaProperties.put(MAX_REQUEST_SIZE_CONFIG, KafkaProducerConfiguration.producerMaxRequestSize);
    if (KafkaProducerConfiguration.useSsl) {
      // -Djavax.net.ssl.trustStore=C:/cacerts/DEV_cacerts.jks
      System.setProperty("javax.net.ssl.trustStore", "C:/cacerts/DEV_cacerts.jks");
      kafkaProperties.put(SECURITY_PROTOCOL_CONFIG, "SSL");
    }
    producerSupplier = new KafkaProducer<>(kafkaProperties);
  }

  public RecordMetadata send() throws IOException, ExecutionException, InterruptedException {
    RecordMetadata recordMetadata = producerSupplier.send(createProducerRecord()).get();
    System.out.println("to " + recordMetadata + " at " + recordMetadata.timestamp());

    return recordMetadata;
  }

  private ProducerRecord<String, Object> createProducerRecord() throws IOException {
    String key = "1584406824432";
    String value = "test";
    System.out.println("send key: " + key);
    System.out.println("send value: " + value);
    return new ProducerRecord<>(kafkaAckTopic, key, value);
  }

  private static class KafkaProducerConfiguration {

    static String bootstrapServer = "localhost:9092";

    static String schemaRegistryUrl = "";

    static int retries = 0;

    static int batchSize = 16384;

    static int lingerMs = 1;

    static int bufferMemory = 33554432;

    static String keySerializer = "org.apache.kafka.common.serialization.StringSerializer";

    static String valueSerializer = "org.apache.kafka.common.serialization.StringSerializer";

    static int producerMaxRequestSize = 20971520;

    static boolean useSsl = true;
  }

  public static void main(String[] args)
      throws IOException, ExecutionException, InterruptedException {
    SimpleKafkaProducer producer = new SimpleKafkaProducer();
    producer.send();
  }
}
原文地址:https://www.cnblogs.com/tonggc1668/p/12511491.html