kafka笔记

Kafka的特性

  1. 高吞吐量、低延迟:kafka每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个topic(话题)可以分多个partition(分区), consumer group(消费组) 对partition进行consume(消费)操作。
  2. 可扩展性:kafka集群支持热扩展
  3. 持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
  4. 容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)
  5. 高并发:支持数千个客户端同时读写

 

Kafka是一个分布式消息队列:生产者、消费者的功能。它提供了类似于JMS的特性,但是在设计实现上完全不同

kafka基本原理:

生产者将数据生产出来,交给 broker 进行存储,消费者需要消费数据了,就从broker中去拿出数据来,然后完成一系列对数据的处理操作。

这里写图片描述

多个 broker 协同合作,producer 和 consumer 部署在各个业务逻辑中被频繁的调用,三者通过 zookeeper管理协调请求和转发。这样一个高性能的分布式消息发布订阅系统就完成了。

消息系统的核心作用就是三点:解耦,异步和缓冲

 

 

Kafka核心组件

  • Topic :消息根据Topic进行归类
  • Producer:发送消息者
  • Consumer:消息接受者
  • broker:每个kafka实例(server)
  • Zookeeper:依赖集群保存meta信息。

Kafka的使用场景

  • 日志收集:Kafka可以收集各种日志文件,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 消息系统:解耦和生产者和消费者、缓存消息等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 运营指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 流式处理:比如spark streaming和storm
  • 事件源

kafka消费者的三种消费语义

  • At most once 消息可能会丢,但绝不会重复传输
  • At least one 消息绝不会丢,但可能会重复传输
  • Exactly once 每条消息肯定会被传输一次且仅传输一次

Kafka数据丢失问题

 

1)使用同步模式的时候,有3种状态保证消息被安全生产,在配置为1(只保证写入leader成功)的话,如果刚好leader partition挂了,数据就会丢失。

producer.type=sync

request.required.acks=1

2)还有一种情况可能会丢失消息,就是使用异步模式的时候,当缓冲区满了,如果配置为0(还没有收到确认的情况下,缓冲池一满,就清空缓冲池里的消息), 数据就会被立即丢弃掉。

producer.type=async

request.required.acks=1

batch.mun.messages=100//在异步模式下,一个Batch发送的消息数量,product会等待直到发送的消息数量达到这个值,之后才会发送,该配置默认值200,但是如果数量不够,达到queue.buffer.max.ms时也会直接发送

queue.buffering.max.ms=100//使用异步模式数据缓冲最大的时间,换句话说该配置会每隔100ms发送数据,这个会相应的提高吞吐量,但是会增加消息达到的时间,默认值为5000ms

queue.buffering.max.messages=100//发送队列缓冲长度,默认值10000条数据

queue.enqueue.timeout.ms=10000//当消息在达到queue.buffering.max.messages配置时候,会阻塞一定时间后,如果队列任然没有enqueue(producter然仍没有发送任何消息),此时producer可以继续阻塞或者将消息抛出,timeout值用于控制阻塞的时间,如果值为-1(默认值)则 无阻塞超时限制,消息不会被抛弃;如果值为0 则立即清空队列,消息被抛弃。

而对于配置Product Configs最重要4个参数

  • metadata.broker.list
  • request.required.acks
  • producer.type
  • serializer.class

############################# Producer Basics #############################

# list of brokers used for bootstrapping knowledge about the rest of the cluster

# format: host1:port1,host2:port2 ...

metadata.broker.list=localhost:9092

# name of the partitioner class for partitioning events; default partition spreads data randomly

#partitioner.class=

# specifies whether the messages are sent asynchronously (async) or synchronously (sync)

producer.type=sync//支持设置sync同步和异步async

request.required.acks=0

//默认值为0,该值延迟性最低但持久性保证最低,一旦服务器宕机数据有丢失的风险,设置该值后,product不会等待brocker返回的ack,leader已死,但producer并不知情,发出去的信息broker就收不到)。

//1:这意味着producer在leader已成功收到的数据并得到确认后发送下一条message。此选项提供了更好的耐久性为客户等待服务器确认请求成功(被写入死亡leader但尚未复制将失去了唯一的消息)。

//-1:这意味着producer在follower副本确认接收到数据后才算一次发送完成。  此选项提供最好的耐久性,我们保证没有信息将丢失,只要至少一个同步副本保持存活。 三种机制,性能依次递减 (producer吞吐量降低),数据健壮性则依次递增。

# specify the compression codec for all data generated: none, gzip, snappy, lz4.

# the old config values work as well: 0, 1, 2, 3 for none, gzip, snappy, lz4, respectively

compression.codec=none

# message encoder

serializer.class=kafka.serializer.DefaultEncoder

# allow topic level compression

#compressed.topics=

原文地址:https://www.cnblogs.com/dummyly/p/10041779.html