栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

PHP模拟supervisor的进程管理

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

推荐:《PHP视频教程》

前言

模拟supervisor进程管理DEMO(简易实现)

没错,是造轮子!目的在于学习!

截图:

模拟supervisor的进程管理

在图中自己实现了一个Copy子进程的功能。如果用在AMQP增减消费者时,我觉得应该会很有用。

实现

1、在主进程循环内启动子进程执行命令
2、在web输入 127.0.0.1:7865 获取子进程状态
3、socket接收请求消息,并且执行相应操作,返回web页面
4、回收子进程,防止称为僵尸进程

不足:无法持续监听错误页面。由于socket得到的响应是通过include函数加载的,所以在加载的页面内不能出现tail -f命令,否则stream就会掉入了死循环了~。我想应该有方案解决(写了socket+多进程模式,模仿fpm在接收到请求之后就启动一个子进程去处理的模式,但是执行有问题。因此将代码贴出来希望得到大家的指点)。
延伸:由于对进程可以很好的管理(期望如此),那么就可以定制化自己的一些需求,比如:(1)定制AMQP的消费者进程管理服务。(2)模拟crontab定时服务。

知识点

代码实现的过程中,有很多的细节是值得学习的。
1、在while()循环中,启用了stream的非阻塞模式。所以不能在循环中使用sleep(1),而是用stream_select($read, $write, $except, 1)让stream内部阻塞。
关于阻塞非阻塞模式,可以参阅这里
2、能够执行外部程序的函数很多,但是都稍有不同。这里采用的是proc_open,是一个很强大的函数。在这之前我曾用pcntl_exec执行过外部程序,但是需要先pcntl_fork。而用其他的如exec,shell_exec无法对子进程进行管理。
3、重启或停止等操作子进程时,只是先更改主进程中该子进程在内存中的的状态,并不是真正的对子进程操作。在统一处init()处理子进程。如此才能防止因为子进程启动时的上下文导致的一些怪异的现象。

代码

由于代码过多,所以如果你对我的方案有更好的建议可以在github这里看。

主进程代码:Process.php

