linux 下安装RabbitMQ
转载: https://blog.csdn.net/qq_39135287/article/details/95725385
本教程为 windows 示例:
转载: https://www.jianshu.com/p/a6f21317722a
自测:
服务 + 延迟队列插件 (注意版本)
RabbitMq Server 3.7.4
rabbitmq_delayed_message_exchange-3.8.0.ez [适用于3.7 ~ 3.8]
插件地址: https://github.rc1844.workers.dev/rabbitmq/rabbitmq-delayed-message-exchange/releases/download/v3.8.0/rabbitmq_delayed_message_exchange-3.8.0.ez
下载完毕: 放在plugins目录
常用命令:
进入到RabbitMq Server 3.7.4 sbin 目录下
rabbitmq-service start // 启动 rabbitmq-service stop // 停止 rabbitmq-plugins enable rabbitmq_management // 启用管理界面 (web管理界面) rabbitmq-plugins enable rabbitmq_delayed_message_exchange // 启用延迟队列插件
安装完插件, 重新启动rabbitmq服务, 出现如下图所示, 即插件安装成功
PHP项目代码
protected $connection; //连接
protected $channel; //频道
protected $config = [
'host' => 'localhost',
'port' => 5672,
'user' => 'guest',
'pass' => 'guest',
'vhost' => '/',
];
// 过期时间
const TIMEOUT_5_S = 5; // 5s
private $exchange_logs = "logs";
private $exchange_direct = "direct";
private $exchange_delayed = "delayed";
private $queue_delayed = "delayedQueue";
CONST EXCHANGETYPE_FANOUT = "fanout";
CONST EXCHANGETYPE_DIRECT = "direct";
CONST EXCHANGETYPE_DELAYED = "x-delayed-message";
// 生命连接
protected function _initialize()
{
$this->connection = new AMQPStreamConnection($this->config['host'], $this->config['port'], $this->config['user'], $this->config['pass'], $this->config['vhost']);
$this->channel = $this->connection->channel();
// 声明Exchange
$this->channel->exchange_declare($this->exchange_delayed, self::EXCHANGETYPE_DELAYED, false, true, false,false,false,new AMQPTable(["x-delayed-type" => self::EXCHANGETYPE_DIRECT]));
$this->channel->queue_declare($this->queue_delayed, false, true, false, false);
$this->channel->queue_bind($this->queue_delayed, $this->exchange_delayed,$this->queue_delayed);
}
// 创建消息 , 发送消息第三行 会调用此方法
public function createMessageDelay($msg,$time) {
$delayConfig = [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
'application_headers' => new AMQPTable(['x-delay' => $time * 1000])
];
$msg = new AMQPMessage($msg,$delayConfig);
return $msg;
}
// public function sendDelay($msg,$time = self::TIMEOUT_10_S) {
public function sendDelay() {
$msg = '555555';
$time = 10;
$msg = $this->createMessageDelay($msg,$time);
$this->channel->basic_publish($msg,$this->exchange_delayed,$this->queue_delayed);
$this->channel->close();
$this->connection->close();
}
public function consumDelay(){
$callback = function($msg){
echo ' [x] ', $msg->body, "n";
$this->channel->basic_ack($msg->delivery_info['delivery_tag'],false);
};
$this->channel->basic_qos(null, 1, null);
$this->channel->basic_consume($this->queue_delayed, '', false, false, false, false, $callback);
echo ' [*] Waiting for logs. To exit press CTRL+C', "n";
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
$this->channel->close();
$this->connection->close();
}
测试类
1: 接收监听
2: 执行发送
ps:
mq修改完代码, 接收接听需要 重新监听



