1)包的选择
confluent-kafka-go使用了rdkafka的c库,破坏了go代码的收敛,不使用;
sarama不支持groud id 的功能,写consumer需要自己管理消费的partition,offset;很难用;
sarama-cluster是对sarama的一层封装,实现了group id 功能
2)关于offset问题
sarama-cluster有auto commit的功能,默认是一秒;但最好自己管理,如每100条数据MarkOffset,并CommitOffsets
3)实现consumer的Priority MQ功能
如1-5优先级的5个Topic,传入
map[string]int32 {
topic1: 1,
topic2: 2,
....
}
按Priority生成排序的consumerList,for循环遍历consume,<-consumer.Messages(),select之并设置default分支
4)producer
producer使用的AsyncProducer的对象池;测试:本机1K以上message大小,producer池可提升效率,原因是I/O时间长,单一Producer发送效率受限;小message(10byte),单个producer发送效率要高,瓶颈在producer池的频繁Get与Put
5)网络问题时,consumer会自动重连;
https://github.com/Shopify/sarama/issues/72
6)接收producer的Errors() chan一定要用for _, err := range producer.Errors();勿用for{}否则producer意外关闭,这里会死循环;
for {
err := <-producer.Errors() // 错误示例;若producer意外关掉,此外err一直返回nil,跑满CPU
if err != nil {
// print log
}
}