c语言使用librdkafka库实现kafka的生产和消费实例(转)

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的生产、消费

一、producer

librdkafka进行kafka生产操作的大致步骤如下:

1、创建kafka配置

[cpp] view plain copy
 
  1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

2、配置kafka各项参数

[cpp] view plain copy
 
  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
  2.                                        const char *name,  
  3.                                        const char *value,  
  4.                                        char *errstr, size_t errstr_size)  

3、设置发送回调函数

[cpp] view plain copy
 
  1. void rd_kafka_conf_set_dr_msg_cb (rd_kafka_conf_t *conf,  
  2.                                   void (*dr_msg_cb) (rd_kafka_t *rk,  
  3.                                   const rd_kafka_message_t *  
  4.                                   rkmessage,  
  5.                                   void *opaque))  

4、创建producer实例

[cpp] view plain copy
 
  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf,char *errstr, size_t errstr_size)  

5、实例化topic

[cpp] view plain copy
 
  1. rd_kafka_topic_t *rd_kafka_topic_new (rd_kafka_t *rk, const char *topic, rd_kafka_topic_conf_t *conf)  

6、异步调用将消息发送到指定的topic

[cpp] view plain copy
 
  1. int rd_kafka_produce (rd_kafka_topic_t *rkt, int32_t partition,  
  2.               int msgflags,  
  3.               void *payload, size_t len,  
  4.               const void *key, size_t keylen,  
  5.               void *msg_opaque)  

7、阻塞等待消息发送完成

[cpp] view plain copy
 
  1. int rd_kafka_poll (rd_kafka_t *rk, int timeout_ms)  

8、等待完成producer请求完成

[cpp] view plain copy
 
  1. rd_kafka_resp_err_t rd_kafka_flush (rd_kafka_t *rk, int timeout_ms)  

9、销毁topic

[cpp] view plain copy
 
  1. void rd_kafka_topic_destroy (rd_kafka_topic_t *app_rkt)  

10、销毁producer实例

[cpp] view plain copy
 
  1. void rd_kafka_destroy (rd_kafka_t *rk)  


完整代码如下my_producer.c:

