Source code for ds_protocol_http_py_lib.utils.http.token_bucket
"""
**File:** ``token_bucket.py``
**Region:** ``ds_protocol_http_py_lib/utils/http/token_bucket``
Token Bucket Rate Limiter
This module implements a cooperative token bucket algorithm for rate limiting HTTP requests.
A token bucket maintains a reservoir of tokens that are consumed when requests are made.
Tokens are replenished at a constant rate (tokens per second), providing smooth rate limiting
that allows burst traffic while maintaining an overall rate cap.
Key features:
- Thread-safe using threading.Lock
- Configurable requests per second (RPS) rate
- Adjustable capacity for burst handling
- Uses threading.Lock for thread-safe operations
- Global RPS cap across all concurrent operations
Example:
>>> limiter = TokenBucket(rps=10.0, capacity=20)
>>> limiter.acquire()
>>> # Make your HTTP request here
"""
import threading
import time
from ds_common_logger_py_lib import Logger
logger = Logger.get_logger(__name__, package=True)
[docs]
class TokenBucket:
"""
Token Bucket Rate Limiter
Implements the classic token bucket algorithm for controlling request rates in threading environments.
Each request consumes one token from the bucket. If no tokens are available, the request waits
until tokens are replenished based on the configured rate.
The bucket starts full and refills continuously at the specified RPS rate. This allows for
burst traffic (up to the bucket capacity) while maintaining the overall rate limit.
:param rps: Target requests per second rate. Determines token refill rate.
:param capacity: Maximum number of tokens the bucket can hold. Defaults to 2x RPS.
:return: None
Example:
# Limit to 10 requests per second with burst capacity of 20
limiter = TokenBucket(rps=10.0, capacity=20)
# Acquire permission for a request
limiter.acquire()
# Make your HTTP request here
"""
def __init__(
self,
rps: float = 10.0,
capacity: int = 20,
) -> None:
"""
Initialize the TokenBucket.
:param rps: Target requests per second rate. Determines token refill rate.
:param capacity: Maximum number of tokens the bucket can hold. Defaults to 2x RPS.
:return: None
"""
self.rps = float(rps)
self.capacity = int(capacity) if capacity else int(self.rps * 2)
self.tokens = float(self.capacity)
self.last = time.perf_counter()
self._lock = threading.Lock()
[docs]
def acquire(self) -> None:
"""
Acquire a token from the bucket, waiting if necessary.
:return: None
"""
while True:
with self._lock:
now = time.perf_counter()
self.tokens = min(self.capacity, self.tokens + (now - self.last) * self.rps)
self.last = now
if self.tokens >= 1.0:
self.tokens -= 1.0
return
wait = (1.0 - self.tokens) / self.rps
logger.debug(f"Waiting {wait} seconds for token")
time.sleep(wait)
[docs]
def available(self) -> float:
"""
Return the current available token count *without mutating* internal state.
This computes how many tokens would be available if we refilled based on
elapsed time, but it does not update `tokens` or `last`.
"""
with self._lock:
now = time.perf_counter()
return min(self.capacity, self.tokens + (now - self.last) * self.rps)