"""
**File:** ``http.py``
**Region:** ``ds_protocol_http_py_lib/dataset/http``
HTTP Dataset
This module implements a dataset for HTTP APIs.
Example:
>>> from ds_protocol_http_py_lib.enums import AuthType
>>> from ds_protocol_http_py_lib.linked_service import OAuth2AuthSettings
>>> dataset = HttpDataset(
... deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
... serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
... settings=HttpDatasetSettings(
... url="https://api.example.com/data",
... method=HttpMethod.GET,
... ),
... linked_service=HttpLinkedService(
... settings=HttpLinkedServiceSettings(
... host="api.example.com",
... auth_type=AuthType.OAUTH2,
... oauth2=OAuth2AuthSettings(
... token_endpoint="https://auth.example.com/token",
... client_id="my-client",
... client_secret="secret",
... ),
... ),
... ),
... )
>>> dataset.read()
>>> data = dataset.output
"""
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar
import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
ReadError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ResourceException
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
AuthenticationError,
AuthorizationError,
ConnectionError,
)
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from ..enums import HttpMethod, ResourceType
from ..linked_service.http import HttpLinkedService
from ..models import Files
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class HttpDatasetSettings(DatasetSettings):
"""
Settings for HTTP dataset.
"""
method: HttpMethod = HttpMethod.GET
"""The HTTP method to use."""
url: str
"""The URL to send the request to."""
data: Any | None = None
"""The data to send with the request."""
json: dict[str, Any] | None = None
"""The JSON data to send with the request."""
params: dict[str, Any] | None = None
"""The parameters to send with the request."""
files: list[Files] | None = None
"""The multipart files to send with the request."""
headers: dict[str, Any] | None = None
"""The headers to send with the request."""
path_params: dict[str, Any] | None = None
"""Path parameters to interpolate into the URL template using {param} syntax.
Example:
url="https://api.example.com/documents/{document_guid}/original"
path_params={"document_guid": "abc123"}
# → https://api.example.com/documents/abc123/original
"""
HttpDatasetSettingsType = TypeVar(
"HttpDatasetSettingsType",
bound=HttpDatasetSettings,
)
HttpLinkedServiceType = TypeVar(
"HttpLinkedServiceType",
bound=HttpLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class HttpDataset(
TabularDataset[
HttpLinkedServiceType,
HttpDatasetSettingsType,
PandasSerializer,
PandasDeserializer,
],
Generic[HttpLinkedServiceType, HttpDatasetSettingsType],
):
linked_service: HttpLinkedServiceType
settings: HttpDatasetSettingsType
serializer: PandasSerializer | None = field(
default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON),
)
deserializer: PandasDeserializer | None = field(
default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON),
)
@property
def type(self) -> ResourceType:
return ResourceType.DATASET
[docs]
def _resolve_url(self) -> str:
"""Resolve the URL by substituting any path parameters."""
if self.settings.path_params is not None:
try:
return self.settings.url.format(**self.settings.path_params)
except (KeyError, ValueError) as exc:
# Normalize all URL template resolution issues into a ResourceException
details: dict[str, Any] = {
"type": self.type.value,
"url_template": self.settings.url,
"path_params": self.settings.path_params,
}
message = "Failed to resolve URL: missing path parameter"
if isinstance(exc, ValueError):
message = "Failed to resolve URL: invalid URL template"
details["template_error"] = str(exc)
else:
details["missing_path_param"] = str(exc)
raise ResourceException(
message=message,
status_code=400,
details=details,
) from exc
return self.settings.url
[docs]
def create(self) -> None:
"""
Create data at the specified endpoint.
Args:
kwargs: Additional keyword arguments to pass to the request.
Raises:
AuthenticationError: If the authentication fails.
AuthorizationError: If the authorization fails.
ConnectionError: If the connection fails.
CreateError: If the create error occurs.
"""
try:
url = self._resolve_url()
logger.debug(f"Sending {self.settings.method} request to {url}")
response = self.linked_service.connection.request(
method=self.settings.method,
url=url,
data=self.settings.data,
json=self.settings.json,
files=self._map_files(self.settings.files),
params=self.settings.params,
headers=self.settings.headers,
)
except (AuthenticationError, AuthorizationError, ConnectionError) as exc:
raise exc
except ResourceException as exc:
exc.details.update({"type": self.type.value})
raise CreateError(
message=exc.message,
status_code=exc.status_code,
details=exc.details,
) from exc
if response.content and self.deserializer:
self.output = self.deserializer(response.content)
else:
self.output = pd.DataFrame()
[docs]
def read(self) -> None:
"""
Read data from the specified endpoint.
Args:
kwargs: Additional keyword arguments to pass to the request.
Raises:
AuthenticationError: If the authentication fails.
AuthorizationError: If the authorization fails.
ConnectionError: If the connection fails.
ReadError: If the read error occurs.
"""
try:
url = self._resolve_url()
logger.debug(f"Sending {self.settings.method} request to {url}")
response = self.linked_service.connection.request(
method=self.settings.method,
url=url,
data=self.settings.data,
json=self.settings.json,
files=self._map_files(self.settings.files),
params=self.settings.params,
headers=self.settings.headers,
)
except (AuthenticationError, AuthorizationError, ConnectionError) as exc:
raise exc
except ResourceException as exc:
exc.details.update({"type": self.type.value})
raise ReadError(
message=exc.message,
status_code=exc.status_code,
details=exc.details,
) from exc
if response.content and self.deserializer:
self.output = self.deserializer(response.content)
else:
self.output = pd.DataFrame()
[docs]
def delete(self) -> NoReturn:
"""
Delete entity using http.
"""
raise NotSupportedError("Delete operation is not supported for Http datasets")
[docs]
def update(self) -> NoReturn:
"""
Update entity using http.
"""
raise NotSupportedError("Update operation is not supported for Http datasets")
[docs]
def rename(self) -> NoReturn:
"""
Rename entity using http.
"""
raise NotSupportedError("Rename operation is not supported for Http datasets")
[docs]
def upsert(self) -> NoReturn:
"""
Upsert entity using http.
"""
raise NotSupportedError("Upsert operation is not supported for Http datasets")
[docs]
def purge(self) -> NoReturn:
"""
Purge entity using http.
"""
raise NotSupportedError("Purge operation is not supported for Http datasets")
[docs]
def list(self) -> NoReturn:
"""
List entity using http.
"""
raise NotSupportedError("List operation is not supported for Http datasets")
[docs]
def _map_files(self, files: Sequence[Files] | None) -> Any:
"""
Convert typed `Files` descriptors into `requests` compatible `files=...`.
`HttpDatasetSettings.files` is expected to already be deserialized
into the correct typed model, so this method focuses purely on the
`requests` shape conversion.
"""
if not files:
return None
return [(file.field, file.to_requests_file_tuple()) for file in files]
[docs]
def close(self) -> None:
"""
Close the dataset.
"""
self.linked_service.close()