Kafka

1. Kafka简介


1.1 消息队列

1.1.1 消息队列简介

  • 消息Message:通讯设备之间传递的数据
  • 队列Queue:一种特殊的线性表(数据元素首尾相连),特殊之处在于只允许在首部删除元素和在尾部追加元素(fifo)
  • 消息队列:消息+队列,保存消息的队列,消息的传输过程中的容器,主要提供生产,消费接口供外部调用的存储和获取.

1.1.2 消息队列分类

message queue主要分为两类:点对点(peer to peer),发布订阅

  • 点对点(peer to peer):

    一般基于Pull或者Polling接收数据。发送到队列中的消息被一个而且仅仅一个接收者所接收,即使有多个接收者在同一个队列中监听同一消息;即支持即发即收的消息传递方式,也支持同步请求/应答传送方式。

  • 发布订阅:

    发布到同一个主题的消息,可被多个订阅者所接收。发布/订阅即可基于Push消费数据,也可基于Pull或者Polling消费数据

1.1.3 两种类型的比较

  • p2p模型包括:消息队列(Queue),发送者(Sender),接收者(Receiver);一个生产者只有一个消费者(Consumer)即一旦被消费,消息就不存在消息队列中.
  • pub/Sub包含:消息队列(Queue),主题(Topic),发布者(Publisher),订阅者(Subscriber);每个消息可以有多个消费者,彼此互不影响.

1.1.4 消息系统的使用场景

  1. 解耦

    各系统之间通过消息系统统一的接口交换数据,无需了解彼此的存在

  2. 异步通信

    在不需要立即处理请求的场景下,可以将请求放入消息系统,合适的时候在处理

  3. 峰值处理能力

    消息系统可顶住峰值流量,业务系统可根据处理能力从消息系统中获取并处理对应量的请求

  4. 扩展

    消息系统是统一的数据接口,各系统可独立扩展

  5. 冗余

    部分消息系统具有消息持久化能力,可规避消息处理前丢失的风险

  6. 可恢复性

    系统中部分键失效并不会影响整个系统,它恢复会仍然可从消息系统中获取并处理数据

1.1.5 常见的消息系统

  • RabbitMQ:erlang编写,支持amqp,xmpp,smtp,stomp。支持负载均衡,数据持久化。同时支持p2p和发布/订阅模式。
  • Redis :基于key-value的nosql数据库,同时支持mq功能,可作轻量级的队列服务使用。就入队操作而言,Redis对短消息(小于10kb)的性能比RabbitMQ好,长消息性能比RabbitMQ差。
  • ZeroMQ:轻量级,不需要单独的消息服务器或者中间件,应用程序本身扮演该角色,Peer-To-Peer。它实质上是一个库,需要开发人员自己组合多种技术,使用复杂度高。
  • ActiveMQ:JMS实现,p2p,支持持久化,XA(分布式)事务
  • Kafka/Jafka:高性能跨语言的分布式发布/订阅消息系统,数据持久化,全分布式,同时支持在线和离线处理。
  • MetaQ/RocketMQ:纯java实现,发布/订阅消息系统,支持本地事务和XA分布式事务。

1.2 kafka简介

1.2.1 简介

是分布式的发布-订阅消息系统.由Scala语言编写。Kafka是一个高吞吐量,持久化的,分布式订阅消息系统。主要用于处理活跃live数据(登录,浏览,点击,分享,喜欢等用户行为产生的数据)。 Kafka作为一个集群,运行在一台后者多台服务器上;kafka通过topic对存储的流数据进行分类;每条记录中包含一个key,一个value和一个timestamp(时间戳)

1.2.2 适用场景

  • 构建实时流数据管道,它可以在系统或应用之间可靠的获取数据(相当于message queue)
  • 构建实时流式应用程序,对这些流数据进行转换或影响。(即流处理,通过kafka stream topic和topic之间内部进行变化)

1.2.3 三大特点

  1. 高吞吐量:可以满足每秒百万级别消息的生产和消费-生产消费
  2. 持久化:有一套完善的消息存储机制,确保数据的高效安全的持久化-中间存储
  3. 分布式:基于分布式的扩展和容错机制;kafka的数据都会复制到几台服务器上.当某一个服务器发生故障时,生产者和消费者转而使用其它的机器-整体健壮性

1.2.4 四大核心API

  1. The Producer API

    允许一个应用程序发布一串流式的数据到一个或多个Kafka topic

  2. The Consumer API

    允许一个应用程序订阅一个或者多个topic,并且对发布给它们的流式数据进行处理

  3. The Streams API

    允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的数据流,然后生产一个输入流到一个或者多个topic中去,在输入输出流中进行有效的转换

  4. The Connector API

    允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统.比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容

