kafka学习

Kafka概述

Kafka是分布式(点对点模式)(发布-订阅模式)消息系统,Scala写成, 它主要用于处理流式数据。本质是基于消息队列缓存数据.

Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)称为broker。组件依赖于zookeeper集群保存一些meta信息

为什么要用kafka,为什么要用消息队列

1)解耦:
2)冗余:
3)扩展性:
4)灵活性 & 峰值处理能力:
5)可恢复性:
6)顺序保证:
7)缓冲:
8)异步通信:

kafka构架

1)Producer :消息生产者,就是向kafka broker发消息的客户端;
2)Consumer :消息消费者,向kafka broker取消息的客户端;
3)Topic :可以理解为一个队列(就是同一个业务的数据放在一个topic下);
4) Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。
一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。
如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic;
5)Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic; 6)Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。
partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序;
7)Offset:偏移量。

kafka分区

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成
原因: (1) 方便在集群中扩展 (2)提高并发 因为可以以partition来进行读写

分区有多副本机制 可以通过选举leader来提高容

kafka中的broker 是干什么的

broker 是消息的代理,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉取指定Topic的消息,然后进行业务处理,broker在中间起到一个代理保存消息的中转站。

为什么Kafka不支持读写分离?

(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。
某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,
应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。 (
2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,
整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节
点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用

kafka中consumer group 是什么概念

是Kafka实现单播和广播两种消息模型的手段。同一个topic的数据,会广播给不同的group;
同一个group中的worker,只有一个worker能拿到这个数据。换句话说,对于同一个topic,每个group都可以拿到同样的所有数据,但是数据进入group后只能被其中的一个worker消费。
group内的worker可以使用多线程或多进程来实现,也可以将进程分散在多台机器上,worker的数量通常不超过partition的数量,且二者最好保持整数倍关系,
因为Kafka在设计时假定了一个partition只能被一个worker消费(同一group内)。

kafka命令行操作

修改配置文件
#broker的全局唯一编号,不能重复
broker.id=1
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的线程数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka运行日志存放的路径    
log.dirs=/opt/module/kafka/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop101:2181,hadoop102:2181,hadoop103:2181


分别在hadoop102和hadoop103上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=2、broker.id=3

启动集群kafka-server-start.sh -daemon server.properties  所有节点都要

关闭集群kafka-server-stop.sh stop

查看当前服务器中的所有topic kafka-topics.sh --zookeeper cc1:2181 --list
创建topic    kafka-topics.sh --zookeeper cc1:2181 
--create --replication-factor 3 --partitions 1 --topic first
删除topic  kafka-topics.sh --zookeeper hadoop101:2181 
--delete --topic first
消费消息 kafka-console-consumer.sh 
--bootstrap-server cc1:9092 --from-beginning --topic first

kafka存储策略

无论消息是否被消费,kafka都会保留所有消息。有两种策略可以删除旧数据:
1)基于时间:log.retention.hours=168
2)基于大小:log.retention.bytes=1073741824

kafka follower如何与leader同步数据 写流程

1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
2)producer将消息发送给该leader
3)leader将消息写入本地log
4)followers从leader pull消息,写入本地log后向leader发送ACK
5)leader收到所有ISR中的replication的ACK后向producer发送ACK
注:leader会维护一个与其一定程度保持同步的Replica列表,该列表称为ISR(in-sync Replica)。所说的“一定程度”是指可以忍受的滞后范围,这个范围可以通过参数replica.lag.time.max.ms(默认10秒)进行配置。与leader副本同步滞后过多的副本(不包括leader)副本,组成OSR(Out-Sync Relipcas)。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步。

flume和kafka集成

1)配置flume(flume-kafka.conf)
# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F  /opt/module/datas/flume.log
a1.sources.r1.shell = /bin/bash -c

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop101:9092,hadoop102:9092,hadoop103:9092
a1.sinks.k1.kafka.topic = second
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = -1


# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2) 启动kafkaIDEA消费者
3) 进入flume根目录下,启动flume
$ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf
4) 向 /opt/module/datas/flume.log里追加数据,查看kafka消费者消费情况
$ echo hello > /opt/module/datas/flume.log

kafka中的 zookeeper 起到什么作用,可以不用zookeeper么

zookeeper 是一个分布式的协调组件,早期版本的kafka用zk做meta信息存储,consumer的消费状态,group的管理以及 offset的值。考虑到zk本身的一些因素以及整个架构较大概率存在单点问题,新版本中逐渐弱化了zookeeper的作用。新的consumer使用了kafka内部的group coordination协议,也减少了对zookeeper的依赖,

但是broker依然依赖于ZK,zookeeper 在kafka中还用来选举controller 和 检测broker是否存活等等。
我凝视这恒星,等待这那场风暴,我已经准备好了
原文地址:https://www.cnblogs.com/cheng5350/p/11877758.html