栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

librdkafka官方Demo在Windows上运行与使用

librdkafka官方Demo在Windows上运行与使用

用了将近两天时间中与把Kafka官方demo跑起来了!记录一下下踩坑过程。
1、安装
kafka是跑在zookeeper上的,需要依次安装 JDK、zookeeper和Kafka,并配置环境变量。这里参考教程:在Windows安装运行Kafka基本没问题。
2、git clone官方源码
https://github.com/edenhill/librdkafka
3、cmake生成工程Rdkafka.sln到librdkafka-build文件夹中
在VS中打开工程如下:

经过以上步骤得到目录如下:

【注】vcpkg是官方提供的librdkafka库安装方式,类似于Linux上通过get-apt install的方式进行安装,可以不下载。

4、生成依赖库
在Debug或Relese下生成ALL_BULID得到rdkafka.dll和rdkafka++.dll(注意Debug和Relese下生成的dll同名,不要混用)
5、配置Demo链接库
这我们可以看到有一个rdkafka_example_cpp的项目,就是官方的Demo,我们直接运行项目肯定是不行的!因为运行需要依赖库,所以我们要把之前生成的两个rdkafka.dll和rdkafka++.dll拷贝到exe运行目录。这样我们就能够运行rdkafka_example_cpp项目了,下面着重讲一下怎么运行这个代码!

6、运行Demo
6.1 前提

  1. 确保dll是Debug下生成的,这样才可以查看调试信息
  2. 开启zookeeper Server:cmd中运行:zkserver
  3. 开启kafka Server:在kafka安装目录打开PowerShell,运行:.binwindowskafka-server-start.bat .configserver.properties。
  4. 创建一个Topic用例:保持zookeeper和kafka服务器开启,在kafka安装目录另打开PowerShell,创建一个名为test的Topic,partition为1分区,代码如下:
.binwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
  1. 创建一个生产者(Producer)和消费者(Consumer): 一般格式为:./kafka-console-producer.bat --broker-list [ip]:[port] --topic [name]
    例如在Topic名为test主题下创建本地IP、端口为9092的生产者和消费者代码如下:
//创建生产者
.binwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test
//创建消费者
.binwindowskafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning

【注】这里网上查看Kafka的数据流存放方式,一般来说一个集群有很多Topic,一个Topic有还有很多partition分区,每一个分区又有很多section段,一个段可以看成一个数组,数据传递就是在section中进行的

通过以上的操作可以通过DOS窗口在Producer中写入字符,在Consumer中读取字符了。

6.2 下面我们就可以在Demo中解读一下源码:
在main函数中

