kafka是一个分布式流处理的平台,通过kafka我们可以发布和订阅流式记录。有关kafka的介绍可以参考官网或者这篇文章https://juejin.im/post/6844903495670169607,介绍的非常的详细。
kafka中多个broker称为一个集群,一个broker(独立的kafka服务器)中可以创建不同的topic(主题),topic下可以建立多个partition(分区),数据则存放在分区中的一个单元里面(partition可以近似理解为一个数组,存入kafka的一条记录就存放在partition1[0]、partition1[1]…)
Kafka安装个人喜欢docker方式,安装方便
# WARNING: This docker-compose.yml is only for testing purpose.
# Parameters:
# - name: CONFLUENT_PLATFORM_VERSION
# default: 3.0.0
# reference: https://hub.docker.com/u/confluentinc/
# Ports:
# - description: Major ports are exposed to host computer
# - zookeeper: 2181
# kafka1: 9091
# kafka2: 9092
# kafka3: 9093
# kafka4: 9094
# kafka5: 9095
# Tips:>
# - You can up part of the cluster with below command.
# $ docker-compose up -d kafka1 kafka2 kafka3
version: '3.3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
ports:
- "2181:2181"
- "2888:2888"
- "3888:3888"
healthcheck:
test: echo stat | nc localhost 2181
interval: 10s
timeout: 10s
retries: 3
environment:
- ZOOKEEPER_SERVER_ID=1
- ZOOKEEPER_CLIENT_PORT=2181
- ZOOKEEPER_TICK_TIME=2000
- ZOOKEEPER_INIT_LIMIT=5
- ZOOKEEPER_SYNC_LIMIT=2
- ZOOKEEPER_SERVERS=zookeeper:2888:3888
kafka1:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9091:9091"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9091
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9091
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=1
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka2:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9092
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=2
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka3:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9093:9093"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9093
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9093
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=3
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka4:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9094:9094"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9094
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9094
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=4
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
kafka5:
image: confluentinc/cp-kafka:${CONFLUENT_PLATFORM_VERSION:-3.0.0}
healthcheck:
test: ps augwwx | egrep [S]upportedKafka
depends_on:
- zookeeper
ports:
- "9095:9095"
environment:
- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.1.102:9095
- KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9095
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
- KAFKA_BROKER_ID=5
- BOOTSTRAP_SERVERS=192.168.1.102:9091,192.168.1.102:9092,192.168.1.102:9093,192.168.1.102:9094,192.168.1.102:9095
- ZOOKEEPER=zookeeper:2181
Kafka 生产/消费
个人喜欢c/c++操作kafka,需要安装kafka客户端librdkafka、cppkafka
sudo apt install librdkafka-dev git clone https://github.com/mfontanini/cppkafka.git cd cppkafka mkdir build && cd build cmake -DCMAKE_BUILD_TYPE=RELEASE .. make -j sudo make install
cmake使用方式
find_package(CppKafka REQUIRED) target_link_libraries(CppKafka::cppkafka)
生产
#include#include #include #include "cppkafka/utils/buffered_producer.h" #include "cppkafka/configuration.h" using std::string; using std::exception; using std::getline; using std::cin; using std::cout; using std::endl; using cppkafka::BufferedProducer; using cppkafka::Configuration; using cppkafka::Topic; using cppkafka::MessageBuilder; using cppkafka::Message; namespace po = boost::program_options; int main(int argc, char* argv[]) { string brokers; string topic_name; int partition_value = -1; po::options_description options("Options"); options.add_options() ("help,h", "produce this help message") ("brokers,b", po::value (&brokers)->required(), "the kafka broker list") ("topic,t", po::value (&topic_name)->required(), "the topic in which to write to") ("partition,p", po::value (&partition_value), "the partition to write into (unassigned if not provided)") ; po::variables_map vm; try { po::store(po::command_line_parser(argc, argv).options(options).run(), vm); po::notify(vm); } catch (exception& ex) { cout << "Error parsing options: " << ex.what() << endl; cout << endl; cout << options << endl; return 1; } // Create a message builder for this topic MessageBuilder builder(topic_name); // Get the partition we want to write to. If no partition is provided, this will be // an unassigned one if (partition_value != -1) { builder.partition(partition_value); } // Construct the configuration Configuration config = { { "metadata.broker.list", brokers } }; // Create the producer BufferedProducer producer(config); // Set a produce success callback producer.set_produce_success_callback([](const Message& msg) { cout << "Successfully produced message with payload " << msg.get_payload() << endl; }); // Set a produce failure callback producer.set_produce_failure_callback([](const Message& msg) { cout << "Failed to produce message with payload " << msg.get_payload() << endl; // Return false so we stop trying to produce this message return false; }); cout << "Producing messages into topic " << topic_name << endl; // Now read lines and write them into kafka string line; while (getline(cin, line)) { // Set the payload on this builder builder.payload(line); // Add the message we've built to the buffered producer producer.add_message(builder); // Now flush so we: // * emit the buffered message // * poll the producer so we dispatch on delivery report callbacks and // therefore get the produce failure/success callbacks producer.flush(); } }
消费
#include#include #include #include #include "cppkafka/consumer.h" #include "cppkafka/configuration.h" using std::string; using std::exception; using std::cout; using std::endl; using cppkafka::Consumer; using cppkafka::Configuration; using cppkafka::Message; using cppkafka::TopicPartitionList; namespace po = boost::program_options; bool running = true; int main(int argc, char* argv[]) { string brokers; string topic_name; string group_id; po::options_description options("Options"); options.add_options() ("help,h", "produce this help message") ("brokers,b", po::value (&brokers)->required(), "the kafka broker list") ("topic,t", po::value (&topic_name)->required(), "the topic in which to write to") ("group-id,g", po::value (&group_id)->required(), "the consumer group id") ; po::variables_map vm; try { po::store(po::command_line_parser(argc, argv).options(options).run(), vm); po::notify(vm); } catch (exception& ex) { cout << "Error parsing options: " << ex.what() << endl; cout << endl; cout << options << endl; return 1; } // Stop processing on SIGINT signal(SIGINT, [](int) { running = false; }); // Construct the configuration Configuration config = { { "metadata.broker.list", brokers }, { "group.id", group_id }, // Disable auto commit { "enable.auto.commit", false } }; // Create the consumer Consumer consumer(config); // Print the assigned partitions on assignment consumer.set_assignment_callback([](const TopicPartitionList& partitions) { cout << "Got assigned: " << partitions << endl; }); // Print the revoked partitions on revocation consumer.set_revocation_callback([](const TopicPartitionList& partitions) { cout << "Got revoked: " << partitions << endl; }); // Subscribe to the topic consumer.subscribe({ topic_name }); cout << "Consuming messages from topic " << topic_name << endl; // Now read lines and write them into kafka while (running) { // Try to consume a message Message msg = consumer.poll(); if (msg) { // If we managed to get a message if (msg.get_error()) { // Ignore EOF notifications from rdkafka if (!msg.is_eof()) { cout << "[+] Received error notification: " << msg.get_error() << endl; } } else { // Print the key (if any) if (msg.get_key()) { cout << msg.get_key() << " -> "; } // Print the payload cout << msg.get_payload() << endl; // Now commit the message consumer.commit(msg); } } } }



