栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 软件开发 > 后端开发 > Python

Python 的多进程编程(二)

Python 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

Python 的多进程编程(二)

原文出处

这里顺带说下刷新和关闭文件吧。

POSIX 系统提供了非常多的函数用于刷新:

  • fflush() 函数:用于刷新 stdio 流(stream)。对于输出流,它强制将用户空间的缓冲区(buffer),通过这个流底层的 write()函数写入到该流。对于关联到可定位(seekable)文件的输入流,它丢弃掉已经被底层文件读取(fetch),但还没被应用消费(consume)的buffer(因为对于 seekable 的文件而言,只需要重新定位再读一次即可)。如果它的 FILE *stream 参数为NULL,则刷新所有打开的流。简单来说,它的作用主要是将用户空间的缓冲区写入内核空间。但是用户空间的缓冲区是和所用的语言和库相关的,fflush() 刷新的只是 C 语言的 stdio 库创建的 FILE 类型的 buffer。对于 Python 而言,你应该使用 file.flush() 方法;如果没有输出缓冲区的话,直接使用os.write()和 socket.send() 就能写入内核空间了,不需要刷新。
  • fsync() 和 fdatasync()系统调用:用于将内核中缓存的对文件的修改写入磁盘。两者的区别在于后者不写入元数据(metadata)。它们都是 async-signal-safe 的函数。很显然,这两个函数都只对磁盘文件有效,不能用于刷新socket。此外,磁盘也有自己的缓冲区,内核也只是将数据写入磁盘缓冲区而已,并不一定真正被写入磁盘了。macOS 的fcntl()系统调用提供了 F_FULLFSYNC 选项,可以要求将数据写入磁盘的永久存储设备中(只对 HFS、FAT 和 UDF文件系统有效)。

    大部分的程序都不需要用到它们,因为只要不是断电等意外原因导致非正常关机的话,内核和磁盘驱动最终都会将文件写入磁盘的(尽管那时你可能已经关闭文件或者退出进程了)。要考虑到断电时的数据安全的话,主要也就是数据库之类的应用了。
    Python 提供了 os.fsync() 函数来封装 fsync() 系统调用。

  • aio_fsync() 函数:用于刷新异步 IO 操作到磁盘。功能类似于 fsync() 和 fdatasync(2),但是只发出请求,不等待实际的操作完成就返回。
  • sync() 和 syncfs() 系统调用:将文件系统的缓存写入磁盘。二者区别在于后者只刷新该 fd 所对应的文件的文件系统。
    很显然,这是两个很重的系统调用,一般不需要使用。

而 POSIX 的 close() 和 _exit() 系统调用并不会进行任何刷新操作,fclose()、exit() 和 abort() 函数可以看成隐式地调用了 fflush() 和 close()。

对应到 Python 中,文件对象的 close() 方法会刷新用户空间的缓冲区;os.close()、os._exit() 和 os.abort() 与对应的 C 函数行为一致,并且因为 abort() 函数并不知道 Python 是怎样使用用户空间的缓冲区的,所以并不会刷新 Python 的文件对象;其他正常的退出方式会刷新用户空间的缓冲区,并进行其他的清理工作。

最后,我们来利用上述知识来实现一个需求:编写一个长期运行的程序,它每隔一段时间(如 5 秒)访问一次远程服务器,查询一次是否有新的命令。有的话就执行该命令,并且上报命令的执行结果和输出。不会有并发的任务需要执行,但对于执行时间较长的命令,需要每隔一段时间上报它当前的输出。

很显然,这个程序应该是一个多进程程序,使用子进程来执行该命令;如果是多线程程序的话,执行时很可能导致主线程挂掉。

于是先来看看这个主进程的架构:

class Daemon(object):
    def __init__(self):
 self._action_id = None
 self._done_action = False

    def run(self):
 while True:
     start_time = time.time()

     status = self.collect_status()
     if status and self.report_status(status) and self._done_action:
  self.clean_status()

     if not self._action_id:
  action = self.fetch_action()
  if action and self.accept_action(action):
      self.handle_action(action)

     wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
     if wait_time > 0:
  self.wait(wait_time)

    def fetch_action(self):
 raise NotImplementedError

    def handle_action(self, action):
 raise NotImplementedError

    def accept_action(self, action):
 raise NotImplementedError

    def collect_status(self):
 raise NotImplementedError

    def report_status(self, status):
 raise NotImplementedError

    def clean_status(self):
 raise NotImplementedError

    @staticmethod
    def wait(wait_time):
 time.sleep(wait_time)

