(08)java程序连接kafka示例

  1、导入kafka所需要的包

  在服务器上安装kafka程序的时候,解压后就有kafka需要的jar包,如下图所示:

  2、新建生产者类

 1 package demo;
 2 
 3 import java.util.Properties;
 4 import java.util.concurrent.TimeUnit;
 5 
 6 import kafka.javaapi.producer.Producer;
 7 import kafka.producer.KeyedMessage;
 8 import kafka.producer.ProducerConfig;
 9 import kafka.serializer.StringEncoder;
10 
11 public class ProducerDemo extends Thread {
12     
13     //指定具体的topic
14     private String topic;
15     
16     public ProducerDemo(String topic){
17         this.topic = topic;
18     }
19     
20     //每隔5秒发送一条消息
21     public void run(){
22         //创建一个producer的对象
23         Producer producer = createProducer();
24         //发送消息
25         int i = 1;
26         while(true){
27             String data = "message " + i++;
28             //使用produer发送消息
29             producer.send(new KeyedMessage(this.topic, data));
30             //打印
31             System.out.println("发送数据:" + data);
32             try {
33                 TimeUnit.SECONDS.sleep(5);
34             } catch (Exception e) {
35                 e.printStackTrace();
36             }
37         }
38     }
39     
40     //创建Producer的实例
41     private Producer createProducer() {
42         Properties prop = new Properties();
43         //声明zk
44         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
45         prop.put("serializer.class",StringEncoder.class.getName());
46         //声明Broker的地址
47         prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093");
48         return new Producer(new ProducerConfig(prop));
49     }
50     
51     public static void main(String[] args) {
52         //启动线程发送消息
53         new ProducerDemo("mydemo1").start();
54     }
55 }

  3、新建消费者类

 1 package demo;
 2 
 3 import java.util.HashMap;
 4 import java.util.List;
 5 import java.util.Map;
 6 import java.util.Properties;
 7 
 8 
 9 import kafka.consumer.Consumer;
10 import kafka.consumer.ConsumerConfig;
11 import kafka.consumer.ConsumerIterator;
12 import kafka.consumer.KafkaStream;
13 import kafka.javaapi.consumer.ConsumerConnector;
14 
15 public class ConsumerDemo extends Thread {
16 
17     //指定具体的topic
18     private String topic;
19     
20     public ConsumerDemo(String topic){
21         this.topic = topic;
22     }
23     
24     public void run(){
25         //构造一个consumer的对象
26         ConsumerConnector consumer = createConsumer();
27         //构造一个Map对象,代表topic
28         //String: topic的名称  Integer: 从这个topic中获取多少条记录
29         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
30         //一次从这个topic中获取一条记录
31         topicCountMap.put(this.topic, 1);
32         //构造一个messageStream:输入流
33         //String: topic的名称 List: 获取的数据
34         Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
35         //获取每次接受到的具体的数据
36         KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
37         ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
38         while(iterator.hasNext()){
39             String message = new String(iterator.next().message());
40             System.out.println("接受数据:" + message);
41         }
42     }
43     
44     //创建具体的consumer
45     private ConsumerConnector createConsumer() {
46         Properties prop = new Properties();
47         //指明zk的地址
48         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
49         //指明这个consumer的消费组
50         prop.put("group.id", "group1");
51         //时间设置的过小可能会连接超时。。。
52         prop.put("zookeeper.connection.timeout.ms", "60000");
53         return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
54     }
55 
56     public static void main(String[] args) {
57         new ConsumerDemo("mydemo1").start();
58     }
59 
60 }

  运行程序如下:

   注意:

  1、在消费者的类中,时间要设置长一些,否则可能出现连接超时的错误(我就出现了。。。)

  2、直接关闭生产者和消费者窗口,重新打开消费者窗口,会有重复数据。。。目前还没找到解决办法。。。

原文地址:https://www.cnblogs.com/javasl/p/12273968.html