栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Java

使用librdkafka的C++接口实现Kafka生产者和消费者客户端

Java 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

使用librdkafka的C++接口实现Kafka生产者和消费者客户端

1. librdkafka简介:

librdkafka 是 Apache Kafka 的 C/C++ 开发包,提供 生产者、消费者 和 管理客户端。

设计理念是可靠以及高性能的消息传输,当前可支持每秒超过100万的消息生产和300万每秒的消息消费。

官方README 文档对librdkafka的介绍:
“librdkafka — the Apache Kafka c/C++ client library”

librdkafka/INTRODUCTION.md
https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md

librdkafka/examples/
https://github.com/edenhill/librdkafka/tree/master/examples

Usage:
使用时,需要在源程序中包含包含 "rdkafka.h" 头文件


2. librdkafka的C++接口: 2.1 RdKafka::Conf::create():

创建Conf配置实例,用于填充用户指定的各配置项:

//namespace RdKafka;

//brief Create configuration object:
//RdKafka::Conf ---> 配置接口类,用来设置对生产者、消费者、broker的各配置项的值
static Conf *create(ConfType type);	

enum ConfType {
	CONF_GLOBAL,	//Global configuration
	CONF_TOPIC		//Topic specific configuration
};

使用举例:

RdKafka::Conf *m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
}

RdKafka::Conf *m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if(m_config == nullptr) {
	std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
}
2.2 Conf::ConfResult set():

Conf类中的多个set成员函数,用于对不同的配置项进行赋值:

class Conf {
public:
	virtual Conf::ConfResult set(const std::string &name, const std::string &value, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, DeliveryReportCb *dr_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, EventCb *event_cb, std::string &errstr);
	virtual Conf::ConfResult set(const std::string &name, PartitionerCb *partitioner_cb, std::string &errstr);
	//...
	//...
};

enum ConfResult {
	CONF_UNKNOWN = -2,	//Unknown configuration property
	CONF_INVALID = -1,	//Invalid configuration value
	CONF_OK = 0			//Configuration property was succesfully set
};

使用举例:

RdKafka::Conf::ConfResult  	result;
std::string					error_str;

RdKafka::Conf *m_config;
//设置 "booststrap::servers" 配置项:
result = m_config->set("bootstrap.servers", "127.0.0.`:9092", error_str);
if(result != RdKafka::Conf::CONF_OK) {
    std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
}

//设置 "event_cb" 配置项:
RdKafka::EventCb* m_event_cb = new ProducerEventCb;
result = m_config->set("event_cb", m_event_cb, error_str);
if(result != RdKafka::Conf::CONF_OK) {
    std::cout << "Global Conf set 'event_cb' failed: " << error_str << std::endl;
}
2.3 RdKafka::Producer::create():

创建Producer生产者客户端:

class Producer : public virtual Handle {
public:
	static Producer *create(Conf *conf, std::string &errstr);
};

使用举例:

RdKfka::Producer *m_producer;

m_producer = RdKafka::Producer::create(m_config, error_str);
if(m_producer == nullptr) {
    std::cout << "Create Topic failed: " << error_str << std::endl;
}
2.4 RdKafka::Topic::create():

创建Topic主题对象:

class Topic {
public:
	static Topic *create(Handle *base, const std::string &tipic_str, const Conf *conf, std::string &errstr);
};

使用举例:

RdKafka::Topic *m_topic;

m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str);
if(m_topic == nullptr) {
    std::cout << "Create Topic failed: " << error_str << std::endl;
}
2.5 RdKafka::Producer::produce():
class Producer : public virtual Handle {
public:
	virtual ErrorCode produce(Topic *topic, int32_t partition, int msgflags, 
						void *payload, size_t len, const std::string *key, void *msg_opaque);

	virtual ErrorCode produce();
};


//Use RdKafka::err2str() to translate an error code a human readable string
enum ErrorCode {
	//Internal errors to rdkafka:
	ERR_BEGIN = -200,		//Begin internal error codes
	ERR_BAD_MSG = -199,		//Received message is incorrect
	//...
	ERR_END = -100,			//End interval error codes

	//Kafka broker errors: 
	ERROR_UNKNOWN = -1,		//Unknown broker error
	ERROR_NO_ERROR = 0,		//Success
	//...
};

使用举例:

RdKafka::ErrorCode error_code = m_producer->produce(m_topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY, 
										payload, len, key, NULL);

m_producer->pool(0);	//poll()参数为0意味着不阻塞,poll(0)主要是为了触发应用程序提供的回调函数

