栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka 生产者 DEMO

Kafka 生产者 DEMO

Kafka 生产者 DEMO



#include 
#include 
#include 
#include 
#include 
#include 

#if _AIX
#include 
#endif


#include "rdkafkacpp.h"


static volatile sig_atomic_t run = 1;

static void sigterm(int sig) {
  run = 0;
}


class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
 public:
  void dr_cb(RdKafka::Message &message) {
    
    if (message.err())
      std::cerr << "% Message delivery failed: " << message.errstr()
                << std::endl;
    else
      std::cerr << "% Message delivered to topic " << message.topic_name()
                << " [" << message.partition() << "] at offset "
                << message.offset() << std::endl;
  }
};

int main(int argc, char **argv) {
  if (argc != 3) {
    std::cerr << "Usage: " << argv[0] << "  n";
    exit(1);
  }

  std::string brokers = argv[1];
  std::string topic   = argv[2];

  
  RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);

  std::string errstr;

  
  if (conf->set("bootstrap.servers", brokers, errstr) !=
      RdKafka::Conf::CONF_OK) {
    std::cerr << errstr << std::endl;
    exit(1);
  }

  signal(SIGINT, sigterm);
  signal(SIGTERM, sigterm);

  
  ExampleDeliveryReportCb ex_dr_cb;

  if (conf->set("dr_cb", &ex_dr_cb, errstr) != RdKafka::Conf::CONF_OK) {
    std::cerr << errstr << std::endl;
    exit(1);
  }

  
  RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
  if (!producer) {
    std::cerr << "Failed to create producer: " << errstr << std::endl;
    exit(1);
  }

  delete conf;

  
  std::cout << "% Type message value and hit enter "
            << "to produce message." << std::endl;

  for (std::string line; run && std::getline(std::cin, line);) {
    if (line.empty()) {
      producer->poll(0);
      continue;
    }

    
  retry:
       RdKafka::ErrorCode err = producer->produce(
        
        topic,
        
        RdKafka::Topic::PARTITION_UA,
        
        RdKafka::Producer::RK_MSG_COPY ,
        
        const_cast(line.c_str()), line.size(),
        
        NULL, 0,
        
        0,
        
        NULL,
        
        NULL);

    if (err != RdKafka::ERR_NO_ERROR) {
      std::cerr << "% Failed to produce to topic " << topic << ": "
                << RdKafka::err2str(err) << std::endl;

      if (err == RdKafka::ERR__QUEUE_FULL) {
        
        producer->poll(1000 );
        goto retry;
      }

    } else {
      std::cerr << "% Enqueued message (" << line.size() << " bytes) "
                << "for topic " << topic << std::endl;
    }

    
    producer->poll(0);
  }

  
  std::cerr << "% Flushing final messages..." << std::endl;
  producer->flush(10 * 1000 );

  if (producer->outq_len() > 0)
    std::cerr << "% " << producer->outq_len()
              << " message(s) were not delivered" << std::endl;

  delete producer;

  return 0;
}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/748470.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号