kafka_2.11-0.10.0.1生产者producer的Java实现

转载自:http://blog.csdn.net/qq_26479655/article/details/52555283

首先导入包

  1. 将kafka目录下的libs中的jar包导入
  2. 用maven建立

  1. <dependency>
  2.     <groupId>org.apache.kafka</groupId>
  3.     <artifactId>kafka-clients</artifactId>
  4.     <version>0.10.0.1</version>
  5. </dependency>

写好properties配置文件 
一下为项目结构 
这里写图片描述


  1. #kafka集群地址
  2. bootstrap.servers = 192.168.168.200:9092
  3. client.id = testProducer
  4. key.serializer = org.apache.kafka.common.serialization.IntegerSerializer
  5. value.serializer = org.apache.kafka.common.serialization.StringSerializer

然后上代码


  1. package kafka.producer;
  2.  
  3. import java.io.IOException;
  4. import java.util.Properties;
  5.  
  6. import java.util.concurrent.ExecutionException;
  7.  
  8. import org.apache.kafka.clients.producer.Callback;
  9. import org.apache.kafka.clients.producer.KafkaProducer;
  10. import org.apache.kafka.clients.producer.ProducerRecord;
  11. import org.apache.kafka.clients.producer.RecordMetadata;
  12.  
  13. public class ProducerTest extends Thread{
  14.      private final KafkaProducer<Integer, String> producer;
  15.         private final String topic;
  16.         private final Boolean isAsync;
  17.  
  18. /*isAsync同步、异步*/
  19.         public ProducerTest(String topic, Boolean isAsync) {
  20.             Properties properties = new Properties();
  21.             /*加载配置文件*/
  22.             try {
  23. properties.load(ProducerTest.class.getClassLoader().getResourceAsStream("conf/kafka.producer.properties"));
  24.             } catch (IOException e) {
  25.  
  26.                 e.printStackTrace();
  27.             }
  28.             producer = new KafkaProducer<>(properties);
  29.             this.topic = topic;
  30.             this.isAsync = isAsync;
  31.         }
  32.  
  33.         public void run() {
  34.             int messageNo = 1;
  35.             while (true) {
  36.                 String messageStr = "Message_" + messageNo;
  37.                 long startTime = System.currentTimeMillis();
  38.                 if (isAsync) { // Send asynchronously
  39.                     producer.send(new ProducerRecord<>(topic,
  40.                         messageNo,
  41.                         messageStr), new DemoCallBack(startTime, messageNo, messageStr));
  42.                 } else { // Send synchronously
  43.                     try {
  44.                         producer.send(new ProducerRecord<>(topic,
  45.                             messageNo,
  46.                             messageStr)).get();
  47.                         System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
  48.                     } catch (InterruptedException | ExecutionException e) {
  49.                         e.printStackTrace();
  50.                     }
  51.                 }
  52.                 ++messageNo;
  53.             }
  54.         }
  55.     }
  56.  
  57.     class DemoCallBack implements Callback {
  58.  
  59.         private final long startTime;
  60.         private final int key;
  61.         private final String message;
  62.  
  63.         public DemoCallBack(long startTime, int key, String message) {
  64.             this.startTime = startTime;
  65.             this.key = key;
  66.             this.message = message;
  67.         }
  68.  
  69.         /**
  70.          * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
  71.          *                  occurred.
  72.          * @param exception The exception thrown during processing of this record. Null if no error occurred.
  73.          */
  74.         public void onCompletion(RecordMetadata metadata, Exception exception) {
  75.             long elapsedTime = System.currentTimeMillis() - startTime;
  76.             if (metadata != null) {
  77.                 System.out.println(
  78.                     "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
  79.                         "), " +
  80.                         "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
  81.             } else {
  82.                 exception.printStackTrace();
  83.             }
  84.         }
  85.  
  86.  
  87. }

测试代码


  1. package kafka.producer;
  2.  
  3. public class Main {
  4.  
  5.     public static void main(String[] args) {
  6.         ProducerTest test = new ProducerTest("TestTopic", true);
  7.         test.start();
  8.     }
  9. }

 


运行结果


  1. message(51215, Message_51215) sent to partition(0), offset(161830) in 3497 ms
  2. message(51216, Message_51216) sent to partition(0), offset(161831) in 3497 ms
  3. message(51217, Message_51217) sent to partition(0), offset(161832) in 3497 ms
  4. message(51218, Message_51218) sent to partition(0), offset(161833) in 3497 ms
  5. message(51219, Message_51219) sent to partition(0), offset(161834) in 3497 ms
  6. message(51220, Message_51220) sent to partition(0), offset(161835) in 3497 ms
  7. message(51221, Message_51221) sent to partition(0), offset(161836) in 3497 ms
  8. message(51222, Message_51222) sent to partition(0), offset(161837) in 3497 ms
  9. message(51223, Message_51223) sent to partition(0), offset(161838) in 3497 ms
  10. message(51224, Message_51224) sent to partition(0), offset(161839) in 3497 ms
  11. message(51225, Message_51225) sent to partition(0), offset(161840) in 3497 ms
  12. message(51226, Message_51226) sent to partition(0), offset(161841) in 3497 ms
  13. message(51227, Message_51227) sent to partition(0), offset(161842) in 3497 ms
  14. message(51228, Message_51228) sent to partition(0), offset(161843) in 3497 ms
  15.  
  16. .............​
原文地址:https://www.cnblogs.com/yangcx666/p/8723852.html