Source code for ds_protocol_http_py_lib.dataset.http

"""
**File:** ``http.py``
**Region:** ``ds_protocol_http_py_lib/dataset/http``

HTTP Dataset

This module implements a dataset for HTTP APIs.

Example:
    >>> from ds_protocol_http_py_lib.enums import AuthType
    >>> from ds_protocol_http_py_lib.linked_service import OAuth2AuthSettings
    >>> dataset = HttpDataset(
    ...     deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
    ...     serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
    ...     settings=HttpDatasetSettings(
    ...         url="https://api.example.com/data",
    ...         method=HttpMethod.GET,
    ...     ),
    ...     linked_service=HttpLinkedService(
    ...         settings=HttpLinkedServiceSettings(
    ...             host="api.example.com",
    ...             auth_type=AuthType.OAUTH2,
    ...             oauth2=OAuth2AuthSettings(
    ...                 token_endpoint="https://auth.example.com/token",
    ...                 client_id="my-client",
    ...                 client_secret="secret",
    ...             ),
    ...         ),
    ...     ),
    ... )
    >>> dataset.read()
    >>> data = dataset.output
"""

from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar

import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.dataset import (
    DatasetSettings,
    DatasetStorageFormatType,
    TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
    CreateError,
    ReadError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ResourceException
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
    AuthenticationError,
    AuthorizationError,
    ConnectionError,
)
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer

from ..enums import HttpMethod, ResourceType
from ..linked_service.http import HttpLinkedService
from ..models import Files

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class HttpDatasetSettings(DatasetSettings): """ Settings for HTTP dataset. """ method: HttpMethod = HttpMethod.GET """The HTTP method to use.""" url: str """The URL to send the request to.""" data: Any | None = None """The data to send with the request.""" json: dict[str, Any] | None = None """The JSON data to send with the request.""" params: dict[str, Any] | None = None """The parameters to send with the request.""" files: list[Files] | None = None """The multipart files to send with the request.""" headers: dict[str, Any] | None = None """The headers to send with the request.""" path_params: dict[str, Any] | None = None """Path parameters to interpolate into the URL template using {param} syntax. Example: url="https://api.example.com/documents/{document_guid}/original" path_params={"document_guid": "abc123"} # → https://api.example.com/documents/abc123/original """
HttpDatasetSettingsType = TypeVar( "HttpDatasetSettingsType", bound=HttpDatasetSettings, ) HttpLinkedServiceType = TypeVar( "HttpLinkedServiceType", bound=HttpLinkedService[Any], )
[docs] @dataclass(kw_only=True) class HttpDataset( TabularDataset[ HttpLinkedServiceType, HttpDatasetSettingsType, PandasSerializer, PandasDeserializer, ], Generic[HttpLinkedServiceType, HttpDatasetSettingsType], ): linked_service: HttpLinkedServiceType settings: HttpDatasetSettingsType serializer: PandasSerializer | None = field( default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON), ) deserializer: PandasDeserializer | None = field( default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON), ) @property def type(self) -> ResourceType: return ResourceType.DATASET
[docs] def _resolve_url(self) -> str: """Resolve the URL by substituting any path parameters.""" if self.settings.path_params is not None: try: return self.settings.url.format(**self.settings.path_params) except (KeyError, ValueError) as exc: # Normalize all URL template resolution issues into a ResourceException details: dict[str, Any] = { "type": self.type.value, "url_template": self.settings.url, "path_params": self.settings.path_params, } message = "Failed to resolve URL: missing path parameter" if isinstance(exc, ValueError): message = "Failed to resolve URL: invalid URL template" details["template_error"] = str(exc) else: details["missing_path_param"] = str(exc) raise ResourceException( message=message, status_code=400, details=details, ) from exc return self.settings.url
[docs] def create(self) -> None: """ Create data at the specified endpoint. Args: kwargs: Additional keyword arguments to pass to the request. Raises: AuthenticationError: If the authentication fails. AuthorizationError: If the authorization fails. ConnectionError: If the connection fails. CreateError: If the create error occurs. """ try: url = self._resolve_url() logger.debug(f"Sending {self.settings.method} request to {url}") response = self.linked_service.connection.request( method=self.settings.method, url=url, data=self.settings.data, json=self.settings.json, files=self._map_files(self.settings.files), params=self.settings.params, headers=self.settings.headers, ) except (AuthenticationError, AuthorizationError, ConnectionError) as exc: raise exc except ResourceException as exc: exc.details.update({"type": self.type.value}) raise CreateError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc if response.content and self.deserializer: self.output = self.deserializer(response.content) else: self.output = pd.DataFrame()
[docs] def read(self) -> None: """ Read data from the specified endpoint. Args: kwargs: Additional keyword arguments to pass to the request. Raises: AuthenticationError: If the authentication fails. AuthorizationError: If the authorization fails. ConnectionError: If the connection fails. ReadError: If the read error occurs. """ try: url = self._resolve_url() logger.debug(f"Sending {self.settings.method} request to {url}") response = self.linked_service.connection.request( method=self.settings.method, url=url, data=self.settings.data, json=self.settings.json, files=self._map_files(self.settings.files), params=self.settings.params, headers=self.settings.headers, ) except (AuthenticationError, AuthorizationError, ConnectionError) as exc: raise exc except ResourceException as exc: exc.details.update({"type": self.type.value}) raise ReadError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc if response.content and self.deserializer: self.output = self.deserializer(response.content) else: self.output = pd.DataFrame()
[docs] def delete(self) -> NoReturn: """ Delete entity using http. """ raise NotSupportedError("Delete operation is not supported for Http datasets")
[docs] def update(self) -> NoReturn: """ Update entity using http. """ raise NotSupportedError("Update operation is not supported for Http datasets")
[docs] def rename(self) -> NoReturn: """ Rename entity using http. """ raise NotSupportedError("Rename operation is not supported for Http datasets")
[docs] def upsert(self) -> NoReturn: """ Upsert entity using http. """ raise NotSupportedError("Upsert operation is not supported for Http datasets")
[docs] def purge(self) -> NoReturn: """ Purge entity using http. """ raise NotSupportedError("Purge operation is not supported for Http datasets")
[docs] def list(self) -> NoReturn: """ List entity using http. """ raise NotSupportedError("List operation is not supported for Http datasets")
[docs] def _map_files(self, files: Sequence[Files] | None) -> Any: """ Convert typed `Files` descriptors into `requests` compatible `files=...`. `HttpDatasetSettings.files` is expected to already be deserialized into the correct typed model, so this method focuses purely on the `requests` shape conversion. """ if not files: return None return [(file.field, file.to_requests_file_tuple()) for file in files]
[docs] def close(self) -> None: """ Close the dataset. """ self.linked_service.close()