栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > PHP

教你使用mixphp打造多进程异步邮件发送

PHP 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力
注意:这个是 MixPHP V1 的范例

邮件发送是很常见的需求,由于发送邮件的操作一般是比较耗时的,所以我们一般采用异步处理来提升用户体验,而异步通常我们使用消息队列来实现。

传统 MVC 框架由于缺少多进程开发能力,通常是采用同一个脚本执行多次,产生多个进程的方式,mixphp 封装了 TaskExecutor 专用于多进程开发,用户能非常简单的开发出功能完善的高可用多进程应用。

推荐:《PHP视频教程》

下面演示一个异步邮件发送系统的开发过程,涉及知识点:

  • 异步
  • 消息队列
  • 多进程
  • 守护进程

如何使用消息队列实现异步

PHP 使用消息队列通常是使用中间件来实现,常用的消息中间件有:

  • redis
  • rabbitmq
  • kafka

本次我们选用 redis 来实现异步邮件发送,redis 的数据类型中有一个 list 类型,可实现消息队列,使用以下命令:

// 入列
$redis->lpush($key, $data);
// 出列
$data = $redis->rpop($key);
// 阻塞出列
$data = $redis->brpop($key, 10);

架构设计

本实例由传统 MVC 框架投递邮件发送需求,MixPHP 多进程执行发送任务。

邮件发送库选型

以往我们通常使用框架提供的邮件发送库,或者网上下载别的用户分享的库,composer 出现后,https://packagist.org/ 上有大量优质的库,我们只需选择一个最好的即可,本例选择 swiftmailer。

由于发送任务是由 MixPHP 执行,所以 swiftmailer 是安装在 MixPHP 项目中,在项目根目录中执行以下命令安装:

composer require swiftmailer/swiftmailer

生产者开发

在邮件发送这个需求中生产者是指投递发送任务的一方,这一方通常是一个接口或网页,这个部分并不一定需 mixphp 开发,TP、CI、YII 这些都可以,只需在接口或网页中把任务信息投递到消息队列中即可。

在传统 MVC 框架的控制器中增加如下代码:

通常框架中使用 redis 会安装一个类库来使用,本例使用原生代码,便于理解。
// 连接
$redis = new Redis();
if (!$redis->connect('127.0.0.1', 6379)) {
    throw new Exception('Redis Connect Failure');
}
$redis->auth('');
$redis->select(0);
// 投递任务
$data = [
    'to'      => ['***@qq.com' => 'A name'],
    'body'    => 'Here is the message itself',
    'subject' => 'The title content',
];
$redis->lpush('queue:email', serialize($data));

通常异步开发中,投递完成后就会立即响应一个消息给用户,当然此时该任务并没有执行。

消费者开发

本例我们使用 MixPHP 的多进程开发工具 TaskExecutor 来完成这个需求,通常使用常驻进程来处理队列的消费,所以我们使用 TaskExecutor 的 TYPE_DAEMON 类型,MODE_PUSH 模式。

TaskExecutor 的 MODE_PUSH 模式有二种进程:

  • 左进程:负责从消息队列取出任务数据,投放给中进程。

  • 中进程:负责执行邮件发送任务。

PushCommand.php 代码如下:

programName = Input::getCommandName();
 // 设置pidfile
 $this->pidFile = "/var/run/{$this->programName}.pid";
    }

    
    public function getTaskService()
    {
 return create_object(
     [
  // 类路径
  'class'  => 'mixtaskTaskExecutor',
  // 服务名称
  'name'   => "mix-daemon: {$this->programName}",
  // 执行类型
  'type'   => mixtaskTaskExecutor::TYPE_DAEMON,
  // 执行模式
  'mode'   => mixtaskTaskExecutor::MODE_PUSH,
  // 左进程数
  'leftProcess'   => 1,
  // 中进程数
  'centerProcess' => 5,
  // 任务超时时间 (秒)
  'timeout'=> 5,
     ]
 );
    }

    // 启动
    public function actionStart()
    {
 // 预处理
 if (!parent::actionStart()) {
     return ExitCode::UNSPECIFIED_ERROR;
 }
 // 启动服务
 $service = $this->getTaskService();
 $service->on('LeftStart', [$this, 'onLeftStart']);
 $service->on('CenterStart', [$this, 'onCenterStart']);
 $service->start();
 // 返回退出码
 return ExitCode::OK;
    }

    // 左进程启动事件回调函数
    public function onLeftStart(LeftProcess $worker)
    {
 try {
     // 模型内使用长连接版本的数据库组件,这样组件会自动帮你维护连接不断线
     $queueModel = Redis::getInstance();
     // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
     for ($j = 0; $j < 16000; $j++) {
  // 从消息队列中间件阻塞获取一条消息
  $data = $queueModel->brpop('queue:email', 10);
  if (empty($data)) {
      continue;
  }
  list(, $data) = $data;
  // 将消息推送给中进程去处理,push有长度限制 (https://wiki.swoole.com/wiki/page/290.html)
  $worker->push($data, false);
     }
 } catch (Exception $e) {
     // 休息一会,避免 CPU 出现 100%
     sleep(1);
     // 抛出错误
     throw $e;
 }
    }

    // 中进程启动事件回调函数
    public function onCenterStart(CenterProcess $worker)
    {
 // 保持任务执行状态,循环结束后当前进程会退出,主进程会重启一个新进程继续执行任务,这样做是为了避免长时间执行内存溢出
 for ($j = 0; $j < 16000; $j++) {
     // 从进程消息队列中抢占一条消息
     $data = $worker->pop();
     if (empty($data)) {
  continue;
     }
     // 处理消息
     try {
  // 处理消息,比如:发送短信、发送邮件、微信推送
  var_dump($data);
  $ret = self::sendEmail($data);
  var_dump($ret);
     } catch (Exception $e) {
  // 回退数据到消息队列
  $worker->rollback($data);
  // 休息一会,避免 CPU 出现 100%
  sleep(1);
  // 抛出错误
  throw $e;
     }
 }
    }

    // 发送邮件
    public static function sendEmail($data)
    {
 // Create the Transport
 $transport = (new Swift_SmtpTransport(self::HOST, self::PORT, self::SECURITY))
     ->setUsername(self::USERNAME)
     ->setPassword(self::PASSWORD);
 // Create the Mailer using your created Transport
 $mailer = new Swift_Mailer($transport);
 // Create a message
 $message = (new Swift_Message($data['subject']))
     ->setFrom([self::USERNAME => '**网'])
     ->setTo($data['to'])
     ->setBody($data['body']);
 // Send the message
 $result = $mailer->send($message);
 return $result;
    }

}

测试

1.在 shell 中启动 push 常驻程序。
[root@localhost bin]# ./mix-daemon push start
mix-daemon 'push' start successed.
1.调用接口往消息队列投放任务。

此时 shell 终端将打印:


企业微信截图_15979008134242.png

成功收到测试邮件:


企业微信截图_15979008195228.png

MixPHP

GitHub: https://github.com/mix-php/mix
官网:http://www.mixphp.cn/

转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/262694.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号