Atitit Kafka 使用总结 内容 Kafka2.0 50M1 启动 要启动zookeeper 先,比ativemp麻烦很多啊1 Kafka生产者 1 Kafka消费者2 2

Atitit Kafka 使用总结

 

内容

Kafka2.0   50M1

启动 要启动zookeeper 先,比ativemp麻烦很多啊1

Kafka生产者 1

 Kafka消费者2

  2

 

 

 

  1. Kafka2.0   50M

 

  1. 启动 要启动zookeeper 先,比ativemp麻烦很多啊

 

 

zookeeper-server-start.bat D:\kafka_2.11-2.1.0\config\zookeeper.properties

//  D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

 

D:\kafka_2.11-2.1.0\bin\windows\kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

//\\  kafka-server-start.bat D:\kafka_2.11-2.1.0\config\server.properties

 

 

  1. Kafka生产者

 

/**

 * Kafka生产者

 */

public class KafkaProducerDemo extends Thread{

 

public static void main(String[] args) throws InterruptedException, ExecutionException {

 

        

        Properties props = new Properties();

        props.put("bootstrap.servers", "127.0.0.1:9092");

        props.put("acks", "all");

        props.put("retries", 0);

        props.put("batch.size", 16384);

        props.put("linger.ms", 1);

        props.put("buffer.memory", 33554432);

        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        

        String message = "message_" ;

        String topic="hello_topic";

        Producer  producer =  new KafkaProducer(props);

        //Producer   producer =new KafkaProducer(props);

         //new Producer (new ProducerConfig(properties));

        Future<RecordMetadata> Future_RecordMetadata= producer.send(new ProducerRecord<String, String>(topic, "val332222"));

            System.out.println("Sent: " + message);

            System.out.println(Future_RecordMetadata.get());

 

}

 

 

 Kafka消费者

  

/**

 * Kafka消费者

 */

public class KafkaConsumerCls  {

 

public static void main(String[] args) {

Properties props = new Properties();

 

  props.put("bootstrap.servers", "127.0.0.1:9092");

  props.put("group.id", "test");

    props.put("key.deserializer",  StringDeserializer.class);

    props.put("value.deserializer",  StringDeserializer.class);

 

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList("hello_topic"));

 

        while (true) {

            ConsumerRecords<String, String> records = consumer.poll(100);

            for (ConsumerRecord<String, String> record : records) {

                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());

            }

        }

}

 

kafka_2.12-1.1.0 生产与消费java实现示例 - cctext - 博客园

 

 

 

原文地址:https://www.cnblogs.com/attilax/p/15197428.html