[cpp] view plain copy
 
  1. #include <stdio.h>  
  2. #include <signal.h>  
  3. #include <string.h>  
  4.   
  5. #include "../src/rdkafka.h"  
  6.   
  7. static int run = 1;  
  8.   
  9. static void stop(int sig){  
  10.     run = 0;  
  11.     fclose(stdin);  
  12. }  
  13.   
  14. /* 
  15.     每条消息调用一次该回调函数,说明消息是传递成功(rkmessage->err == RD_KAFKA_RESP_ERR_NO_ERROR) 
  16.     还是传递失败(rkmessage->err != RD_KAFKA_RESP_ERR_NO_ERROR) 
  17.     该回调函数由rd_kafka_poll()触发,在应用程序的线程上执行 
  18.  */  
  19. static void dr_msg_cb(rd_kafka_t *rk,  
  20.                       const rd_kafka_message_t *rkmessage, void *opaque){  
  21.         if(rkmessage->err)  
  22.             fprintf(stderr, "%% Message delivery failed: %s ",   
  23.                     rd_kafka_err2str(rkmessage->err));  
  24.         else  
  25.             fprintf(stderr,  
  26.                         "%% Message delivered (%zd bytes, "  
  27.                         "partition %"PRId32") ",  
  28.                         rkmessage->len, rkmessage->partition);  
  29.         /* rkmessage被librdkafka自动销毁*/  
  30. }  
  31.   
  32. int main(int argc, char **argv){  
  33.     rd_kafka_t *rk;            /*Producer instance handle*/  
  34.     rd_kafka_topic_t *rkt;     /*topic对象*/  
  35.     rd_kafka_conf_t *conf;     /*临时配置对象*/  
  36.     char errstr[512];            
  37.     char buf[512];               
  38.     const char *brokers;         
  39.     const char *topic;           
  40.   
  41.     if(argc != 3){  
  42.         fprintf(stderr, "%% Usage: %s <broker> <topic> ", argv[0]);  
  43.         return 1;  
  44.     }  
  45.   
  46.     brokers = argv[1];  
  47.     topic = argv[2];  
  48.   
  49.     /* 创建一个kafka配置占位 */  
  50.     conf = rd_kafka_conf_new();  
  51.   
  52.     /*创建broker集群*/  
  53.     if (rd_kafka_conf_set(conf, "bootstrap.servers", brokers, errstr,  
  54.                 sizeof(errstr)) != RD_KAFKA_CONF_OK){  
  55.         fprintf(stderr, "%s ", errstr);  
  56.         return 1;  
  57.     }  
  58.   
  59.     /*设置发送报告回调函数,rd_kafka_produce()接收的每条消息都会调用一次该回调函数 
  60.      *应用程序需要定期调用rd_kafka_poll()来服务排队的发送报告回调函数*/  
  61.     rd_kafka_conf_set_dr_msg_cb(conf, dr_msg_cb);  
  62.   
  63.     /*创建producer实例 
  64.       rd_kafka_new()获取conf对象的所有权,应用程序在此调用之后不得再次引用它*/  
  65.     rk = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, sizeof(errstr));  
  66.     if(!rk){  
  67.         fprintf(stderr, "%% Failed to create new producer:%s ", errstr);  
  68.         return 1;  
  69.     }  
  70.   
  71.     /*实例化一个或多个topics(`rd_kafka_topic_t`)来提供生产或消费,topic 
  72.     对象保存topic特定的配置,并在内部填充所有可用分区和leader brokers,*/  
  73.     rkt = rd_kafka_topic_new(rk, topic, NULL);  
  74.     if (!rkt){  
  75.         fprintf(stderr, "%% Failed to create topic object: %s ",   
  76.                 rd_kafka_err2str(rd_kafka_last_error()));  
  77.         rd_kafka_destroy(rk);  
  78.         return 1;  
  79.     }  
  80.   
  81.     /*用于中断的信号*/  
  82.     signal(SIGINT, stop);  
  83.   
  84.     fprintf(stderr,  
  85.                 "%% Type some text and hit enter to produce message "  
  86.                 "%% Or just hit enter to only serve delivery reports "  
  87.                 "%% Press Ctrl-C or Ctrl-D to exit ");  
  88.   
  89.      while(run && fgets(buf, sizeof(buf), stdin)){  
  90.         size_t len = strlen(buf);  
  91.   
  92.         if(buf[len-1] == ' ')  
  93.             buf[--len] = '';  
  94.   
  95.         if(len == 0){  
  96.             /*轮询用于事件的kafka handle, 
  97.             事件将导致应用程序提供的回调函数被调用 
  98.             第二个参数是最大阻塞时间,如果设为0,将会是非阻塞的调用*/  
  99.             rd_kafka_poll(rk, 0);  
  100.             continue;  
  101.         }  
  102.   
  103.      retry:  
  104.          /*Send/Produce message. 
  105.            这是一个异步调用,在成功的情况下,只会将消息排入内部producer队列, 
  106.            对broker的实际传递尝试由后台线程处理,之前注册的传递回调函数(dr_msg_cb) 
  107.            用于在消息传递成功或失败时向应用程序发回信号*/  
  108.         if (rd_kafka_produce(  
  109.                     /* Topic object */  
  110.                     rkt,  
  111.                     /*使用内置的分区来选择分区*/  
  112.                     RD_KAFKA_PARTITION_UA,  
  113.                     /*生成payload的副本*/  
  114.                     RD_KAFKA_MSG_F_COPY,  
  115.                     /*消息体和长度*/  
  116.                     buf, len,  
  117.                     /*可选键及其长度*/  
  118.                     NULL, 0,  
  119.                     NULL) == -1){  
  120.             fprintf(stderr,   
  121.                 "%% Failed to produce to topic %s: %s ",   
  122.                 rd_kafka_topic_name(rkt),  
  123.                 rd_kafka_err2str(rd_kafka_last_error()));  
  124.   
  125.             if (rd_kafka_last_error() == RD_KAFKA_RESP_ERR__QUEUE_FULL){  
  126.                 /*如果内部队列满,等待消息传输完成并retry, 
  127.                 内部队列表示要发送的消息和已发送或失败的消息, 
  128.                 内部队列受限于queue.buffering.max.messages配置项*/  
  129.                 rd_kafka_poll(rk, 1000);  
  130.                 goto retry;  
  131.             }     
  132.         }else{  
  133.             fprintf(stderr, "%% Enqueued message (%zd bytes) for topic %s ",   
  134.                 len, rd_kafka_topic_name(rkt));  
  135.         }  
  136.   
  137.         /*producer应用程序应不断地通过以频繁的间隔调用rd_kafka_poll()来为 
  138.         传送报告队列提供服务。在没有生成消息以确定先前生成的消息已发送了其 
  139.         发送报告回调函数(和其他注册过的回调函数)期间,要确保rd_kafka_poll() 
  140.         仍然被调用*/  
  141.         rd_kafka_poll(rk, 0);  
  142.      }  
  143.   
  144.      fprintf(stderr, "%% Flushing final message..  ");  
  145.      /*rd_kafka_flush是rd_kafka_poll()的抽象化, 
  146.      等待所有未完成的produce请求完成,通常在销毁producer实例前完成 
  147.      以确保所有排列中和正在传输的produce请求在销毁前完成*/  
  148.      rd_kafka_flush(rk, 10*1000);  
  149.   
  150.      /* Destroy topic object */  
  151.      rd_kafka_topic_destroy(rkt);  
  152.   
  153.      /* Destroy the producer instance */  
  154.      rd_kafka_destroy(rk);  
  155.   
  156.      return 0;  
  157. }  



