8월, 2018의 게시물 표시

Rate limit function calls in python

#!/usr/bin/env python import functools from typing import Callable, Deque import time import threading from collections import deque import inspect MAX_CALLS = 10 PERIOD_SEC = 3 def rate_limited(func: Callable, max_calls: int, period: int) -> Callable:     print("Max calls : " + str(max_calls) + " / " + str(period))     calls: Deque = deque()     lock = threading.RLock()     @functools.wraps(func)     def wrapper(*args, **kargs):         with lock:             if len(calls) >= max_calls:                 until = time.time() + period - (calls[-1] - calls[0])                 sleeptime = until - time.time()                 if sleeptime > 0:                     print("Rate Limit Reached => sleep with : " + str(sleeptime))                     time.sleep(sleeptime)                 while len(calls) > 0:                     calls.popleft()             _time = time.time()             calls.append(_time)