Kafka核心技术与实战——12 | 客户端都有哪些不常见但是很高级的功能?

    • 什么是拦截器?
      • 其基本思想就是允许应用程序在不修改逻辑的情况下,动态地实现一组可插拔的事件处理逻辑链
      • 它能够在主业务操作的前后多个时间点上插入对应的“拦截”逻辑
      • 这些功能都是以配置拦截器类的方式动态插入到应用程序中的,故可以快速地切换不同的拦截器而不影响主程序逻辑
      • Kafka 拦截器借鉴了这样的设计思路。你可以在消息处理的前后多个时点动态植入不同的处理逻辑,比如在消息发送前或者在消息被消费后
    • Kafka 拦截器
      • Kafka 拦截器分为生产者拦截器和消费者拦截器
        • 生产者拦截器允许你在发送消息前以及消息提交成功后植入你的拦截器逻辑
        • 而消费者拦截器支持在消费消息前以及提交位移后编写特定逻辑
        • 值得一提的是,这两种拦截器都支持链的方式,即你可以将一组拦截器串连成一个大的拦截器,Kafka 会按照添加顺序依次执行拦截器逻辑
      • 例子
        • 假设你想在生产消息前执行两个“前置动作”:
        • 第一个是为消息增加一个头信息,封装发送该消息的时间
        • 第二个是更新发送消息数字段
        • 那么当你将这两个拦截器串联在一起统一指定给 Producer 后,Producer 会按顺序执行上面的动作,然后再发送消息
      • 当前 Kafka 拦截器的设置方法是通过参数配置完成的。生产者和消费者两端有一个相同的参数,名字叫 interceptor.classes
      • 生产者拦截器
        • onSend:该方法会在消息发送之前被调用
        • onAcknowledgement:该方法会在消息成功提交或发送失败之后被调用
      • 消费者拦截器
        • onConsume:该方法在消息返回给 Consumer 程序之前调用
        • onCommit:Consumer 在提交位移之后调用该方法
    • 典型使用场景
      • Kafka 拦截器可以应用于包括客户端监控、端到端系统性能检测、消息审计等多种功能在内的场景
      • 客户端监控,系统性能检测
        • 通过实现拦截器的逻辑以及可插拔的机制,我们能够快速地观测、验证以及监控集群间的客户端性能指标,特别是能够从具体的消息层面上去收集这些数据
      • 消息审计
        • 你肯定要能够随时查看每条消息是哪个业务方在什么时间发布的,之后又被哪些业务方在什么时刻消费。一个可行的做法就是你编写一个拦截器类,实现相应的消息审计逻辑,然后强行规定所有接入你的 Kafka 服务的客户端程序必须设置该拦截器
    • 案例分享
      • 编写拦截器类来统计消息端到端处理的延时
        • 想知道该业务消息从被生产出来到最后被消费的平均总时长是多少
        • 既然是要计算总延时,那么一定要有个公共的地方来保存它,并且这个公共的地方还是要让生产者和消费者程序都能访问的。在这个例子中,我们假设数据被保存在 Redis 中
      • 生产者拦截器
        public class AvgLatencyProducerInterceptor implements ProducerInterceptor<String, String> { private Jedis jedis; // 省略 Jedis 初始化@Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { jedis.incr("totalSentMessage"); return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { }@Override public void close() { }@Override public void configure(Map<java.lang.String, ?> configs) { }
        • 代码比较关键的是在发送消息前更新总的已发送消息数
      • 消费者拦截器
        public class AvgLatencyConsumerInterceptor implements ConsumerInterceptor<String, String> { private Jedis jedis; // 省略 Jedis 初始化@Override public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) { long lantency = 0L; for (ConsumerRecord<String, String> record : records) { lantency += (System.currentTimeMillis() - record.timestamp()); } jedis.incrBy("totalLatency", lantency); long totalLatency = Long.parseLong(jedis.get("totalLatency")); long totalSentMsgs = Long.parseLong(jedis.get("totalSentMessage")); jedis.set("avgLatency", String.valueOf(totalLatency / totalSentMsgs)); return records; } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) { } @Overridepublic void close() { } @Override public void configure(Map<String, ?> configs) {
        • 我们在真正消费一批消息前首先更新了它们的总延时,方法就是用当前的时钟时间减去封装在消息中的创建时间,然后累计得到这批消息总的端到端处理延时并更新到 Redis 中。之后的逻辑就很简单了,我们分别从 Redis 中读取更新过的总延时和总消息数,两者相除即得到端到端消息的平均处理延时
      • 创建好生产者和消费者拦截器后,我们按照上面指定的方法分别将它们配置到各自的 Producer 和 Consumer 程序中,这样就能计算消息从 Producer 端到 Consumer 端平均的处理延时了。这种端到端的指标监控能够从全局角度俯察和审视业务运行情况,及时查看业务是否满足端到端的 SLA 目标
    • 小结
      • 今天我们花了一些时间讨论 Kafka 提供的冷门功能:拦截器
      • 我们可以利用拦截器满足实际的需求,比如端到端系统性能检测、消息审计等
原文地址:https://www.cnblogs.com/minimalist/p/12880601.html