kafka单机搭建,并测试api

所用环境:
  kafka_2.12-2.0.0.gz
   centos 6.9 nat动态ip
准备工作:
            (1).将防火墙关闭
                                    service iptables stop 临时关闭
                                    chkconfig iptables off 永久关闭
    
             (2).修改C:WindowsSystem32driversetc 下的hosts文件
                增加映射

启动zookeeper服务(采用kafka内置的zk)

/root/kafka_2.12-2.0.0/bin
在这个目录下启动  zookeeper-server-start.sh

命令 :bin/zookeeper-server-start.sh config/zookeeper.properties

当最后一行显示 INFO binding to port 0.0.0.0/0.0.0.0:2181 证明成功

启动kafka服务

进入到kafka目录下
bin/kafka-server-start.sh config/server.properties

创建一个topic

 bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic

我这里是   192.168.15.140 test 
localhost改为test也可以运行成功
(当出现副本什么的larger than n,就要关闭防火墙)

kafka生产者生产消息

bin/kafka-console-producer.sh --broker-list test:9092 --topic testTopic

消费者消费消息

bin/kafka-console-consumer.sh --bootstrap-server test:9092 --topic testTopic --from-beginning

代码测试:

    这里用了idea

produce

 1 package com.xuliugen.kafka.demo;
 2 
 3 import org.apache.kafka.clients.producer.KafkaProducer;
 4 import org.apache.kafka.clients.producer.ProducerRecord;
 5 
 6 import java.util.Properties;
 7 
 8 public class ProducerDemo {
 9 
10     // Topic
11     private static final String topic = "testTopic";
12 
13     public static void main(String[] args) throws Exception {
14 
15         Properties props = new Properties();
16         props.put("bootstrap.servers", "192.168.15.140:9092");
17         props.put("acks", "0");
18         props.put("group.id", "1111");
19         props.put("retries", "0");
20         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
21         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
22 
23         //生产者实例
24         KafkaProducer producer = new KafkaProducer(props);
25 
26         int i = 1;
27 
28         // 发送业务消息
29         // 读取文件 读取内存数据库 读socket端口
30         while (true) {
31             Thread.sleep(1000);
32             producer.send(new ProducerRecord<String, String>(topic, "key:" + i, "value:" + i));
33             System.out.println("key:" + i + " " + "value:" + i);
34             i++;
35         }
36     }
37 }

comsumer

package com.xuliugen.kafka.demo;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Properties;

public class ConsumerDemo {
    private static final Logger logger = LoggerFactory.getLogger(ConsumerDemo.class);
    private static final String topic = "testTopic";

    public static void main(String[] args) {

        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.15.140:9092");
        props.put("group.id", "1111");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        consumer.subscribe(Arrays.asList(topic));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            }
        }
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.xuliugen.kafka</groupId>
    <artifactId>kafka.demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.0.0</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>

    </dependencies>

</project>
1 抄袭自
2 https://blog.csdn.net/xlgen157387/article/details/77312569
View Code

 代码地址            链接: https://pan.baidu.com/s/1hjJ7IRMTQEFdV-8SCf7VlA 提取码: 286w 复制这段内容后打开百度网盘手机App,操作更方便哦

RUSH B
原文地址:https://www.cnblogs.com/tangsonghuai/p/10552359.html