FETCH_ACTION_INTERVAL = 5

先来看 wait() 方法,如果子进程提前执行结束了,应该尽快上报结果,而不该傻傻地 sleep。考虑到 signal 模块有个定时器的功能,先用它来实现吧:

class Daemon(object):
    def run(self):
 try:
     self.register_signals()

     while True:
  start_time = time.time()

  status = self.collect_status()
  if status and self.report_status(status) and self._done_action:
      self.clean_status()

  if not self._action_id:
      action = self.fetch_action()
      if action and self.accept_action(action):
   self.handle_action(action)

  wait_time = FETCH_ACTION_INTERVAL - (time.time() - start_time)
  if wait_time > 0:
      self.wait(wait_time)
 finally:
     self.deregister_signals()

    @staticmethod
    def ignore_signal(signum, frame):
 return

    def register_signals(self):
 signal.signal(signal.SIGALRM, self.ignore_signal)
 signal.signal(signal.SIGCHLD, self.ignore_signal)

    def deregister_signals(self):
 signal.setitimer(signal.ITIMER_REAL, 0)
 signal.signal(signal.SIGALRM, signal.SIG_DFL)
 signal.signal(signal.SIGCHLD, signal.SIG_DFL)

    def wait(self):
 signal.setitimer(signal.ITIMER_REAL, wait_time)
 signal.pause()
 signal.setitimer(signal.ITIMER_REAL, 0)

因为 signal.pause() 会在收到任意信号后继续运行,所以定时器的 SIGALRM 信号和子进程退出的 SIGCHLD 都可以唤醒它,也就实现了子进程退出时立刻唤醒,否则等待最多 5 秒的功能。但是我们并不知道这次唤醒是不是由 SIGALRM 信号造成的,所以安全起见还是清除一下定时器,避免它可能打断之后的系统调用。

然而这种实现仍然有不足之处,如果使用 IDE 来调试的话,暂停在 signal.pause() 这步可能导致错过定时器发出的 SIGALRM 信号,使得程序没法继续执行,有时还可能出现各种意外的退出情况。更可靠的方式是使用唤醒 fd,再用 select 等方式监听 fd 是否可读,并设置超时时间。为了使用比较高效的 select 模型,又不用考虑平台的可移植性(例如我经常在 macOS 和 Linux 之间切换),我采用了 selectors 标准库(Python 3.4 之前的版本可以使用 selectors2 或 selectors34 这两个第三方库来替代):

class Daemon(object):
    def __init__(self):
 self._action_id = None
 self._done_action = False
 self._selector = None
 r, w = os.pipe()
 set_non_blocking(r)
 set_non_blocking(w)  # wakeup fd 需要是非阻塞的
 self._waker = r, w

    def register_signals(self):
 signal.signal(signal.SIGCHLD, self.ignore_signal)
 self._selector = selectors.DefaultSelector()
 self._selector.register(self._waker[0], selectors.EVENT_READ)
 signal.set_wakeup_fd(self._waker[1])

    def deregister_signals(self):
 signal.signal(signal.SIGALRM, signal.SIG_DFL)
 signal.set_wakeup_fd(-1)
 if self._selector:
     self._selector.close()
     self._selector = None

    def wait(self, wait_time):
 ready = self._selector.select(wait_time)
 if ready:
     waker = self._waker[0]
     # 读完并忽略所有的数据,避免 _waker[1] 被阻塞而无法写入
     try:
  while True:
      data = os.read(waker, BUF_SIZE)
      if not data or len(data) < BUF_SIZE:  # 没有更多数据了
   break
     except OSError:  # 忽略 EAGAIN 和 EINTR
  pass

def set_non_blocking(fd):
    flags = fcntl.fcntl(fd, fcntl.F_GETFL)
    fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)

BUF_SIZE = 1024

经过这样改造后,就没有上述缺点了。顺带还获得了一个好处,如果和远程服务器之间是长链接,那就可以监听该 socket 的 fd,一有消息就立刻跳到下个循环,加快 fetch_action() 的调用。

再来看子进程的实现:

class Worker(object):
    def __init__(self, action):
 try:
     os.setsid()
 except OSError:
     worker_logger.warning('failed to setsid, the worker should be created in a forked process')

 user_name = action.get('user_name')
 if not user_name:
     raise InvalidAction('user_name is required')
 if not isinstance(user_name, basestring):
     raise InvalidAction('user_name must be a string object')

 try:
     user = pwd.getpwnam(user_name)
 except KeyError:
     raise UserNotFound

 group_name = action.get('group_name')
 if group_name:
     if not isinstance(group_name, basestring):
  raise InvalidAction('group_name must be a string object')
     try:
  user_group = grp.getgrnam(group_name)
     except KeyError:
  raise GroupNotFound
     if user_name not in user_group.gr_mem:
  raise UserNotInGroup('user %s not in group %s' % (user_name, group_name))
     os.setgid(user_group.gr_gid)
 else:
     os.setgid(user.pw_gid)

 os.setuid(user.pw_uid)
 os.chdir(user.pw_dir)

 redirect_fd(0, os.devnull, os.O_RDONLY)  # 避免运行一些需要 TTY 的命令,如 top
 redirect_fd(1, STDOUT_FILE_PATH), os.O_WRonLY | os.O_CREAT | os.O_TRUNC)
 redirect_fd(2, STDERR_FILE_NAME), os.O_WRonLY | os.O_CREAT | os.O_TRUNC)

 self.action = action

 self.save_worker_pid()

    def do_action(self):
 gc.disable()  # https://bugs.python.org/issue1336
 try:
     pid = os.fork()
 except OSError:
     worker_logger.exception('failed to fork executor for action ' + self.action.id)
     gc.enable()
     raise
 else:
     if pid == 0:  # executor
  worker_logger.handlers[0].close()
  os.execve('/bin/bash', ['/bin/bash', '--norc', '-c', self.action.execution], {})  # 使用干净的环境来执行
     else:  # worker
  gc.enable()
  result = wait_pid(pid, 0)
  executor_exit_status = result[1]
  self.save_executor_exit_status(exit_status)

    def save_worker_pid(self):
 raise NotImplementedError

    def save_executor_exit_status(self, exit_status):
 raise NotImplementedError

def redirect_fd(fd, path, mode):
    try:
 os.close(fd)
    except OSError:
 pass
    new_fd = os.open(path, mode)
    if new_fd != fd:
 os.dup2(new_fd, fd)
 os.close(new_fd)

def wait_pid(pid, options):
    while True:
 try:
     return os.waitpid(pid, options)
 except OSError as e:
     if e.errno == errno.EINTR:  # 忽略等待时被中断
  continue
     raise

这里先做的工作是设置 UID 和 GID 等,因为执行的用户和 woker 的运行用户可能不一样(后者一般是用 root 用户)。
而在 do_action() 里,worker 并没有直接调用 os.execve(),而是让它的子进程去执行,为的是执行完毕后有机会调用 save_executor_exit_status() 方法。这个方法和 save_worker_pid() 方法的实现我就不列出了,只需要写入文件即可。

为什么要将退出状态和 PID 写入文件,而不是直接通知 daemon 进程呢?因为 daemon 进程虽然是长期运行的,但是有可能因为意外情况退出,也可能需要重启。在它重启之后,新的 daemon 已经和正在运行的 worker 没有联系了,就只能通过文件读取到 worker 的 PID 和 executor 的退出状态,从而恢复运行。这也是我不用 subprocess 模块的原因,因为进程退出后 PIPE 就丢失了。FIFO 也是不可靠的,因为读出来后如果 daemon 进程崩溃了,数据也没法找回;甚至也有可能两个进程都崩溃了,这个 FIFO 就被销毁了。

最后,daemon 剩下的一些繁琐的功能实现我也不列出了,只把 handle_action() 实现一下:

class Daemon(object):
    def handle_action(self, action):
 gc.disable()
 try:
     pid = os.fork()
 except OSError:
     daemon_logger.exception('failed to create worker for action ' + action.id)
     gc.enable()
     # 保存 action exit status
 else:
     if pid == 0:  # worker
  self.deregister_signals()
  os.close(self._waker[0])
  os.close(self._waker[1])
  daemon_logger.handlers[0].close()
  gc.enable()
  exit_code = 0
  try:
      worker = Worker(action)
      worker.do_action()
  except SystemExit as e:
      exit_code = e.code
  except:
      exit_code = 1  # 根据实际的设置返回不同的错误码
  finally:
      try:
   logging.shutdown()  # 刷新未写入的日志
      except:
   pass
      os._exit(exit_code)
     else:  # daemon
  gc.enable()
  # 保存 worker PID,因为 worker 有可能还没保存 PID 就退出了
转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/225546.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号