consumers = $this->getConsumers();
    }

    // 这里是个DEMO,实际可以用读取配置文件的方式。
    public function getConsumers()
    {
 $consumer = new Consumer([
     'program' => 'test',
     'command' => '/usr/bin/php test.php',
     'directory' => __DIR__,
     'logfile' => __DIR__ . '/test.log',
     'uniqid' => uniqid(),
     'auto_restart' => false,
 ]);
 return [
     $consumer->uniqid => $consumer,
 ];
    }

    public function run()
    {
 if (empty($this->consumers)) {
     // consumer empty
     return;
 }
 if ($this->_notifyMaster()) {
     // master alive
     return;
 }

 $pid = pcntl_fork();
 if ($pid < 0) {
     exit;
 } elseif ($pid > 0) {
     exit;
 }
 if (!posix_setsid()) {
     exit;
 }

 $stream = new StreamConnection('tcp://0.0.0.0:7865');
 @cli_set_process_title('AMQP Master Process');
 // 将主进程ID写入文件
 file_put_contents(self::PPID_FILE, getmypid());
 // master进程继续
 while (true) {
     $this->init();
     pcntl_signal_dispatch();
     $this->waitpid();
     // 如果子进程被全部回收,则主进程退出
     // if (empty($this->childPids)) {
     //     $stream->close($stream->getSocket());
     //     break;
     // }
     $stream->accept(function ($uniqid, $action) {
  $this->handle($uniqid, $action);
  return $this->display();
     });
 }
    }

    protected function init()
    {
 foreach ($this->consumers as &$c) {
     switch ($c->state) {
  case Consumer::RUNNING:
  case Consumer::STOP:
      break;
  case Consumer::NOMINAL:
  case Consumer::STARTING:
      $this->fork($c);
      break;
  case Consumer::STOPING:
      if ($c->pid && posix_kill($c->pid, SIGTERM)) {
   $this->reset($c, Consumer::STOP);
      }
      break;
  case Consumer::RESTART:
      if (empty($c->pid)) {
   $this->fork($c);
   break;
      }
      if (posix_kill($c->pid, SIGTERM)) {
   $this->reset($c, Consumer::STOP);
   $this->fork($c);
      }
      break;
  default:
      break;
     }
 }
    }

    protected function reset(Consumer $c, $state)
    {
 $c->pid = '';
 $c->uptime = '';
 $c->state = $state;
 $c->process = null;
    }

    protected function waitpid()
    {
 foreach ($this->childPids as $uniqid => $pid) {
     $result = pcntl_waitpid($pid, $status, WNOHANG);
     if ($result == $pid || $result == -1) {
  unset($this->childPids[$uniqid]);
  $c = &$this->consumers[$uniqid];
  $state = pcntl_wifexited($status) ? Consumer::EXITED : Consumer::STOP;
  $this->reset($c, $state);
     }
 }
    }


    
    private function _notifyMaster()
    {
 $ppid = file_get_contents(self::PPID_FILE );
 $isAlive = $this->checkProcessAlive($ppid);
 if (!$isAlive) return false;
 return true;
    }

    public function checkProcessAlive($pid)
    {
 if (empty($pid)) return false;
 $pidinfo = `ps co pid {$pid} | xargs`;
 $pidinfo = trim($pidinfo);
 $pattern = "/.*?PID.*?(d+).*?/";
 preg_match($pattern, $pidinfo, $matches);
 return empty($matches) ? false : ($matches[1] == $pid ? true : false);
    }

    
    protected function fork(Consumer $c)
    {
 $descriptorspec = [2 => ['file', $c->logfile, 'a'],];
 $process = proc_open('exec ' . $c->command, $descriptorspec, $pipes, $c->directory);
 if ($process) {
     $ret = proc_get_status($process);
     if ($ret['running']) {
  $c->state = Consumer::RUNNING;
  $c->pid = $ret['pid'];
  $c->process = $process;
  $c->uptime = date('m-d H:i');
  $this->childPids[$c->uniqid] = $ret['pid'];
     } else {
  $c->state = Consumer::EXITED;
  proc_close($process);
     }
 } else {
     $c->state = Consumer::ERROR;
 }
 return $c;
    }

    public function display()
    {
 $location = 'http://127.0.0.1:7865';
 $basePath = Http::$basePath;
 $scriptName = isset($_SERVER['script_NAME']) &&
     !empty($_SERVER['script_NAME']) &&
     $_SERVER['script_NAME'] != '/' ? $_SERVER['script_NAME'] : '/index.php';
 if ($scriptName == '/index.html') {
     return Http::status_301($location);
 }

 $sourcePath = $basePath . $scriptName;
 if (!is_file($sourcePath)) {
     return Http::status_404();
 }

 ob_start();
 include $sourcePath;
 $response = ob_get_contents();
 ob_clean();

 return Http::status_200($response);
    }

    public function handle($uniqid, $action)
    {
 if (!empty($uniqid) && !isset($this->consumers[$uniqid])) {
     return;
 }
 switch ($action) {
     case 'refresh':
  break;
     case 'restartall':
  $this->killall(true);
  break;
     case 'stopall':
  $this->killall();
  break;
     case 'stop':
  $c = &$this->consumers[$uniqid];
  if ($c->state != Consumer::RUNNING) break;
  $c->state = Consumer::STOPING;
  break;
     case 'start':
  $c = &$this->consumers[$uniqid];
  if ($c->state == Consumer::RUNNING) break;
  $c->state = Consumer::STARTING;
  break;
     case 'restart':
  $c = &$this->consumers[$uniqid];
  $c->state = Consumer::RESTART;
  break;
     case 'copy':
  $c = $this->consumers[$uniqid];
  $newC = clone $c;
  $newC->uniqid = uniqid('C');
  $newC->state = Consumer::NOMINAL;
  $newC->pid = '';
  $this->consumers[$newC->uniqid] = $newC;
  break;
     default:
  break;
 }
    }

    protected function killall($restart = false)
    {
 foreach ($this->consumers as &$c) {
     $c->state = $restart ? Consumer::RESTART : Consumer::STOPING;
 }
    }}$cli = new Process();$cli->run();

Consumer消费者对象

stream相关代码:StreamConnection.php

socket = $this->connect($host);
    }

    public function connect($host)
    {
 $socket = stream_socket_server($host, $errno, $errstr);
 if (!$socket) {
     exit('stream error');
 }
 stream_set_timeout($socket, $this->timeout);
 stream_set_chunk_size($socket, 1024);
 stream_set_blocking($socket, false);
 $this->client = [$socket];
 return $socket;
    }

    public function accept(Closure $callback)
    {
 $read = $this->client;
 if (stream_select($read, $write, $except, 1) < 1) return;
 if (in_array($this->socket, $read)) {
     $cs = stream_socket_accept($this->socket);
     $this->client[] = $cs;
 }
 foreach ($read as $s) {
     if ($s == $this->socket) continue;
     $header = fread($s, 1024);
     if (empty($header)) {
  $index = array_search($s, $this->client);
  if ($index)
      unset($this->client[$index]);
  $this->close($s);
  continue;
     }
     Http::parse_http($header);
     $uniqid = isset($_GET['uniqid']) ? $_GET['uniqid'] : '';
     $action = isset($_GET['action']) ? $_GET['action'] : '';
     $response = $callback($uniqid, $action);
     $this->write($s, $response);
     $index = array_search($s, $this->client);
     if ($index)
  unset($this->client[$index]);
     $this->close($s);
 }
    }

    public function write($socket, $response)
    {
 $ret = fwrite($socket, $response, strlen($response));
    }

    public function close($socket)
    {
 $flag = fclose($socket);
    }

    public function getSocket()
    {
 return $this->socket;
    }}

