栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

详解PHP实现生产者与消费者(Kafka应用)

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

详解PHP实现生产者与消费者(Kafka应用)

本篇文章给大家介绍PHP实现生产者与消费者,希望对需要的朋友有所帮助!

前言

PHP中使用Kafka需要RdKafka扩展,而RdKafka依赖于librdkafka,所以这两个我们都需要安装,具体安装方法自行百度,本篇不做说明了。

生产者(测试)

创建消费者需要步骤:

  • 生产者配置参数
  • 创建生产者实例
  • 创建主题实例(依赖生产者)
  • 生产主题消息
  • 推送消息

具体代码如下:

 $conf = new RdKafkaConf();
 // 绑定服务节点
 $conf->set('metadata.broker.list', '127.0.0.1:32772');

 // 创建生产者
 $kafka = new RdKafkaProducer($conf);

 // 创建主题实例
 $topic = $kafka->newTopic('p1r1');
 // 生产主题数据,此时消息在缓冲区中,并没有真正被推送
 $topic->produce(RD_KAFKA_PARTITION_UA, 0, 'Message');
 // 阻塞时间(毫秒), 0为非阻塞
 $kafka->poll(0); 

 // 推送消息,如果不调用此函数,消息不会被发送且会丢失
 $result = $kafka->flush(5000);

 if (RD_KAFKA_RESP_ERR_NO_ERROR !== $result) {
     throw new RuntimeException('Was unable to flush, messages might be lost!');
 }
消费者

创建一个消费者需要几个步骤:

  • 消费者配置参数
  • 应用配置参数创建消费者实例
  • 订阅对应主题
  • 拉取数据
  • 提交位移

具体代码如下:

 $conf = new RdKafkaConf();
 // 绑定消费者组
 $conf->set('group.id', 'ceshi');
 // 绑定服务节点,多个用,分隔
 $conf->set('metadata.broker.list', '127.0.0.1:32787');
 // 设置自动提交为false
 $conf->set('enable.auto.commit', 'false');
 // 设置当前消费者拉取数据时的偏移量, 可选参数:
 // earliest: 如果消费者组是新创建的,从头开始消费,否则从消费者组当前消费位移开始。
 // latest:如果消费者组是新创建的,从最新偏移量开始,否则从消费者组当前消费位移开始。
 $conf->set('auto.offset.reset', 'earliest');

 // 创建消费者实例
 $consumer = new RdKafkaKafkaConsumer($conf);
 // 消费者订阅主题,数组形式
 $consumer->subscribe(['topic1','topic2']);
 while (true) {
     // 消费数据,阻塞5秒(5秒内有数据就消费,没有数据等待5秒进入下一轮循环)
     $message = $consumer->consume(5000);
     switch ($message->err) {
  case RD_KAFKA_RESP_ERR_NO_ERROR:
      // 业务逻辑
      var_dump($message);

      // 提交位移
      $consumer->commit($message);
      break;
  case RD_KAFKA_RESP_ERR__PARTITION_EOF:
      echo "No more messages; will wait for moren";
      break;
  case RD_KAFKA_RESP_ERR__TIMED_OUT:
      echo "Timed outn";
      break;
  default:
      throw new Exception($message->errstr(), $message->err);
      break;
     }
 }
 // 关闭消费者(一般用在脚本中,不需要关闭)
 $conumser->close();

只消费指定分区中的数据:

    // 对消费者指定分区,注意此方式不能与subscribe一同使用
    $consumer->assign([
 new RdKafkaTopicPartition("topic", 0),
 new RdKafkaTopicPartition("topic", 1),
    ]);

以上就是详解PHP实现生产者与消费者(Kafka应用)的详细内容,更多请关注考高分网其它相关文章!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/262553.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号