kafka入门

基本概念:

1、什么是kafka?
Kafka是一个高吞吐量、分布式的发布订阅消息系统。据kafka官方网站介绍,当前的kafka已经定位为一个分布式流式处理平台( a distributed streaming platform),它最初由LinkedIn公司开发,后来成为Apache项目的一部分。
kafka核心模块使用scala语言开发,支持多语言(如java、c/c++、python、go、erlang、node.js等)客户端,它以可以水平扩展和具有高吞吐量等特性而被广泛使用。

2、流式处理平台的3大关键特性。
a、能够允许发布和订阅流数据;
b、存储流数据时提供相应的容错机制;
c、当流数据到达时能够被及时处理。

3.体系结构

4、kafka的名词
a、producer:生产者
b、consumer:消费者
c、topic:消息以topic为类别记录,Kafka将消息种子(Feed)分门别类,每一类的消息称之为一个主题(Topic)
d、broker:以集群的方式运行,可以由一个或多个服务组成,每个服务叫做一个broker;消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

e、分区和副本:kafka将一组消息归纳为一个主题,而每个主题又被分为一个或多个分区(partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。
每个分区在物理上对应为一个文件夹,分区命名规则为 主题名+“-”+分区编号,分区编号从0开始,到最大值减一。每个分区又有一到多个副本(replica),分区的副本分布在集群的不同代理上,一提高可用性。

每个消息(也叫作record记录,也被称为消息)是由一个key,一个value和时间戳构成。

安装部署:

1、JDK
a、最新的kafka要jdk1.7以上,官方推荐jdk1.8
b、环境变量,JAVA_HOME = D:ProgramFilesJavajdk1.8.0_144
c、jdk安装路径不能有空格,否则kafka启动报错

2、zookeeper
a、下载安装包: http://zookeeper.apache.org/releases.html#download
下载后不需要安装,直接解压就好,我的jdk安装在D盘,我把zookeeper也解压在那里。
b、环境变量,ZOOKEEPER_HOME = D:ProgramFileszookeeper-3.3.6
记得添加到path系统变量下,%ZOOKEEPER_HOME%in;
c、修改配置文件
(1)进入目录 D:ProgramFileszookeeper-3.3.6conf;
(2)将“zoo_sample.cfg”重命名为“zoo.cfg”;
(3)文本编辑器打开zoo.cfg,找到并编辑 dataDir=D:\ProgramFiles\zookeeper-3.3.6\tmp\zookeeper_logs
d、在zoo.cfg文件中修改默认的Zookeeper端口(默认端口2181)

e、启动zookeeper,两种方式
(1)打开新的cmd,输入zkServer,运行Zookeeper;
(2)进入目录D:ProgramFileszookeeper-3.3.6in,执行zkServer.cmd脚本。

3、kafka
a、下载安装包:
http://kafka.apache.org/downloads.html
下载后不需要安装,直接解压就好,我的jdk安装在D盘,我把kafka也解压在那里。

b、修改配置文件
(1)进入Kafka配置目录: D:ProgramFileskafka_2.12-1.1.0config
(2)文本编辑器打开 server.properties 文件,找到并编辑日志路径
log.dirs=D:\ProgramFiles\kafka_2.12-1.1.0\tmp\kafka-logs
(3)找到并编辑zookeeper.connect=localhost:2181。表示本地运行
(4)Kafka会按照默认,在9092端口上运行,并连接zookeeper的默认端口:2181。

c、运行
重要:请确保在启动Kafka服务器前,Zookeeper实例已经准备好并开始运行。
(1)进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
(2)按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
(3)输入
.inwindowskafka-server-start.bat .configserver.properties
然后回车

上面的Zookeeper和kafka一直打开。
1、创建主题
a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
c、输入
.inwindowskafka-topics.bat --create --zookeeper localhost:2181 –repartition-factor 1 --partitions 1 --topic gl_test_topic
d、已创建的topic,不可以重复创建。

上面所有的窗口要一直打开。
2、创建生产者
a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
c、输入
.inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic gl_test_topic_00

3、创建消费者
a、进入kafka安装目录, D:ProgramFileskafka_2.12-1.1.0
b、按下Shift键,同时单击鼠标右键,选择“打开命令窗口”选项,打开命令行;
c、输入
.inwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic gl_test_topic_00

一、安装kafka
1.安装zookeeper
    相关配置
    log_Dir
2.安装kafka
    相关配置
    log_Dir
3.启动zookeeper服务
    ./bin/zookeeper-server-start.sh config/zookeeper.properties
4.开启kafka服务
    .inwindowskafka-server-start.bat .configserver.properties 
    ./bin/kafka-server-start.sh config/server.properties
5.创建topic
    .inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
  查看topic:
    ./bin/kafka-topics.sh --list --zookeeper localhost:2181
创建生产者
    .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test_flink     
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_flink 
创建消费者
    .inwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic test_flink     
    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_flink --from-beginning

二、搭建一个多个broker的集群
    首先为每个节点编写配置文件
    1.    config/server-1.properties:
        broker.id=1
        port=9093
        log.dir=/tmp/kafka-logs-1
    2.    config/server-2.properties:
        broker.id=2
        port=9094
        log.dir=/tmp/kafka-logs-2
    3.启动另外的节点
    ./bin/kafka-server-start.sh config/server-1.properties
    ./bin/kafka-server-start.sh config/server-2.properties
    4.创建一个拥有3个副本的topic
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
    ./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
    ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
    
三、搭建Kafka开发环境   
1.添加依赖    
    <dependency>
        <groupId> org.apache.kafka</groupId >
        <artifactId> kafka_2.10</artifactId >
        <version> 0.8.0</ version>
    </dependency>

    
2.  
public interface KafkaProperties {
    final static String ZKCONNECT = "10.22.10.139:2181";
    final static String GROUPID = "group1";
    final static String TOPIC = "topic1";
    final static String KAFKASERVERURL = "10.22.10.139";
    final static int KAFKASERVERPORT = 9092;
    final static int KAFKAPRODUCERBUFFERSIZE = 64 * 1024;
    final static int CONNECTIONTIMEOUT = 20000;
    final static int RECONNECTINTERVAL = 10000;
    final static String TOPIC2 = "topic2";
    final static String TOPIC3 = "topic3";
    final static String CLIENTID = "SimpleConsumerDemoClient";
}

3.生产者 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;

import java.util.Properties;

public class KafkaProducer extends Thread {
    private final Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public KafkaProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("metadata.broker.list", "10.22.10.139:9092");
        producer = new Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    @Override
    public void run() {
        int messageNo = 1;
        while (true) {
            String messageStr = new String("Message_" + messageNo);
            System.out.println("Send:" + messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
            messageNo++;
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}   
    
4.消费者  
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

public class KafkaConsumer extends Thread {
    private final ConsumerConnector consumer;
    private final String topic;

    public KafkaConsumer(String topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig());
        this.topic = topic;
    }

    private static ConsumerConfig createConsumerConfig() {
        Properties props = new Properties();
        props.put("zookeeper.connect", KafkaProperties.ZKCONNECT);
        props.put("group.id", KafkaProperties.GROUPID);
        props.put("zookeeper.session.timeout.ms", "40000");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        return new ConsumerConfig(props);
    }

    @Override
    public void run() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap =consumer.createMessageStreams(topicCountMap);
        KafkaStream<byte[], byte[]> stream =consumerMap.get(topic).get(0);
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("receive:" + new String(it.next().message()));
            try {
                sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
} 
View Code
原文地址:https://www.cnblogs.com/yin-fei/p/11208317.html