ds_protocol_http_py_lib.utils.http

File: __init__.py Region: ds_protocol_http_py_lib/utils/http

HTTP utility subpackage.

This file exists to ensure tools (like Sphinx AutoAPI) treat this directory as a proper Python package, so intra-package relative imports resolve correctly.

Submodules

Classes

HttpConfig

Configuration for the HTTP client.

RetryConfig

Retry policy (urllib3 Retry via requests).

Http

Minimal synchronous HTTP client with:

TokenBucket

Token Bucket Rate Limiter

Package Contents

class ds_protocol_http_py_lib.utils.http.HttpConfig[source]

Configuration for the HTTP client. - headers: applied to all requests (overridable per call) - timeout_seconds: connect+read timeout seconds (or (connect, read) per call) - user_agent: user agent for all requests - pool_maxsize: maximum number of connections in the pool - pool_connections: maximum number of connections in the pool - raise_for_status: raise for status for all requests - retry: RetryConfig

headers: collections.abc.Mapping[str, str]
timeout_seconds: int | float = 10
user_agent: str = 'Http/1.0'
pool_maxsize: int = 32
pool_connections: int = 10
retry: RetryConfig
class ds_protocol_http_py_lib.utils.http.RetryConfig[source]

Retry policy (urllib3 Retry via requests).

  • total: max attempts (includes first request)

  • backoff_factor: sleep = factor * (2 ** (retry_num - 1))

  • status_forcelist: statuses that trigger retry

  • allowed_methods: methods eligible for retry

  • respect_retry_after_header: honor Retry-After on 429/503

total: int = 3
backoff_factor: float = 0.2
status_forcelist: tuple[int, Ellipsis] = (429, 500, 502, 503, 504)
allowed_methods: tuple[str, Ellipsis] = ('GET', 'POST', 'PUT', 'DELETE', 'PATCH')
raise_on_status: bool = False
respect_retry_after_header: bool = True
class ds_protocol_http_py_lib.utils.http.Http(*, config: ds_protocol_http_py_lib.utils.http.config.HttpConfig | None = None, bucket: ds_protocol_http_py_lib.utils.http.token_bucket.TokenBucket | None = None, session: requests.Session | None = None)[source]
Minimal synchronous HTTP client with:
  • requests.Session + urllib3.Retry (429/5xx, backoff, Retry-After)

  • optional TokenBucket for simple RPS throttling

  • context-managed lifetime

  • tiny API: request/get/post/close

_cfg
_bucket
_session
_build_session() requests.Session[source]

Build the session. :returns: The session. :rtype: requests.Session

_response_info(response: requests.Response) dict[str, Any][source]

Get information about a response. Extracts safe metadata only (no request/response bodies).

Parameters:

response – The HTTP response object to extract info from.

Returns:

Dictionary containing the response information.

Return type:

dict[str, Any]

property session: requests.Session

Get the underlying requests session for direct use. Allows direct access to session properties like headers.

Returns:

The requests session.

Return type:

requests.Session

Example

>>> http = Http()
>>> http.session.headers.update({"Authorization": "Bearer token"})
__enter__() Http[source]
__exit__(exc_type: Any, exc: Any, tb: Any) None[source]
close() None[source]
request(method: str, url: str, **kwargs: Any) requests.Response[source]

Send an HTTP request with rate limiting, retry logic, and comprehensive logging.

Parameters:
  • method – HTTP method (GET, POST, PUT, DELETE, etc.).

  • url – Target URL for the request.

  • **kwargs – Additional keyword arguments passed to requests (timeout, headers, data, etc.).

Returns:

The HTTP response object.

Return type:

requests.Response

Raises:
  • requests.HTTPError – If the response status code indicates an error.

  • requests.RequestException – For other request-related errors.

Example

>>> with Http() as client:
...     response = client.request('GET', 'https://api.example.com/data', timeout=30)
...     data = response.json()
get(url: str, **kwargs: Any) requests.Response[source]

Send a GET request with enhanced logging.

Parameters:
  • url – Target URL for the GET request.

  • **kwargs – Additional keyword arguments passed to requests.

Returns:

The HTTP response object.

Return type:

requests.Response

Example

>>> with Http() as client:
...     response = client.get("https://api.example.com/data")
post(url: str, **kwargs: Any) requests.Response[source]

Send a POST request with enhanced logging.

Parameters:
  • url – Target URL for the POST request.

  • **kwargs – Additional keyword arguments passed to requests.

Returns:

The HTTP response object.

Return type:

requests.Response

Example

>>> with Http() as client:
...     response = client.post("https://api.example.com/data", json={"key": "value"})
put(url: str, **kwargs: Any) requests.Response[source]

Send a PUT request with enhanced logging.

Parameters:
  • url – Target URL for the PUT request.

  • **kwargs – Additional keyword arguments passed to requests.

Returns:

The HTTP response object.

Return type:

requests.Response

Example

>>> with Http() as client:
...     response = client.put("https://api.example.com/data/1", json={"key": "value"})
delete(url: str, **kwargs: Any) requests.Response[source]

Send a DELETE request with enhanced logging.

Parameters:
  • url – Target URL for the DELETE request.

  • **kwargs – Additional keyword arguments passed to requests.

Returns:

The HTTP response object.

Return type:

requests.Response

Example

>>> with Http() as client:
...     response = client.delete("https://api.example.com/data/1")
class ds_protocol_http_py_lib.utils.http.TokenBucket(rps: float = 10.0, capacity: int = 20)[source]

Token Bucket Rate Limiter

Implements the classic token bucket algorithm for controlling request rates in threading environments. Each request consumes one token from the bucket. If no tokens are available, the request waits until tokens are replenished based on the configured rate.

The bucket starts full and refills continuously at the specified RPS rate. This allows for burst traffic (up to the bucket capacity) while maintaining the overall rate limit.

Parameters:
  • rps – Target requests per second rate. Determines token refill rate.

  • capacity – Maximum number of tokens the bucket can hold. Defaults to 2x RPS.

Returns:

None

Example

# Limit to 10 requests per second with burst capacity of 20 limiter = TokenBucket(rps=10.0, capacity=20)

# Acquire permission for a request limiter.acquire() # Make your HTTP request here

rps
capacity = 20
tokens
last
_lock
acquire() None[source]

Acquire a token from the bucket, waiting if necessary. :return: None

available() float[source]

Return the current available token count without mutating internal state.

This computes how many tokens would be available if we refilled based on elapsed time, but it does not update tokens or last.