栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

PHP异步任务之swoole

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

PHP异步任务之swoole

一、安装swoole

  1. 下载地址:https://github.com/swoole/swoole-src/releases/tag/1.8.12-stable

  2. 下载压缩包,上传到服务器(测试centos),解压缩


  3. cd swoole-src-1.8.7-stable

  4. phpize(phpize是用来扩展php扩展模块的,通过phpize可以建立php的外挂模块)

  5. ./configure

  6. make && make install

  7. 添加php.ini扩展extension=swoole.so

  8. 通过php -m查看扩展信息如果出现swoole则为成功


  9. 测试make test出现上图信息,表示安装成功

二、问题解决

  1. 出现问题prce_open,为扩展问题,修改php.ini文件,去除即可

三、测试方案

  1. swoole下examples  service.php start 启动swoole服务

  2. 推荐测试工具

四、服务端Demo(CI框架,以发送邮件为例)

class Swoole extends CI_Controller
{
    static $index = 0;
    static $serv;

    private static $buffers = array();

   
    static function getBuffer($fd, $create = true)
    {
        if (!isset(self::$buffers[$fd]))
        {
            if (!$create)
            {
                return false;
            }
            self::$buffers[$fd] = new swoole_buffer(1024 * 128);
        }
        return self::$buffers[$fd];
    }
    
    function my_log($msg)
    {
        global $serv;
        if (empty($serv->worker_pid))
        {
            $serv->worker_pid = posix_getpid();
        }
        //echo "#".$serv->worker_pid."t[".date('H:i:s')."]t".$msg.PHP_EOL;
    }
    
    function my_onStart(swoole_server $serv)
    {
        global $argv;
        swoole_set_process_name("php {$argv[0]}: master");
        $this->my_log("Server: start.Swoole version is [".SWOOLE_VERSION."]");
        $this->my_log("MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}");
    }
    
    function forkChildInWorker() {
        global $serv;
        $this->my_log("on worker startn");
        $process = new swoole_process( function (swoole_process $worker) use ($serv) {
    
        });
    
            $pid = $process->start();
            $this->my_log("Fork child process success. pid={$pid}n");
            //保存子进程对象,这里如果不保存,那对象会被销毁,管道也会被关闭
            $serv->childprocess = $process;
    }
    
    function processRename(swoole_server $serv, $worker_id) {
    
        global $argv;
        if ( $serv->taskworker)
        {
            swoole_set_process_name("php {$argv[0]}: task");
        }
        else
        {
            swoole_set_process_name("php {$argv[0]}: worker");
        }
    
        $this->my_log("WorkerStart: MasterPid={$serv->master_pid}|Manager_pid={$serv->manager_pid}|WorkerId={$serv->worker_id}|WorkerPid={$serv->worker_pid}");
    }
    
    function setTimerInWorker(swoole_server $serv, $worker_id) {
    
        if ($worker_id == 0) {
            $this->my_log("Start: ".microtime(true)."n");
        }
    }
    
    function my_onShutdown($serv)
    {
        $this->my_log("Server: onShutdownn");
    }
    
    function my_onClose(swoole_server $serv, $fd, $from_id)
    {
        $this->my_log("Client[$fd@$from_id]: fd=$fd is closed");
        $buffer = Swoole::getBuffer($fd);
        if ($buffer)
        {
            $buffer->clear();
        }
    }
    
    function my_onConnect(swoole_server $serv, $fd, $from_id)
    {
        $this->my_log("Client: Connect --- {$fd}");
    }
    
    function timer_show($id)
    {
        $this->my_log("Timer#$id");
    }
    function my_onWorkerStart(swoole_server $serv, $worker_id)
    {
        $this->processRename($serv, $worker_id);
    
        if (!$serv->taskworker)
        {
            swoole_process::signal(SIGUSR2, function($signo){
                $this->my_log("SIGNAL: $signon");
            });
            $serv->defer(function(){
                $this->my_log("defer calln");
            });
        }
        else
        {
    
        }
    
    }
    
    function my_onWorkerStop($serv, $worker_id)
    {
        $this->my_log("WorkerStop[$worker_id]|pid=".$serv->worker_pid.".n");
    }
    
