"""
**File:** ``soap.py``
**Region:** ``ds_protocol_soap_py_lib/dataset/soap``
SOAP Dataset
This module implements a dataset for SOAP APIs.
Example:
>>> import uuid
>>> from ds_protocol_soap_py_lib import SoapLinkedService, SoapLinkedServiceSettings
>>> from ds_protocol_soap_py_lib.dataset.soap import SoapDataset, SoapDatasetSettings
>>> from ds_protocol_soap_py_lib.enums import AuthType
>>> from ds_protocol_soap_py_lib.linked_service.soap import ParameterBasedAuthSettings
>>> linked_service = SoapLinkedService(
... id=uuid.uuid4(),
... name="example::linked_service",
... version="1.0.0",
... settings=SoapLinkedServiceSettings(
... wsdl="https://example.com/service?wsdl",
... auth_type=AuthType.PARAMETER_BASED,
... auth_test_method="Ping",
... parameter_based=ParameterBasedAuthSettings(
... auth_param_key1="apiKey",
... auth_param_value1="my-token",
... ),
... ),
... )
>>> linked_service.connect()
>>> dataset = SoapDataset(
... id=uuid.uuid4(),
... name="example::dataset",
... version="1.0.0",
... settings=SoapDatasetSettings(method="GetRecords"),
... linked_service=linked_service,
... )
>>> dataset.read()
>>> data = dataset.output
"""
import builtins
from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar
import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
ReadError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from zeep.helpers import serialize_object
from ..enums import ResourceType
from ..linked_service.soap import SoapLinkedService
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class SoapDatasetSettings(DatasetSettings):
"""
Settings for SOAP dataset.
"""
method: str
"""The SOAP method to call."""
kwargs: dict[str, Any] = field(default_factory=dict)
"""
Additional keyword arguments to pass to the SOAP method alongside
the auth parameters from the linked service.
"""
SoapDatasetSettingsType = TypeVar(
"SoapDatasetSettingsType",
bound=SoapDatasetSettings,
)
SoapLinkedServiceType = TypeVar(
"SoapLinkedServiceType",
bound=SoapLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class SoapDataset(
TabularDataset[
SoapLinkedServiceType,
SoapDatasetSettingsType,
PandasSerializer,
PandasDeserializer,
],
Generic[SoapLinkedServiceType, SoapDatasetSettingsType],
):
"""
Dataset for SOAP APIs.
Calls a configured SOAP method via the linked service connection.
Authentication parameters from the linked service are automatically
injected into each call.
``read()`` fetches data from the SOAP endpoint and populates ``self.output``.
``create()`` sends data to the SOAP endpoint.
All other operations raise ``NotSupportedError``.
"""
linked_service: SoapLinkedServiceType
settings: SoapDatasetSettingsType
deserializer: PandasDeserializer | None = field(
default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.SEMI_STRUCTURED_JSON),
)
@property
def type(self) -> ResourceType:
return ResourceType.DATASET
[docs]
def _invoke_method(self, error_cls: builtins.type[ReadError | CreateError]) -> Any:
"""
Call the configured SOAP method and return the serialized response.
Returns ``None`` if the SOAP response is empty.
Args:
error_cls: The error class to raise on failure (``ReadError`` or ``CreateError``).
Raises:
ReadError | CreateError: If the SOAP call fails.
"""
try:
method = getattr(self.linked_service.connection.service, self.settings.method)
response = method(
**self.linked_service.body_auth_params,
**self.settings.kwargs,
)
except Exception as exc:
logger.exception("SOAP call failed")
raise error_cls(
message=f"SOAP call failed for method {self.settings.method}: {exc}",
details={"type": self.type.value, "method": self.settings.method},
) from exc
serialized = serialize_object(response) # type: ignore[no-untyped-call]
if serialized is None:
logger.info(f"SOAP method {self.settings.method} returned empty response")
return None
return serialized
[docs]
def read(self) -> None:
"""
Call the configured SOAP method and populate ``self.output``.
The zeep response is serialised to native Python types via
``zeep.helpers.serialize_object`` and normalised into a DataFrame.
Raises:
ReadError: If the SOAP call fails or no deserializer is configured.
"""
logger.info(f"Calling SOAP method {self.settings.method}")
serialized = self._invoke_method(ReadError)
if serialized is None:
self.output = pd.DataFrame()
return
if not self.deserializer:
raise ReadError(
message="No deserializer configured for SOAP dataset",
details={"type": self.type.value, "method": self.settings.method},
)
self.output = self.deserializer(serialized)
[docs]
def create(self) -> None:
"""
Call the configured SOAP method to create an entity.
Returns immediately if ``self.input`` is empty (no-op).
Calls the SOAP method with auth params and ``settings.kwargs``.
Sets ``self.output`` to the deserialized backend response, or a
copy of ``self.input`` if the response is empty or no deserializer is configured.
Raises:
CreateError: If the SOAP call fails.
"""
if self.input is None or self.input.empty:
return
logger.info(f"Calling SOAP method {self.settings.method}")
serialized = self._invoke_method(CreateError)
if serialized is None or not self.deserializer:
self.output = self.input.copy()
return
self.output = self.deserializer(serialized)
[docs]
def update(self) -> NoReturn:
raise NotSupportedError("Update operation is not supported for SOAP datasets")
[docs]
def delete(self) -> NoReturn:
raise NotSupportedError("Delete operation is not supported for SOAP datasets")
[docs]
def upsert(self) -> NoReturn:
raise NotSupportedError("Upsert operation is not supported for SOAP datasets")
[docs]
def purge(self) -> NoReturn:
raise NotSupportedError("Purge operation is not supported for SOAP datasets")
[docs]
def list(self) -> NoReturn:
raise NotSupportedError("List operation is not supported for SOAP datasets")
[docs]
def rename(self) -> NoReturn:
raise NotSupportedError("Rename operation is not supported for SOAP datasets")
[docs]
def close(self) -> None:
"""
Close the linked service connection.
"""
self.linked_service.close()