栏目分类:
子分类:
返回
名师互学网用户登录
快速导航关闭
当前搜索
当前分类
子分类
实用工具
热门搜索
名师互学网 > IT > 面试经验 > 面试问答

在Python Asyncio中限制异步功能

面试问答 更新时间: 发布时间: IT归档 最新发布 模块sitemap 名妆网 法律咨询 聚返吧 英语巴士网 伯小乐 网商动力

在Python Asyncio中限制异步功能

您可以通过实现漏斗算法来做到这一点:

import asyncioimport contextlibimport collectionsimport timefrom types import TracebackTypefrom typing import Dict, Optional, Typetry:  # Python 3.7    base = contextlib.AbstractAsyncContextManager    _current_task = asyncio.current_taskexcept AttributeError:    base = object  # type: ignore    _current_task = asyncio.Task.current_task  # type: ignoreclass AsyncLeakyBucket(base):    """A leaky bucket rate limiter.    Allows up to max_rate / time_period acquisitions before blocking.    time_period is measured in seconds; the default is 60.    """    def __init__(        self,        max_rate: float,        time_period: float = 60,        loop: Optional[asyncio.AbstractEventLoop] = None    ) -> None:        self._loop = loop        self._max_level = max_rate        self._rate_per_sec = max_rate / time_period        self._level = 0.0        self._last_check = 0.0        # queue of waiting futures to signal capacity to        self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict()    def _leak(self) -> None:        """Drip out capacity from the bucket."""        if self._level: # drip out enough level for the elapsed time since # we last checked elapsed = time.time() - self._last_check decrement = elapsed * self._rate_per_sec self._level = max(self._level - decrement, 0)        self._last_check = time.time()    def has_capacity(self, amount: float = 1) -> bool:        """Check if there is enough space remaining in the bucket"""        self._leak()        requested = self._level + amount        # if there are tasks waiting for capacity, signal to the first        # there there may be some now (they won't wake up until this task        # yields with an await)        if requested < self._max_level: for fut in self._waiters.values():     if not fut.done():         fut.set_result(True)         break        return self._level + amount <= self._max_level    async def acquire(self, amount: float = 1) -> None:        """Acquire space in the bucket.        If the bucket is full, block until there is space.        """        if amount > self._max_level: raise ValueError("Can't acquire more than the bucket capacity")        loop = self._loop or asyncio.get_event_loop()        task = _current_task(loop)        assert task is not None        while not self.has_capacity(amount): # wait for the next drip to have left the bucket # add a future to the _waiters map to be notified # 'early' if capacity has come up fut = loop.create_future() self._waiters[task] = fut try:     await asyncio.wait_for(         asyncio.shield(fut),         1 / self._rate_per_sec * amount,         loop=loop     ) except asyncio.TimeoutError:     pass fut.cancel()        self._waiters.pop(task, None)        self._level += amount        return None    async def __aenter__(self) -> None:        await self.acquire()        return None    async def __aexit__(        self,        exc_type: Optional[Type[baseException]],        exc: Optional[baseException],        tb: Optional[TracebackType]    ) -> None:        return None

请注意,我们是有机会从存储桶中泄漏容量,因此无需运行单独的异步任务即可降低该级别;而是在测试足够的剩余容量时泄漏了容量。

请注意,等待容量的任务保存在有序字典中,并且当可能有容量再次空闲时,第一个仍在等待的任务会提前唤醒。

您可以将其用作上下文管理器;尝试在存储块已满时获取存储桶,直到再次释放足够的容量为止:

bucket = AsyncLeakyBucket(100)# ...async with bucket:    # only reached once the bucket is no longer full

或者您可以

acquire()
直接致电:

await bucket.acquire()  # blocks until there is space in the bucket

或者您可以简单地先测试是否有空间:

if bucket.has_capacity():    # reject a request due to rate limiting

请注意,您可以通过增加或减少“滴入”存储桶的数量来将某些请求视为“较重”或“较轻”:

await bucket.acquire(10)if bucket.has_capacity(0.5):

不过要小心点;当混合大小液滴时,在达到或接近最大速率时,小液滴往往先于大液滴流失,因为存在较大可能性的可能性是,在有较大液滴空间之前,有足够的自由容量用于较小液滴。

演示:

>>> import asyncio, time>>> bucket = AsyncLeakyBucket(5, 10)>>> async def task(id):...     await asyncio.sleep(id * 0.01)...     async with bucket:...         print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}')...>>> ref = time.time()>>> tasks = [task(i) for i in range(15)]>>> result = asyncio.run(asyncio.wait(tasks)) 0: Drip!  0.00 1: Drip!  0.02 2: Drip!  0.02 3: Drip!  0.03 4: Drip!  0.04 5: Drip!  2.05 6: Drip!  4.06 7: Drip!  6.06 8: Drip!  8.06 9: Drip! 10.0710: Drip! 12.0711: Drip! 14.0812: Drip! 16.0813: Drip! 18.0814: Drip! 20.09

开始时,存储桶会突然装满,导致其余任务更均匀地分配;每2秒释放出足够的容量以处理另一个任务。

在上面的演示中,最大突发大小等于最大速率值,该示例已将其设置为5。如果您不想允许突发,请将最大速率设置为1,并将时间间隔设置为两次滴水之间的最小时间:

>>> bucket = AsyncLeakyBucket(1, 1.5)  # no bursts, drip every 1.5 seconds>>> async def task():...     async with bucket:...         print(f'Drip! {time.time() - ref:>5.2f}')...>>> ref = time.time()>>> tasks = [task() for _ in range(5)]>>> result = asyncio.run(asyncio.wait(tasks))Drip!  0.00Drip!  1.50Drip!  3.01Drip!  4.51Drip!  6.02

我已经将其打包为Python项目:https :
//github.com/mjpieters/aiolimiter



转载请注明:文章转载自 www.mshxw.com
本文地址:https://www.mshxw.com/it/610825.html
我们一直用心在做
关于我们 文章归档 网站地图 联系我们

版权所有 (c)2021-2022 MSHXW.COM

ICP备案号:晋ICP备2021003244-6号