栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 前沿技术 > 大数据 > 大数据系统

Laravel 8.63.0 之 RabbitMQ 生产&消费案例

Laravel 8.63.0 之 RabbitMQ 生产&消费案例

场景:微服务中,会遇到这样的案例:用户申请提现,总后台(后台服务)审核通过,通知资金服务 更新数据;

1.安装 composer 包
composer requires php-amqplib/php-amqplib ^2.12
2. env 追加配置
RABBITMQ_HOST=xb_rabbitmq
RABBITMQ_PORT=5672
#通过15672创建的rabbitmq虚拟主机,默认是'/'
RABBITMQ_VHOST=/
RABBITMQ_USER=guest
RABBITMQ_PASSWORD=guest
#通过15672创建的rabbitmq队列
RABBITMQ_QUEUE=withdrawal-queue
RABBITMQ_EXCHANGE=withdrawal-exchange


QUEUE_ConNECTION=rabbitmq # 更新
3.追加配置 config/queue.php connections 下 后 执行 php artisan config:cache
'rabbitmq' => [
            'driver'                => 'rabbitmq',

            'host'                  => env('RABBITMQ_HOST', '127.0.0.1'),
            'port'                  => env('RABBITMQ_PORT', 5672),

            'vhost'                 => env('RABBITMQ_VHOST', '/'),
            'login'                 => env('RABBITMQ_LOGIN', 'guest'),
            'password'              => env('RABBITMQ_PASSWORD', 'guest'),

            'queue'                 => env('RABBITMQ_QUEUE'), // name of the default queue,
            'exchange_name'         => env('RABBITMQ_QUEUE'),

            'exchange_declare'      => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists
            'queue_declare_bind'    => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange

            'queue_params'          => [
                'passive'           => env('RABBITMQ_QUEUE_PASSIVE', false),
                'durable'           => env('RABBITMQ_QUEUE_DURABLE', true),
                'exclusive'         => env('RABBITMQ_QUEUE_EXCLUSIVE', false),
                'auto_delete'       => env('RABBITMQ_QUEUE_AUTODELETE', false),
            ],

            'exchange_params' => [
                'name'        => env('RABBITMQ_EXCHANGE_NAME', null),
                'type'        => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html
                'passive'     => env('RABBITMQ_EXCHANGE_PASSIVE', false),
                'durable'     => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts
                'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false),
            ],

        ],
4.审核通过方法
    public function adopt()
    {
        //我将用户提现申请,审核通过 我生产一条消息 等待资金服务消费去做资金改动
        $withdrawalId = 100;//提现id
        event(new WithdrawalEvent(['withdrawal_id' => $withdrawalId]));
    }

5.创建 监听事件 Event
withdrawal= $withdrawal;
    }

    
    public function broadcastOn()
    {
        return new PrivateChannel('channel-name');
    }
}


5.创建 监听事件 Listener
config = config('queue.connections.rabbitmq');
    }

    
    public function handle(WithdrawalEvent $event)
    {
        try {
            $connect = new AMQPStreamConnection( //建立生产者与mq之间的连接
                $this->config['host'],$this->config['port'],$this->config['login'],$this->config['password'], '/'
            );
            $channel = $connect->channel(); //在已连接基础上建立生产者与mq之间的通道
            $channel->exchange_declare($this->config['exchange_name'], 'direct', false, true, false); //声明初始化交换机
            $channel->queue_declare($this->config['queue'], false, true, false, false); //声明初始化一条队列
            $channel->queue_bind($this->config['queue'], $this->config['exchange_name']); //将队列与某个交换机进行绑定,并使用路由关键字
            $msgBody = json_encode($event->withdrawal);
            $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成消息
            $channel->basic_publish($msg, $this->config['exchange_name']); //推送消息到某个交换机
            $channel->close();
            $connect->close();
        }catch (Exception $exception){
            Log::info($exception->getMessage());
        }
    }
}
6.创建服务层
connection = new AMQPStreamConnection(
            config('queue.connections.rabbitmq.host'),
            config('queue.connections.rabbitmq.port'),
            config('queue.connections.rabbitmq.login'),
            config('queue.connections.rabbitmq.password'),
            config('queue.connections.rabbitmq.vhost')
        );
        $this->channel = $this->connection->channel();
        $this->message = new AMQPMessage('', ['content_type' => 'json', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]);
    }

    
    private function __clone()
    {}

    
    public function __destruct()
    {
        $this->channel->close();
        $this->connection->close();
        self::$instance = null;
    }

    
    public static function getInstance()
    {
        if (!self::$instance instanceof self) {
            self::$instance = new self();
        }
        return self::$instance;
    }

    
    public function consumer(string $queue, bool $bForceDelete = false)
    {
        try {
            // 取数据
            // 声明队列
            // 不检测同名队列,持久化,不允许其他队列访问,不自动删除队列
            $this->channel->queue_declare($queue, false, true, false, false);
            $message = $this->channel->basic_get($queue);
            if ($message && $bForceDelete) {
                // 回复确认信息
                $this->channel->basic_ack($message->delivery_info['delivery_tag']);
            }
        } catch (Exception $exception) {
            Log::info($exception->getMessage());
        }
        return $message;
    }

    
    public function ack($nTag)
    {
        try {
            $this->channel->basic_ack($nTag);
        } catch (Exception $e) {
            dd($e->getMessage());
        }
        return null;//success 个人响应
    }
}

7.创建 Command
consumer($queue, true);
            if ($response) {
                $result = json_decode($response->body,1);
                dd($result);
                //资金服务后续操作
            }
            dd('service error');
        }
    }
}

8.测试
1.通过 ‘提现审核通过‘ 的路由 生产消息
	http://test.test/api/user/adopt
	
2.执行 php artisan withdrawal:consumer #如下图 生产环境使用 Supvervisor 等进程管理 常驻监听 (请查看 10)

规则说明
direct精准推送
fanout 广播推送到绑定到此交换机下的所有队列
topic 组播比如上面我绑定的关键字是sms_send,那么他可以推送到*.sms_send的所有队列
headers这个目前不知道是如何推送的
9.在创建交换机和队列的时候各个常用参数说明 地址

   name: $queue    // should be unique in fanout exchange. [队列名称]
   passive: false  // don't check if a queue with the same name exists [是否检测同名队列]
   durable: false // the queue will not survive server restarts [是否开启队列持久化]
   exclusive: false // the queue might be accessed by other channels [队列是否可以被其他队列访问]
   auto_delete: true //the queue will be deleted once the channel is closed. [通道关闭后是否删除队列]
   
   
   
   name: $exchange [交换机名称]
   type: direct [路由类型]
   passive: false []
   durable: true [交换机是否开启持久化]
   auto_delete: false //the exchange won't be deleted once the channel is closed.
10.Supvervisor 守护进程 消费 ,进入容器内

10-1.安装 supervisor

apt-get install supervisor

10-2.切换到配置目录

cd /etc/supervisor/conf.d

10-3.写入配置 vim mq.conf

[program:withdrawal_consumer]                                       #管理进程的命名
   command=php artisan withdrawal:consumer     #执行的命令
   stderr_logfile=/var/log/supervisor/error.log      #错误日志输出路径
   stdout_logfile=/var/log/supervisor/supervisor.log   #日志输出路径
   directory=/workspace/xiaoba/oldLiuCms      #命令执行的工作空间
   autostart=true                 #自动启动
   user=root                   #指定用户
   autorestart=true                #自动重启

10-4.重新加载

service  supervisor   force-reload

10-5.启动进程

 service  supervisor   start withdrawal_consumer
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/467259.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号