    function my_onPacket($serv, $data, $clientInfo)
    {
        $serv->sendto($clientInfo['address'], $clientInfo['port'], "Server " . $data);
        var_dump($clientInfo);
    }
    
    function my_onFinish(swoole_server $serv, $task_id, $data)
    {
        list($str, $fd) = explode('-', $data);
        $serv->send($fd, 'taskok');
        var_dump($str, $fd);
        $this->my_log("AsyncTask Finish: result={$data}. PID=".$serv->worker_pid.PHP_EOL);
    }
    
    function my_onWorkerError(swoole_server $serv, $worker_id, $worker_pid, $exit_code, $signo)
    {
        $this->my_log("worker abnormal exit. WorkerId=$worker_id|Pid=$worker_pid|ExitCode=$exit_code|Signal=$signon");
    }
    
    function broadcast(swoole_server $serv, $fd = 0, $data = "hello")
    {
        $start_fd = 0;
        $this->my_log("broadcastn");
        while(true)
        {
            $conn_list = $serv->connection_list($start_fd, 10);
            if($conn_list === false)
            {
                break;
            }
            $start_fd = end($conn_list);
            foreach($conn_list as $conn)
            {
                if($conn === $fd) continue;
                $ret1 = $serv->send($conn, $data);
            }
        }
    }
    
   
    public function service_start(){
        $serv = new swoole_server("127.0.0.1", 9501, SWOOLE_PROCESS, SWOOLE_SOCK_TCP);
        $serv->set(config_item('swoole'));
        Swoole::$serv = $serv;
        $serv->on('PipeMessage', function($serv, $src_worker_id, $msg) {
            $this->my_log("PipeMessage: Src={$src_worker_id},Msg=".trim($msg));
            if ($serv->taskworker)
            {
                $serv->sendMessage("hello user process",
                    $src_worker_id);
            }
        });
        
            $serv->on('Start', array($this, 'my_onStart'));
            $serv->on('Connect', array($this, 'my_onConnect'));
            $serv->on('Receive', array($this, 'my_onReceive'));
            $serv->on('Packet', array($this, 'my_onPacket'));
            $serv->on('Close', array($this, 'my_onClose'));
            $serv->on('Shutdown', array($this, 'my_onShutdown'));
            $serv->on('WorkerStart', array($this, 'my_onWorkerStart'));
            $serv->on('WorkerStop', array($this, 'my_onWorkerStop'));
            $serv->on('Task', array($this, 'my_onTask'));
            $serv->on('Finish', array($this, 'my_onFinish'));
            $serv->on('WorkerError', array($this, 'my_onWorkerError'));
            $serv->on('ManagerStart', function($serv) {
                global $argv;
                swoole_set_process_name("php {$argv[0]}: manager");
            });
            $serv->start();
    }

   
    function my_onReceive(swoole_server $serv, $fd, $from_id, $data)
    {
        print_r($data);
        $this->my_log("Worker#{$serv->worker_pid} Client[$fd@$from_id]: received: $data");
        $serv->task($data, -1, function (swoole_server $serv, $task_id, $data)
        {
            $this->my_log("Task Callback: ");
            var_dump($task_id, $data);
        });
    }
    
    function my_onTask(swoole_server $serv, $task_id, $from_id, $data)
    {
        $data=json_decode($data,true);
        switch ($data['cmd']){
            case 'sendemail':
                $this->sendEmail($data);
                break;
            default:
                break;
        }
    }
    
   
    private function sendEmail($data){
        //TODO 发送邮件
    }
}
五、客户端Demo

$client = new swoole_client(SWOOLE_SOCK_TCP);
$client->connect('127.0.0.1', 9501);
$data=array(
    'cmd'=>'sendemail',
    'to'=>$to,
    'subject'=>$subject,
    'message'=>$message,
    'type'=>$type
);
$client->send(json_encode($data));
$client->close();

六、swoole服务查看

七、swoole服务关闭

  1. kill 29193

八、swoole后台PHP进程查看

  1. ps aux|grep php


转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/227556.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号