Http响应代码:Http.php

 '',
     'REQUEST_METHOD' => '',
     'REQUEST_URI' => '',
     'SERVER_PROTOCOL' => '',
     'SERVER_SOFTWARE' => '',
     'SERVER_NAME' => '',
     'HTTP_HOST' => '',
     'HTTP_USER_AGENT' => '',
     'HTTP_ACCEPT' => '',
     'HTTP_ACCEPT_LANGUAGE' => '',
     'HTTP_ACCEPT_ENCODING' => '',
     'HTTP_cookie' => '',
     'HTTP_CONNECTION' => '',
     'REMOTE_ADDR' => '',
     'REMOTE_PORT' => '0',
     'script_NAME' => '',
     'HTTP_REFERER' => '',
     'CONTENT_TYPE' => '',
     'HTTP_IF_NONE_MATCH' => '',
 );

 // 将header分割成数组
 list($http_header, $http_body) = explode("rnrn", $http, 2);
 $header_data = explode("rn", $http_header);

 list($_SERVER['REQUEST_METHOD'], $_SERVER['REQUEST_URI'], $_SERVER['SERVER_PROTOCOL']) = explode(' ', $header_data[0]);

 unset($header_data[0]);
 foreach ($header_data as $content) {
     // rnrn
     if (empty($content)) {
  continue;
     }
     list($key, $value) = explode(':', $content, 2);
     $key = strtolower($key);
     $value = trim($value);
     switch ($key) {
  case 'host':
      $_SERVER['HTTP_HOST'] = $value;
      $tmp = explode(':', $value);
      $_SERVER['SERVER_NAME'] = $tmp[0];
      if (isset($tmp[1])) {
   $_SERVER['SERVER_PORT'] = $tmp[1];
      }
      break;
  case 'cookie':
      $_SERVER['HTTP_cookie'] = $value;
      parse_str(str_replace('; ', '&', $_SERVER['HTTP_cookie']), $_cookie);
      break;
  case 'user-agent':
      $_SERVER['HTTP_USER_AGENT'] = $value;
      break;
  case 'accept':
      $_SERVER['HTTP_ACCEPT'] = $value;
      break;
  case 'accept-language':
      $_SERVER['HTTP_ACCEPT_LANGUAGE'] = $value;
      break;
  case 'accept-encoding':
      $_SERVER['HTTP_ACCEPT_ENCODING'] = $value;
      break;
  case 'connection':
      $_SERVER['HTTP_CONNECTION'] = $value;
      break;
  case 'referer':
      $_SERVER['HTTP_REFERER'] = $value;
      break;
  case 'if-modified-since':
      $_SERVER['HTTP_IF_MODIFIED_SINCE'] = $value;
      break;
  case 'if-none-match':
      $_SERVER['HTTP_IF_NONE_MATCH'] = $value;
      break;
  case 'content-type':
      if (!preg_match('/boundary="?(S+)"?/', $value, $match)) {
   $_SERVER['CONTENT_TYPE'] = $value;
      } else {
   $_SERVER['CONTENT_TYPE'] = 'multipart/form-data';
   $http_post_boundary = '--' . $match[1];
      }
      break;
     }
 }

 // script_name
 $_SERVER['script_NAME'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_PATH);

 // QUERY_STRING
 $_SERVER['QUERY_STRING'] = parse_url($_SERVER['REQUEST_URI'], PHP_URL_QUERY);
 if ($_SERVER['QUERY_STRING']) {
     // $GET
     parse_str($_SERVER['QUERY_STRING'], $_GET);
 } else {
     $_SERVER['QUERY_STRING'] = '';
 }

 // REQUEST
 $_REQUEST = array_merge($_GET, $_POST);

 return array('get' => $_GET, 'post' => $_POST, 'cookie' => $_cookie, 'server' => $_SERVER, 'files' => $_FILES);
    }

    public static function status_404()
    {
 return <<

待执行的脚本:test.php

在当前目录下的视图页面:
|- Process.php
|- Http.php
|- StreamConnection.php
|- Consumer.php
|- baseObject.php
|- views/

更多编程相关知识,请访问:编程教学!!

以上就是PHP模拟supervisor的进程管理的详细内容,更多请关注考高分网其它相关文章!

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/262633.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号