栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

PHP 框架 Hyperf 实现处理超时未支付订单和延时队列

延时队列

  • Delayproducer.Php

  • Amqpbuilder.Php

AmqpBuilder.php

arguments = array_merge($this->arguments, $arguments);
 return $this;
    }
    
    public function setDelayedQueue(string $queueName, int $xMessageTtl, string $xDeadLetterExchange, string $xDeadLetterRoutingKey) : self
    {
 $this->setArguments([
     'x-message-ttl'      => ['I', $xMessageTtl * 1000], // 毫秒
     'x-dead-letter-exchange'    => ['S', $xDeadLetterExchange],
     'x-dead-letter-routing-key' => ['S', $xDeadLetterRoutingKey],
 ]);
 $this->setQueue($queueName);
 return $this;
    }
}

DelayProducer.php

produceMessage($producerMessage, $queueBuilder, $/confirm/i, $timeout);
 });
    }
    
    private function produceMessage(ProducerMessageInterface $producerMessage, AmqpBuilder $queueBuilder, bool $confirm = false, int $timeout = 5) : bool
    {
 $result = false;
 $this->injectMessageProperty($producerMessage);
 $message = new AMQPMessage($producerMessage->payload(), $producerMessage->getProperties());
 $pool    = $this->getConnectionPool($producerMessage->getPoolName());
 
 $connection = $pool->get();
 if ($/confirm/i) {
     $channel = $connection->getConfirmChannel();
 } else {
     $channel = $connection->getChannel();
 }
 $channel->set_ack_handler(function () use (&$result)
 {
     $result = true;
 });
 try {
     // 处理延时队列
     $exchangeBuilder = $producerMessage->getExchangeBuilder();
     // 队列定义
     $channel->queue_declare($queueBuilder->getQueue(), $queueBuilder->isPassive(), $queueBuilder->isDurable(), $queueBuilder->isExclusive(), $queueBuilder->isAutoDelete(), $queueBuilder->isNowait(), $queueBuilder->getArguments(), $queueBuilder->getTicket());
     // 路由定义
     $channel->exchange_declare($exchangeBuilder->getExchange(), $exchangeBuilder->getType(), $exchangeBuilder->isPassive(), $exchangeBuilder->isDurable(), $exchangeBuilder->isAutoDelete(), $exchangeBuilder->isInternal(), $exchangeBuilder->isNowait(), $exchangeBuilder->getArguments(), $exchangeBuilder->getTicket());
     // 队列绑定
     $channel->queue_bind($queueBuilder->getQueue(), $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     // 消息发送
     $channel->basic_publish($message, $producerMessage->getExchange(), $producerMessage->getRoutingKey());
     $channel->wait_for_pending_acks_returns($timeout);
 } catch (Throwable $exception) {
     // Reconnect the connection before release.
     $connection->reconnect();
     throw $exception;
 }
 finally {
     $connection->release();
 }
 return $confirm ? $result : true;
    }
    
    private function injectMessageProperty(ProducerMessageInterface $producerMessage) : void
    {
 if (class_exists(AnnotationCollector::class)) {
     
     $annotation = AnnotationCollector::getClassAnnotation(get_class($producerMessage), Producer::class);
     if ($annotation) {
  $annotation->routingKey && $producerMessage->setRoutingKey($annotation->routingKey);
  $annotation->exchange && $producerMessage->setExchange($annotation->exchange);
     }
 }
    }
}

处理超时订单

  • Orderqueueconsumer.Php

  • Orderqueueproducer.Php

Orderqueueproducer.php

payload = $data;
    }
    public function getExchangeBuilder() : ExchangeBuilder
    {
 return parent::getExchangeBuilder(); // TODO: Change the autogenerated stub
    }
}

Orderqueueconsumer.php

Demo

$builder = new AmqpBuilder();
 $builder->setDelayedQueue('order_exchange', 1, 'delay_exchange', 'delay_route');
 $que = ApplicationContext::getContainer()->get(DelayProducer::class);
 var_dump($que->produce(new OrderQueueProducer(['order_sn' => (string)mt_rand(10000, 90000)]), $builder))

推荐教程:《PHP教程》

以上就是PHP 框架 Hyperf 实现处理超时未支付订单和延时队列的详细内容,更多请关注考高分网其它相关文章!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/263154.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号