栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

基于 Hyperf + RabbitMQ + WebSocket 实现消息推送

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

基于 Hyperf + RabbitMQ + WebSocket 实现消息推送

基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。

思路

利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,

保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

WebSocket 服务

composer require hyperf/websocket-server

配置文件 [config/autoload/server.php]

 SWOOLE_PROCESS,
    'servers' => [
 [
     'name' => 'http',
     'type' => Server::SERVER_HTTP,
     'host' => '0.0.0.0',
     'port' => 11111,
     'sock_type' => SWOOLE_SOCK_TCP,
     'callbacks' => [
  SwooleEvent::ON_REQUEST => [HyperfHttpServerServer::class, 'onRequest'],
     ],
 ],
 [
     'name' => 'ws',
     'type' => Server::SERVER_WEBSOCKET,
     'host' => '0.0.0.0',
     'port' => 12222,
     'sock_type' => SWOOLE_SOCK_TCP,
     'callbacks' => [
  SwooleEvent::ON_HAND_SHAKE => [HyperfWebSocketServerServer::class, 'onHandShake'],
  SwooleEvent::ON_MESSAGE => [HyperfWebSocketServerServer::class, 'onMessage'],
  SwooleEvent::ON_CLOSE => [HyperfWebSocketServerServer::class, 'onClose'],
     ],
 ],
    ],

WebSocket 服务器端代码示例

container->get(Redis::class);
 //获取所有的客户端id
 $fdList = $redis->sMembers('websocket_sjd_1');
 //如果当前客户端在客户端集合中,就刷新
 if (in_array($frame->fd, $fdList)) {
     $redis->sAdd('websocket_sjd_1', $frame->fd);
     $redis->expire('websocket_sjd_1', 7200);
 }
 $server->push($frame->fd, 'Recv: ' . $frame->data);
    }
    
    public function onClose(Server $server, int $fd, int $reactorId): void
    {
 //删掉客户端id
 $redis = $this->container->get(Redis::class);
 //移除集合中指定的value
 $redis->sRem('websocket_sjd_1', $fd);
 var_dump('closed');
    }
    
    public function onOpen(WebSocketServer $server, Request $request): void
    {
 //保存客户端id
 $redis = $this->container->get(Redis::class);
 $res1 = $redis->sAdd('websocket_sjd_1', $request->fd);
 var_dump($res1);
 $res = $redis->expire('websocket_sjd_1', 7200);
 var_dump($res);
 $server->push($request->fd, 'Opened');
    }
}

WebSocket 前端代码

function WebSocketTest() {
 if ("WebSocket" in window) {
     console.log("您的浏览器支持 WebSocket!");
     var num = 0
     // 打开一个 web socket
     var ws = new WebSocket("ws://127.0.0.1:12222");
     ws.onopen = function () {
  // Web Socket 已连接上,使用 send() 方法发送数据
  //alert("数据发送中...");
  //ws.send("发送数据");
     };
     window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开
  var ping = {"type": "ping"};
  ws.send(JSON.stringify(ping));
     }, 5000);
     ws.onmessage = function (evt) {
  var d = JSON.parse(evt.data);
  console.log(d);
  if (d.code == 300) {
      $(".address").text(d.address)
  }
  if (d.code == 200) {
      var v = d.data
      console.log(v);
      num++
      var str = `
 

${v.recordOutTime}

${v.userOutName}

${v.userOutNum}

${v.doorOutName}

` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 关闭 websocket alert("连接已关闭..."); }; } else { alert("您的浏览器不支持 WebSocket!"); } }

AMQP 组件

composer require hyperf/amqp

配置文件 [config/autoload/amqp.php]

 [
 'host' => 'localhost',
 'port' => 5672,
 'user' => 'guest',
 'password' => 'guest',
 'vhost' => '/',
 'pool' => [
     'min_connections' => 1,
     'max_connections' => 10,
     'connect_timeout' => 10.0,
     'wait_timeout' => 3.0,
     'heartbeat' => -1,
 ],
 'params' => [
     'insist' => false,
     'login_method' => 'AMQPLAIN',
     'login_response' => null,
     'locale' => 'en_US',
     'connection_timeout' => 3.0,
     'read_write_timeout' => 6.0,
     'context' => null,
     'keepalive' => false,
     'heartbeat' => 3,
 ],
    ],
];

MQ 消费者代码

container->get(Redis::class);
 $fdList=$redis->sMembers('websocket_sjd_1');
 $server=$this->container->get(ServerFactory::class)->getServer()->getServer();
 foreach($fdList as $key=>$v){
     if(!empty($v)){
  $server->push((int)$v, $data);
     }
 }
 return Result::ACK;
    }
}

控制器代码

  
    public function test()
    {
 $data = array(
     'code' => 200,
     'data' => [
  'userOutName' => 'ccflow',
  'userOutNum' => '9999',
  'recordOutTime' => date("Y-m-d H:i:s", time()),
  'doorOutName' => '教师公寓',
     ]
 );
 $data = GuzzleHttpjson_encode($data);
 $message = new DemoProducer($data);
 $producer = ApplicationContext::getContainer()->get(Producer::class);
 $result = $producer->produce($message);
 var_dump($result);
 $user = $this->request->input('user', 'Hyperf');
 $method = $this->request->getMethod();
 return [
     'method' => $method,
     'message' => "{$user}.",
 ];
    }

最终效果


微信截图_20200605091315.png

推荐教程:《PHP教程》

以上就是基于 Hyperf + RabbitMQ + WebSocket 实现消息推送的详细内容,更多请关注考高分网其它相关文章!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/263102.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号