if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
	if(error_code == ERR_QUEUE_FULL) {
		m_producer->poll(1000);		//如果发送失败的原始是队列正满,则阻塞等待一段时间
	}
	else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
		//如果消息过大超过了max_size,则需要对消息做裁剪后重新发送
	}
	else {
		std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
	}
}
2.6 RdKafka::KafkaConsumer::create():

创建Consumer消费者客户端:

class KafkaConsumer : public virtual Handle {
public:
	static KafkaConsumer *create(const Conf *conf, std::string &errstr);
};

使用举例:

RdKafka::KafkaConsumer *m_consumer;

m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
if(m_consumer == nullptr) {
    std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
}
2.7 RdKafka::KafkaConsumer::subscribe():

Consumer消费者订阅Topic主题:

class KafkaConsumer : public virtual Handle {
public:
	virtual ErrorCode subscribe(const std::vector &topics);
};

使用举例:

std::vector topics;
topics.push_back(topic_str);

RdKafka::ErrorCode error_code = m_consumer->subscribe(topics);
if(error_code != ERROR_NO_ERROR) {
	std::cerr << "Consumer Subscribe Topics Failed: " << RdKafka::err2str(error_code) << std::endl;
}
2.8 RdKafka::KafkaConsumer::consume():

Consumer消费者拉取消息进行消费:

class KafkaConsumer : public virtual Handle {
public
	virtual Message *consume(int timeout_ms);
};

使用举例:

RdKafka::Message *m_message = m_consumer->consume(5000);	//若超过 5000ms 未订阅到消息,则触发 RdKafka::ERR_TIMED_OUT

2. 使用librdkafka的C++接口实现生产者客户端: 2.1 main_producer.cpp
#include "producer_kafka.h"

using namespace std;


int main() {
    KafkaProducer producer("127.0.0.1:9092", "topic-demo", 0);

    sleep(5);

    for(int i = 0; i < 10; i++) {
        char msg[64] = {0};
        sprintf(msg, "%s%4d", "Hello Kafka ", i);   //msg = "Hello Kafka 0001";

        char key[8] = {0};
        sprintf(key, "%d", i);  //key = "1";

        producer.pushMessage(msg, key);
    }

    KafkaProducer::wait_destroyed(50000);

    return 0;
}
2.2 kafka_producer.h
#ifndef __KAFKAPRODUCER_H_
#define __KAFKAPRODUCER_H_

#include 
#include 
#include "rdkafkacpp.h"


class KafkaProducer {
public:
    explicit KafkaProducer(const std::string& brokers, const std::string& topic, int partition);       //epplicit:禁止隐式转换,例如不能通过string的构造函数转换出一个broker
    ~KafkaProducer();                                                                                                       

    void pushMessage(const std::string& msg, const std::string& key);



protected:
    std::string     m_brokers;
    std::string     m_topicStr;
    int             m_partition;

    RdKafka::Conf*      m_config;           //RdKafka::Conf --- 配置接口类,用来设置对 生产者、消费者、broker的各项配置值
    RdKafka::Conf*      m_topicConfig;

    RdKafka::Producer*  m_producer;
    RdKafka::Topic*     m_topic;
    
    RdKafka::DeliveryReportCb*      m_dr_cb;            //RdKafka::DeliveryReportCb 用于在调用 RdKafka::Producer::produce() 后返回发送结果,RdKafka::DeliveryReportCb是一个类,需要自行填充其中的回调函数及处理返回结果的方式
    RdKafka::EventCb*               m_event_cb;         //RdKafka::EventCb 用于从librdkafka向应用程序传递errors,statistics,logs 等信息的通用接口
    RdKafka::PartitionCb*           m_partitioner_cb;   //Rdkafka::PartitionCb 用于设定自定义分区器


};




class ProducerDeliveryReportCb : public RdKafka::DeliveryReportCb {
public:
    void dr_cb(RdKafka::Message &message) {  //重载基类RdKafka::DeliveryReportCb中的虚函数dr_cb()
        if(message.err() != 0) {       //发送出错
            std::cerr << "Message delivery failed: " << message.errstr() << std::endl;
        } else {                             //发送成功
            std::cerr << "Message delivered to topic: " << message.topic_name() 
                      << " [" << message.partition() 
                      << "] at offset " << message.offset() << std::endl;
        }
    }
};



class ProducerEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event &event) {
        switch(event.type()) {
            case RdKafka::EVENT::EVENT_ERROR:
                std::cout << "RdKafka::EVENT::EVENT_ERROR: " << RdKafka::err2str(event.err()) << std::endl;
                break;
            case RdKafka::EVENT::EVENT_STATS:
                std::cout << "RdKafka::EVENT::EVENT_STATS: " << events.str() << std::endl;
                break;
            case RdKafka::EVENT::EVENT_LOG:
                std::cout << "RdKafka::EVENT::EVENT_LOG: " << events.fac() << std::endl;
                break;
            case RdKafka::EVENT::EVENT_THROTTLE:
                std::cout << "RdKafka::EVENT::EVENT_THROTTLE: " << event.broker_name() << std::endl;
                break;
        }

    }
}



class HashPartitionerCb : public RdKafka::PartitionerCb {       //自定义生产者分区器,作用就是返回一个分区id。  对key计算Hash值,得到待发送的分区号(其实这跟默认的分区器计算方式是一样的)
public:
    int32_t partitioner_cb( const Topic *topic, const std::string *key, 
                            int32_t partition_cnt, void *msg_opaque)    
    {
        char msg[128] = {0};
        sprintf(smg, "HashPartitionCb:[%s][%s][%d]", topic->name().c_str(), key->c_str(), partition_cnt);
        std::cout << msg << std::endl; 

        //前面的操作只是为了在分区器回调中打印出一行打印,分区器真正的操作是在下面generate_hash,生成一个待发送的分区ID
        return generate_hash(key->c_str(), key->size()) % partition_cnt;    
    }
private:
    static inline unsigned int generate_hash(const char *str, size_t len) {
        unsigned int hash = 5381;
        for (size_t i = 0; i < len; i++) {
            hash = ( (hash << 5) + hash ) + str[i];
        }
        return hash;    //返回值必须在 0 到 partition_cnt 之间。如果出错则发回 PARTITION_UA(-1)
    }
};


#endif



2.3 producer_kafka.cpp
#include "producer_kafka.h"


//("192.168.0.105:9092", "topic_demo", 0)
KafkaProducer::KafkaProducer(const std::string& brokers, const std::string& topic, int partition) {
    m_brokers   = brokers;
    m_topicStr  = topic;
    m_partition = partition;

    //先填充构造生产者客户端的参数配置:
    m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    if(m_config == nullptr) {
        std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
    }

    m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
    if(m_topicConfig == nullptr) {
        std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
    }

    //下面开始配置各种需要的配置项:
    RdKafka::Conf::ConfResult   result;
    std::string                 error_str;
    
    result = m_config->set("booststrap.servers", m_brokers, error_str); //设置生产者待发送服务器的地址: "ip:port" 格式
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set 'booststrap.servers' failed: " << error_str << std::endl;
    }

    result = m_config->set("statistics.interval.ms", "10000", error_str);
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘statistics.interval.ms’ failed: " << error_str << std::endl;
    }

    result = m_config->set("message.max.bytes", "10240000", error_str);     //设置发送端发送的最大字节数,如果发送的消息过大则返回失败
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set 'message.max.bytes' failed: " << error_str << std::endl;
    }


    m_dr_cb = new ProducerDeliveryReportCb;
    result = m_config->set("dr_cb", m_dr_cb, error_str);    //设置每个消息发送后的发送结果回调
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘dr_cb’ failed: " << error_str << std::endl;
    }

    m_event_cb = new ProducerEventCb;
    result = m_config->set("event_cb", m_event_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Global Conf set ‘event_cb’ failed: " << error_str << std::endl;
    }

    m_partitioner_cb = new HashPartitionerCb;
    result = m_topicConfig->set("partitioner_cb", m_partitioner_cb, error_str);     //设置自定义分区器
    if(result != RdKafka::Conf::CONF_OK) {
        std::cout << "Topic Conf set ‘partitioner_cb’ failed: " << error_str << std::endl;
    }

    //创建Producer生产者客户端:
    m_producer = RdKafka::Producer::create(m_config, error_str);    //RdKafka::Producer::create(const RdKafka::Conf *conf, std::string &errstr);
    if(m_producer == nullptr) {
        std::cout << "Create Producer failed: " << error_str << std::endl;
    }

    //创建Topic对象,后续produce发送消息时需要使用
    m_topic = RdKafka::Topic::create(m_producer, m_topicStr, m_topicConfig, error_str); //RdKafka::Topic::create(Hanle *base, const std::string &topic_str, const Conf *conf, std::string &errstr);
    if(m_topic == nullptr) {
        std::cout << "Create Topic failed: " << error_str << std::endl;
    }
}




