"""
**File:** ``provider.py``
**Region:** ``ds_protocol_http_py_lib/utils/http/provider``
HTTP Provider
This module implements a synchronous HTTP client with:
- requests.Session + urllib3.Retry (429/5xx, backoff, Retry-After)
- optional TokenBucket for simple RPS throttling
- context-managed lifetime
- tiny API: request/get/post/close
Example:
>>> with Http() as client:
... response = client.get("https://api.example.com/data")
... data = response.json()
"""
import time
from dataclasses import dataclass
from typing import Any
import requests
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.errors import ResourceException
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
AuthenticationError,
AuthorizationError,
ConnectionError,
)
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from ...utils.http.config import HttpConfig
from ...utils.http.token_bucket import TokenBucket
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class Http:
"""
Minimal synchronous HTTP client with:
- requests.Session + urllib3.Retry (429/5xx, backoff, Retry-After)
- optional TokenBucket for simple RPS throttling
- context-managed lifetime
- tiny API: request/get/post/close
"""
def __init__(
self,
*,
config: HttpConfig | None = None,
bucket: TokenBucket | None = None,
session: requests.Session | None = None,
) -> None:
self._cfg = config or HttpConfig()
self._bucket = bucket or TokenBucket()
self._session = session or self._build_session()
logger.debug(f"HTTP client initialized with timeout={self._cfg.timeout_seconds}s")
[docs]
def _build_session(self) -> requests.Session:
"""
Build the session.
Returns:
requests.Session: The session.
"""
session = requests.Session()
headers = dict(self._cfg.headers or {})
headers.setdefault("User-Agent", self._cfg.user_agent)
session.headers.update(headers)
retry = Retry(
total=self._cfg.retry.total,
backoff_factor=self._cfg.retry.backoff_factor,
status_forcelist=self._cfg.retry.status_forcelist,
allowed_methods=self._cfg.retry.allowed_methods,
raise_on_status=self._cfg.retry.raise_on_status,
respect_retry_after_header=self._cfg.retry.respect_retry_after_header,
)
logger.debug(
f"Retry config: total={retry.total}, "
f"allowed_methods={retry.allowed_methods}, "
f"status_forcelist={retry.status_forcelist}, "
f"backoff_factor={retry.backoff_factor}"
)
adapter = HTTPAdapter(
max_retries=retry,
pool_maxsize=self._cfg.pool_maxsize,
pool_connections=self._cfg.pool_connections,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
[docs]
def _response_info(self, response: requests.Response) -> dict[str, Any]:
"""
Get information about a response.
Extracts safe metadata only (no request/response bodies).
Args:
response: The HTTP response object to extract info from.
Returns:
dict[str, Any]: Dictionary containing the response information.
"""
try:
info: dict[str, Any] = {
"status_code": response.status_code,
"url": response.url,
"method": response.request.method,
"reason": response.reason,
"content_type": response.headers.get("Content-Type"),
"content_length": response.headers.get("Content-Length"),
"request_id": (
response.headers.get("X-Request-ID")
or response.headers.get("X-Amzn-RequestId")
or response.headers.get("X-Correlation-ID")
),
}
return info
except AttributeError:
logger.warning("Failed to get full response info, returning partial info")
return {
"status_code": getattr(response, "status_code", None),
"url": getattr(response, "url", None),
"reason": getattr(response, "reason", None),
}
# ---- context ----
@property
def session(self) -> requests.Session:
"""
Get the underlying requests session for direct use.
Allows direct access to session properties like headers.
Returns:
requests.Session: The requests session.
Example:
>>> http = Http()
>>> http.session.headers.update({"Authorization": "Bearer token"})
"""
return self._session
[docs]
def __enter__(self) -> "Http":
return self
[docs]
def __exit__(self, exc_type: Any, exc: Any, tb: Any) -> None:
self.close()
[docs]
def close(self) -> None:
self._session.close()
# ---- request ----
[docs]
def request(self, method: str, url: str, **kwargs: Any) -> requests.Response:
"""
Send an HTTP request with rate limiting, retry logic, and comprehensive logging.
Args:
method: HTTP method (GET, POST, PUT, DELETE, etc.).
url: Target URL for the request.
**kwargs: Additional keyword arguments passed to requests (timeout, headers, data, etc.).
Returns:
requests.Response: The HTTP response object.
Raises:
requests.HTTPError: If the response status code indicates an error.
requests.RequestException: For other request-related errors.
Example:
>>> with Http() as client:
... response = client.request('GET', 'https://api.example.com/data', timeout=30)
... data = response.json()
"""
start_time = time.time()
request_id = f"{method}_{int(start_time * 1000)}"
logger.debug(
f"[{request_id}] Initiating {method} request to {url} with timeout={kwargs.get('timeout', self._cfg.timeout_seconds)}s"
)
kwargs.setdefault("timeout", self._cfg.timeout_seconds)
logger.debug(f"[{request_id}] Acquiring rate limit token (available: {self._bucket.available()})")
self._bucket.acquire()
logger.debug(f"[{request_id}] Rate limit token acquired (remaining: {self._bucket.tokens})")
response_info = None
try:
response = self._session.request(method, url, **kwargs)
duration = time.time() - start_time
response_info = self._response_info(response)
logger.debug(
f"[{request_id}] Request completed in {duration:.3f}s "
f"with status {response.status_code} "
f"and response: {response_info}"
)
response.raise_for_status()
except requests.HTTPError as exc:
logger.error(f"HTTP error: {exc} with response: {response_info}")
if exc.response.status_code == 401:
raise AuthenticationError(
message=f"Authentication error: {exc}",
details={
"response_body": exc.response.text,
"reason": exc.response.reason,
"url": exc.response.url,
"method": exc.response.request.method,
},
) from exc
elif exc.response.status_code == 403:
raise AuthorizationError(
message=f"Authorization error: {exc}",
details={
"response_body": exc.response.text,
"reason": exc.response.reason,
"url": exc.response.url,
"method": exc.response.request.method,
},
) from exc
raise ResourceException(
message=f"HTTP error: {exc}",
status_code=exc.response.status_code,
details={
"response_body": exc.response.text,
"reason": exc.response.reason,
"url": exc.response.url,
"method": exc.response.request.method,
},
) from exc
except requests.exceptions.ConnectionError as exc:
logger.error(f"Connection error: {exc} with response: {response_info}")
raise ConnectionError(
message=f"Connection error: {exc}",
details={
"url": url,
"method": method,
"error_type": type(exc).__name__,
"error_message": str(exc),
},
) from exc
except Exception as exc:
logger.error(f"HTTP request error: {exc} with response: {response_info}")
raise ResourceException(
message=f"HTTP error: {exc}",
details={
"url": url,
"method": method,
"error_type": type(exc).__name__,
"error_message": str(exc),
},
) from exc
return response
# ---- convenience methods ----
[docs]
def get(self, url: str, **kwargs: Any) -> requests.Response:
"""
Send a GET request with enhanced logging.
Args:
url: Target URL for the GET request.
**kwargs: Additional keyword arguments passed to requests.
Returns:
requests.Response: The HTTP response object.
Example:
>>> with Http() as client:
... response = client.get("https://api.example.com/data")
"""
logger.debug(f"GET request to {url}")
return self.request("GET", url, **kwargs)
[docs]
def post(self, url: str, **kwargs: Any) -> requests.Response:
"""
Send a POST request with enhanced logging.
Args:
url: Target URL for the POST request.
**kwargs: Additional keyword arguments passed to requests.
Returns:
requests.Response: The HTTP response object.
Example:
>>> with Http() as client:
... response = client.post("https://api.example.com/data", json={"key": "value"})
"""
logger.debug(f"POST request to {url}")
return self.request("POST", url, **kwargs)
[docs]
def put(self, url: str, **kwargs: Any) -> requests.Response:
"""
Send a PUT request with enhanced logging.
Args:
url: Target URL for the PUT request.
**kwargs: Additional keyword arguments passed to requests.
Returns:
requests.Response: The HTTP response object.
Example:
>>> with Http() as client:
... response = client.put("https://api.example.com/data/1", json={"key": "value"})
"""
logger.debug(f"PUT request to {url}")
return self.request("PUT", url, **kwargs)
[docs]
def delete(self, url: str, **kwargs: Any) -> requests.Response:
"""
Send a DELETE request with enhanced logging.
Args:
url: Target URL for the DELETE request.
**kwargs: Additional keyword arguments passed to requests.
Returns:
requests.Response: The HTTP response object.
Example:
>>> with Http() as client:
... response = client.delete("https://api.example.com/data/1")
"""
logger.debug(f"DELETE request to {url}")
return self.request("DELETE", url, **kwargs)