栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

RabbitMQ入门(PHP语言描述)

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

RabbitMQ入门(PHP语言描述)

一 "Hello World!"

 生产者:

  public function actionSendMq($argv=''){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->queue_declare('hello', false, false, false, false);     $msg = new AMQPMessage($argv);     $channel->basic_publish($msg, '', 'hello');     echo " [x] Sent '$argv'".PHP_EOL;     $channel->close();     $connection->close(); } 消费者: public function actionReceiveMq(){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->queue_declare('hello', false, false, false, false);     echo ' [*] Waiting for messages. To exit press CTRL+C', "n";     $callback = function($msg) {         echo " [x] Received ", $msg->body, "n";     };     $channel->basic_consume('hello', '', false, true, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     } } 二 Work queues 生产者: public function actionNewTask($data='Hello World!'){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->queue_declare('my_task_queue', false, true, false, false);     $msg = new AMQPMessage($data,         array('delivery_mode' => 2) # make message persistent     );     $channel->basic_publish($msg, '', 'my_task_queue');     echo " [x] Sent ", $data, "n";     $channel->close();     $connection->close(); } 消费者: public function actionWorker(){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->queue_declare('my_task_queue', false, true, false, false);     echo ' [*] Waiting for messages. To exit press CTRL+C', "n";     $callback = function($msg){         echo " [x] Received ", $msg->body, "n";         sleep(substr_count($msg->body, '.'));         echo " [x] Done", "n";         $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);     };     $channel->basic_qos(null, 1, null);     $channel->basic_consume('my_task_queue', '', false, false, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     }     $channel->close();     $connection->close(); } 三 Publish/Subscribe 生产者: public function actionEmitLog($data='Hello World!'){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('logs', 'fanout', false, false, false);     if(empty($data)) $data = "info: Hello World!";     $msg = new AMQPMessage($data);     $channel->basic_publish($msg, 'logs');     echo " [x] Sent ", $data, "n";     $channel->close();     $connection->close(); } 消费者: public function actionReceiveLogs(){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('logs', 'fanout', false, false, false);     list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);     $channel->queue_bind($queue_name, 'logs');     echo ' [*] Waiting for logs. To exit press CTRL+C', "n";     $callback = function($msg){         echo ' [x] ', $msg->body, "n";     };     $channel->basic_consume($queue_name, '', false, true, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     }     $channel->close();     $connection->close(); } 四 Routing 生产者: public function actionEmitLogDirect($argv, $data='Hello World!'){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('direct_logs', 'direct', false, false, false);     $severity = isset($argv) && !empty($argv) ? $argv : 'info';     $msg = new AMQPMessage($data);     $channel->basic_publish($msg, 'direct_logs', $severity);     echo " [x] Sent ",$severity,':',$data," n";     $channel->close();     $connection->close(); } 消费者: public function actionReceiveLogsDirect($argv){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('direct_logs', 'direct', false, false, false);     list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);     $severities = explode(',', $argv);     if(empty($severities)) {         file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]n");         exit(1);     }     foreach($severities as $severity) {         $channel->queue_bind($queue_name, 'direct_logs', $severity);     }     echo ' [*] Waiting for logs. To exit press CTRL+C', "n";     $callback = function($msg){         echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n";     };     $channel->basic_consume($queue_name, '', false, true, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     }     $channel->close();     $connection->close(); } 五 Topics 生产者: public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('topic_logs', 'topic', false, false, false);     $msg = new AMQPMessage($data);     $channel->basic_publish($msg, 'topic_logs', $routing_key);     echo " [x] Sent ",$routing_key,':',$data," n";     $channel->close();     $connection->close(); } 消费者: public function actionTopicsReceiveLogsDirect($binding_keys=''){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->exchange_declare('topic_logs', 'topic', false, false, false);     list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);     $binding_keys = explode(',', $binding_keys);     if( empty($binding_keys )) {         file_put_contents('php://stderr', "Usage: $binding_keysn");         exit(1);     }     foreach($binding_keys as $binding_key) {         $channel->queue_bind($queue_name, 'topic_logs', $binding_key);     }     echo ' [*] Waiting for logs. To exit press CTRL+C', "n";     $callback = function($msg){         echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n";     };     $channel->basic_consume($queue_name, '', false, true, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     }     $channel->close();     $connection->close(); } 六 RPC 生产者: public function actionRpcClient($fib=10){     $fibonacci_rpc = new FibonacciRpcClient();     $response = $fibonacci_rpc->call($fib);     echo " [.] Got ", $response, "n"; } 消费者: public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){     $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');     $channel = $connection->channel();     $channel->queue_declare('rpc_queue', false, false, false, false);     function fib($n) {         if ($n == 0)             return 0;         if ($n == 1)             return 1;         return fib($n-1) + fib($n-2);     }     echo " [x] Awaiting RPC requestsn";     $callback = function($req) {         $n = intval($req->body);         echo " [.] fib(", $n, ")n";         $msg = new AMQPMessage(             (string) fib($n),             array('correlation_id' => $req->get('correlation_id'))         );         $req->delivery_info['channel']->basic_publish(             $msg, '', $req->get('reply_to'));         $req->delivery_info['channel']->basic_ack(             $req->delivery_info['delivery_tag']);     };     $channel->basic_qos(null, 1, null);     $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback);     while(count($channel->callbacks)) {         $channel->wait();     }     $channel->close();     $connection->close(); } 相关类: class FibonacciRpcClient {     private $connection;     private $channel;     private $callback_queue;     private $response;     private $corr_id;     public function __construct() {         $this->connection = new AMQPStreamConnection(             'localhost', 5672, 'guest', 'guest');         $this->channel = $this->connection->channel();         list($this->callback_queue, ,) = $this->channel->queue_declare(             "", false, false, true, false);         $this->channel->basic_consume(             $this->callback_queue, '', false, false, false, false,             array($this, 'on_response'));     }     public function on_response($rep) {         if($rep->get('correlation_id') == $this->corr_id) {             $this->response = $rep->body;         }     }     public function call($n) {         $this->response = null;         $this->corr_id = uniqid();         $msg = new AMQPMessage(             (string) $n,             array('correlation_id' => $this->corr_id,                 'reply_to' => $this->callback_queue)         );         $this->channel->basic_publish($msg, '', 'rpc_queue');         while(!$this->response) {             $this->channel->wait();         }         return intval($this->response);     } }

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/228899.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号