import asyncio
from threading import Lock
class PTBNL:
def __init__(self):
self._req_id_seq = 0
self._futures = {}
self._results = {}
self.token_bucket = TokenBucket()
self.token_bucket.set_rate(100)
def run(self, *awaitables):
loop = asyncio.get_event_loop()
if not awaitables:
loop.run_forever()
elif len(awaitables) == 1:
return loop.run_until_complete(*awaitables)
else:
future = asyncio.gather(*awaitables)
return loop.run_until_complete(future)
def sleep(self, secs) -> True:
self.run(asyncio.sleep(secs))
return True
def get_req_id(self) -> int:
new_id = self._req_id_seq
self._req_id_seq += 1
return new_id
def start_req(self, key):
loop = asyncio.get_event_loop()
future = loop.create_future()
self._futures[key] = future
return future
def end_req(self, key, result=None):
future = self._futures.pop(key, None)
if future:
if result is None:
result = self._results.pop(key, [])
if not future.done():
future.set_result(result)
def req_data(self, req_id, obj):
# Do Some Work Here
self.req_data_end(req_id)
pass
def req_data_end(self, req_id):
print(req_id, " has ended")
self.end_req(req_id)
async def req_data_async(self, obj):
req_id = self.get_req_id()
future = self.start_req(req_id)
self.req_data(req_id, obj)
await future
return future.result()
async def req_data_batch_async(self, contracts):
futures = []
FLAG = False
for contract in contracts:
req_id = self.get_req_id()
future = self.start_req(req_id)
futures.append(future)
nap = self.token_bucket.consume(1)
if FLAG is False:
FLAG = True
start = asyncio.get_event_loop().time()
asyncio.get_event_loop().call_later(nap, self.req_data, req_id, contract)
await asyncio.gather(*futures)
elapsed = asyncio.get_event_loop().time() - start
return futures, len(contracts)/elapsed
class TokenBucket:
def __init__(self):
self.tokens = 0
self.rate = 0
self.last = asyncio.get_event_loop().time()
self.lock = Lock()
def set_rate(self, rate):
with self.lock:
self.rate = rate
self.tokens = self.rate
def consume(self, tokens):
with self.lock:
if not self.rate:
return 0
now = asyncio.get_event_loop().time()
lapse = now - self.last
self.last = now
self.tokens += lapse * self.rate
if self.tokens > self.rate:
self.tokens = self.rate
self.tokens -= tokens
if self.tokens >= 0:
return 0
else:
return -self.tokens / self.rate
if __name__ == '__main__':
asyncio.get_event_loop().set_debug(True)
app = PTBNL()
objs = [obj for obj in range(500)]
l,t = app.run(app.req_data_batch_async(objs))
print(l)
print(t)
import asyncio
import contextlib
import collections
import time
from types import TracebackType
from typing import Dict, Optional, Type
try: # Python 3.7
base = contextlib.AbstractAsyncContextManager
_current_task = asyncio.current_task
except AttributeError:
base = object # type: ignore
_current_task = asyncio.Task.current_task # type: ignore
class AsyncLeakyBucket(base):
"""A leaky bucket rate limiter.
Allows up to max_rate / time_period acquisitions before blocking.
time_period is measured in seconds; the default is 60.
"""
def __init__(
self,
max_rate: float,
time_period: float = 60,
loop: Optional[asyncio.AbstractEventLoop] = None
) -> None:
self._loop = loop
self._max_level = max_rate
self._rate_per_sec = max_rate / time_period
self._level = 0.0
self._last_check = 0.0
# queue of waiting futures to signal capacity to
self._waiters: Dict[asyncio.Task, asyncio.Future] = collections.OrderedDict()
def _leak(self) -> None:
"""Drip out capacity from the bucket."""
if self._level:
# drip out enough level for the elapsed time since
# we last checked
elapsed = time.time() - self._last_check
decrement = elapsed * self._rate_per_sec
self._level = max(self._level - decrement, 0)
self._last_check = time.time()
def has_capacity(self, amount: float = 1) -> bool:
"""Check if there is enough space remaining in the bucket"""
self._leak()
requested = self._level + amount
# if there are tasks waiting for capacity, signal to the first
# there there may be some now (they won't wake up until this task
# yields with an await)
if requested < self._max_level:
for fut in self._waiters.values():
if not fut.done():
fut.set_result(True)
break
return self._level + amount <= self._max_level
async def acquire(self, amount: float = 1) -> None:
"""Acquire space in the bucket.
If the bucket is full, block until there is space.
"""
if amount > self._max_level:
raise ValueError("Can't acquire more than the bucket capacity")
loop = self._loop or asyncio.get_event_loop()
task = _current_task(loop)
assert task is not None
while not self.has_capacity(amount):
# wait for the next drip to have left the bucket
# add a future to the _waiters map to be notified
# 'early' if capacity has come up
fut = loop.create_future()
self._waiters[task] = fut
try:
await asyncio.wait_for(
asyncio.shield(fut),
1 / self._rate_per_sec * amount,
loop=loop
)
except asyncio.TimeoutError:
pass
fut.cancel()
self._waiters.pop(task, None)
self._level += amount
return None
async def __aenter__(self) -> None:
await self.acquire()
return None
async def __aexit__(
self,
exc_type: Optional[Type[BaseException]],
exc: Optional[BaseException],
tb: Optional[TracebackType]
) -> None:
return None