一些Go操作Kafka的问题

1)包的选择

confluent-kafka-go使用了rdkafka的c库,破坏了go代码的收敛,不使用;

sarama不支持groud id 的功能,写consumer需要自己管理消费的partition,offset;很难用;

sarama-cluster是对sarama的一层封装,实现了group id 功能

2)关于offset问题

sarama-cluster有auto commit的功能,默认是一秒;但最好自己管理,如每100条数据MarkOffset,并CommitOffsets

3)实现consumer的Priority MQ功能

如1-5优先级的5个Topic,传入

map[string]int32 {

  topic1: 1,

  topic2: 2,

  ....

}

按Priority生成排序的consumerList,for循环遍历consume,<-consumer.Messages(),select之并设置default分支

4)producer

producer使用的AsyncProducer的对象池;测试:本机1K以上message大小,producer池可提升效率,原因是I/O时间长,单一Producer发送效率受限;小message(10byte),单个producer发送效率要高,瓶颈在producer池的频繁Get与Put

5)网络问题时,consumer会自动重连;

https://github.com/Shopify/sarama/issues/72

6)接收producer的Errors() chan一定要用for _, err := range producer.Errors();勿用for{}否则producer意外关闭,这里会死循环;

for {

  err := <-producer.Errors()  // 错误示例;若producer意外关掉,此外err一直返回nil,跑满CPU

  if err != nil {

    // print log

  }

}

原文地址:https://www.cnblogs.com/gm-201705/p/7944362.html