栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 系统运维 > 运维 > Linux

RabbitMQ的简单理解及PHP使用RabbitMQ(附yii2使用方法)

Linux 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ的简单理解及PHP使用RabbitMQ(附yii2使用方法)

最近问了几个小伙伴,发现大家都仅仅是会调用操作RabbitMQ的方法,或者仅停留在使用RabbitMQ的层次上。稍微深入一点的知识,就不太清楚了。所以在此记录一下个人对它的理解及PHP怎么使用。

注:才疏学浅,若有不对之处,敬请指正!

首先谈一下RabbitMQ的整体架构(图示):

docker安装RabbitMQ的教程可以参考:使用docker安装RabbitMQ

windows环境PHP安装amqp扩展可以参考:windows环境PHP使用RabbitMq安装amqp扩展

关于rabbitMq的四种交换机类型(Direct exchange、Fanout exchange、Topic exchange、Headers exchange)在此就不做一一说明了,网上资料有很多。文中示例使用较常用的Direct类型。 可以参考这篇博客:RabbitMQ四种交换机类型介绍

在日常开发中需要注意关于生产者(producter )和消费者(consumer )下面几点注意点:

    虚拟地址(vhost) 用于逻辑的隔离,是最上层的消息路由,用来表明某个虚拟机。一个vhost里面可以有很多exchange和queue,但不能有相同名称的exchange或queue。网络信道(channel ),进行消息读写的通道。个人理解:它类似于MySQL里面的会话(Session)。如果虚拟机中不存在路由键(RouteKey)绑定的队列(queue)名,则生产者发出的消息会被抛弃。无论生产者与消费者两者谁先启动,都需要先保证vhost中routeKey存在。否则生产者发出的消息会被抛弃。一般看到视频讲解中或者别人的代码示例,都是先运行consumer,这是因为consumer中指定了队列和路由键(路由键不指定也可以,可以看第一个示例)。两者都不指定交换机,消费者只指定队列名、 生产者只指定路由键,交换机会默认为“Exchange: (AMQP default) ” ,消息会发送到 名称 和生产者中指定的路由键的名称相同的队列 中去。(这句话有点绕,可以结合下面第一个示例来看)对于同一个交换机,生产者与消费者的路由键一样,生产者的消息会分发到各队列中。假如两个消费者,一个监听队列A,一个监听队列B,绑定的路由键都是key1,生产者的路由键也是key1,那么生产者发送10条消息,队列AB 各收到10条,两个消费者分别消费10条来自生产者的消息。

第一个示例(不指定交换机、生产者只指定路由键、消费者只指定队列名):

consumer 代码(consumer.php ):

 '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/'
);   

$queueName = 'queueTest'; //队列名 

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!n");   
}

//创建channel ,进行消息读写的通道 类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);    
   
//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化  
echo "Message Total:".$queue->declareQueue()."n";   
 
//阻塞模式接收消息 
echo "Waiting for message...:n";   

while(True){ 
    $queue->consume('processMessage');   
    //$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答  
} 

//断开连接
$conn->disconnect();   
 
 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo "Received:".$msg."n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
}

?>

producter 代码(producter.php):

 '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   

$routeKey = 'queueTest'; //路由键

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!n");   
}

//创建channel
$channel = new AMQPChannel($connection);
   
//创建交换机
$exchange = new AMQPExchange($channel);

//发送消息 
for($i=0; $i<10; ++$i){
    sleep(2);
    //消息内容 
    $message = "Hello RabbitMq! send time:".date("h:i:sa");   
    echo "Send Message:".$exchange->publish($message, $routeKey)."n";  
}

$connection->disconnect();
?>

