Source code for ds_provider_xledger_py_lib.dataset.xledger

"""
**File:** ``xledger.py``
**Region:** ``ds_provider_xledger_py_lib/dataset``

Description
-----------
Xledger dataset implementation.

This module follows the same high-level architecture as the HTTP
protocol dataset:

- linked service owns transport and authentication
- dataset owns operation intent and contract behavior
- serializer builds GraphQL query strings from tabular input
- deserializer converts API response content to ``pandas.DataFrame``
"""

from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar

import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.dataset import DatasetSettings, TabularDataset
from ds_resource_plugin_py_lib.common.resource.dataset.errors import CreateError, DeleteError, ReadError, UpdateError
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ResourceException

from ..enums import ObjectStatus, OperationType, OwnerSet, ResourceType
from ..linked_service.xledger import XledgerLinkedService
from ..serde.deserializer import XledgerDeserializer
from ..serde.serializer import XledgerSerializer
from ..utils.graphql import raise_for_graphql_errors
from ..utils.introspection import IntrospectionService
from .engines.read import ReadEngine

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class XledgerReadSettings(DatasetSettings): """Settings for Xledger read operations.""" first: int = 1000 """The number of records to return.""" last: int | None = None """The last record to return.""" before: str | None = None """The cursor to return the previous page of results.""" after: str | None = None """The cursor to return the next page of results.""" filter: dict[str, Any] | None = None """The filter to apply to the query.""" owner_set: OwnerSet | None = None """The owner set to return.""" object_status: ObjectStatus | None = None """The object status to return.""" columns: list[str] | None = None """The columns to return.""" pagination: bool = False """Whether to return pagination information."""
[docs] @dataclass(kw_only=True) class XledgerCreateSettings(DatasetSettings): """Settings for Xledger create operations.""" return_columns: list[str] | None = None """The columns to return."""
[docs] @dataclass(kw_only=True) class XledgerUpdateSettings(DatasetSettings): """Settings for Xledger update operations.""" return_columns: list[str] | None = None """The columns to return."""
[docs] @dataclass(kw_only=True) class XledgerDeleteSettings(DatasetSettings): """Settings for Xledger delete operations.""" return_columns: list[str] | None = None """The columns to return."""
[docs] @dataclass(kw_only=True) class XledgerDatasetSettings(DatasetSettings): """Settings for Xledger dataset operations.""" entrypoint: str """Xledger entrypoint name targeted by dataset operations.""" read: XledgerReadSettings = field(default_factory=XledgerReadSettings) """Settings for Xledger read operations.""" create: XledgerCreateSettings = field(default_factory=XledgerCreateSettings) """Settings for Xledger create operations.""" update: XledgerUpdateSettings = field(default_factory=XledgerUpdateSettings) """Settings for Xledger update operations.""" delete: XledgerDeleteSettings = field(default_factory=XledgerDeleteSettings) """Settings for Xledger delete operations."""
XledgerDatasetSettingsType = TypeVar( "XledgerDatasetSettingsType", bound=XledgerDatasetSettings, ) XledgerLinkedServiceType = TypeVar( "XledgerLinkedServiceType", bound=XledgerLinkedService[Any], )
[docs] @dataclass(kw_only=True) class XledgerDataset( TabularDataset[ XledgerLinkedServiceType, XledgerDatasetSettingsType, XledgerSerializer, XledgerDeserializer, ], Generic[XledgerLinkedServiceType, XledgerDatasetSettingsType], ): """Tabular dataset for Xledger GraphQL operations.""" linked_service: XledgerLinkedServiceType settings: XledgerDatasetSettingsType serializer: XledgerSerializer | None = field(default_factory=XledgerSerializer) deserializer: XledgerDeserializer | None = field(default_factory=XledgerDeserializer) introspection: IntrospectionService = field(init=False, repr=False, metadata={"serialize": False})
[docs] def __post_init__(self) -> None: self.serializer = XledgerSerializer() self.deserializer = XledgerDeserializer() self.introspection = IntrospectionService(entrypoint=self.settings.entrypoint) self.introspection.load()
@property def supports_checkpoint(self) -> bool: """Whether this dataset supports checkpointing.""" return True @property def type(self) -> ResourceType: """Return the dataset resource type.""" return ResourceType.DATASET
[docs] def read(self) -> None: """Execute a GraphQL query and store the result in ``self.output``. Raises: AuthenticationError: If authentication fails. AuthorizationError: If authorization fails. ConnectionError: If the transport cannot reach the endpoint. ReadError: If query execution fails. """ logger.debug("Sending GraphQL read requests to Xledger") if not self.deserializer: raise ReadError( message="Deserializer is not set", status_code=500, details={"type": self.type.value}, ) reader = ReadEngine( connection=self.linked_service.connection, host=self.linked_service.settings.host, deserializer=self.deserializer, metadata=self.introspection.metadata.get(operation=OperationType.READ), ) try: reader.execute( read_settings=self.settings.read, checkpoint=self.checkpoint, ) except ResourceException as exc: exc.details.update({"type": self.type.value}) raise ReadError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc finally: self.output = pd.concat(reader.output, ignore_index=True) if reader.output else pd.DataFrame() self.checkpoint = reader.checkpoint.serialize()
[docs] def create(self) -> None: """Execute a GraphQL mutation built from ``self.input``. For empty input, this method is a no-op and returns successfully. Raises: AuthenticationError: If authentication fails. AuthorizationError: If authorization fails. ConnectionError: If the transport cannot reach the endpoint. CreateError: If mutation execution fails. """ if self.input.empty: logger.debug("Input is empty, skipping create operation") self.output = self.input.copy() return if not self.serializer or not self.deserializer: raise CreateError( message="Serializer or deserializer is not set", status_code=500, details={"type": self.type.value}, ) logger.debug("Sending GraphQL create request to Xledger") try: payload = self.serializer( self.input, operation=OperationType.CREATE, metadata=self.introspection.metadata.get(operation=OperationType.CREATE), operation_settings=self.settings.create, ) response = self.linked_service.connection.post( url=self.linked_service.settings.host, json=payload, ) raise_for_graphql_errors(body=response.json()) except ResourceException as exc: exc.details.update({"type": self.type.value}) raise CreateError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc logger.debug("Deserializing response to dataframe") self.output = self.deserializer( response.json(), metadata=self.introspection.metadata.get(operation=OperationType.CREATE), operation_settings=self.settings.create, )
[docs] def update(self) -> None: """Execute a GraphQL mutation built from ``self.input``. For empty input, this method is a no-op and returns successfully. Raises: AuthenticationError: If authentication fails. AuthorizationError: If authorization fails. ConnectionError: If the transport cannot reach the endpoint. UpdateError: If update operation fails. """ if self.input.empty: logger.debug("Input is empty, skipping update operation") self.output = self.input.copy() return if not self.serializer or not self.deserializer: raise UpdateError( message="Serializer or deserializer is not set", status_code=500, details={"type": self.type.value}, ) logger.debug("Sending GraphQL update request to Xledger") try: payload = self.serializer( self.input, operation=OperationType.UPDATE, metadata=self.introspection.metadata.get(operation=OperationType.UPDATE), operation_settings=self.settings.update, ) response = self.linked_service.connection.post( url=self.linked_service.settings.host, json=payload, ) raise_for_graphql_errors(body=response.json()) except ResourceException as exc: exc.details.update({"type": self.type.value}) raise UpdateError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc logger.debug("Deserializing response to dataframe") self.output = self.deserializer( response.json(), metadata=self.introspection.metadata.get(operation=OperationType.UPDATE), operation_settings=self.settings.update, )
[docs] def delete(self) -> None: """Execute a GraphQL mutation built from ``self.input``. For empty input, this method is a no-op and returns successfully. Raises: AuthenticationError: If authentication fails. AuthorizationError: If authorization fails. ConnectionError: If the transport cannot reach the endpoint. DeleteError: If delete operation fails. """ if self.input.empty: logger.debug("Input is empty, skipping delete operation") self.output = self.input.copy() return if not self.serializer or not self.deserializer: raise DeleteError( message="Serializer or deserializer is not set", status_code=500, details={"type": self.type.value}, ) logger.debug("Sending GraphQL delete request to Xledger") try: payload = self.serializer( self.input, operation=OperationType.DELETE, metadata=self.introspection.metadata.get(operation=OperationType.DELETE), operation_settings=self.settings.delete, ) response = self.linked_service.connection.post( url=self.linked_service.settings.host, json=payload, ) raise_for_graphql_errors(body=response.json()) except ResourceException as exc: exc.details.update({"type": self.type.value}) raise DeleteError( message=exc.message, status_code=exc.status_code, details=exc.details, ) from exc logger.debug("Deserializing response to dataframe") self.output = self.deserializer( response.json(), metadata=self.introspection.metadata.get(operation=OperationType.DELETE), operation_settings=self.settings.delete, )
[docs] def rename(self) -> NoReturn: """Rename is not supported by this dataset.""" raise NotSupportedError("Rename operation is not supported for Xledger dataset")
[docs] def upsert(self) -> NoReturn: """Upsert is not supported by this dataset.""" raise NotSupportedError("Upsert operation is not supported for Xledger dataset")
[docs] def purge(self) -> NoReturn: """Purge is not supported by this dataset.""" raise NotSupportedError("Purge operation is not supported for Xledger dataset")
[docs] def list(self) -> NoReturn: """List is not supported by this dataset.""" raise NotSupportedError("List operation is not supported for Xledger dataset")
[docs] def close(self) -> None: """Close the linked-service connection.""" self.linked_service.close()