void KafkaProducer::pushMessage(const std::string& msg, const std::string& key) {
    int32_t len = str.length();
    void *payload = const_cast(static_cast(str.data()));

    RdKafka::ErrorCode error_code = m_producer->prodce( m_topic, RdKafka::Topic::PARTITION_UA,
                                                        RdKafka::Producer::RK_MSG_COPY,
                                                        payload, len, key, NULL);
    m_producer->poll(0);        //poll()参数为0意味着不阻塞;poll(0)主要是为了触发应用程序提供的回调函数
    if(error_code != ERR_NO_ERROR) {
        std::cerr << "Produce failed: " << RdKafka::err2str(error_code) << std::endl;
        if(error_code == ERR_QUEUE_FULL) {
            m_producer->poll(1000);     //如果发送失败的原因是队列正满,则阻塞等待一段时间
        } else if(error_code == ERR_MSG_SIZE_TOO_LARGE) {
            //如果发送消息过大,超过了max.size,则需要裁减后重新发送
        } else {
            std::cerr << "ERR_UNKNOWN_PARTITION or ERR_UNKNOWN_TOPIC" << std::endl;
        }
    }
}



KafkaProducer::~KafkaProducer() {
    while(m_producer->outq_len() > 0) {         //当 Handle->outq_len() 客户端的“出队列” 的长度大于0
        std::cerr << "Waiting for: " << m_producer->outq_len() << std::endl;
        m_producer->flush(5000);
    }

    delete m_config;
    delete m_topicConfig;
    delete m_topic;
    delete m_producer;
    delete m_dr_cb;
    delete m_event_cb;
    delete m_partitioner_cb;
}



3. 使用librdkafka的C++接口实现实现消费者客户端: 3.1 main_consumer.cpp
#include "kafka_consumer.h"

int main()
{
    std::string brokers = "127.0.0.1:9092";
    
    std::vector topics;		//待消费主题的集合
    topics.push_back("topic-demo");

    std::string group = "consumer-group-demo";	//消费组
    
    KafkaConsumer consumer(brokers, group, topics, RdKafka::Topic::OFFSET_BEGINNING);
    
    consumer.pullMessage();

    RdKafka::wait_destroyed(5000);
    
    return 0;
}


3.2 kafka_consumer.h
#ifndef __KAFKACONSUMER_H_
#define __KAFKACONSUMER_H_

#include 
#include 
#include 
#include 
#include "rdkafkacpp.h"

class KafkaConsumer {
public:
    explicit KafkaConsumer(const std::string& brokers, const std::string& groupID,
                           const std::vector& topics, int partition);
    ~KafkaConsumer();
    
    void pullMessage();
    
protected:
    std::string 				m_brokers;
    std::string 				m_groupId;
    std::vector 	m_topicVector;		//一个消费者可以同时订阅多个主题,所有用vector
    int 						m_partition;
    
    RdKafka::Conf* 				m_config;			//GLOBAL 级别的配置(Consumer客户端级别)
    RdKafka::Conf* 				m_topicConfig;		//TOPIC	级别的配置
    
    RdKafka::KafkaConsumer* 	m_consumer;			//消费者客户端实例
    
    RdKafka::EventCb* 			m_event_cb;			//Event事件回调
    RdKafka::RebalanceCb* 		m_rebalance_cb;		//再均衡 回调
};


class ConsumerEventCb : public RdKafka::EventCb {
public:
    void event_cb(RdKafka::Event& event) {
        switch (event.type())
        {
	        case RdKafka::Event::EVENT_ERROR:
	            if (event.fatal()) 				//判断是否为FATAL错误
	                std::cerr << "FATAL ";
	            std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " << event.str() << std::endl;
	            break;
	        case RdKafka::Event::EVENT_STATS:
	            std::cerr << ""STATS": " << event.str() << std::endl;
	            break;
	        case RdKafka::Event::EVENT_LOG:
	            fprintf(stderr, "LOG-%i-%s: %sn", event.severity(), event.fac().c_str(), event.str().c_str());
	            break;
	        case RdKafka::Event::EVENT_THROTTLE:
	            std::cerr << "THROTTLED: " << event.throttle_time() << "ms by " << event.broker_name() << " id " << (int)event.broker_id() << std::endl;
	            break;
	        default:
	            std::cerr << "EVENT " << event.type() << " (" << RdKafka::err2str(event.err()) << "): " <<  event.str() << std::endl;
	            break;
        }
    }
};

