栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Kafka生产者的客户端PHP版本

Kafka生产者的客户端PHP版本

一、准备工作

虽然 Kafka 是用 Java/Scala 语言编写的,但这不妨碍它对多语言的支持。可以在 Kafka 官网的 CLIENTS 查看 Kafka 支持的语言,其中包括 C/C++、Python、Go 等语言。

PHP 操作 Kafka 需要安装 librdkafka 库和 kafka 的 PHP 扩展。
1.安装 librdkafka 库

git clone https://github.com/edenhill/librdkafka.git
./configure
 make
 sudo make install

2.安装 php-kafka 扩展

$ git clone https://github.com/arnaud-lb/php-rdkafka.git
$ cd librdkafka/
$ phpize
$ ./configure 
$ make
$ sudo make install

#在php.ini 文件中配置 rdkafka扩展
extension=rdkafka.so

#查看扩展是否生效
php -m | grep kafka

二、代码实现

demo 来源于 https://github.com/arnaud-lb/php-rdkafka#examples

正常的生产逻辑如下:
1.配置生产者客户端参数及创建相应的生产者实例;

$conf = new RdKafkaConf();
$conf->set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafkaProducer($conf);
$rk->addBrokers("127.0.0.1");

2.构建主题;

$topic = $rk->newTopic("test");

3.发送消息;

$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");

4.关闭生产者实例。

$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);

检验消息是否发送成功
终端开启一个消费者:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

在另一个窗口执行 php producer.php,
正在上传…重新上传取消

可看到消费者终端接收到消息。
正在上传…重新上传取消

完整代码如下:

set('log_level', LOG_DEBUG);
//$conf->set('debug', 'all');
$rk = new RdKafkaProducer($conf);
$rk->addBrokers("127.0.0.1");


$topic = $rk->newTopic("test");


$topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message payload");


$timeout_ms = 60000; // 1 minute
$rk->flush($timeout_ms);

echo 'finished';exit;
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/751322.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号