1.2.5 核心概念

  1. 服务组成

    • Topic:主题,Kafka处理的消息的不同分类
    • Broker:消息服务器代理,Kafka集群中的一个kafka服务器节点称为一个broker,主要存储消息数据.存于硬盘中
    • Partition:Topic物理上的分组,一个topic在broker中被分为1个或者多个partition,分区在创建的时候指定
    • Message:消息,是通信的基本单位,每个消息都属于一个partition
  2. 服务相关

    • Producer:消息和数据的生产者,向Kafka的一个topic发布消息
    • Consumer:消息和数据的消费者,定于topic并处理其发布的信息
    • Zookeeper:协调kafka的正常运行

2. Kafka的分布式安装

2.1 安装

  1. 下载解压

  2. 添加环境变量

    etc/profile末尾添加:

    export KAFKA_HOME=/usr/lcoal/DevInstall/kafka
    export PATH=$KAFKA_HOME/bin:$PATH
    
  3. 刷新profile配置

    source /etc/profile

  4. 配置server.properties(/kafka/config)文件

    ##当前kafka实例的id,必须为整数,一个集群不可重复
    broker.id=1 
    ## 生产到kafka中的数据存储的目录,目录需要手动创建(kafka-logs是目录,不是文件)
    log.dirs=log.dirs=/usr/local/DevInstall/kafka-2.1.1/data/kafka-logs
    ## kafka连接zk的url和kafka数据在zk中的存储目录
    zookeeper.connect=172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafk
    
  5. 将kafka的安装文件同步到其它机器:

    scp -r /path/to/kafka root@ip:/path/to/kafka

  6. 修改其它机器上kafka的server.properties中的broker.id

2.2 启动

  1. 启动命令:

    cd kafka
    ./bin/kafka-server-start.sh -daemon ./config/server.properties
    
  2. 查看是否启动成功:

    jps
    

    使用jps命令查看显示列表有无kafka

  3. 查看启动日志:

    cd /usr/local/DevInstall/kafka-2.1.1/logs
    vim kafkaServer.out
    

2.3 服务测试

在zookeeper中查看kafka的服务ID:

[zk: 172.18.19.129(CONNECTED) 1] ls /kafka/brokers/ids

控制台输出了:

 [1, 2, 3]

证明服务均已注册上

2.4 目录详解

| --kafka
		|--cluster
				|--id	代表的是一个kafka集群中包含集群的版本和集群的id
		|--brokers #服务器的id,使用get [id] 后显示连接的封装信息
				|--ids	#存放当前kafka的broker实例列表
				|--topics	#当前kafka中的topic列表
				|--seqid	#系统的序列id        
        |--controller :#get /kafka/controller中信息的brokerid显示当前的leader
        |--controller_epoch: #代表的是controller的纪元,即表示controller的迭代;每当controller中的brokerid更换一次,controller_epoch就+1
        |--consumers:#老版本用于存储kafka消费者的信息,主要保存对应的offset;新版本中基本不用,此时用户的消费信息,保存在一个系统的topic中:_consumer_offsets
        |--config:#存放配置信息

3. Kafka的基本操作

3.1 KafKa的topic的操作

topic是kafka的核心概念,用来存储各种类型的数据。

关于topic操作的脚本命令:kafka-topics.sh

3.1.1 创建topic

  1. 命令:

    ./bin/kafka-topics.sh --create --topic topic1 --zookeeper dev-local-1:2181,dev-local-2:2181,dev-local-3:2181/kafka --partitions 3 --replication-factor 3
    
  2. 注意:指定副本因子(--replication-factor)的时候,不能超过brokers的数量

3.1.2 查看topic

  1. 查看所有的topic命令:

    ./bin/kafka-topics.sh --list --zookeeper dev-local-1:2181,dev-local-2:2181,dev-local-3:2181/kafka
    
  2. 查看指定topic的详尽信息(例如查看topic1的详尽信息)

    ./bin/kafka-topics.sh --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka --describe --topic topic1
    

    结果显示:

    Topic:topic1	PartitionCount:3	ReplicationFactor:3	Configs:
    Topic: topic1	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1
    Topic: topic1	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2
    Topic: topic1	Partition: 2	Leader: 2	Replicas: 3,1,2	Isr: 2
    # Partition:当前topic对应的分区编号
    # Replicas:副本因子,当前kafka对应的partition所在的broker实例的broker.id
    # Leader:该partition的所有副本中的leader领导者,处理所有kafka该partition读写请求
    # Isr:该partition的存活的副本对应的broker实例的broker.id列表
    

3.1.3 修改一个topic

[root@dev-local-3 kafka-2.1.1]./bin/kafka-topics.sh  --alter  --topic topic1 --partitions 4 --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka

注意:partition个数,只能增加,不能减少

3.1.4 删除一个topic

[root@dev-local-3 kafka-2.1.1]./bin/kafka-topics.sh  --delete --topic topic1 --zookeeper 172.18.19.129:2181,172.18.19.143:2181,172.18.19.15:2181/kafka

老版本是不能直接删除topic,除非你配置了delete.topic.enable=true,可以直接删除掉。若未配置,那么就不会直接删除,会做一个标记,表明这个topic不能再用了。在新版本中,不需要这些设置,可直接删除。

3.1.5 生产数据