二、consumer

librdkafka进行kafka消费操作的大致步骤如下:

1、创建kafka配置

[cpp] view plain copy
 
  1. rd_kafka_conf_t *rd_kafka_conf_new (void)  

2、创建kafka topic的配置

[cpp] view plain copy
 
  1. rd_kafka_topic_conf_t *rd_kafka_topic_conf_new (void)   

3、配置kafka各项参数

[cpp] view plain copy
 
  1. rd_kafka_conf_res_t rd_kafka_conf_set (rd_kafka_conf_t *conf,  
  2.                                        const char *name,  
  3.                                        const char *value,  
  4.                                        char *errstr, size_t errstr_size)  

4、配置kafka topic各项参数

[cpp] view plain copy
 
  1. rd_kafka_conf_res_t rd_kafka_topic_conf_set (rd_kafka_topic_conf_t *conf,  
  2.                          const char *name,  
  3.                          const char *value,  
  4.                          char *errstr, size_t errstr_size)  

5、创建consumer实例

[cpp] view plain copy
 
  1. rd_kafka_t *rd_kafka_new (rd_kafka_type_t type, rd_kafka_conf_t *conf, char *errstr, size_t errstr_size)  

6、为consumer实例添加brokerlist

[cpp] view plain copy
 
  1. int rd_kafka_brokers_add (rd_kafka_t *rk, const char *brokerlist)  

7、开启consumer订阅

[cpp] view plain copy
 
  1. rd_kafka_subscribe (rd_kafka_t *rk, const rd_kafka_topic_partition_list_t *topics)  

8、轮询消息或事件,并调用回调函数

[cpp] view plain copy
 
  1. rd_kafka_message_t *rd_kafka_consumer_poll (rd_kafka_t *rk,int timeout_ms)  

9、关闭consumer实例

[cpp] view plain copy
 
  1. rd_kafka_resp_err_t rd_kafka_consumer_close (rd_kafka_t *rk)  

10、释放topic list资源

[cpp] view plain copy
 
  1. rd_kafka_topic_partition_list_destroy (rd_kafka_topic_partition_list_t *rktparlist)  

11、销毁consumer实例

[cpp] view plain copy
 
  1. void rd_kafka_destroy (rd_kafka_t *rk)   

12、等待consumer对象的销毁

[cpp] view plain copy
 
  1. int rd_kafka_wait_destroyed (int timeout_ms)  

完整代码如下my_consumer.c

