欢迎来到贱贱的博客

扩大
缩小

kafka之c接口常用API------librdkafka

1 安装方法以及相关库文件

  https://github.com/edenhill/librdkafka

  • High-level producer
  • High-level consumer
  • Simple (Low-level) consumer
  • 压缩:snappy, gzip, lz4
  • SSL
  • SASL

  consumer有两套API,高级(high-level)和底层(simple)的,应该叫底层API或者低级API,它跟高级API的区别是没有自动负载均衡,而高级API会自动进行负载均衡。

3 kafka主要的用途

  发数据---->producer

    //发一条

    rd_kafka_produce()

    //发多条

    rd_kafka_produce_batch()

  收数据---->consumer

 

 

 

 

 

  在收发数据之前至少需要一个统一的句柄,方便kafka内部准备好链接brokers集群,初始化kafka内部结构等

  

  建立这个kafka句柄需要知道连接到哪个broker

  

  发布消息使用rd_kafka_top_t, 

 1 // 对rd_kafka_topic_partition_list_t结构的操作
 2 // 创建
 3 rd_kafka_topic_partition_list_new();
 4 // 增加元素
 5 rd_kafka_topic_partition_list_add();
 6 // 删除元素
 7 rd_kafka_topic_partition_list_del();
 8 // 查找元素
 9 rd_kafka_topic_partition_list_find();
10 
11 // 对rd_kafka_topic_t的操作
12 // 创建
13 rd_kafka_topic_new();
14 // 删除
15 rd_kafka_topic_destroy();
16 // 获取该topic的名字
17 rd_kafka_topic_name();
18 // 获取该topic传入的应用参数
19 rd_kafka_topic_opaque();
20 
21 // 使用rd_kafka_topic_partition_list_t的时候,topic+partition是连在一起的,
22 // 所以给kafka句柄的时候只用一个参数就够了
23 // 订阅消息
24 rd_kafka_subscribe (rd_kafka_t *rk,
25                             const rd_kafka_topic_partition_list_t *topics);
26 // 指定消费的partition,可以在运行时更换
27 rd_kafka_assign (rd_kafka_t *rk,
28                             const rd_kafka_topic_partition_list_t *partitions);
29 
30 // 用rd_kafka_topic_t比较麻烦,需要配合一个partition才行
31 // 直接启动consumer了
32 rd_kafka_consume_start(rd_kafka_topic_t *rkt, int32_t partition,
33                             int64_t offset);
34 // 每次接收也要带上partition
35 rd_kafka_message_t *rd_kafka_consume(rd_kafka_topic_t *rkt, int32_t partition,
36                             int timeout_ms);
37 
38 
39 // 发送的时候使用 rd_kafka_topic_t
40 int rd_kafka_produce(rd_kafka_topic_t *rkt, int32_t partition,
41                             int msgflags,
42                             void *payload, size_t len,
43                             const void *key, size_t keylen,
44                             void *msg_opaque);
View Code

后续用了继续做笔记,关于错误处理,topic_partition_list操作等

https://blog.csdn.net/lijinqi1987/article/details/76582067

http://suntus.github.io/2016/07/07/librdkafka--kafka%20C%20api%E4%BB%8B%E7%BB%8D/

 

posted on 2019-04-01 15:46  L的存在  阅读(6568)  评论(0编辑  收藏  举报

导航