据我了解,(1)每次调用write时都需要调用rain。(2)如果不是我猜,写会阻塞循环线程
两者都不是正确的,但是这种混乱是可以理解的。
write()工作方式如下:
调用
write()
只会将数据存储到缓冲区中,然后将其留给事件循环以在以后的时间实际将其写出,而无需程序的进一步干预。就应用程序而言,数据在后台写入的速度与另一端能够接收数据的速度一样快。换句话说,每个人都write()
将使用所需的尽可能多的OS级别写入来调度要传输的数据,而这些写入实际上是在相应文件描述符可写时发出的。所有这一切都是自动发生的,甚至无需等待drain()
。write()
不是协程,它绝对 不会 阻塞事件循环。
第二个属性听起来方便-您可以拨打
write()你需要的地方,甚至从一个功能,这不是
async def-但它实际上是一个重大 缺陷
的
write()。流API公开的写入方式与接收数据的操作系统完全脱钩,因此,如果写入数据的速度快于对等方读取数据的速度,则内部缓冲区将不断增长,并且您将面临内存泄漏。
drain()解决了该问题:如果写入缓冲区太大,则等待它暂停协程,并
os.write()在后台成功执行并且缓冲区缩小后再次恢复协程。
您不需要
drain()在 每次 写操作之后 都 等待,但是您确实需要偶尔等待它,通常在
write()调用该循环的迭代之间等待。例如:
while True: response = await peer1.readline() peer2.write(b'<response>') peer2.write(response) peer2.write(b'</response>') await peer2.drain()
drain()如果未决的未写入数据量很少,则立即返回。如果数据超过高阈值,
drain()将暂停调用协程直到待处理的未写入数据量降至低阈值以下。暂停会导致协程停止从读取数据
peer1,这反过来又会导致对等方放慢其向我们发送数据的速度。这种反馈称为背压。
缓冲应在写函数内部处理,应用程序不应在意。
这几乎就是
write()现在的工作方式-
它确实可以处理缓冲,并且它使应用程序不管好坏都不在乎。另请参阅此答案以获取其他信息。
解决问题的编辑部分:
再次阅读答案和链接,我认为这些功能就是这样工作的。
write()仍然比那聪明。它不会尝试只写入一次,而是会安排数据继续写入,直到没有数据可写入为止。即使您从未等待也将发生这种情况
drain()-应用程序必须做的唯一事情就是让事件循环运行其过程足够长的时间,以将所有内容写出。
一个更正确的伪代码
write,并
drain可能是这样的:
class ToyWriter: def __init__(self): self._buf = bytearray() self._empty = asyncio.Event(True) def write(self, data): self._buf.extend(data) loop.add_writer(self._fd, self._do_write) self._empty.clear() def _do_write(self): # Automatically invoked by the event loop when the # file descriptor is writable, regardless of whether # anyone calls drain() while self._buf: try: nwritten = os.write(self._fd, self._buf) except OSError as e: if e.errno == errno.EWOULDBLOCK: return # continue once we're writable again raise self._buf = self._buf[nwritten:] self._empty.set() loop.remove_writer(self._fd, self._do_write) async def drain(self): if len(self._buf) > 64*1024: await self._empty.wait()
实际的实现更为复杂,因为:
- 它写在具有自己复杂的流控制的Twisted样式的传输/协议层之上,而不是在;
os.write
drain()
并不是真正地等到缓冲区为空时,才等到达到低水位线;- 除了
EWOULDBLOCK
引发的异常之外,其他异常_do_write
都存储在中并重新引发drain()
。
最后一点是调用的 另一个 很好的理由
drain()-实际上由于写入失败而注意到对等端已消失。



