栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

PHP + RabbitMQ + TP6实现,记录操作全过程

PHP + RabbitMQ + TP6实现,记录操作全过程

目录

前言

composer安装扩展

遇到的问题 

代码展示

1.api接口,进行消息发布

2.生产者类库

3.消费者类库

4.目录结构

5.PHP中 register_shutdown_function 函数的基础介绍与用法详解

windows上运行消费者类库

1.项目根目录下执行

2.执行后代码展示

3.修改下config/console.php

4.项目根目录下执行

5.效果展示

6.tp6.0自定义指令


前言

昨天用树莓派搭建的rabbitmq今天就迫不及待的上手试试了!哈哈哈,(附上树莓派搭建步骤树莓派(Raspberry Pi)上安装RabbitMQ(一)_zk_jy520的博客-CSDN博客)赶紧熬夜把操作过程整理了一遍分享给大家,哪个小伙伴想试试的话,也可以按照这个这个步骤来,能帮到的话,记得点赞收藏哦,没有帮到,也可以点赞收藏,给我点鼓励呀!!!

composer安装扩展
1.composer安装
composer require  php-amqplib/php-amqplib

遇到的问题 

代码展示

1.api接口,进行消息发布
    //RabbitMQ使用
    public function test4(){

        $client = Producer::getInstance();

        $res = $client->publishMsg('exchange_name',['queue_name'],'111111','','');
        dump($res);
    }

发送成功

2.生产者类库
host = $info['address'];
            $this->port = $info['port'];
            $this->user = $info['user'];
            $this->pwd = $info['pwd'];
            $this->vhost = $info['vhost'];
        }else{
            $this->host = $host;
            $this->port = $port;
            $this->user = $user;
            $this->pwd = $pwd;
            $this->vhost = $vhost;
        }
        self::$client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
    }

    public static function getInstance($host = '',$port = '',$user = '',$pwd = '',$vhost = '') {
        if (!(self::$instance instanceof self)) {
            self::$instance = new self($host,$port,$user,$pwd,$vhost);
        }
        return self::$instance;
    }

    public function publishMsg($exchange,$QueueArr,$msg,$message_id,$route_key = '',$expiration = 3600 * 90){
        $channel = self::$client->channel();

        
        $channel->exchange_declare($exchange, AMQPExchangeType::DIRECT, false, true, false, false, false, [], null);

        //绑定多个对列
        foreach ($QueueArr as $key=>$value){

            
            $channel->queue_declare($value,false,true,false,false,false);
            //队列和交换机绑定
            $channel->queue_bind($value, $exchange,$route_key);
        }

        //发送消息
        $message = new AMQPMessage($msg,array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,'expiration'=>$expiration * 1000,'message_id'=>$message_id));
        $channel->basic_publish($message,$exchange,$route_key);
        $channel->close();
        self::$client->close();
        return true;
    }


}

3.消费者类库
host = $info['address'];
        $this->port = $info['port'];
        $this->user = $info['user'];
        $this->pwd = $info['pwd'];
        $this->vhost = $info['vhost'];

        $this->client = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pwd, $this->vhost);
        $this->channel = $this->client->channel();
    }

    public function start(){

        
        $this->channel->basic_consume('queue_name','',false, false, false, false,[$this,'process_message']);
        register_shutdown_function([$this, 'shutdown'], $this->channel, $this->client);

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }

    }

    public function shutdown($channel, $connection){
        $channel->close();
        $connection->close();
        save_log('close');
    }

    public function process_message($message){
        echo  $message->body."n";
        //手动发送ack
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }


}

4.目录结构

考虑到大家会不理解这个函数哈,下面这链接详解,通俗易懂

5.PHP中 register_shutdown_function 函数的基础介绍与用法详解

参考链接:https://www.jb51.net/article/129213.htm

windows上运行消费者类库

1.项目根目录下执行
php think make:command Consumer

2.执行后代码展示
setName('consumer')
            ->setDescription('the consumer command');        
    }

    protected function execute(Input $input, Output $output)
    {

        $consumer = new servicesrabbitmqConsumer();

        $consumer->start();

    	// 指令输出
    	$output->writeln('consumer');
    }
}

3.修改下config/console.php

添加下面代码

 [
        'consumer' => 'appcommandConsumer',
    ],
];
4.项目根目录下执行
php think consumer

5.效果展示

6.tp6.0自定义指令

参考链接:自定义指令 · ThinkPHP6.0完全开发手册 · 看云 

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/670929.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号