kafka_2.10-0.8.1.1
maven
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.1.1</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.15</version>
<exclusions>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>mail</artifactId>
<groupId>javax.mail</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
producer
1 package org.admln.kafka.test; 2 3 import java.util.Properties; 4 5 import kafka.javaapi.producer.Producer; 6 import kafka.producer.KeyedMessage; 7 import kafka.producer.ProducerConfig; 8 9 public class Producertest { 10 11 public static void main(String[] args) { 12 Properties props = new Properties(); 13 //props.put("zk.connect", "192.168.1.110:2181"); 14 // serializer.class为消息的序列化类 15 props.put("serializer.class", "kafka.serializer.StringEncoder"); 16 // 配置metadata.broker.list, 为了高可用, 最好配两个broker实例 17 props.put("metadata.broker.list", "192.168.1.113:9092"); 18 // 设置Partition类, 对队列进行合理的划分 19 //props.put("partitioner.class", "idoall.testkafka.Partitionertest"); 20 // ACK机制, 消息发送需要kafka服务端确认 21 props.put("request.required.acks", "1"); 22 23 props.put("num.partitions", "2"); 24 ProducerConfig config = new ProducerConfig(props); 25 Producer<String, String> producer = new Producer<String, String>(config); 26 for (int i = 0; i < 10; i++) 27 { 28 String msg = "hello" + i; 29 producer.send(new KeyedMessage<String, String>("test",msg)); 30 System.out.println("i:"+i+" msg:"+msg); 31 } 32 } 33 }
consumer
运行consumer一直接收不到消息,还没找到原因