目录
前言
composer安装扩展
遇到的问题
代码展示
1.api接口,进行消息发布
2.生产者类库
3.消费者类库
4.目录结构
5.PHP中 register_shutdown_function 函数的基础介绍与用法详解
windows上运行消费者类库
1.项目根目录下执行
2.执行后代码展示
3.修改下config/console.php
4.项目根目录下执行
5.效果展示
6.tp6.0自定义指令
前言
昨天用树莓派搭建的rabbitmq今天就迫不及待的上手试试了!哈哈哈,(附上树莓派搭建步骤树莓派(Raspberry Pi)上安装RabbitMQ(一)_zk_jy520的博客-CSDN博客)赶紧熬夜把操作过程整理了一遍分享给大家,哪个小伙伴想试试的话,也可以按照这个这个步骤来,能帮到的话,记得点赞收藏哦,没有帮到,也可以点赞收藏,给我点鼓励呀!!!
composer安装扩展
1.composer安装
composer require php-amqplib/php-amqplib
遇到的问题
代码展示
1.api接口,进行消息发布
//RabbitMQ使用
public function test4(){
$client = Producer::getInstance();
$res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
dump($res);
}
代码展示
1.api接口,进行消息发布
//RabbitMQ使用
public function test4(){
$client = Producer::getInstance();
$res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
dump($res);
}
//RabbitMQ使用
public function test4(){
$client = Producer::getInstance();
$res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
dump($res);
}
发送成功
2.生产者类库
host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
}else{
$this->host = $host;
$this->port = $port;
$this->user = $user;
$this->pwd = $pwd;
$this->vhost = $vhost;
}
self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
}
public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') {
if (!(self::$instance instanceof self)) {
self::$instance = new self($host,$port,$user,$pwd,$vhost);
}
return self::$instance;
}
public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){
$channel = self::$client->channel();
$channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null);
//绑定多个对列
foreach ($QueueArr as $key=>$value){
$channel->queue_declare($value,false,true,false,false,false);
//队列和交换机绑定
$channel->queue_bind($value, $exchange,$route_key);
}
//发送消息
$message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id));
$channel->basic_publish($message,$exchange,$route_key);
$channel->close();
self::$client->close();
return true;
}
}
3.消费者类库
host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
$this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
$this->channel = $this->client->channel();
}
public function start(){
$this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function shutdown($channel, $connection){
$channel->close();
$connection->close();
save_log('close');
}
public function process_message($message){
echo $message->body."n";
//手动发送ack
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
}
4.目录结构
host = $info['address'];
$this->port = $info['port'];
$this->user = $info['user'];
$this->pwd = $info['pwd'];
$this->vhost = $info['vhost'];
$this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
$this->channel = $this->client->channel();
}
public function start(){
$this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
public function shutdown($channel, $connection){
$channel->close();
$connection->close();
save_log('close');
}
public function process_message($message){
echo $message->body."n";
//手动发送ack
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
}
4.目录结构
考虑到大家会不理解这个函数哈,下面这链接详解,通俗易懂
5.PHP中 register_shutdown_function 函数的基础介绍与用法详解
参考链接:https://www.jb51.net/article/129213.htm
windows上运行消费者类库
1.项目根目录下执行
php think make:command Consumer
php think make:command Consumer
2.执行后代码展示
setName('consumer')
->setDescription('the consumer command');
}
protected function execute(Input $input, Output $output)
{
$consumer = new servicesrabbitmqConsumer();
$consumer->start();
// 指令输出
$output->writeln('consumer');
}
}
3.修改下config/console.php
添加下面代码
[
'consumer' => 'appcommandConsumer',
],
];
4.项目根目录下执行
php think consumer
5.效果展示
6.tp6.0自定义指令
参考链接:自定义指令 · ThinkPHP6.0完全开发手册 · 看云