[cpp] view plain copy
 
  1. #include <string.h>  
  2. #include <stdlib.h>  
  3. #include <syslog.h>  
  4. #include <signal.h>  
  5. #include <error.h>  
  6. #include <getopt.h>  
  7.   
  8. #include "../src/rdkafka.h"  
  9.   
  10. static int run = 1;  
  11. //`rd_kafka_t`自带一个可选的配置API,如果没有调用API,Librdkafka将会使用CONFIGURATION.md中的默认配置。  
  12. static rd_kafka_t *rk;  
  13. static rd_kafka_topic_partition_list_t *topics;  
  14.   
  15. static void stop (int sig) {  
  16.   if (!run)  
  17.     exit(1);  
  18.   run = 0;  
  19.   fclose(stdin); /* abort fgets() */  
  20. }  
  21.   
  22. static void sig_usr1 (int sig) {  
  23.   rd_kafka_dump(stdout, rk);  
  24. }  
  25.   
  26. /** 
  27.  * 处理并打印已消费的消息 
  28.  */  
  29. static void msg_consume (rd_kafka_message_t *rkmessage,  
  30.        void *opaque) {  
  31.   if (rkmessage->err) {  
  32.     if (rkmessage->err == RD_KAFKA_RESP_ERR__PARTITION_EOF) {  
  33.       fprintf(stderr,  
  34.         "%% Consumer reached end of %s [%"PRId32"] "  
  35.              "message queue at offset %"PRId64" ",  
  36.              rd_kafka_topic_name(rkmessage->rkt),  
  37.              rkmessage->partition, rkmessage->offset);  
  38.   
  39.       return;  
  40.     }  
  41.   
  42.     if (rkmessage->rkt)  
  43.             fprintf(stderr, "%% Consume error for "  
  44.                     "topic "%s" [%"PRId32"] "  
  45.                     "offset %"PRId64": %s ",  
  46.                     rd_kafka_topic_name(rkmessage->rkt),  
  47.                     rkmessage->partition,  
  48.                     rkmessage->offset,  
  49.                     rd_kafka_message_errstr(rkmessage));  
  50.     else  
  51.             fprintf(stderr, "%% Consumer error: %s: %s ",  
  52.                     rd_kafka_err2str(rkmessage->err),  
  53.                     rd_kafka_message_errstr(rkmessage));  
  54.   
  55.     if (rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION ||  
  56.         rkmessage->err == RD_KAFKA_RESP_ERR__UNKNOWN_TOPIC)  
  57.           run = 0;  
  58.     return;  
  59.   }  
  60.   
  61.   fprintf(stdout, "%% Message (topic %s [%"PRId32"], "  
  62.                       "offset %"PRId64", %zd bytes): ",  
  63.                       rd_kafka_topic_name(rkmessage->rkt),  
  64.                       rkmessage->partition,  
  65.     rkmessage->offset, rkmessage->len);  
  66.   
  67.   if (rkmessage->key_len) {  
  68.     printf("Key: %.*s ",  
  69.              (int)rkmessage->key_len, (char *)rkmessage->key);  
  70.   }  
  71.   
  72.   printf("%.*s ",  
  73.            (int)rkmessage->len, (char *)rkmessage->payload);  
  74.     
  75. }  
  76.   
  77. /* 
  78.   init all configuration of kafka 
  79.  */  
  80. int initKafka(char *brokers, char *group,char *topic){  
  81.   rd_kafka_conf_t *conf;  
  82.   rd_kafka_topic_conf_t *topic_conf;  
  83.   rd_kafka_resp_err_t err;  
  84.   char tmp[16];  
  85.   char errstr[512];  
  86.   
  87.   /* Kafka configuration */  
  88.   conf = rd_kafka_conf_new();  
  89.   
  90.   //quick termination  
  91.   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  
  92.   rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);  
  93.   
  94.   //topic configuration  
  95.   topic_conf = rd_kafka_topic_conf_new();  
  96.   
  97.   /* Consumer groups require a group id */  
  98.   if (!group)  
  99.           group = "rdkafka_consumer_example";  
  100.   if (rd_kafka_conf_set(conf, "group.id", group,  
  101.                         errstr, sizeof(errstr)) !=  
  102.       RD_KAFKA_CONF_OK) {  
  103.           fprintf(stderr, "%% %s ", errstr);  
  104.           return -1;  
  105.   }  
  106.   
  107.   /* Consumer groups always use broker based offset storage */  
  108.   if (rd_kafka_topic_conf_set(topic_conf, "offset.store.method",  
  109.                               "broker",  
  110.                               errstr, sizeof(errstr)) !=  
  111.       RD_KAFKA_CONF_OK) {  
  112.           fprintf(stderr, "%% %s ", errstr);  
  113.           return -1;  
  114.   }  
  115.   
  116.   /* Set default topic config for pattern-matched topics. */  
  117.   rd_kafka_conf_set_default_topic_conf(conf, topic_conf);  
  118.   
  119.   //实例化一个顶级对象rd_kafka_t作为基础容器,提供全局配置和共享状态  
  120.   rk = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));  
  121.   if(!rk){  
  122.     fprintf(stderr, "%% Failed to create new consumer:%s ", errstr);  
  123.     return -1;  
  124.   }  
  125.   
  126.   //Librdkafka需要至少一个brokers的初始化list  
  127.   if (rd_kafka_brokers_add(rk, brokers) == 0){  
  128.     fprintf(stderr, "%% No valid brokers specified ");  
  129.     return -1;  
  130.   }  
  131.   
  132.   //重定向 rd_kafka_poll()队列到consumer_poll()队列  
  133.   rd_kafka_poll_set_consumer(rk);  
  134.   
  135.   //创建一个Topic+Partition的存储空间(list/vector)  
  136.   topics = rd_kafka_topic_partition_list_new(1);  
  137.   //把Topic+Partition加入list  
  138.   rd_kafka_topic_partition_list_add(topics, topic, -1);  
  139.   //开启consumer订阅,匹配的topic将被添加到订阅列表中  
  140.   if((err = rd_kafka_subscribe(rk, topics))){  
  141.       fprintf(stderr, "%% Failed to start consuming topics: %s ", rd_kafka_err2str(err));  
  142.       return -1;  
  143.   }  
  144.   
  145.   return 1;  
  146. }  
  147.   
  148. int main(int argc, char **argv){  
  149.   char *brokers = "localhost:9092";  
  150.   char *group = NULL;  
  151.   char *topic = NULL;  
  152.     
  153.   int opt;  
  154.   rd_kafka_resp_err_t err;  
  155.   
  156.   while ((opt = getopt(argc, argv, "g:b:t:qd:eX:As:DO")) != -1){  
  157.     switch (opt) {  
  158.       case 'b':  
  159.         brokers = optarg;  
  160.         break;  
  161.       case 'g':  
  162.         group = optarg;  
  163.         break;  
  164.       case 't':  
  165.         topic = optarg;  
  166.         break;  
  167.       default:  
  168.         break;  
  169.     }  
  170.   }   
  171.   
  172.   signal(SIGINT, stop);  
  173.   signal(SIGUSR1, sig_usr1);  
  174.   
  175.   if(!initKafka(brokers, group, topic)){  
  176.     fprintf(stderr, "kafka server initialize error ");  
  177.   }else{  
  178.     while(run){  
  179.       rd_kafka_message_t *rkmessage;  
  180.       /*-轮询消费者的消息或事件,最多阻塞timeout_ms 
  181.         -应用程序应该定期调用consumer_poll(),即使没有预期的消息,以服务 
  182.         所有排队等待的回调函数,当注册过rebalance_cb,该操作尤为重要, 
  183.         因为它需要被正确地调用和处理以同步内部消费者状态 */  
  184.       rkmessage = rd_kafka_consumer_poll(rk, 1000);  
  185.       if(rkmessage){  
  186.         msg_consume(rkmessage, NULL);  
  187.         /*释放rkmessage的资源,并把所有权还给rdkafka*/  
  188.         rd_kafka_message_destroy(rkmessage);  
  189.       }  
  190.     }  
  191.   }  
  192.   
  193. done:  
  194.     /*此调用将会阻塞,直到consumer撤销其分配,调用rebalance_cb(如果已设置), 
  195.     commit offset到broker,并离开consumer group 
  196.     最大阻塞时间被设置为session.timeout.ms 
  197.     */  
  198.     err = rd_kafka_consumer_close(rk);  
  199.     if(err){  
  200.       fprintf(stderr, "%% Failed to close consumer: %s ", rd_kafka_err2str(err));  
  201.     }else{  
  202.       fprintf(stderr, "%% Consumer closed ");  
  203.     }  
  204.   
  205.     //释放topics list使用的所有资源和它自己  
  206.     rd_kafka_topic_partition_list_destroy(topics);  
  207.   
  208.     //destroy kafka handle  
  209.     rd_kafka_destroy(rk);  
  210.     
  211.     run = 5;  
  212.     //等待所有rd_kafka_t对象销毁,所有kafka对象被销毁,返回0,超时返回-1  
  213.     while(run-- > 0 && rd_kafka_wait_destroyed(1000) == -1){  
  214.       printf("Waiting for librdkafka to decommission ");  
  215.     }  
  216.     if(run <= 0){  
  217.       //dump rdkafka内部状态到stdout流  
  218.       rd_kafka_dump(stdout, rk);  
  219.     }  
  220.   
  221.     return 0;  
  222. }  


在linux下编译producer和consumer的代码:

[cpp] view plain copy
 
  1. gcc my_producer.c -o my_producer  -lrdkafka -lz -lpthread -lrt  
  2. gcc my_consumer.c -o my_consumer  -lrdkafka -lz -lpthread -lrt  

在运行my_producer或my_consumer时可能会报错"error while loading shared libraries xxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录


在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test_topic”的topic
启动方式可参考 kafka0.8.2集群的环境搭建并实现基本的生产消费

启动consumer:

启动producer,并发送一条数据“hello world”:


consumer处成功收到producer发送的“hello world”:

http://orchome.com/5

https://github.com/edenhill/librdkafka

https://github.com/mfontanini/cppkafka

https://github.com/zengyuxing007/kafka_test_cpp

原文地址:https://www.cnblogs.com/wangbin/p/8192372.html