生产及消费消息,默认监听端口未9092,防火墙需要开放此端口;并在/etc/hosts中配置主机名和对应的IP

./bin/kafka-console-producer.sh --broker-list dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3

3.1.6 消费数据

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3

若想从头消费,需要加上参数--from-beginning:

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3 --from-beginning

指定消费某个分区:--partition 0;从什么位置开始消费(消息的偏移量)--offset earliest:

./bin/kafka-console-consumer.sh --bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 --topic topic3 --from-beginning --partition 0 --offset earliest

数据消费的顺序不是按照发送消息的顺序;因为生产出的消息会发送至不同的分区。

3.2 Kafka的数据消费总结

​ Kafka消费者在 消费数据的时候,都是分组别的。不同组的消费不受影响。例如在两个消费者组内,同一时间可以有两个消费者同时消费一个分区的数据。但在同一组内,不会有两个消费者同时消费一个分区的数据。

​ 相同组内的消费,需要注意。若partition有3个,消费者有三个,那么便是每一个消费者消费其中一个partition对应的数据;若有两个消费者,此时一个消费者消费其中一个partition数据,另一个消费者消费2个partition的数据。若存在超过3个消费者,同一时间只能最多有3个消费者能消费得到数据。

./bin/kafka-console-consumer.sh 
--bootstrap-server dev-local-1:9092,dev-local-2:9092,dev-local-3:9092 
--topic topic3 
--group haha 
--partition 2 
--offset earliest

--group haha:消费者对应的消费者组。

offset是kafka的topic的partition中的每一条消息的标志,如何区分该条消息在kafka对应的partition的位置,就是用该偏移量。offset的数据 类型是Long,8个字节长度。offset在分区内是有序的,分区间是不一定有序。若想要kafka中的数据全局有序,就只能让partition个数为1.

​ 在组内,kafka的topic的partition个数,代表了kafka的topic的并行度。同一时间最多可以有多个线程来消费topic数据,所以如果想要提高kafka的topic的消费能力,应该增大partition的个数。

3.3 Kafka的编程api

3.3.1 创建kafka的项目

  1. 添加依赖:
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.1</version>
</dependency>
  1. 复制linux上kafka/config/producer.properties文件至项目中并修改和添加如下:
bootstrap.servers=dev-local-1:9092,dev-local-2:9092,dev-local-3:9092
# key和value的序列化,最后面的StringSerializer应该和代码中的key value类型一致
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer

3.3.2 生产者的api操作

public class ProducerDemo {
    /**
     * kafka的数据是由key,value和timestamp组成
     */
    public static void main(String[] args) {
        Properties properties = new Properties();
        try {
            properties.load(ProducerDemo.class.getClassLoader().getResourceAsStream("producer.properties"));
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("未成功读取producer.properties文件");
        }
        // 创建执行入口
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        String topic = "topic3";
        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, "111");
        // 发送消息
        producer.send(producerRecord);

        //释放资源
        producer.close();

    }

}
  • 创建producer时需要指定的配置信息
acks=[0|-1|1|all]
# 消息确认机制
# 0:不做确认,直接发送消息即可;1:只需要leader进行消息确认即可,后期follower可以从leader进行同步;
# -1/all:不仅需要leader进行消息确认,还需要等待follower进行确认

batch.size=1024 
# 每个分区内的用户缓存未发送record记录空间的大小
# 若缓存区中的数据,没有沾满,也就是仍然有未用的空间,那么也会将请求发送储区,为了减少请求次数,可以配置linger.ms大于0

linger.ms=10
# 不管缓冲区是否被沾满,延迟10ms发送request

buffer.memory=10240
# 控制的是一个producer中的所有缓存空间

retires=0
# 发送消息失败之后的重试次数
  • 开启幂等性

在producer.properties文件中添加:enable.idempotence=true

开启幂等却并未生效的原因:

其实主要原因在于大数据框架设置问题上:

​ 若有1000w条数据,在其中不断增加,要保证添加的数据不能重复。

​ 方案1:在添加的时候进行判断,若该条数据消息已经存在,直接覆盖掉对应的数据;

​ 方案2:在添加的时候先不进行判断,直接进行添加,在后续的操作中满足条件之后,再进行合并操作。

出于效率,必然选择方案2

  • 如何保证Kafka的数据的一致性?

答:Kafka生产者可以选择生产的两种模式(幂等和事务)

3.3.3 消费者的api操作

入口类:Consumer

  • 配置文件:
bootstrap.servers=dev-local-1:9092,dev-local-2:9092,dev-local-3:9092

# consumer group id
group.id=group1903

# What to do when there is no initial offset in Kafka or if the current
# offset does not exist any more on the server: latest, earliest, none
# 默认值未latest,即从最新的开始消费
auto.offset.reset=earliest
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
  • 执行代码:
public class ConsumerDemo {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.load(ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));

        //构建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList("topic3"));
        System.out.println("topic	partition	offset	key	value");
        try {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.topic()+"	"+record.partition()+"	"+record.offset()+"	"+record.key()+"	"+record.value());
                }
            }
        } finally {
            consumer.close();
        }


    }
  • 从指定的位置开始消费,需要使用.assign().seek()api:(line8-13)
