Kafka学习笔记-Java简单操作

转自:http://www.cnblogs.com/edison2012/p/5759223.html

Maven依赖包:

  1. <dependency>  
  2.         <groupId>org.apache.kafka</groupId>  
  3.         <artifactId>kafka-clients</artifactId>  
  4.         <version>0.8.2.1</version>  
  5. </dependency>  
  6.           
  7. <dependency>  
  8.     <groupId>org.apache.kafka</groupId>  
  9.     <artifactId>kafka_2.11</artifactId>  
  10.     <version>0.8.2.1</version>  
  11. </dependency>  


代码如下:

  1. import java.util.Properties;  
  2.   
  3. import org.apache.kafka.clients.producer.Callback;  
  4. import org.apache.kafka.clients.producer.KafkaProducer;  
  5. import org.apache.kafka.clients.producer.ProducerRecord;  
  6. import org.apache.kafka.clients.producer.RecordMetadata;  
  7. import org.slf4j.Logger;  
  8. import org.slf4j.LoggerFactory;  
  9.   
  10. public class KafkaProducerTest {  
  11.       
  12.     private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerTest.class);  
  13.       
  14.     private static Properties properties = null;  
  15.       
  16.     static {  
  17.         properties = new Properties();  
  18.         properties.put("bootstrap.servers", "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  19.         properties.put("producer.type", "sync");  
  20.         properties.put("request.required.acks", "1");  
  21.         properties.put("serializer.class", "kafka.serializer.DefaultEncoder");  
  22.         properties.put("partitioner.class", "kafka.producer.DefaultPartitioner");  
  23.         properties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  24. //      properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  25.         properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");  
  26. //      properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
  27.     }  
  28.       
  29.     public void produce() {  
  30.         KafkaProducer<byte[], byte[]> kafkaProducer = new KafkaProducer<byte[],byte[]>(properties);  
  31.         ProducerRecord<byte[],byte[]> kafkaRecord = new ProducerRecord<byte[],byte[]>(  
  32.                 "test", "kkk".getBytes(), "vvv".getBytes());  
  33.         kafkaProducer.send(kafkaRecord, new Callback() {  
  34.             public void onCompletion(RecordMetadata metadata, Exception e) {  
  35.                 if(null != e) {  
  36.                     LOG.info("the offset of the send record is {}", metadata.offset());  
  37.                     LOG.error(e.getMessage(), e);  
  38.                 }  
  39.                 LOG.info("complete!");  
  40.             }  
  41.         });  
  42.         kafkaProducer.close();  
  43.     }  
  44.   
  45.     public static void main(String[] args) {  
  46.         KafkaProducerTest kafkaProducerTest = new KafkaProducerTest();  
  47.         for (int i = 0; i < 10; i++) {  
  48.             kafkaProducerTest.produce();  
  49.         }  
  50.     }  
  51. }  
  1. import java.util.List;  
  2. import java.util.Map;  
  3. import java.util.Properties;  
  4.   
  5. import org.apache.kafka.clients.consumer.ConsumerConfig;  
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;  
  7. import org.apache.kafka.clients.consumer.ConsumerRecords;  
  8. import org.apache.kafka.clients.consumer.KafkaConsumer;  
  9. import org.slf4j.Logger;  
  10. import org.slf4j.LoggerFactory;  
  11.   
  12. public class KafkaConsumerTest {  
  13.       
  14.     private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerTest.class);  
  15.       
  16.     public static void main(String[] args) {  
  17.         Properties properties = new Properties();  
  18.         properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,   
  19.                 "centos.master:9092,centos.slave1:9092,centos.slave2:9092");  
  20.         properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");              
  21.         properties.put(ConsumerConfig.SESSION_TIMEOUT_MS, "1000");              
  22.         properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");  
  23.         properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "range");  
  24. //      properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY, "roundrobin");  
  25.         properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");    
  26.         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,   
  27.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
  28.         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,   
  29.                 "org.apache.kafka.common.serialization.ByteArrayDeserializer");  
  30.           
  31.         KafkaConsumer<byte[], byte[]> kafkaConsumer = new KafkaConsumer<byte[], byte[]>(properties);  
  32.         kafkaConsumer.subscribe("test");  
  33. //      kafkaConsumer.subscribe("*");  
  34.         boolean isRunning = true;              
  35.         while(isRunning) {  
  36.             Map<String, ConsumerRecords<byte[], byte[]>> results = kafkaConsumer.poll(100);  
  37.             if (null != results) {  
  38.                 for (Map.Entry<String, ConsumerRecords<byte[], byte[]>> entry : results.entrySet()) {  
  39.                     LOG.info("topic {}", entry.getKey());  
  40.                     ConsumerRecords<byte[], byte[]> consumerRecords = entry.getValue();  
  41.                     List<ConsumerRecord<byte[], byte[]>> records = consumerRecords.records();  
  42.                     for (int i = 0, len = records.size(); i < len; i++) {  
  43.                         ConsumerRecord<byte[], byte[]> consumerRecord = records.get(i);  
  44.                         LOG.info("topic {} partition {}", consumerRecord.topic(), consumerRecord.partition());  
  45.                         try {  
  46.                             LOG.info("offset {} value {}", consumerRecord.offset(), new String(consumerRecord.value()));  
  47.                         } catch (Exception e) {  
  48.                             LOG.error(e.getMessage(), e);  
  49.                         }  
  50.                     }  
  51.                 }  
  52.             }  
  53.         }  
  54.           
  55.         kafkaConsumer.close();    
  56.           
  57.     }  
  58.   
  59. }  

