栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

为什么要显式调用asyncio.StreamWriter.drain?

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

为什么要显式调用asyncio.StreamWriter.drain?

据我了解,(1)每次调用write时都需要调用rain。(2)如果不是我猜,写会阻塞循环线程

两者都不是正确的,但是这种混乱是可以理解的。

write()
工作方式如下:

  • 调用

    write()
    只会将数据存储到缓冲区中,然后将其留给事件循环以在以后的时间实际将其写出,而无需程序的进一步干预。就应用程序而言,数据在后台写入的速度与另一端能够接收数据的速度一样快。换句话说,每个人都
    write()
    将使用所需的尽可能多的OS级别写入来调度要传输的数据,而这些写入实际上是在相应文件描述符可写时发出的。所有这一切都是自动发生的,甚至无需等待
    drain()

  • write()
    不是协程,它绝对 不会 阻塞事件循环。

第二个属性听起来方便-您可以拨打

write()
你需要的地方,甚至从一个功能,这不是
async def
-但它实际上是一个重大 缺陷
write()
。流API公开的写入方式与接收数据的操作系统完全脱钩,因此,如果写入数据的速度快于对等方读取数据的速度,则内部缓冲区将不断增长,并且您将面临内存泄漏。
drain()
解决了该问题:如果写入缓冲区太大,则等待它暂停协程,并
os.write()
在后台成功执行并且缓冲区缩小后再次恢复协程。

您不需要

drain()
每次 写操作之后 等待,但是您确实需要偶尔等待它,通常在
write()
调用该循环的迭代之间等待。例如:

while True:    response = await peer1.readline()    peer2.write(b'<response>')    peer2.write(response)    peer2.write(b'</response>')    await peer2.drain()

drain()
如果未决的未写入数据量很少,则立即返回。如果数据超过高阈值,
drain()
将暂停调用协程直到待处理的未写入数据量降至低阈值以下。暂停会导致协程停止从读取数据
peer1
,这反过来又会导致对等方放慢其向我们发送数据的速度。这种反馈称为背压。

缓冲应在写函数内部处理,应用程序不应在意。

这几乎就是

write()
现在的工作方式-
它确实可以处理缓冲,并且它使应用程序不管好坏都不在乎。另请参阅此答案以获取其他信息。


解决问题的编辑部分:

再次阅读答案和链接,我认为这些功能就是这样工作的。

write()
仍然比那聪明。它不会尝试只写入一次,而是会安排数据继续写入,直到没有数据可写入为止。即使您从未等待也将发生这种情况
drain()
-应用程序必须做的唯一事情就是让事件循环运行其过程足够长的时间,以将所有内容写出。

一个更正确的伪代码

write
,并
drain
可能是这样的:

class ToyWriter:    def __init__(self):        self._buf = bytearray()        self._empty = asyncio.Event(True)    def write(self, data):        self._buf.extend(data)        loop.add_writer(self._fd, self._do_write)        self._empty.clear()    def _do_write(self):        # Automatically invoked by the event loop when the        # file descriptor is writable, regardless of whether        # anyone calls drain()        while self._buf: try:     nwritten = os.write(self._fd, self._buf) except OSError as e:     if e.errno == errno.EWOULDBLOCK:         return  # continue once we're writable again     raise self._buf = self._buf[nwritten:]        self._empty.set()        loop.remove_writer(self._fd, self._do_write)    async def drain(self):        if len(self._buf) > 64*1024: await self._empty.wait()

实际的实现更为复杂,因为:

  • 它写在具有自己复杂的流控制的Twisted样式的传输/协议层之上,而不是在;
    os.write
  • drain()
    并不是真正地等到缓冲区为空时,才等到达到低水位线;
  • 除了
    EWOULDBLOCK
    引发的异常之外,其他异常
    _do_write
    都存储在中并重新引发
    drain()

最后一点是调用的 另一个 很好的理由

drain()
-实际上由于写入失败而注意到对等端已消失。



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/647022.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号