class ConsumerRebalanceCb : public RdKafka::RebalanceCb {
public:
    void rebalance_cb( RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err,
                       std::vector &partitions)		//Kafka服务端通过 err参数传入再均衡的具体事件(发生前、发生后),通过partitions参数传入再均衡 前/后,旧的/新的 分区信息
    {
        std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": " << printTopicPartition(partitions);
        
        if(err == RdKafka::ERR__ASSIGN_PARTITIONS) {		//ERR__ASSIGN_PARTITIONS: 表示“再均衡发生之后,消费者开始消费之前”,此时消费者客户端可以从broker上重新加载offset
            consumer->assign(partitions);					//再均衡后,重新 assign() 订阅这些分区
            partition_count = (int)partitions.size();
        } else if(err == RdKafka::ERR__REVOKE_PARTITIONS) {		//ERR__REVOKE_PARTITIONS: 表示“消费者停止消费之后,再均衡发生之前”,此时应用程序可以在这里提交 offset
            consumer->unassign();								//再均衡前,unassign() 退订这些分区
            partition_count = 0;								//退订所有分区后,清0
        } else {
			std::cerr << "Rebalancing error: " << RdKafka::err2str(err) << std::endl;
		}
    }
    
private:
    static void printTopicPartition(const std::vector &partitions) {	//打印出所有的主题、分区信息
        for(unsigned int i = 0 ; i < partitions.size() ; i++) {
            std::cerr << partitions[i]->topic() << "[" << partitions[i]->partition() << "], ";
        }
        std::cerr << "n";
    }
private:
    int partition_count;			//保存consumer消费者客户端 当前订阅的分区数
};



#endif
3.3 kafka_consumer.cpp
#include "kafka_consumer.h"

KafkaConsumer::KafkaConsumer(const std::string& brokers, const std::string& groupId, 
							 const std::vector& topics, int partition) 
{
	m_brokers		= borker;
	m_groupId		= groupId;
	m_topicVector	= topics;
	m_partition		= partition;

	//创建Conf实例:
	m_config = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
	if(m_config == nullptr) {
		std::cout << "Create Rdkafka Global Conf Failed." << std::endl;
	}

	m_topicConfig = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
	if(m_topicConfig == nullptr) {
		std::cout << "Create Rdkafka Topic Conf Failed." << std::endl;
	}

	//设置Conf的各个配置参数:
	RdKafka::Conf::ConfResult   result;
    std::string                 error_str;

    result = m_config->set("bootstrap.servers", m_brokers, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'bootstrap.servers' failed: " << error_str << std::endl;
    }

    result = m_config->set("group.id", m_groupId, error_str);		//设置消费组名:group.id(string类型)
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'group.id' failed: " << error_str << std::endl;
    }

    result = m_config->set("max.partition.fetch.bytes", "1024000", error_str);	//消费消息的最大大小
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'max.partition.fetch.bytes' failed: " << error_str << std::endl;
    }

    result = m_config->set("enable.partition.eof", "false", error_str);		//enable.partition.eof: 当消费者到达分区结尾,发送 RD_KAFKA_RESP_ERR__PARTITION_EOF 事件,默认值 true
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'enable.partition.eof' failed: " << error_str << std::endl;
    }


    m_event_cb = new ConsumerEventCb;
    result = m_config->set("event_cb", m_event_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'event_cb' failed: " << error_str << std::endl;
    }

    m_reblance_cb = new ConsumerRebalanceCb;
    result = m_config->set("rebalance_cb", m_reblance_cb, error_str);
    if(result != RdKafka::Conf::CONF_OK) {
    	std::cout << "Conf set 'rebalance_cb' failed: " << error_str << std::endl;
    }


    //设置 topic_conf的配置项:
    result = m_topicConfig->set("auto.offset.reset", "latest", error_str);
    if(result != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Topic Conf set 'auto.offset.reset' failed: " << error_str << std::endl;
    }

    result = m_config->set("default_topic_conf", m_topicConfig, error_str);
    if(result != RdKafka::Conf::CONF_OK)
    {
        std::cout << "Conf set 'default_topic_conf' failed: " << error_str << std::endl;
    }


    //创建消费者客户端:
    m_consumer = RdKafka::KafkaConsumer::create(m_config, error_str);
    if(m_consumer == nullptr) {
    	std::cout << "Create KafkaConsumer failed: " << error_str << std::endl;
    }
    std::cout << "Create KafkaConsumer succeed, consumer name : " << m_consumer->name() << std::endl;
}


void RdKafkaConsumer::pullMessage() {

}

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/298985.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号