Source code for ds_provider_simployer_py_lib.dataset.simployer

"""
**File:** ``simployer.py``
**Region:** ``ds_provider_simployer_py_lib/dataset/simployer.py``

Simployer Dataset

This module implements a dataset for Simployer APIs.

Example:
    >>> from uuid import uuid4
    >>> dataset = SimployerDataset(
    ...     id=uuid4(),
    ...     name="employees_dataset",
    ...     version="1.0.0",
    ...     settings=SimployerDatasetSettings(
    ...         data_product=SimployerDataProducts.EMPLOYEES,
    ...         read=ReadSettings(page_size=100),
    ...     ),
    ...     linked_service=SimployerLinkedService(
    ...         id=uuid4(),
    ...         name="simployer_connection",
    ...         version="1.0.0",
    ...         settings=SimployerLinkedServiceSettings(
    ...             client_id="your_client_id",
    ...             client_secret="your_client_secret",
    ...         ),
    ...     ),
    ... )
    >>> linked_service = dataset.linked_service
    >>> linked_service.connect()
    >>> dataset.read()
    >>> data = dataset.output
"""

import re
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar

import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib import Serializable
from ds_resource_plugin_py_lib.common.resource.dataset import DatasetSettings, DatasetStorageFormatType, TabularDataset
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
    CreateError,
    DeleteError,
    ReadError,
    UpdateError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer

from ..endpoint_info import EndpointInfo
from ..enums import ResourceType, SimployerDataProducts
from ..linked_service.simployer import SimployerLinkedService

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class ReadSettings(Serializable): """Settings specific to the read() operation. These settings only apply when reading data from the API and do not affect create(), update() or delete() operations When set, pagination settings are ignored and a single GET request is made to the resource-specific endpoint (e.g., /v1/persons/{id}). Note: Not all endpoints support single-record lookup (e.g., /v1/employees). """ resource_id: str | None = None """For read operations, if set, indicates a single resource lookup by ID (e.g., /v1/persons/{id}). If None, read() performs based on read settings (pagination, filters, etc.) on the collection endpoint (e.g., /v1/employees). Note: Not all data products support single-record lookup. """ page: int = 1 """Page number for pagination. Default is 1.""" page_size: int = 100 """Number of records per page for pagination. Default is 100.""" from_date: str | None = None """Start date for filtering data, in YYYY-MM-DD format. Optional.""" to_date: str | None = None """End date for filtering data, in YYYY-MM-DD format. Optional.""" filters: dict[str, Any] | None = None """Additional filters for the API request. Optional."""
[docs] @dataclass(kw_only=True) class SimployerDatasetSettings(DatasetSettings): data_product: SimployerDataProducts | None = None """Data product associated with this dataset (e.g., "employees"). Used to determine the API endpoint and other settings. """ read: ReadSettings = field(default_factory=ReadSettings) """Settings for read()."""
SimployerDatasetSettingsType = TypeVar( "SimployerDatasetSettingsType", bound=SimployerDatasetSettings, ) SimployerLinkedServiceType = TypeVar( "SimployerLinkedServiceType", bound=SimployerLinkedService[Any], )
[docs] @dataclass(kw_only=True) class SimployerDataset( TabularDataset[SimployerLinkedServiceType, SimployerDatasetSettingsType, PandasSerializer, PandasDeserializer], Generic[SimployerLinkedServiceType, SimployerDatasetSettingsType], ): linked_service: SimployerLinkedServiceType settings: SimployerDatasetSettingsType serializer: PandasSerializer | None = field( default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON), ) deserializer: PandasDeserializer | None = field( default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON), ) @property def type(self) -> ResourceType: return ResourceType.SIMPLOYER_DATASET @property def supports_checkpoint(self) -> bool: """ Whether this provider supports incremental loads via ``self.checkpoint``. This implementation uses a simple dictionary-based checkpoint structure to support resuming paginated reads: - On a full load, ``self.checkpoint`` is expected to be empty (``{}``) or ``None``. In this case, :meth:`read` starts from page ``1``. - After each successfully read page, :meth:`read` sets ``self.checkpoint = {"last_page": page}``, where ``page`` is the last completed page number. - On a subsequent run, if ``self.checkpoint`` contains a ``"last_page"`` entry, :meth:`read` resumes from ``last_page + 1`` and continues fetching data from the Simployer API. This allows consumers to perform incremental loads by persisting and reusing the checkpoint between executions, avoiding re-reading pages that were already processed successfully. Returns: bool: True if checkpointing is supported, False otherwise. """ return True
[docs] def read(self) -> None: """ Read data from the requested endpoint of the Simployer API. Raises: ReadError: If reading data fails. """ logger.info("Reading data from Simployer API for product: %s", self.settings.data_product) session = self.linked_service.connection # Ensure data_product is not None if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") # Single resource lookup by ID if self.settings.read.resource_id: self._read_single_resource(session) return # Collection read with pagination self._read_collection(session)
[docs] def _read_single_resource(self, session: Any) -> None: """Fetch a single resource by ID.""" resource_id = self.settings.read.resource_id if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") url = self._build_url(self.settings.data_product, mode="read") logger.info("Fetching single resource: %s", url) try: response = session.get(url=url) record = response.json() self.output = pd.DataFrame([record] if isinstance(record, dict) else record) except Exception as exc: logger.error("Failed to read single resource: %s", exc) raise ReadError( message=f"Failed to read {self.settings.data_product} with ID {resource_id}", details={ "resource_id": resource_id, "data_product": self.settings.data_product, }, ) from exc
[docs] def _read_collection(self, session: Any) -> None: """Fetch collection with pagination and update checkpoint with max 'updated' value.""" # If checkpoint has from_date, set it in settings for incremental load if self.checkpoint and "from_date" in self.checkpoint: self.settings.read.from_date = self.checkpoint["from_date"] page = self.checkpoint.get("last_page", 0) + 1 if self.checkpoint else self.settings.read.page logger.info("%s load from page %s", "Resuming incremental" if self.checkpoint else "Starting full", page) all_records: list[dict[str, Any]] = [] last_successful_page = page - 1 # Track last completed page try: if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") if not EndpointInfo.supports_method(self.settings.data_product, "GET"): raise NotSupportedError(f"Read (GET) not supported for data product '{self.settings.data_product.value}'.") while True: params = self._build_params(page) url = self._build_url(self.settings.data_product) response = session.get(url=url, params=params) records = response.json() if isinstance(records, list): all_records.extend(records) elif records is not None: all_records.extend([records]) last_successful_page = page # Pagination info is in response headers (x-has-next-page) has_next = response.headers.get("x-has-next-page", "false").lower() == "true" if not has_next: break page += 1 except Exception as exc: logger.error("Failed to read data from Simployer API: %s", exc) raise ReadError( message=f"Failed to read data from Simployer API for product {self.settings.data_product} at page {page}", details={ "last_successful_page": last_successful_page, "failed_page": page, "data_product": self.settings.data_product, "settings": self.settings.read.serialize(), }, ) from exc finally: # Always set output as a DataFrame, even if all_records is empty self.output = pd.DataFrame(all_records) self.checkpoint = self._build_checkpoint(last_successful_page)
[docs] def create(self) -> None: """ Insert rows into Simployer API. Reads from self.input (which must be a pandas DataFrame) and POSTs to the configured endpoint. Results are stored in self.output. Input Requirement: - self.input must be a pandas DataFrame with columns matching the Simployer endpoint schema. - Only one record per create() call is allowed (one row in the DataFrame). - Users must convert their data (dict, JSON, etc.) to a DataFrame before assigning to self.input. Example: import pandas as pd data = { "firstName": "John", "lastName": "Doe", "primaryEmail": "john.doe@example.com", "affiliatedOrganizationId": "org-12345" } dataset.input = pd.DataFrame([data]) dataset.create() The caller can also provide raw data and use the deserializer to convert: dataset.input = dataset.deserializer.deserialize(raw_bytes) dataset.create() Raises: NotSupportedError: If the configured data product does not support create (POST). CreateError: If creating records fails. """ if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") if EndpointInfo.supports_method(self.settings.data_product, "POST") is False: raise NotSupportedError(f"Create (POST) not supported for data product '{self.settings.data_product.value}'.") if self.input is None or self.input.empty: logger.info("No input data to create, returning empty output") self.output = pd.DataFrame() return # Capacity limit: Simployer accepts 1 record per POST for atomicity if len(self.input) > 1: raise CreateError( message="Simployer API accepts 1 record per request. Caller must batch.", details={"input_rows": len(self.input), "capacity": 1}, ) # Don't mutate self.input - work on copy row = self.input.iloc[0].to_dict() session = self.linked_service.connection # Use data_product from settings url = self._build_url(self.settings.data_product, mode="create") logger.info("Creating record at %s", url) try: response = session.post(url=url, json=row) result = response.json() # Populate self.output with backend response self.output = pd.DataFrame([result] if isinstance(result, dict) else result) except Exception as exc: logger.error("Failed to create record: %s", exc) raise CreateError( message=f"Failed to create {self.settings.data_product.value}", details={"data_product": self.settings.data_product.value}, ) from exc
[docs] def delete(self) -> None: """ Delete rows from Simployer API. Reads from self.input (which must be a pandas DataFrame) and issues a DELETE to the configured endpoint. Results are stored in self.output. Capacity Limit: The Simployer API accepts only 1 record per DELETE request. If self.input contains more than 1 row, this method raises DeleteError. The caller must batch: split self.input into single-row chunks and call delete() once per chunk. Input Requirement: - self.input must be a pandas DataFrame. - Only one record per delete() call is allowed (one row in the DataFrame). - Users must convert their data (dict, JSON, etc.) to a DataFrame before assigning to self.input. Example: import pandas as pd dataset.input = pd.DataFrame([{"id": "12345"}]) dataset.delete() Per contract: - Empty input is a no-op (returns immediately without contacting the backend). - Deleting a row that does not exist is not an error and is idempotent. Raises: NotSupportedError: If the configured data product does not support delete (DELETE). DeleteError: If the input exceeds capacity (more than 1 row) or if deletion fails. """ if self.input is None or self.input.empty: logger.info("No input data to delete, returning empty output") self.output = pd.DataFrame() return if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") if not EndpointInfo.supports_method(self.settings.data_product, "DELETE"): raise NotSupportedError(f"Delete (DELETE) not supported for data product '{self.settings.data_product.value}'.") # Capacity limit: Simployer accepts 1 record per DELETE for atomicity if len(self.input) > 1: raise DeleteError( message="Simployer API accepts 1 record per request. Caller must batch.", details={"input_rows": len(self.input), "capacity": 1}, ) url = self._build_url(self.settings.data_product, mode="delete") logger.info("Deleting resource at %s", url) session = self.linked_service.connection try: response = session.delete(url=url) result = response.json() if response.content else None if result: self.output = pd.DataFrame([result] if isinstance(result, dict) else result) else: self.output = self.input.copy() except Exception as exc: logger.error("Failed to delete resource: %s", exc) raise DeleteError( message=f"Failed to delete {self.settings.data_product.value}", details={"data_product": self.settings.data_product.value}, ) from exc
[docs] def update(self) -> None: """ Update an existing row in Simployer API. Reads from self.input (which must be a pandas DataFrame) and PUTs to the configured endpoint. Results are stored in self.output. Capacity Limit: The Simployer API accepts only 1 record per PUT request. If self.input contains more than 1 row, this method raises UpdateError. The caller must batch: split self.input into single-row chunks and call update() once per chunk. Input Requirement: - self.input must be a pandas DataFrame with one row. - Only one record per update() call is allowed (one row in the DataFrame). - Users must convert their data (dict, JSON, etc.) to a DataFrame before assigning to self.input. Example: import pandas as pd dataset.input = pd.DataFrame([{"id": "12345", "status": "active"}]) dataset.update() Per contract: - Empty input is a no-op (returns immediately without contacting the backend). - Must not insert new rows (non-existent resources result in an error). - Idempotent: Yes. Updating a row to the same values has no effect. Raises: NotSupportedError: If the configured data product does not support update (PUT). UpdateError: If the input exceeds capacity (more than 1 row) or if the update fails. """ if self.input is None or self.input.empty: logger.info("No input data to update, returning empty output") self.output = pd.DataFrame() return if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") if not EndpointInfo.supports_method(self.settings.data_product, "PUT"): raise NotSupportedError(f"Update (PUT) not supported for data product '{self.settings.data_product.value}'.") # Capacity limit: Simployer accepts 1 record per PUT for atomicity if len(self.input) > 1: raise UpdateError( message="Simployer API accepts 1 record per request. Caller must batch.", details={"input_rows": len(self.input), "capacity": 1}, ) # Don't mutate self.input - work on copy row = self.input.iloc[0].to_dict() session = self.linked_service.connection url = self._build_url(self.settings.data_product, mode="update") logger.info("Updating resource at %s", url) try: response = session.put(url=url, json=row) status = response.status_code status_map = { 404: "NotFoundError", 400: "BadRequest", 401: "Unauthorized", } if status in status_map: raise UpdateError( message=f"Failed to update {self.settings.data_product.value}", details={"status": status, "error": status_map[status], "detail": response.text}, ) elif status in (200, 201, 204): if response.content: try: result = response.json() self.output = pd.DataFrame([result] if isinstance(result, dict) else result) except ValueError: logger.warning("Non-JSON response for successful update: %s", response.text) self.output = self.input.copy() else: self.output = self.input.copy() else: raise UpdateError( message=f"Failed to update {self.settings.data_product.value}", details={"status": status, "detail": response.text}, ) except UpdateError: raise except Exception as exc: logger.error("Failed to update resource: %s", exc) raise UpdateError( message=f"Failed to update {self.settings.data_product.value}", details={"data_product": self.settings.data_product.value}, ) from exc
[docs] def close(self) -> None: """Release any resources held by the dataset. For Simployer, the dataset holds no resources directly. Connection lifecycle is managed by the linked service. """
[docs] def rename(self) -> None: raise NotSupportedError("Method (rename) not supported by Simployer provider.")
[docs] def list(self) -> None: raise NotSupportedError("Method (list) not supported by Simployer provider.")
[docs] def upsert(self) -> None: raise NotSupportedError("Method (upsert) not supported by Simployer provider.")
[docs] def purge(self) -> None: raise NotSupportedError("Method (purge) not supported by Simployer provider.")
[docs] def _build_params(self, page: int) -> dict[str, Any]: """Build query parameters for the API request.""" params: dict[str, Any] = {"page": page, "pageSize": self.settings.read.page_size} if self.settings.read.from_date: params["fromDate"] = self.settings.read.from_date if self.settings.read.to_date: params["toDate"] = self.settings.read.to_date if self.settings.read.filters: params.update(self.settings.read.filters) return params
[docs] def _build_checkpoint(self, last_page: int) -> dict[str, Any]: """Build checkpoint dictionary for incremental load support. Sets from_date to the ISO-8601 string representation of the maximum 'updated' value in self.output if available. This ensures the checkpoint is JSON-serializable. Checkpoint structure: { "last_page": int, "page_size": int, "to_date": str | None (ISO-8601 format), "from_date": str | None (ISO-8601 format), "data_product": str } """ if self.settings.data_product is None: raise NotSupportedError("Data product must be specified.") checkpoint = { "last_page": last_page, "page_size": self.settings.read.page_size, "to_date": self.settings.read.to_date, "data_product": self.settings.data_product.value, "from_date": None, } # Set from_date from max 'updated' in self.output if available if ( hasattr(self, "output") and isinstance(self.output, pd.DataFrame) and not self.output.empty and "updated" in self.output.columns ): updated_values = self.output["updated"].dropna() if not updated_values.empty: max_updated = updated_values.max() # Convert pandas.Timestamp to ISO-8601 string for JSON serialization # (if it's already a string, use it as-is) if pd.notna(max_updated): if isinstance(max_updated, str): checkpoint["from_date"] = max_updated else: checkpoint["from_date"] = max_updated.isoformat() return checkpoint
[docs] def _build_url(self, data_product: SimployerDataProducts, mode: str = "read") -> str: """Construct the API endpoint URL based on the data product and operation mode. Handles path parameters for read, create, update, and delete operations. :param data_product: The SimployerDataProducts enum value indicating which API endpoint to target. :param mode: Operation mode ("read", "create", "update", "delete"). :return: The full URL for the API request. :raises ReadError: If mode is "read" and endpoint is missing or parameter cannot be resolved. :raises CreateError: If mode is "create" and endpoint is missing or parameter cannot be resolved. :raises UpdateError: If mode is "update" and endpoint is missing or parameter cannot be resolved. :raises DeleteError: If mode is "delete" and endpoint is missing or parameter cannot be resolved. """ # Map mode to the appropriate error class error_map = { "read": ReadError, "create": CreateError, "update": UpdateError, "delete": DeleteError, } error_class = error_map.get(mode, ReadError) base_endpoint = EndpointInfo.get_endpoint_for_product(data_product) if base_endpoint is None: raise error_class(message=f"No endpoint configured for data product '{data_product!s}'.") host = self.linked_service.settings.host.rstrip("/") endpoint = base_endpoint pattern = re.compile(r"{([^}]+)}") matches = pattern.findall(base_endpoint) if matches: for param_name in matches: param_value = None if mode in ("create", "delete", "update"): if ( hasattr(self, "input") and self.input is not None and not self.input.empty and param_name in self.input.columns ): param_value = self.input.iloc[0][param_name] elif mode == "read" and hasattr(self.settings.read, "resource_id") and self.settings.read.resource_id: param_value = self.settings.read.resource_id if param_value is None: raise error_class( message=( f"Cannot build URL: path parameter '{{{param_name}}}' requires a value " f"but none was provided for mode '{mode}'." ) ) endpoint = endpoint.replace(f"{{{param_name}}}", str(param_value)) elif mode == "read" and hasattr(self.settings.read, "resource_id") and self.settings.read.resource_id: endpoint = f"{endpoint}/{self.settings.read.resource_id}" return f"{host}{endpoint}"