public class ConsumerDemo2 {
    public static void main(String[] args) throws Exception{
        Properties properties = new Properties();
        properties.load(ConsumerDemo.class.getClassLoader().getResourceAsStream("consumer.properties"));

        //构建消费者
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        TopicPartition partition1 = new TopicPartition("topic3", 0);
        TopicPartition partition2 = new TopicPartition("topic3", 1);
        consumer.assign(Arrays.asList(partition1, partition2));
        System.out.println("topic	partition	offset	key	value");
        consumer.seek(partition1,2);
        consumer.seek(partition2,5);
        try {
            while (true){
                ConsumerRecords<String, String> records = consumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record.topic()+"	"+record.partition()+"	"+record.offset()+"	"+record.key()+"	"+record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}
  • offset消费的问题:

若没有手动管理offset,当程序崩溃时只能从头开始消费,导致消息的重复消费或丢失,效率较低。

手动管理offset:消费时将offset存入数据库。

3.3.4 record进入分区的策略

每一条producerRecord都有topic名称,可选的partition分区编号,以及一堆可选的key和value组成。

  • 三种策略进入分区:

    • 若指定了partition,那么直接进入该partition
    • 若没有指定partition,但指定了key,则使用key的hash选择partition
    • 若既没有指定partition,也没有指定key,则使用轮询的方式进入partition
  • 自定义分区需要实现Partitioner接口

public class RandomPartitions implements Partitioner {

    private Random random = new Random();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取总的分区数
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        // 取0到countForTopic之间的随机数
        int index = random.nextInt(countForTopic);
        return index;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 随机分区方式
public class ProducerDemo2 {
    public static void main(String[] args) throws Exception {
        Properties properties = new Properties();
        properties.put("partitioner.class", com.kong.partition.RandomPartitions.class);
        properties.load(ProducerDemo2.class.getClassLoader().getResourceAsStream("producer.properties"));
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
        String topic = "topic3";

        int start = 100;
        int end = start + 10;
        for (int i = start; i < end; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, "random" + i, i + "");
            kafkaProducer.send(record);
        }

        kafkaProducer.close();
    }
}

或者将上面line4删除,在producer.properties文件中添加:

## 指定自定义分区
partitioner.class=com.kong.partition.RandomPartitions
  • Hash分区
/**
 * @author gedachao
 * @description 自定义分区之Hash分区
 * 计算方法:key.hashValue()%partitionerCount
 * @date 2020-08-03 9:47
 */
public class HashPartitions implements Partitioner {

    /**
     * Compute the partition for the given record.
     *
     * @param topic The topic name
     * @param key The key to partition on (or null if no key)
     * @param keyBytes The serialized key to partition on( or null if no key)
     * @param value The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster The current cluster metadata
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取topic分区的数量
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        if (keyBytes != null) {
            return Math.abs(keyBytes.hashCode()) % countForTopic;
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 轮询分区
public class RoundRobinPartition implements Partitioner {

    private AtomicInteger counter =  new AtomicInteger();
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        Integer countForTopic = cluster.partitionCountForTopic(topic);
        return counter.getAndIncrement() % countForTopic;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}
  • 分组分区
public class GroupPartition implements Partitioner {
    private HashMap<String,Integer> map = new HashMap<>();
    {
        map.put("java.learn.com",0);
        map.put("ui.learn.com",0);
        map.put("data.learn.com",0);
        map.put("android.learn.com",0);
        map.put("h5.learn.com",0);
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String line = value.toString();
        String[] str = line.split("\s+");
        try {
            if (str == null || str.length != 2) {
                return 0;
            } else {
                URL url = new URL(str[1]);
                String host = url.getHost();
                return map.getOrDefault(host,0);
            }
        }catch (Exception e) {
            e.printStackTrace();
        }
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

4 Kafka架构之道

4.1 Kafka相关属于介绍

broker,topic,producer,partition,consumer,consumergroup等等不作介绍

replica

​ 每一个分区,根据副本因子N,会有N个副本。比如broker1上有一个topic,分区为topic-1,副本因子为2,那么在两个broker的数据目录里,都都有一个topic-1,其中一个是leader,一个follower。

Segment

​ partition物理上由多个segment组成,每个Segment存着message信息。

Leader

​ 每个partition有多个副本,其中有且仅有一个Leader,Leader是当前负责数据的读写的partiton。

Follower

​ Follower跟随Leader,所有写请求都通过Leader路由,数据变更会广播给所有Follower,Follower于Leader保持数据同步。若Leader失效,则从Follower中选举出一个新的Leader。当Follower于Leader挂掉、卡住或者同步太慢,leader会把这个follower从“in sync replicas”(ISR)列表中删除,重新创建一个Follower。

Offset

​ Kafka的存储文件都是按照offset.log来命令,用offset作名字的好处是方便查找。存放于~/kafka/data/kafka-log/topic-name/

4.2 Kafka的架构

​ 通常,一个典型的kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU,Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用Pull模式从broker订阅并消费信息。注意:follower同步不是本机同步,而是跨节点同步。比如上图中的broker0-partition1为follower,它将同步的是broker1-partition1(leader),而不是本地(broker0)中的leader。

4.3 Kafka的分布式模型

​ kafka分布式主要是指分区被分布在多台server(broker)上,同时每个分区都有leader和follower(不是必须),leader负责处理,follower负责同步。leader和follower之间身份可互相转化,形成分布式模型。

​ kafka的分区日志(message)被分布在kafka集群的服务器上,每一个服务器处理数据和共享分区请求。每个服务器处理和共享分区请求。每个分区是被复制到一系列配置好的服务器上来进行容错。

​ 每个分区有一个server节点来作为leader和零个或者多个server接待你来作为followers。leader处理指定分区的所有读写请求,同时follower被动复制leader。若leader失败,followers中的一个将会自动变成一个新的leader。每个服务器都能作为分区的一个leader和作为其它分区的follower,因此kafka集群能被很好的平衡。kafka集群是一个去中心化的集群。

​ kafka消费的并行度就是kafka topic分区的个数,或者分区的个数决定了同一时间同一消费者组内最多可以有多少个消费者消费数据。

4.4 Kafka的文件存储

  • 在kafka集群中,分单个broker和多个broker。每个broker中有多个topic,topic数量可以自己设定。在每个topic中又有0到多个partition,每个partition为一个分区。kafka分区命名规则为topic的名称+有序序号,这个序号从0开始一次增加。

  • 每个partition中有多个segment file。创建分区时,默认会生成一个segment file,kafka默认每个segment file的大小是1G。当生产者往partition中存储数据时,内存中已满,就会向segment file里刷新。在存储数据时,会生成一个segment file,当这个segment file到1G之后,在生成第二个segment file,以此类推。每个segment file 对应两个文件,分别是以.log结尾的数据文件和以.index结尾的索引文件。在服务器上,每个partition是一个目录,每个segment是分区目录下的一个文件。

  • 每个segment file也有自己的命名规则,每个名字有20个字符,不够用0填充。每个名字从0开始命名,下一个segment file文件的名字就是,上一个segment file中最后一条消息的索引值。在.index文件中,存储的是key-value格式的,key代表在.log中按顺序开始的第n条消息,value代表该消息的位置偏移。但是在.index中不是对每条消息都做记录,它是每隔一些消息记录一次,避免占用太多内存。即使消息不在index记录中,在已有的记录中查找,范围也大大缩小了。.index中存放的消息索引是一个稀疏索引列表

  • Kafka中的消息是以topic进行分类的,生产者和消费者都是面向topic。topic是逻辑上的概念,而partition是物理上的概念,每个 partition对应于一个log文件,该log文件中存储的就是producer生产的数据。Producer生产的数据会不断追加到该log文件末端,且每条数据都有自己的offset。消费者组中的每个消费者,都会实时记录自己消费到了哪个offset,以便出错恢复时,从上次的位置继续消费。

  • offset由Consumer自身维护。在0.8版本之前,offset存储于zookeeper上;在0.9之后,offset存储在Kafka内置的topic中,这个topic是专门用来存储offset,无需我们创建,kafka会自动创建。offset每个分区都是独立的,每个消费者的每次消费只能消费一个分区。消费顺序:不能保证全局是有序的,但是可以保证分区内部是有序的。

4.5 topic中的partition

为什么要分区?

​ 可以想象,若一个topic就一个分区,要是这个分区有1T的数据,那么kafka就想把大文件划分到更多的目录来管理,即Kafka所谓的分区。

分区的好处:

  1. 方便在集群中扩展。因为一个topic由一个或者多个partition构成,而每个节点中通常可以存储多个partition,这样就方便分区存储与移动,也就增加其扩展性。同时也可以增加其topic的数量。

  2. 可以提高并发。因为一个topic多个partition,而每个主题读写数据时,其实就是读写不同的partition。

4.6 partition中文件存储

  • 每个分区一个目录,该目录中是一堆segment file(默认一个segment是1G),该目录和file都是物理存储与磁盘。
  • 每个partion(目录)相当于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每个段segment file 消息数量不一定相等,这种特性方便old segment file 快速被删除。
  • 每个partition 只需要支持顺序读写就行了,segment文件生命周期由服务端配置参数决定。由于使用顺序读写,使得Kafka虽然将数据存储于物理内存中,但其存取速度可媲美对内存的读写。(Kafka不支持随机读写)。
  • 这样做的好处就是能快速删除无用文件,有效提高磁盘利用率。

4.7 Kafka分区中的segment

​ 由于生产者生产的消息会不断追加到log文件末尾,为防止log文件过大导致数据定位效率低下,Kafka采取了分片和索引机制,将每个partition分为多个segment。每个segment对应两个文件(.index和.log)。这些文件位于一个文件夹下,该文件夹的命名规则为:topic名称+分区序号。例如:first这个topic有3个分区,则其对应的文件夹有first-0,first-1,first-2。index和log文件以当前segment的第一条消息的offset命名。

  • segment file组成

由2个部分组成,分别为index file 和 log file(即数据文件)。这两个文件一一对应,成对出现。它们分别表示segment索引文件、数据文件。

  • segment 文件命名规则

partition全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,20位数字字符长度,不够的左边用0填充。

4.8 Kafka中消息查找流程

举例

查找offset=23066的message,需要通过以下两个步骤完成:

第一步查找segment file:

00000000000000000000.index
00000000000000000000.log
00000000000000023060.index
00000000000000023060.log

​ 根据.index和.log物理结构对应关系图可知,其中00000000000000000000.index表示最开始文件,起始偏移量(offset)为0。第二个文件00000000000000023060.index的消息起始偏移量为23060。同样,其它后续文件以此类推,以起始偏移量命名并排序这些文件,只要根据offset二分查找文件列表,就可以快速定位到具体文件。

​ 当offset=23060时定位到00000000000000023060.index和log文件。

第二步:通过segment file 查找message:

​ 通过第一步定位到segment file ,当offset=23060时,依次定位到00000000000000023060.index的元数据物理位置和00000000000000023060.log的物理偏移地址,然后通过00000000000000023060.log顺序查找直到offset=23066为止。 若.index中的偏移量没有此offset,则先找到小于此offset的索引,然后通过二分查找找到确切位置。

​ segment index file 采取稀疏索引存储方式,即<偏移量、位置>,它减少索引文件大小,通过map可以直接内存操作,稀疏索引为数据文件的每个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来需要消耗更多的时间。

4.9 Consumer Group架构

consumer group是kafka提供的可扩展具有容错性的消费机制。既然是一个组,那么组内必然可以有多个消费者或者消费者实例(cnsumer instance),它们共享一个公共的id,即group id。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。当然,每个分区只能由同一个消费组内的一个consumer来消费。3个特性:

  • consumer group 下可以有一个或多个consumer instance,consumer instance 可以是一个进程,也可以是一个线程。
  • group.id 是一个字符串,唯一标识一个consumer group
  • consumer group 下订阅的topic 下的每个分区只能分配给某个group 下的一个consumer(当然该分区还可以被分配给其它group ),即组合组之间的消费者不受彼此影响。

4.10 offset的维护

​ 由于consumer在消费过程中可能会出现断电宕机等故障,consumer恢复后,需要从故障前的位置继续消费,所以consuemr需要实时记录自己消费到了哪个offset,以便故障恢复后继续消费。

​ Kafka默认是定期自动提交位移的(enable.auto.commit=true),当然也可以手动提交位移实现自己的控制。另外Kafka会定期把group 消费情况保存起来,做成一个offset map。

两种提交方式详解:

  • 自动提交。设置enable.auto.commit=true,更新的频率根据参数auto.commit.interval.ms来定。这种方式也成为at most once,fetch到消息后就可以更新offset,无论是否消费成功。默认就是true。
  • 手动提交。设置enable.auto.commit=false,这种方式称为at least once。fetch到消息后,等消费完成再调用方法consumer.commitSync(),手动更新offset;若消费失败,则offset也不会更新,此条消息会被重复消费一次。

4.11 Kafka中Push 和 Pull

​ 一个较早的问题是我们应该考虑消费者从broker中Pull数据还是broker将数据push给消费者。kafka遵守传统设计和借鉴很多消息系统,这儿kafka选择producer向broker去push消息,并由consumer从broker pull消息。一些ogging-centric system,例如Facebook的Scribe和Cloudera的Flume,采用不同的push模式。事实上,push模式和pull模式各有优劣。

  • push模式很难适用消费速率不同的消费者,因为消息发送速率是由broker决定的。push模式的目标是尽可能以最快速度传递消息,但是这样很容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则可以根据consumer的消费能力以适当的速率消费消息。
  • pull模式的不足之处是,若kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka的消费者再消费数据时会传入一个时长参数timeout,若当前没有数据可供消费,consumer会等待一段时间之后返回,这段时间为timeout。(timeout官方案例为100毫秒)

4.12 Kafka中数据发送保障

​ 为保证producer发送的数据,能可靠的发送到指定的topic,topic的每个partition收到producer发送的数据后,都需要向producer发送ack(acknowledgement确认收到),若producer收到ack,就会进行下一轮的发送,否则重新发送数据。

  • 何时发送ack?

确保有follower与leader同步完成,leader再发送ack,这样才能保证leader挂掉之后,能在follower中选举出新的leader。

  • 多少个follower同步完成之后发送ack?
  1. 半数以上的follower同步完成,即可发送ack
  2. 全部的follower同步完成,即可发送ack

副本数据同步策略:

方案 优点 缺点
半数以上完成同步,就发送ack 延迟低 选举新的leader时,容忍n台节点的故障,需要2n+1个副本
全部完成同步,才发送ack 选举新的leader时,容忍n台节点的故障,需要n+1个副本 延迟高

kafka选择了第二种方案,理由如下:

  1. 同样为了容忍n台节点的故障,第一张方案需要2n+1个副本,而第二种方案只需要n+1个副本,而Kafka的每个分区都有大量的数据,第一张方案会造成大量数据的冗余。
  2. 虽然第二种方案的网络延迟较高,但网络延迟对Kafka的影响较小
  • ISR

采用第二种方案之后,设想以下情景:leader收到数据,所有follower都开始同步数据,但有一个follower因为某种故障,迟迟不能与leader进行同步,那么leader就要一直等下去,直到它完成同步,才能发送ack。如何解决此问题?

答:Leader维护了一个动态的in-sync replica set(ISR),意为和leader保持同步的follower集合。当ISR中的follower完成数据的同步之后,leader就会给follower发送ack。若follower长时间未向leader同步数据,则该follower将被踢出ISR,该事件阈值由replica.lag.time.max.ms 参数设定。Leader发生故障之后,就会从ISR中选举新的leader。

注:

  • 生产者发送到特定主题分区的消息是按照发送的顺序来追加。也就是说,若消息M1和消息M2由相同的生产者发送,并且M1是先发送的,那么M1的偏移量将比M2低,并出现在日志的前面。
  • 消费者是按照存储在日志中记录顺序来查询消息。
  • 对于具有n个副本的主题,我们将容忍最多N-1个服务器失败故障,从而不会丢失提交到日志的任何消息记录。

4.13 ack应答机制

对于某些不太重要的数据,对数据的可靠性要求不是很高,能够容忍数据的少量丢失,所以没必要等ISR中的follower全部接收成功。

所以Kafka为用户提供了三种可靠性级别,用户根据可靠性和延迟的要求进行权衡,选择以下的配置。

ack可以在kafka中进行配置acks参数配置

ack 说明
acks=0 producer不等待broker的ack,这样的操作是一种低延迟的操作,broker一接收到但还没写出成功就返回,当broker故障的时候可能会出现数据丢失(相当于异步发送)
acks=1 producer等待broker的ack,partition的leader将数据落地磁盘后返回ack,若在follower同步成功之前leader故障,此时数据会丢失。(此时Follower需要去同步消息,但是leader已宕机,Kafka集群就要重新进行leader选举,但原leader数据没有同步,所以会出现数据丢失问题)
acks=-1(all) producer等待broker的ack,partition的leader和follower完成全部的数据同步才返回ack(即会触发ISR),但若在follower同步完成之后,broker发送ack之前,leader宕机,会重复数据。
  • 故障处理细节

LEO(Log End Offset)每一个副本最后一个offset的位置,这里可以发现每天一个副本的LEO不一样,是因为每个副本的同步速度都不同。

HW(High watermark)即水位,leader中的HW是所有副本中最小的LEO。

这里可以看到HW后的数据还没有进行完全同步,即说明HW后的位置是可以变动的,因为还没有提交,所以在HW之前的数据对Consuemer可见,之后的数据Consumer是不可见的,并且Consumer只能消费HW之前的数据。

  1. follower故障

    follower发生故障后会被临时踢出ISR,待该follower恢复后,follower会读取本地磁盘记录的上次的HW,并将log文件高于HW的部分截取掉,从HW开始向leader进行同步。等待follower的LEO大于等于该Partition的HW,即follower追上leader之后,就可以重新加入ISR了。

  2. leader故障

    leader发生故障之后,会从ISR中选出一个新的leader,之后,为保证多个副本之间的数据一致性,其余的follower会先将各自的log文件高于HW的部门截取掉,然后从新的leader同步数据。

注意:这只能保证副本间数据的一致性,并不能保证数据不丢失或者不重复

4.14 Exactly Once(一次正好)语义

对于某些重要的消息,需要保证exactly once语义,即保证每条消息被发送且仅被发送一次。

在0.11版本之后,Kafka引入了幂等性机制(idempotent),配合acks=-1时的at least once(最少一次)语义,实现了producer到broker的exactly once 语义。

使用时,只需将enable.idempotence属性设置为true(在生产者位置),kafka自动将acks属性设置为-1。

  • 何为幂等性

简单的说1的几次幂都等于1,也就是说一条消息无论发送几次都只算一次,无论多少条消息但只实例化一次。

Kafka完成幂等性其实就是给消息添加了唯一的id,这个id的组成是PID(ProducerID)这样保证每一个Producer发送的时候是唯一的,还会为Producer中每条消息添加一个消息ID,即当前Producer中生产的消息会加入Producer的ID和消息的ID,这样就能保证消息唯一性了,这个消息发送到Kafka中的时候暂时缓存ID,写入数据后没有收到ack,那么会重新发送这个消息,新消息过来的时候会和缓存中ID进行比较,若发现已经存在就不会再次接受了。

  • 详细解析

​ 为实现producer的幂等性,Kafka引入了Producer ID(即PID)和Sequence Number。

​ PID:每个新的producer在初始化的收会被分配一个新的pid,这个pid对用户是不可见的。

​ Sequence Number。对于每个PID,该Producer发送数据的每个<Topic,Partition>都对应一个从0开始单调递增的Sequence Number。

​ Kafka可能存在多个生产者,会同时产生消息,但对Kafka来说,只需要保证每个生产者内部的消息幂等就可以了,所以引入了PID来标识不同的生产者。

​ 对于Kafka来说,要解决的是生产者发送消息的幂等问题。即需要区分每条消息是否重复。

​ Kafka通过为每条消息增加一个Sequence Number,通过Sequence Number来区分每条消息。每条消息对应一个分区,不同的分区产生的消息不可能重复。所有Sequence Number对应每个分区。

​ Broker端在缓存中保存了这seq number,对于接受的每条消息,若其序号比Broker缓存中序号大于1则接收它,否则将其丢弃。这样就可以实现消息重复提交了。但是,只能保证单个Producer对于同一个<Topic,Partition>的Exactly Once语义。不能保证同一个Producer一个topic不同的partition幂等。

4.15 Zookeeper在Kafka中的作用

​ Kafka集群中有一个broker会被选举为Controller,负责管理集群broker的上下线,所有topic的分区副本分配和leader选举等工作。

  • broker启动时会向Zookeeper注册一个临时节点/controller,此时谁先到就是controller leader。

  • broker还会向Zookeeper注册一个/brokers/ids/0,1,2;kafkaController会监听/brokers/ids/0,1,2

  • 当创建topic的时候会向Zookeeper中注册/brokers/topics/first/partitions/0/state

  • 节点中的数据是“leader”:0,“isr”:[0,1,2];这里的0对应的是brokere的序号leader诞生

  • 当leader宕机,因kafkaController监控着/brokers/ids/0,1,2节点并更改信息为[1,2]

  • KafkaController有MetadataCache信息,所以知道谁是leader,此时会向"leader":0,"isr":[0,1,2]获取ISR进行重新选举leader,选举后KafkaController更新leader及ISR。

    ​ 只有KafkaController Leader会向zookeeper上注册Watcher,其它broker几乎不用监听zookeeper的状态变化。

    ​ Kafka集群中多个broker,有一个会被选举为controller leader(谁先到就是谁),负责管理整个集群中分区和副本的状态,比如partition的leader副本故障,由controller负责为该partition重新选举新的leader副本;当检测到ISR列表发生变化,有controller通知集群中所有broker更新其MetadataCache信息;或者增加某个topic 分区的时候也会由controller管理分区的重新分配工作。

    ​ 当broker启动的时候,都会创建KafkaController对象,但是集群中只能有一个leader对外提供服务,这些每个节点上的KafkaController会在指定的zookeeper路径下创建临时节点,只有第一个成功创建的节点的KafkaController才可以称为leader,其余的都是follower。当leader发生故障后,所有的follower会收到通知,再次竞争在该路径下创建节点从而选举新的leader。

6 Kafka的Log

6.1 kafka的log

6.1.1 日志结构

6.1.2 Kafka的log写

日志允许序列附加,总是附加到最后一个文件。当该文件达到可配置的大小(比如1GB)时,就会将其刷新到一个新文件。日志采用两个配置参数:M和S。前者给出在强制OS将问价刷新到磁盘之前要写入的消息数量(条数),后者给出多少秒之后被强制刷新。这提供了一个持久性保证,在系统崩溃的情况下最多丢失M条消息或S秒的数据。

6.1.3 Kafka的log读

  1. 读取的实际过程是:首先根据offset 去定位数据文件中log segment文件,然后从全局的offset值中计算指定文件offset,然后从指定文件offset 读取消息。查找使用的是二分查找(基于快排对segment文件名进行排序),每一个文件的范围都被维护到内存中。
  2. 读取是通过提供消息的64位逻辑偏移量(8字节的offset)和s字节的最大块大小来完成。
  3. 读取将返回一个迭代器包含有s字节的缓冲区,缓冲区中含有消息。S字节应该比任何单个消息都大,但是在出现异常大的消息时,可以多次重试读取,每次都将缓冲区大小加倍,知道成功读取消息为止。
  4. 可以指定最大的消息和缓冲区大小,以使服务器拒绝的消息大于某个大小,并为客户机提供其获得完整消息所需的最大读取量。

6.1.4 Kafka的log的删除

通过设置参数log.retention.hours=xxx即可

6.1.5 Kafka的log保障

  1. 日志提供了一个配置参数M,该参数控制在强制刷新磁盘之前写入的消息的最大数量(M条)。
  2. 启动日志恢复去处理最近消息在总消息中是否有效,使用crc32来校验,若消息长度和offset总和小于文件长度且crc32和存储的消息能匹配上,则表示有效。

请注意:

必须处理两种类型的损坏:中断(由于崩溃而丢失未写的块)和损坏(向文件添加无意义块)。

原文地址:https://www.cnblogs.com/kongieg/p/13448876.html