kafka笔记

kafka

kafka是什么?

  kafka  是一个分布式消息发布订阅系统,可以处理大量数据,常用在线消息消费,数据可以保存到磁盘里,当消费者去取数据中途发生断开,kafka会做断点,连接成功继续读取,减少数据丢失和冗余

kafka架构

  Broker
    Kafka集群包含一个或多个服务器,这种服务器被称为broker
  Topic
    每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不同Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic即可生产或消费数据而不必关心数据存于何处)
  Partition
    Parition是物理上的概念,每个Topic包含一个或多个Partition.
  Producer
    负责发布消息到Kafka broker
  Consumer
    消息消费者,向Kafka broker读取消息的客户端。
  Consumer Group
    每个Consumer属于一个特定的Consumer Group(可为每个Consumer指定group name,若不指定group name则属于默认的group)

配置文件

  Vi server.properies

  Port 9091默认端口号

  Broker.id =0  # ID自己定义,唯一

  #host.name=。。。。 #接收的ip,注释掉接收所有ip

  Num.network.threads=3 #kafka 处理网络请求线程数 一般情况下就是cpu的颗粒数

  num.io.threads=8   处理io请求线程数

  socket.send/receive.buffer.bytes=102400  发送接收缓存的大小

  num.partitions=1 默认一个分区  不用改

  Num.partitions=1    #分区

  Zookeeper.connect=xxxxx:xxx,yyyy:yyy,zzzz:zzz  #zookeeper连接ip

  zookeeper.connection.timeout.ms=1000000      连接最大超时时间

启动:

  kafka自带一个zk,但是我们还有用自己的,不用它的

  先起zookeeper(配置zookeeper请见zookeeper的基本配置

  bin/kafka-server-start.sh config/server .properties  #不能后台

  nohup bin/kafka-server-start.sh config/server.properties &

  内存不足可能起不来ps -ef |grep kafka 查看进程  或者看日志

  kafka-console-consumer.sh
  这个命令只是简单的将消息输出到标准输出中,通常使用该命令来获取kafka的topic中的数据。
  使用方法:kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
  kafka-console-producer.sh
  这个命令可以将文件或标准输入的内容发送到Kafka集群。
  使用方法:bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

kafka-topics.sh
描述topic的配置
  bin/kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name test_topic
设置保留时间
  如果您需要删除主题中的所有消息,则可以利用保留时间。先将保留时间设置为非常低(1000 ms),等待几秒钟,然后将保留时间恢复为上一个值。默认保留时间为24小时(86400000毫秒)。
  bin/kafka-configs.sh --zookeeper localhost:2181 --alter --entity-type topics --entity-name test_topic --add-config retention.ms=1000
删除topic
  bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic

kafka-topics.sh
topic信息
  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic
添加topic
  bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出topic
  bin/kafka-topics.sh --list --zookeeper localhost:2181

连接测试:生产者消费者演示:3个zookeeper分别对应下面端口个位的1,2,3

生产者:

  bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

消费者:

  bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --topic test --from-beginning

总结:作为9092端口代表的leader,可以向3个生产者发送信息,生产者一样可以向leader发送信息,但是不能发送给其他两个follower,当leader宕机或者挂掉,默认有新的leader启动,但是节点没有挂到,而是让其他两个follower的新leader去跑,我kill掉zookeeper2,发现消费者2182报错,其他两个消费者可继续发消息而发送9093成为leader,生产者依然是9092可以使用,但是我后台已经把他kill掉了,这就是zookeeper,此时杀掉,所有服务器个数少于一半zookeeper失效。在其他kafka扮演着消息列的离线收集,多集群,高稳定,低延迟的分布式消息系统

地区性:一般传json格式,生产者发送上海,接收者判断json值是上海还是北京,如果是上海接收,如果是北京不接受

版权声明:本文原创发表于 博客园,作者为 RainBol 本文欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则视为侵权。

原文地址:https://www.cnblogs.com/RainBol/p/9486108.html