一 "Hello World!"
生产者:
public function actionSendMq($argv=''){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); $msg = new AMQPMessage($argv); $channel->basic_publish($msg, '', 'hello'); echo " [x] Sent '$argv'".PHP_EOL; $channel->close(); $connection->close(); } 消费者: public function actionReceiveMq(){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('hello', false, false, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "n"; $callback = function($msg) { echo " [x] Received ", $msg->body, "n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } } 二 Work queues 生产者: public function actionNewTask($data='Hello World!'){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('my_task_queue', false, true, false, false); $msg = new AMQPMessage($data, array('delivery_mode' => 2) # make message persistent ); $channel->basic_publish($msg, '', 'my_task_queue'); echo " [x] Sent ", $data, "n"; $channel->close(); $connection->close(); } 消费者: public function actionWorker(){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('my_task_queue', false, true, false, false); echo ' [*] Waiting for messages. To exit press CTRL+C', "n"; $callback = function($msg){ echo " [x] Received ", $msg->body, "n"; sleep(substr_count($msg->body, '.')); echo " [x] Done", "n"; $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('my_task_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } 三 Publish/Subscribe 生产者: public function actionEmitLog($data='Hello World!'){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); if(empty($data)) $data = "info: Hello World!"; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'logs'); echo " [x] Sent ", $data, "n"; $channel->close(); $connection->close(); } 消费者: public function actionReceiveLogs(){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('logs', 'fanout', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $channel->queue_bind($queue_name, 'logs'); echo ' [*] Waiting for logs. To exit press CTRL+C', "n"; $callback = function($msg){ echo ' [x] ', $msg->body, "n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } 四 Routing 生产者: public function actionEmitLogDirect($argv, $data='Hello World!'){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); $severity = isset($argv) && !empty($argv) ? $argv : 'info'; $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'direct_logs', $severity); echo " [x] Sent ",$severity,':',$data," n"; $channel->close(); $connection->close(); } 消费者: public function actionReceiveLogsDirect($argv){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('direct_logs', 'direct', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $severities = explode(',', $argv); if(empty($severities)) { file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]n"); exit(1); } foreach($severities as $severity) { $channel->queue_bind($queue_name, 'direct_logs', $severity); } echo ' [*] Waiting for logs. To exit press CTRL+C', "n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } 五 Topics 生产者: public function actionTopicsEmitLogDirect($routing_key='kern.critical', $data='Hello World!'){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); $msg = new AMQPMessage($data); $channel->basic_publish($msg, 'topic_logs', $routing_key); echo " [x] Sent ",$routing_key,':',$data," n"; $channel->close(); $connection->close(); } 消费者: public function actionTopicsReceiveLogsDirect($binding_keys=''){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->exchange_declare('topic_logs', 'topic', false, false, false); list($queue_name, ,) = $channel->queue_declare("", false, false, true, false); $binding_keys = explode(',', $binding_keys); if( empty($binding_keys )) { file_put_contents('php://stderr', "Usage: $binding_keysn"); exit(1); } foreach($binding_keys as $binding_key) { $channel->queue_bind($queue_name, 'topic_logs', $binding_key); } echo ' [*] Waiting for logs. To exit press CTRL+C', "n"; $callback = function($msg){ echo ' [x] ',$msg->delivery_info['routing_key'], ':', $msg->body, "n"; }; $channel->basic_consume($queue_name, '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } 六 RPC 生产者: public function actionRpcClient($fib=10){ $fibonacci_rpc = new FibonacciRpcClient(); $response = $fibonacci_rpc->call($fib); echo " [.] Got ", $response, "n"; } 消费者: public function actionRpcServer($routing_key='kern.critical', $data='Hello World!'){ $connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); $channel = $connection->channel(); $channel->queue_declare('rpc_queue', false, false, false, false); function fib($n) { if ($n == 0) return 0; if ($n == 1) return 1; return fib($n-1) + fib($n-2); } echo " [x] Awaiting RPC requestsn"; $callback = function($req) { $n = intval($req->body); echo " [.] fib(", $n, ")n"; $msg = new AMQPMessage( (string) fib($n), array('correlation_id' => $req->get('correlation_id')) ); $req->delivery_info['channel']->basic_publish( $msg, '', $req->get('reply_to')); $req->delivery_info['channel']->basic_ack( $req->delivery_info['delivery_tag']); }; $channel->basic_qos(null, 1, null); $channel->basic_consume('rpc_queue', '', false, false, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); } 相关类: class FibonacciRpcClient { private $connection; private $channel; private $callback_queue; private $response; private $corr_id; public function __construct() { $this->connection = new AMQPStreamConnection( 'localhost', 5672, 'guest', 'guest'); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, false, true, false); $this->channel->basic_consume( $this->callback_queue, '', false, false, false, false, array($this, 'on_response')); } public function on_response($rep) { if($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } public function call($n) { $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string) $n, array('correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue) ); $this->channel->basic_publish($msg, '', 'rpc_queue'); while(!$this->response) { $this->channel->wait(); } return intval($this->response); } }



