栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

RabbitMQ延时队列实现(PHP)

RabbitMQ延时队列实现(PHP)

linux 下安装RabbitMQ
转载: https://blog.csdn.net/qq_39135287/article/details/95725385

本教程为 windows 示例:
转载: https://www.jianshu.com/p/a6f21317722a

自测:
服务 + 延迟队列插件 (注意版本)
RabbitMq Server 3.7.4
rabbitmq_delayed_message_exchange-3.8.0.ez [适用于3.7 ~ 3.8]

插件地址: https://github.rc1844.workers.dev/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez

下载完毕: 放在plugins目录

常用命令:

进入到RabbitMq Server 3.7.4 sbin 目录下

rabbitmq-service start			// 启动
rabbitmq-service stop         // 停止
rabbitmq-plugins enable rabbitmq_management    // 启用管理界面 (web管理界面)
rabbitmq-plugins enable rabbitmq_delayed_message_exchange     // 启用延迟队列插件

安装完插件, 重新启动rabbitmq服务, 出现如下图所示, 即插件安装成功

PHP项目代码

    protected $connection; //连接
    protected $channel; //频道
    protected $config = [
        'host'  => 'localhost',
        'port'  => 5672,
        'user'  => 'guest',
        'pass'  => 'guest',
        'vhost' => '/',
    ];

    //  过期时间
    const TIMEOUT_5_S   = 5;     // 5s
    private $exchange_logs      = "logs";
    private $exchange_direct    = "direct";
    private $exchange_delayed   = "delayed";
    private $queue_delayed      = "delayedQueue";

    CONST EXCHANGETYPE_FANOUT   = "fanout";
    CONST EXCHANGETYPE_DIRECT   = "direct";
    CONST EXCHANGETYPE_DELAYED  = "x-delayed-message";
	
	// 生命连接
	protected function _initialize()
	{
	    $this->connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pass'], $this->config['vhost']);
	
	    $this->channel = $this->connection->channel();
	
	
	    // 声明Exchange
	    $this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
	    $this->channel->queue_declare($this->queue_delayed, false, true, false, false);
	    $this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed);
	}

	// 创建消息 , 发送消息第三行 会调用此方法
	public function createMessageDelay($msg,$time) {
	    $delayConfig = [
	        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
	        'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
	    ];
	    $msg  = new AMQPMessage($msg,$delayConfig);
	    return $msg;
	}

	
	// public function sendDelay($msg,$time =  self::TIMEOUT_10_S) {
	public function sendDelay() {
	    $msg = '555555';
	    $time = 10;
	    $msg = $this->createMessageDelay($msg,$time);
	    $this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed);
	    $this->channel->close();
	    $this->connection->close();
	}

	
	public function consumDelay(){
	    $callback = function($msg){
	        echo ' [x] ', $msg->body, "n";
	        $this->channel->basic_ack($msg->delivery_info['delivery_tag'],false);
	    };
	    $this->channel->basic_qos(null, 1, null);
	    $this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
	    echo ' [*] Waiting for logs. To exit press CTRL+C', "n";
	    while (count($this->channel->callbacks)) {
	        $this->channel->wait();
	    }
	
	    $this->channel->close();
	    $this->connection->close();
	}
  

测试类
1: 接收监听

2: 执行发送

ps:

mq修改完代码, 接收接听需要 重新监听

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/774888.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号