int main(int argc, char **argv) {
    std::string brokers = "localhost";  //127.0.0.1:2181  9092
    std::string errstr;
    std::string topic_str;  // "wgm-kafka"
    std::string mode;
    std::string debug;
    int32_t partition    = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;
    bool do_conf_dump    = false;
    int opt;
    MyHashPartitionerCb hash_partitioner;
    int use_ccb = 0;
     
    RdKafka::Conf *conf  = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    while ((opt = getopt(argc, argv, "PCLt:p:b:z:qd:o:eX:AM:f:")) != -1) {
        switch (opt) {
        case 'P':
        case 'C':
        case 'L':
          mode = opt;
          break;
        case 't':
          topic_str = optarg;
          break;
        case 'p':
          if (!strcmp(optarg, "random"))
            ;
          else if (!strcmp(optarg, "hash")) {
              if (tconf->set("partitioner_cb", &hash_partitioner, errstr) !=RdKafka::Conf::CONF_OK) 
              {
                  std::cerr << errstr << std::endl;
                  exit(1);
              }
          } else
        	partition = std::atoi(optarg);
      	  break;
   	    case 'b':
            brokers = optarg;
            break;
        case 'z':
            if (conf->set("compression.codec", optarg, errstr) !=RdKafka::Conf::CONF_OK) 
            {
                std::cerr << errstr << std::endl;
        		exit(1);
      		}
      		break;
        case 'o':
      		if (!strcmp(optarg, "end"))
          		start_offset = RdKafka::Topic::OFFSET_END;
      		else if (!strcmp(optarg, "beginning"))
        		start_offset = RdKafka::Topic::OFFSET_BEGINNING;
      		else if (!strcmp(optarg, "stored"))
        		start_offset = RdKafka::Topic::OFFSET_STORED;
      		else
        		start_offset = strtoll(optarg, NULL, 10);
      		break;
    	case 'e':
      		exit_eof = true;
      		break;
    	case 'd':
      		debug = optarg;
      		break;
    	case 'M':
      		if (conf->set("statistics.interval.ms", optarg, errstr) !=RdKafka::Conf::CONF_OK) 
      		{
        		std::cerr << errstr << std::endl;
        		exit(1);
      		}
      		break;
    	case 'X': {
      		char *name, *val;

      		if (!strcmp(optarg, "dump")) 
      		{
        		do_conf_dump = true;
        		continue;
      		}

      		name = optarg;
      		if (!(val = strchr(name, '='))) {
        		std::cerr << "%% Expected -X property=value, not " << name << std::endl;
        		exit(1);
      		}

      		*val = '';
      		val++;

      		
      		RdKafka::Conf::ConfResult res;
      		if (!strncmp(name, "topic.", strlen("topic.")))
        		res = tconf->set(name + strlen("topic."), val, errstr);
      		else
        		res = conf->set(name, val, errstr);

      		if (res != RdKafka::Conf::CONF_OK) {
        		std::cerr << errstr << std::endl;
        		exit(1);
      		}
    	} break;

    	case 'f':
      		if (!strcmp(optarg, "ccb"))
        		use_ccb = 1;
      		else {
        		std::cerr << "Unknown option: " << optarg << std::endl;
        		exit(1);
      		}
      		break;

    	default:
      		goto usage;
    	}
  	}
  
  	if (mode.empty() || (topic_str.empty() && mode != "L") || optind != argc) {
  	usage:
    	std::string features;
    	conf->get("builtin.features", features);
    	fprintf(stderr,
            	"Usage: %s [-C|-P] -t  "
            	"[-p ] [-b ]n"
            	"n"
            	"librdkafka version %s (0x%08x, builtin.features "%s")n"
            	"n"
            	" Options:n"
            	"  -C | -P         Consumer or Producer moden"
            	"  -L              metadata list moden"
            	"  -t       Topic to fetch / producen"
            	"  -p         Partition (random partitioner)n"
            	"  -p        Use partitioner:n"
            	"                  random (default), hashn"
            	"  -b     Broker address (localhost:9092)n"
            	"  -z       Enable compression:n"
            	"                  none|gzip|snappy|lz4|zstdn"
            	"  -o      Start offset (consumer)n"
            	"  -e              Exit consumer when last messagen"
            	"                  in partition has been received.n"
            	"  -d [facs..]     Enable debugging contexts:n"
            	"                  %sn"
            	"  -M  Enable statisticsn"
            	"  -X   Set arbitrary librdkafka "
            	"configuration propertyn"
            	"                  Properties prefixed with "topic." "
            	"will be set on topic object.n"
            	"                  Use '-X list' to see the full listn"
            	"                  of supported properties.n"
            	"  -f        Set option:n"
            	"                     ccb - use consume_callbackn"
            	"n"
            	" In Consumer mode:n"
            	"  writes fetched messages to stdoutn"
            	" In Producer mode:n"
            	"  reads messages from stdin and sends to brokern"
            	"n"
            	"n"
            	"n",
            	argv[0], RdKafka::version_str().c_str(), RdKafka::version(),
            	features.c_str(), RdKafka::get_debug_contexts().c_str());
    	exit(1);
  	}	

这一段代码其实就是进行命令行解析。因为正常来说我们一般通过命令行去运行这个程序,这其中:

  • argc表示命令行参数个数
  • argv表示参数字符串,一般argv[0]表示可支持哪个程序名称***.exe

要搞懂这段代码就需要弄清楚这个函数: int getopt(int nargc, char *const *nargv, const char *options)
它是通过单个字符解按照固定格式来解析字符串的,有三种写法:

  1. 单个字符:表示选项(Option)
  2. 单个字符后接一个冒号:表示该选项后必须跟一个参数。参数紧跟在选项后或者以空格隔开。
  3. 单个字符后跟两个冒号:表示该选项后可以跟一个参数,也可以不跟。如果跟一个参数,参数必须紧跟在选项后不能以空格隔开。该参数的指针赋给optarg。

例如getopt(argc, argv, “ab:c:”);
在命令行中可以指定可选项:-a、-b、-c、-d、-e
-b和-c后面需接一个参数,如 -b 123、-d wgm等
-e后面可跟参数同-b、-c,也可以不跟参数-e就等于NULL了。

对于本项目Demo,我可以给出如下命令行参数:

./rdkafka_example.exe -P -b 127.0.0.1:9092 -t test

表示在Topic为test、broker为127.0.0.1:9092上创建一个mode为Producer的生产者。

接着看源码:

	
   conf->set("metadata.broker.list", brokers, errstr);

   if (!debug.empty()) {
       if (conf->set("debug", debug, errstr) != RdKafka::Conf::CONF_OK) {
          std::cerr << errstr << std::endl;
          exit(1);
        }
   }

   ExampleEventCb ex_event_cb;
   conf->set("event_cb", &ex_event_cb, errstr);

   if (do_conf_dump) {
       int pass;

       for (pass = 0; pass < 2; pass++) {
           std::list *dump;
           if (pass == 0) {
               dump = conf->dump();
               std::cout << "# Global config" << std::endl;
           } else {
               dump = tconf->dump();
               std::cout << "# Topic config" << std::endl;
           }

           for (std::list::iterator it = dump->begin();it != dump->end();) {
               std::cout << *it << " = ";
               it++;
               std::cout << *it << std::endl;
               it++;
           }
           std::cout << std::endl;
       }
       exit(0);
   }

   signal(SIGINT, sigterm);
   signal(SIGTERM, sigterm);

因为在上一段代码中创建了conf,所以这段代码主要是通过conf->set()等API设置配置属性。

if (mode == "P") {
    

    if (topic_str.empty())
        goto usage;

    ExampleDeliveryReportCb ex_dr_cb;

    
    conf->set("dr_cb", &ex_dr_cb, errstr);
    conf->set("default_topic_conf", tconf, errstr);

    
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
        std::cerr << "Failed to create producer: " << errstr << std::endl;
        exit(1);
    }
    
    std::cout << "% Created producer " << producer->name() << std::endl;

    
    for (std::string line; run && std::getline(std::cin, line);) {
        if (line.empty()) {
            producer->poll(0);
            continue;
        }

        RdKafka::Headers *headers = RdKafka::Headers::create();
        headers->add("my header", "header value");
        headers->add("other header", "yes");

        
        RdKafka::ErrorCode resp =
            producer->produce(topic_str, partition,
                              RdKafka::Producer::RK_MSG_COPY ,
                              
                              const_cast(line.c_str()), line.size(),
                              
                              NULL, 0,
                              
                              0,
                              
                              headers,
                              
                              NULL);
        if (resp != RdKafka::ERR_NO_ERROR) {
            std::cerr << "% Produce failed: " << RdKafka::err2str(resp)<< std::endl;
            delete headers; 
            } else {
                std::cerr << "% Produced message (" << line.size() << " bytes)"<< std::endl;
            }

        producer->poll(0);
    }
    run = 1;

    while (run && producer->outq_len() > 0) {
        std::cerr << "Waiting for " << producer->outq_len() << std::endl;
        producer->poll(1000);
    }
    delete producer;

这一段代码就是当我使用 -P的方式时,通过RdKafka::Producer::create()创建一个Producer,line就是我们在DOS中通过**getline()**读取的。

贴出测试结果

在…/example/Debug目录下,启动PowerShell,运行如下命令行:

./rdkafka_example_cpp.exe -P -b 127.0.0.1:9092 -t wgm-kafka

因为我之前创建过wgm-kafka的Topic,所以我在这个Topic下创建Producer可以通过consumer读取到。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/433803.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号