1、导入kafka所需要的包
在服务器上安装kafka程序的时候,解压后就有kafka需要的jar包,如下图所示:
2、新建生产者类
1 package demo; 2 3 import java.util.Properties; 4 import java.util.concurrent.TimeUnit; 5 6 import kafka.javaapi.producer.Producer; 7 import kafka.producer.KeyedMessage; 8 import kafka.producer.ProducerConfig; 9 import kafka.serializer.StringEncoder; 10 11 public class ProducerDemo extends Thread { 12 13 //指定具体的topic 14 private String topic; 15 16 public ProducerDemo(String topic){ 17 this.topic = topic; 18 } 19 20 //每隔5秒发送一条消息 21 public void run(){ 22 //创建一个producer的对象 23 Producer producer = createProducer(); 24 //发送消息 25 int i = 1; 26 while(true){ 27 String data = "message " + i++; 28 //使用produer发送消息 29 producer.send(new KeyedMessage(this.topic, data)); 30 //打印 31 System.out.println("发送数据:" + data); 32 try { 33 TimeUnit.SECONDS.sleep(5); 34 } catch (Exception e) { 35 e.printStackTrace(); 36 } 37 } 38 } 39 40 //创建Producer的实例 41 private Producer createProducer() { 42 Properties prop = new Properties(); 43 //声明zk 44 prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181"); 45 prop.put("serializer.class",StringEncoder.class.getName()); 46 //声明Broker的地址 47 prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093"); 48 return new Producer(new ProducerConfig(prop)); 49 } 50 51 public static void main(String[] args) { 52 //启动线程发送消息 53 new ProducerDemo("mydemo1").start(); 54 } 55 }
3、新建消费者类
1 package demo; 2 3 import java.util.HashMap; 4 import java.util.List; 5 import java.util.Map; 6 import java.util.Properties; 7 8 9 import kafka.consumer.Consumer; 10 import kafka.consumer.ConsumerConfig; 11 import kafka.consumer.ConsumerIterator; 12 import kafka.consumer.KafkaStream; 13 import kafka.javaapi.consumer.ConsumerConnector; 14 15 public class ConsumerDemo extends Thread { 16 17 //指定具体的topic 18 private String topic; 19 20 public ConsumerDemo(String topic){ 21 this.topic = topic; 22 } 23 24 public void run(){ 25 //构造一个consumer的对象 26 ConsumerConnector consumer = createConsumer(); 27 //构造一个Map对象,代表topic 28 //String: topic的名称 Integer: 从这个topic中获取多少条记录 29 Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 30 //一次从这个topic中获取一条记录 31 topicCountMap.put(this.topic, 1); 32 //构造一个messageStream:输入流 33 //String: topic的名称 List: 获取的数据 34 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap); 35 //获取每次接受到的具体的数据 36 KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0); 37 ConsumerIterator<byte[], byte[]> iterator = stream.iterator(); 38 while(iterator.hasNext()){ 39 String message = new String(iterator.next().message()); 40 System.out.println("接受数据:" + message); 41 } 42 } 43 44 //创建具体的consumer 45 private ConsumerConnector createConsumer() { 46 Properties prop = new Properties(); 47 //指明zk的地址 48 prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181"); 49 //指明这个consumer的消费组 50 prop.put("group.id", "group1"); 51 //时间设置的过小可能会连接超时。。。 52 prop.put("zookeeper.connection.timeout.ms", "60000"); 53 return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop)); 54 } 55 56 public static void main(String[] args) { 57 new ConsumerDemo("mydemo1").start(); 58 } 59 60 }
运行程序如下:
注意:
1、在消费者的类中,时间要设置长一些,否则可能出现连接超时的错误(我就出现了。。。)
2、直接关闭生产者和消费者窗口,重新打开消费者窗口,会有重复数据。。。目前还没找到解决办法。。。