所用环境: kafka_2.12-2.0.0.gz centos 6.9 nat动态ip
准备工作: (1).将防火墙关闭 service iptables stop 临时关闭 chkconfig iptables off 永久关闭 (2).修改C:WindowsSystem32driversetc 下的hosts文件 增加映射
启动zookeeper服务(采用kafka内置的zk)
/root/kafka_2.12-2.0.0/bin 在这个目录下启动 zookeeper-server-start.sh 命令 :bin/zookeeper-server-start.sh config/zookeeper.properties
当最后一行显示 INFO binding to port 0.0.0.0/0.0.0.0:2181 证明成功
启动kafka服务
进入到kafka目录下
bin/kafka-server-start.sh config/server.properties
创建一个topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic 我这里是 192.168.15.140 test localhost改为test也可以运行成功
(当出现副本什么的larger than n,就要关闭防火墙)
kafka生产者生产消息
bin/kafka-console-producer.sh --broker-list test:9092 --topic testTopic
消费者消费消息
bin/kafka-console-consumer.sh --bootstrap-server test:9092 --topic testTopic --from-beginning
代码测试:
这里用了idea
produce
1 package com.xuliugen.kafka.demo; 2 3 import org.apache.kafka.clients.producer.KafkaProducer; 4 import org.apache.kafka.clients.producer.ProducerRecord; 5 6 import java.util.Properties; 7 8 public class ProducerDemo { 9 10 // Topic 11 private static final String topic = "testTopic"; 12 13 public static void main(String[] args) throws Exception { 14 15 Properties props = new Properties(); 16 props.put("bootstrap.servers", "192.168.15.140:9092"); 17 props.put("acks", "0"); 18 props.put("group.id", "1111"); 19 props.put("retries", "0"); 20 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 21 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 22 23 //生产者实例 24 KafkaProducer producer = new KafkaProducer(props); 25 26 int i = 1; 27 28 // 发送业务消息 29 // 读取文件 读取内存数据库 读socket端口 30 while (true) { 31 Thread.sleep(1000); 32 producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i)); 33 System.out.println("key:" + i + " " + "value:" + i); 34 i++; 35 } 36 } 37 }
comsumer
package com.xuliugen.kafka.demo; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.Arrays; import java.util.Properties; public class ConsumerDemo { private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class); private static final String topic = "testTopic"; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "192.168.15.140:9092"); props.put("group.id", "1111"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(1000); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } }
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.xuliugen.kafka</groupId> <artifactId>kafka.demo</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.12</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.12</version> </dependency> </dependencies> </project>
1 抄袭自 2 https://blog.csdn.net/xlgen157387/article/details/77312569
代码地址 链接: https://pan.baidu.com/s/1hjJ7IRMTQEFdV-8SCf7VlA 提取码: 286w 复制这段内容后打开百度网盘手机App,操作更方便哦