发现KafkaConsumer的poll方法未实现

  1. @Override  
  2. public Map<String, ConsumerRecords<K,V>> poll(long timeout) {  
  3.      // TODO Auto-generated method stub  
  4.      return null;  
  5. }  


后改为kafka.javaapi.consumer.SimpleConsumer实现,正常运行

    1. import java.nio.ByteBuffer;  
    2. import java.util.ArrayList;  
    3. import java.util.Collections;  
    4. import java.util.HashMap;  
    5. import java.util.List;  
    6. import java.util.Map;  
    7.   
    8. import kafka.api.FetchRequest;  
    9. import kafka.api.FetchRequestBuilder;  
    10. import kafka.api.PartitionOffsetRequestInfo;  
    11. import kafka.cluster.Broker;  
    12. import kafka.common.ErrorMapping;  
    13. import kafka.common.TopicAndPartition;  
    14. import kafka.javaapi.FetchResponse;  
    15. import kafka.javaapi.OffsetRequest;  
    16. import kafka.javaapi.OffsetResponse;  
    17. import kafka.javaapi.PartitionMetadata;  
    18. import kafka.javaapi.TopicMetadata;  
    19. import kafka.javaapi.TopicMetadataRequest;  
    20. import kafka.javaapi.TopicMetadataResponse;  
    21. import kafka.javaapi.consumer.SimpleConsumer;  
    22. import kafka.message.MessageAndOffset;  
    23.   
    24. public class KafkaSimpleConsumerTest {  
    25.       
    26.     private List<String> borkerList = new ArrayList<String>();    
    27.         
    28.     public KafkaSimpleConsumerTest() {    
    29.         borkerList = new ArrayList<String>();    
    30.     }    
    31.     
    32.     public static void main(String args[]) {    
    33.         KafkaSimpleConsumerTest kafkaSimpleConsumer = new KafkaSimpleConsumerTest();    
    34.         // 最大读取消息数量    
    35.         long maxReadNum = Long.parseLong("3");    
    36.         // 订阅的topic    
    37.         String topic = "test";    
    38.         // 查找的分区    
    39.         int partition = Integer.parseInt("0");    
    40.         // broker节点  
    41.         List<String> seeds = new ArrayList<String>();    
    42.         seeds.add("centos.master");    
    43.         seeds.add("centos.slave1");    
    44.         seeds.add("centos.slave2");    
    45.         // 端口    
    46.         int port = Integer.parseInt("9092");    
    47.         try {    
    48.             kafkaSimpleConsumer.run(maxReadNum, topic, partition, seeds, port);    
    49.         } catch (Exception e) {    
    50.             System.out.println("Oops:" + e);    
    51.             e.printStackTrace();    
    52.         }    
    53.     }    
    54.     
    55.     public void run(long maxReadNum, String topic, int partition, List<String> seedBrokers, int port) throws Exception {    
    56.         // 获取指定topic partition的元数据    
    57.         PartitionMetadata metadata = findLeader(seedBrokers, port, topic, partition);    
    58.         if (metadata == null) {    
    59.             System.out.println("can't find metadata for topic and partition. exit");    
    60.             return;    
    61.         }    
    62.         if (metadata.leader() == null) {    
    63.             System.out.println("can't find leader for topic and partition. exit");    
    64.             return;    
    65.         }    
    66.         String leadBroker = metadata.leader().host();    
    67.         String clientName = "client_" + topic + "_" + partition;    
    68.     
    69.         SimpleConsumer consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
    70.         long readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.EarliestTime(), clientName);    
    71.         int numErrors = 0;    
    72.         while (maxReadNum > 0) {    
    73.             if (consumer == null) {    
    74.                 consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);    
    75.             }    
    76.             FetchRequest req = new FetchRequestBuilder().clientId(clientName).addFetch(topic, partition, readOffset, 100000).build();    
    77.             FetchResponse fetchResponse = consumer.fetch(req);    
    78.     
    79.             if (fetchResponse.hasError()) {    
    80.                 numErrors++;    
    81.                 short code = fetchResponse.errorCode(topic, partition);    
    82.                 System.out.println("error fetching data from the broker:" + leadBroker + " reason: " + code);    
    83.                 if (numErrors > 5)    
    84.                     break;    
    85.                 if (code == ErrorMapping.OffsetOutOfRangeCode()) {    
    86.                     readOffset = getLastOffset(consumer, topic, partition, kafka.api.OffsetRequest.LatestTime(), clientName);    
    87.                     continue;    
    88.                 }    
    89.                 consumer.close();    
    90.                 consumer = null;    
    91.                 leadBroker = findNewLeader(leadBroker, topic, partition, port);    
    92.                 continue;    
    93.             }    
    94.             numErrors = 0;    
    95.     
    96.             long numRead = 0;    
    97.             for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partition)) {    
    98.                 long currentOffset = messageAndOffset.offset();    
    99.                 if (currentOffset < readOffset) {    
    100.                     System.out.println("found an old offset: " + currentOffset + " expecting: " + readOffset);    
    101.                     continue;    
    102.                 }    
    103.     
    104.                 readOffset = messageAndOffset.nextOffset();    
    105.                 ByteBuffer payload = messageAndOffset.message().payload();    
    106.     
    107.                 byte[] bytes = new byte[payload.limit()];    
    108.                 payload.get(bytes);    
    109.                 System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));    
    110.                 numRead++;    
    111.                 maxReadNum--;    
    112.             }    
    113.     
    114.             if (numRead == 0) {    
    115.                 try {    
    116.                     Thread.sleep(1000);    
    117.                 } catch (InterruptedException ie) {    
    118.                 }    
    119.             }    
    120.         }    
    121.         if (consumer != null)    
    122.             consumer.close();    
    123.     }    
    124.      
    125.     /** 
    126.      * 从活跃的Broker列表中找出指定Topic、Partition中的Leader Broker 
    127.      * @param seedBrokers 
    128.      * @param port 
    129.      * @param topic 
    130.      * @param partition 
    131.      * @return 
    132.      */  
    133.     private PartitionMetadata findLeader(List<String> seedBrokers, int port, String topic, int partition) {    
    134.         PartitionMetadata partitionMetadata = null;    
    135.         loop: for (String seedBroker : seedBrokers) {    
    136.             SimpleConsumer consumer = null;    
    137.             try {    
    138.                 consumer = new SimpleConsumer(seedBroker, port, 100000, 64 * 1024, "leaderLookup");    
    139.                 List<String> topics = Collections.singletonList(topic);    
    140.                 TopicMetadataRequest topicMetadataRequest = new TopicMetadataRequest(topics);    
    141.                 TopicMetadataResponse topicMetadataResponse = consumer.send(topicMetadataRequest);    
    142.     
    143.                 List<TopicMetadata> topicMetadatas = topicMetadataResponse.topicsMetadata();    
    144.                 for (TopicMetadata topicMetadata : topicMetadatas) {    
    145.                     for (PartitionMetadata pMetadata : topicMetadata.partitionsMetadata()) {    
    146.                         if (pMetadata.partitionId() == partition) {    
    147.                             partitionMetadata = pMetadata;    
    148.                             break loop;    
    149.                         }    
    150.                     }    
    151.                 }    
    152.             } catch (Exception e) {    
    153.                 System.out.println("error communicating with broker [" + seedBroker + "] to find leader for [" + topic + ", " + partition + "] reason: " + e);    
    154.             } finally {    
    155.                 if (consumer != null)    
    156.                     consumer.close();    
    157.             }    
    158.         }    
    159.         if (partitionMetadata != null) {    
    160.             borkerList.clear();    
    161.             for (Broker replica : partitionMetadata.replicas()) {    
    162.                 borkerList.add(replica.host());    
    163.             }    
    164.         }    
    165.         return partitionMetadata;    
    166.     }    
    167.     
    168.     public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName) {    
    169.         TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);    
    170.         Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();    
    171.         requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));    
    172.         OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);    
    173.         OffsetResponse response = consumer.getOffsetsBefore(request);    
    174.         if (response.hasError()) {    
    175.             System.out.println("error fetching data offset data the broker. reason: " + response.errorCode(topic, partition));    
    176.             return 0;    
    177.         }    
    178.         long[] offsets = response.offsets(topic, partition);    
    179.         return offsets[0];    
    180.     }    
    181.     
    182.     private String findNewLeader(String oldLeader, String topic, int partition, int port) throws Exception {    
    183.         for (int i = 0; i < 3; i++) {    
    184.             boolean goToSleep = false;    
    185.             PartitionMetadata metadata = findLeader(borkerList, port, topic, partition);    
    186.             if (metadata == null) {    
    187.                 goToSleep = true;    
    188.             } else if (metadata.leader() == null) {    
    189.                 goToSleep = true;    
    190.             } else if (oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {    
    191.                 goToSleep = true;    
    192.             } else {    
    193.                 return metadata.leader().host();    
    194.             }    
    195.             if (goToSleep) {    
    196.                 try {    
    197.                     Thread.sleep(1000);    
    198.                 } catch (InterruptedException ie) {    
    199.                 }    
    200.             }    
    201.         }    
    202.         System.out.println("unable to find new leader after broker failure. exit");    
    203.         throw new Exception("unable to find new leader after broker failure. exit");    
    204.     }    
    205.     
    206. }   
原文地址:https://www.cnblogs.com/wq3435/p/7081897.html