mirror of
https://github.com/yasirarism/MissKatyPyro.git
synced 2025-12-29 17:44:50 +00:00
- AutoFix Code Using DeepSource - Fix os not defined when got error - Fix Set Chat Photo (Only support photo) - Fix Admins Permission Error - Fix KeyError in Scraper - Fix Help Module in Eval - Fix Media Caption Too Long in IMDB - Remove heroku support - Some minor fix..
49 lines
1.2 KiB
Python
49 lines
1.2 KiB
Python
from typing import Union
|
|
|
|
from pyrate_limiter import (
|
|
BucketFullException,
|
|
Duration,
|
|
Limiter,
|
|
MemoryListBucket,
|
|
RequestRate,
|
|
)
|
|
|
|
|
|
class RateLimiter:
|
|
"""
|
|
Implement rate limit logic using leaky bucket
|
|
algorithm, via pyrate_limiter.
|
|
(https://pypi.org/project/pyrate-limiter/)
|
|
"""
|
|
|
|
def __init__(self) -> None:
|
|
# 1 requests per seconds
|
|
self.second_rate = RequestRate(1, Duration.SECOND)
|
|
|
|
# 15 requests per minute.
|
|
self.minute_rate = RequestRate(15, Duration.MINUTE)
|
|
|
|
# 100 requests per hour
|
|
self.hourly_rate = RequestRate(100, Duration.HOUR)
|
|
|
|
# 500 requests per day
|
|
self.daily_rate = RequestRate(500, Duration.DAY)
|
|
|
|
self.limiter = Limiter(
|
|
self.minute_rate,
|
|
self.hourly_rate,
|
|
self.daily_rate,
|
|
bucket_class=MemoryListBucket,
|
|
)
|
|
|
|
async def acquire(self, userid: Union[int, str]) -> bool:
|
|
"""
|
|
Acquire rate limit per userid and return True / False
|
|
based on userid ratelimit status.
|
|
"""
|
|
|
|
try:
|
|
self.limiter.try_acquire(userid)
|
|
return False
|
|
except BucketFullException:
|
|
return True
|