然后在命令行中运行消费者代码 : php ./consumer.php

 此时,看一下rabbitMq后台,可以看到:

 

 然后再起一个命令行窗口运行生产者代码 : php ./producter.php

 再看运行消费者代码的命令行:

 点击队列名,queueTest:可以发现

 现在,再回头看上面的第6点,两者都不指定交换机,消费者指定的队列名和生产者指定的路由键名称相同都是“queueTest”,交换机会默认为“AMQP default” ,生产者的消息会发送到队列名称 为queueTest的队列 中去。可以点击(AMQP default)交换机,可以看到bindings下面有句话,其实就是第6点的描述。


第二个示例(指定各个参数,使用一个direct类型的交换机):

注:代码中的类中的方法可以在此查询:http://docs.php.net/manual/da/book.amqp.php

consumer 代码(consumer.php ):

 '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/'
); 
  
$exchangeName = 'test'; //交换机名  交换机会根据路由键转发消息到绑定的队列
$queueName = 'queue1'; //队列名 
$routeKey = 'key1'; //路由键 

//创建连接 类中的方法可以从http://docs.php.net/manual/da/book.amqp.php 查询
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!n");   
}

//创建channel ,进行消息读写的通道,类似于MySQL里面的会话(Session)
$channel = new AMQPChannel($connection);   
 
//创建交换机    
$exchange = new AMQPExchange($channel);   
$exchange->setName($exchangeName); 
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型  
$exchange->setFlags(AMQP_DURABLE); //持久化 
echo "Exchange Status:".$exchange->declareExchange()."n";   
   
//创建队列    
$queue = new AMQPQueue($channel); 
$queue->setName($queueName);   
$queue->setFlags(AMQP_DURABLE); //队列持久化  
echo "Message Total:".$queue->declareQueue()."n";   
 
//绑定交换机与队列,并指定路由键 
echo 'Queue Bind: '.$queue->bind($exchangeName, $routeKey)."n"; 
 
//阻塞模式接收消息 
echo "Waiting for message...:n";   
while(True){ 
    $queue->consume('processMessage');   
    //$queue->consume('processMessage', AMQP_AUTOACK); //加上参数AMQP_AUTOACK,自动ACK应答  
} 

//断开连接
$conn->disconnect();   
 
 
function processMessage($envelope, $queue) { 
    $msg = $envelope->getBody(); 
    echo "Received:".$msg."n"; //处理消息 
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答 
}

?>

producter 代码(producter.php):

 '127.0.0.1',  
    'port' => '5672',  
    'login' => 'guest',  
    'password' => 'guest', 
    'vhost'=>'/' 
);   

$exchangeName = 'test'; //交换机名  可以不选择
$routeKey = 'key1'; //路由键  如果vhost中不存在RouteKey中指定的队列名,则该消息会被抛弃

//创建连接
$connection = new AMQPConnection($configParams);   
if (!$connection->connect()) {   
    die("Cannot connection rabbitMq!n");   
}

//创建channel
$channel = new AMQPChannel($connection);   
 
//创建交换机对象    
$exchange = new AMQPExchange($channel);   
$exchange->setName($exchangeName);    
$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$exchange->setFlags(AMQP_DURABLE); //持久化

//发送消息 
for($i=0; $i<10; ++$i){
    sleep(2);
    //消息内容 
    $message = "Hello RabbitMq! send time:".date("h:i:sa");   
    echo "Send Message:".$exchange->publish($message, $routeKey)."n";  
}

$connection->disconnect();
?>

这个运行结果就不一一截图了,有兴趣的可以自己尝试一下(还有其他类型的交换机)。


yii2中怎么使用rabbitMq呢?先要用composer 加载扩展包

composer require php-amqplib/php-amqplib:* 

具体版本可以在此查询下面

https://packagist.org/packages/php-amqplib/php-amqplib

引入包后,可以参照这篇博客->YII2 框架如何使用rabbitMq,要注意不要把消费者写成函数放到postman中请求,因为肯定会报“Error: Response timed out”,最好写成脚本运行。

时间有限,我会抽空慢慢修改或增加关于rabbitMq的内容。

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/756920.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号