ds_protocol_http_py_lib.utils.http¶
File: __init__.py
Region: ds_protocol_http_py_lib/utils/http
HTTP utility subpackage.
This file exists to ensure tools (like Sphinx AutoAPI) treat this directory as a proper Python package, so intra-package relative imports resolve correctly.
Submodules¶
Classes¶
Configuration for the HTTP client. |
|
Retry policy (urllib3 Retry via requests). |
|
Minimal synchronous HTTP client with: |
|
Token Bucket Rate Limiter |
Package Contents¶
- class ds_protocol_http_py_lib.utils.http.HttpConfig[source]¶
Configuration for the HTTP client. - headers: applied to all requests (overridable per call) - timeout_seconds: connect+read timeout seconds (or (connect, read) per call) - user_agent: user agent for all requests - pool_maxsize: maximum number of connections in the pool - pool_connections: maximum number of connections in the pool - raise_for_status: raise for status for all requests - retry: RetryConfig
- headers: collections.abc.Mapping[str, str]¶
- timeout_seconds: int | float = 10¶
- user_agent: str = 'Http/1.0'¶
- pool_maxsize: int = 32¶
- pool_connections: int = 10¶
- retry: RetryConfig¶
- class ds_protocol_http_py_lib.utils.http.RetryConfig[source]¶
Retry policy (urllib3 Retry via requests).
total: max attempts (includes first request)
backoff_factor: sleep = factor * (2 ** (retry_num - 1))
status_forcelist: statuses that trigger retry
allowed_methods: methods eligible for retry
respect_retry_after_header: honor Retry-After on 429/503
- total: int = 3¶
- backoff_factor: float = 0.2¶
- status_forcelist: tuple[int, Ellipsis] = (429, 500, 502, 503, 504)¶
- allowed_methods: tuple[str, Ellipsis] = ('GET', 'POST', 'PUT', 'DELETE', 'PATCH')¶
- raise_on_status: bool = False¶
- respect_retry_after_header: bool = True¶
- class ds_protocol_http_py_lib.utils.http.Http(*, config: ds_protocol_http_py_lib.utils.http.config.HttpConfig | None = None, bucket: ds_protocol_http_py_lib.utils.http.token_bucket.TokenBucket | None = None, session: requests.Session | None = None)[source]¶
- Minimal synchronous HTTP client with:
requests.Session + urllib3.Retry (429/5xx, backoff, Retry-After)
optional TokenBucket for simple RPS throttling
context-managed lifetime
tiny API: request/get/post/close
- _cfg¶
- _bucket¶
- _session¶
- _build_session() requests.Session[source]¶
Build the session. :returns: The session. :rtype: requests.Session
- _response_info(response: requests.Response) dict[str, Any][source]¶
Get information about a response. Extracts safe metadata only (no request/response bodies).
- Parameters:
response – The HTTP response object to extract info from.
- Returns:
Dictionary containing the response information.
- Return type:
dict[str, Any]
- property session: requests.Session¶
Get the underlying requests session for direct use. Allows direct access to session properties like headers.
- Returns:
The requests session.
- Return type:
requests.Session
Example
>>> http = Http() >>> http.session.headers.update({"Authorization": "Bearer token"})
- request(method: str, url: str, **kwargs: Any) requests.Response[source]¶
Send an HTTP request with rate limiting, retry logic, and comprehensive logging.
- Parameters:
method – HTTP method (GET, POST, PUT, DELETE, etc.).
url – Target URL for the request.
**kwargs – Additional keyword arguments passed to requests (timeout, headers, data, etc.).
- Returns:
The HTTP response object.
- Return type:
requests.Response
- Raises:
requests.HTTPError – If the response status code indicates an error.
requests.RequestException – For other request-related errors.
Example
>>> with Http() as client: ... response = client.request('GET', 'https://api.example.com/data', timeout=30) ... data = response.json()
- get(url: str, **kwargs: Any) requests.Response[source]¶
Send a GET request with enhanced logging.
- Parameters:
url – Target URL for the GET request.
**kwargs – Additional keyword arguments passed to requests.
- Returns:
The HTTP response object.
- Return type:
requests.Response
Example
>>> with Http() as client: ... response = client.get("https://api.example.com/data")
- post(url: str, **kwargs: Any) requests.Response[source]¶
Send a POST request with enhanced logging.
- Parameters:
url – Target URL for the POST request.
**kwargs – Additional keyword arguments passed to requests.
- Returns:
The HTTP response object.
- Return type:
requests.Response
Example
>>> with Http() as client: ... response = client.post("https://api.example.com/data", json={"key": "value"})
- put(url: str, **kwargs: Any) requests.Response[source]¶
Send a PUT request with enhanced logging.
- Parameters:
url – Target URL for the PUT request.
**kwargs – Additional keyword arguments passed to requests.
- Returns:
The HTTP response object.
- Return type:
requests.Response
Example
>>> with Http() as client: ... response = client.put("https://api.example.com/data/1", json={"key": "value"})
- delete(url: str, **kwargs: Any) requests.Response[source]¶
Send a DELETE request with enhanced logging.
- Parameters:
url – Target URL for the DELETE request.
**kwargs – Additional keyword arguments passed to requests.
- Returns:
The HTTP response object.
- Return type:
requests.Response
Example
>>> with Http() as client: ... response = client.delete("https://api.example.com/data/1")
- class ds_protocol_http_py_lib.utils.http.TokenBucket(rps: float = 10.0, capacity: int = 20)[source]¶
Token Bucket Rate Limiter
Implements the classic token bucket algorithm for controlling request rates in threading environments. Each request consumes one token from the bucket. If no tokens are available, the request waits until tokens are replenished based on the configured rate.
The bucket starts full and refills continuously at the specified RPS rate. This allows for burst traffic (up to the bucket capacity) while maintaining the overall rate limit.
- Parameters:
rps – Target requests per second rate. Determines token refill rate.
capacity – Maximum number of tokens the bucket can hold. Defaults to 2x RPS.
- Returns:
None
Example
# Limit to 10 requests per second with burst capacity of 20 limiter = TokenBucket(rps=10.0, capacity=20)
# Acquire permission for a request limiter.acquire() # Make your HTTP request here
- rps¶
- capacity = 20¶
- tokens¶
- last¶
- _lock¶