您可以通过实现漏斗算法来做到这一点:
import asyncioimport contextlibimport collectionsimport timefrom types import TracebackTypefrom typing import Dict, Optional, Typetry: # Python 3.7 base = contextlib.AbstractAsyncContextManager _current_task = asyncio.current_taskexcept AttributeError: base = object # type: ignore _current_task = asyncio.Task.current_task # type: ignoreclass AsyncLeakyBucket(base): """A leaky bucket rate limiter. Allows up to max_rate / time_period acquisitions before blocking. time_period is measured in seconds; the default is 60. """ def __init__( self, max_rate: float, time_period: float = 60, loop: Optional[asyncio.AbstractEventLoop] = None ) -> None: self._loop = loop self._max_level = max_rate self._rate_per_sec = max_rate / time_period self._level = 0.0 self._last_check = 0.0 # queue of waiting futures to signal capacity to self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict() def _leak(self) -> None: """Drip out capacity from the bucket.""" if self._level: # drip out enough level for the elapsed time since # we last checked elapsed = time.time() - self._last_check decrement = elapsed * self._rate_per_sec self._level = max(self._level - decrement, 0) self._last_check = time.time() def has_capacity(self, amount: float = 1) -> bool: """Check if there is enough space remaining in the bucket""" self._leak() requested = self._level + amount # if there are tasks waiting for capacity, signal to the first # there there may be some now (they won't wake up until this task # yields with an await) if requested < self._max_level: for fut in self._waiters.values(): if not fut.done(): fut.set_result(True) break return self._level + amount <= self._max_level async def acquire(self, amount: float = 1) -> None: """Acquire space in the bucket. If the bucket is full, block until there is space. """ if amount > self._max_level: raise ValueError("Can't acquire more than the bucket capacity") loop = self._loop or asyncio.get_event_loop() task = _current_task(loop) assert task is not None while not self.has_capacity(amount): # wait for the next drip to have left the bucket # add a future to the _waiters map to be notified # 'early' if capacity has come up fut = loop.create_future() self._waiters[task] = fut try: await asyncio.wait_for( asyncio.shield(fut), 1 / self._rate_per_sec * amount, loop=loop ) except asyncio.TimeoutError: pass fut.cancel() self._waiters.pop(task, None) self._level += amount return None async def __aenter__(self) -> None: await self.acquire() return None async def __aexit__( self, exc_type: Optional[Type[baseException]], exc: Optional[baseException], tb: Optional[TracebackType] ) -> None: return None请注意,我们是有机会从存储桶中泄漏容量,因此无需运行单独的异步任务即可降低该级别;而是在测试足够的剩余容量时泄漏了容量。
请注意,等待容量的任务保存在有序字典中,并且当可能有容量再次空闲时,第一个仍在等待的任务会提前唤醒。
您可以将其用作上下文管理器;尝试在存储块已满时获取存储桶,直到再次释放足够的容量为止:
bucket = AsyncLeakyBucket(100)# ...async with bucket: # only reached once the bucket is no longer full
或者您可以
acquire()直接致电:
await bucket.acquire() # blocks until there is space in the bucket
或者您可以简单地先测试是否有空间:
if bucket.has_capacity(): # reject a request due to rate limiting
请注意,您可以通过增加或减少“滴入”存储桶的数量来将某些请求视为“较重”或“较轻”:
await bucket.acquire(10)if bucket.has_capacity(0.5):
不过要小心点;当混合大小液滴时,在达到或接近最大速率时,小液滴往往先于大液滴流失,因为存在较大可能性的可能性是,在有较大液滴空间之前,有足够的自由容量用于较小液滴。
演示:
>>> import asyncio, time>>> bucket = AsyncLeakyBucket(5, 10)>>> async def task(id):... await asyncio.sleep(id * 0.01)... async with bucket:... print(f'{id:>2d}: Drip! {time.time() - ref:>5.2f}')...>>> ref = time.time()>>> tasks = [task(i) for i in range(15)]>>> result = asyncio.run(asyncio.wait(tasks)) 0: Drip! 0.00 1: Drip! 0.02 2: Drip! 0.02 3: Drip! 0.03 4: Drip! 0.04 5: Drip! 2.05 6: Drip! 4.06 7: Drip! 6.06 8: Drip! 8.06 9: Drip! 10.0710: Drip! 12.0711: Drip! 14.0812: Drip! 16.0813: Drip! 18.0814: Drip! 20.09开始时,存储桶会突然装满,导致其余任务更均匀地分配;每2秒释放出足够的容量以处理另一个任务。
在上面的演示中,最大突发大小等于最大速率值,该示例已将其设置为5。如果您不想允许突发,请将最大速率设置为1,并将时间间隔设置为两次滴水之间的最小时间:
>>> bucket = AsyncLeakyBucket(1, 1.5) # no bursts, drip every 1.5 seconds>>> async def task():... async with bucket:... print(f'Drip! {time.time() - ref:>5.2f}')...>>> ref = time.time()>>> tasks = [task() for _ in range(5)]>>> result = asyncio.run(asyncio.wait(tasks))Drip! 0.00Drip! 1.50Drip! 3.01Drip! 4.51Drip! 6.02我已经将其打包为Python项目:https :
//github.com/mjpieters/aiolimiter



