Source code for ds_provider_xledger_py_lib.linked_service.xledger

"""Xledger linked service implementation.

This module provides a linked service for the Xledger GraphQL API. It
reuses the shared HTTP protocol provider for retries, timeouts, and
rate limiting, while exposing a provider-specific settings model.

Example:
    >>> linked_service = XledgerLinkedService(
    ...     settings=XledgerLinkedServiceSettings(
    ...         host="https://demo.xledger.net/graphql",
    ...         token="my-token",
    ...         timeout=60,
    ...     ),
    ... )
    >>> linked_service.connect()
    >>> linked_service.connection.post(
    ...     linked_service.settings.host,
    ...     json={"query": "query { __typename }"},
    ... )
"""

from dataclasses import dataclass, field
from typing import Generic, TypeVar

from ds_protocol_http_py_lib.utils.http.config import HttpConfig, RetryConfig
from ds_protocol_http_py_lib.utils.http.provider import Http
from ds_protocol_http_py_lib.utils.http.token_bucket import TokenBucket
from ds_resource_plugin_py_lib.common.resource.linked_service import (
    LinkedService,
    LinkedServiceSettings,
)
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
    AuthenticationError,
    ConnectionError,
)

from .. import __version__
from ..enums import ResourceType
from ..utils.graphql import raise_for_graphql_errors


[docs] @dataclass(kw_only=True) class XledgerLinkedServiceSettings(LinkedServiceSettings): """Connection settings for Xledger GraphQL API. Attributes: host: Full GraphQL endpoint URL. token: API token used for Xledger ``Authorization`` header. headers: Optional additional default headers. timeout: Default request timeout in seconds. """ host: str = field(default="https://demo.xledger.net/graphql") token: str = field(metadata={"mask": True}) headers: dict[str, str] | None = None timeout: int | float = 60
XledgerLinkedServiceSettingsType = TypeVar( "XledgerLinkedServiceSettingsType", bound=XledgerLinkedServiceSettings, )
[docs] @dataclass(kw_only=True) class XledgerLinkedService( LinkedService[XledgerLinkedServiceSettingsType], Generic[XledgerLinkedServiceSettingsType], ): """Linked service for Xledger GraphQL API.""" settings: XledgerLinkedServiceSettingsType _http: Http | None = field(default=None, init=False, repr=False, metadata={"serialize": False}) @property def type(self) -> ResourceType: """Return the resource type for this linked service. Returns: ResourceType: The linked-service resource identifier enum. """ return ResourceType.LINKED_SERVICE @property def connection(self) -> Http: """Return the established HTTP provider instance. Returns: Http: Configured HTTP provider used by datasets. Raises: ConnectionError: If ``connect()`` has not been called. """ if self._http is None: raise ConnectionError( message="Session is not initialized", details={"type": self.type.value}, ) return self._http
[docs] def _init_http(self) -> Http: """Build the internal HTTP provider. Returns: Http: A configured HTTP provider instance with retry and token-bucket rate limiting enabled. """ retry_config = RetryConfig( total=5, backoff_factor=0.5, status_forcelist=(429, 500, 502, 503, 504), allowed_methods=("GET", "POST", "PUT", "DELETE", "PATCH"), raise_on_status=True, respect_retry_after_header=True, ) config = HttpConfig( headers=dict(self.settings.headers or {}), timeout_seconds=self.settings.timeout, user_agent=f"ds-provider-xledger-py-lib/{__version__}", retry=retry_config, ) token_bucket = TokenBucket(rps=10, capacity=20) return Http(config=config, bucket=token_bucket)
[docs] def connect(self) -> None: """Connect to Xledger by preparing an authenticated HTTP session. This method is idempotent. It initializes the HTTP provider on first use and updates default headers with bearer authentication. Raises: AuthenticationError: If the token is missing or blank. """ token = self.settings.token.strip() if not token: raise AuthenticationError( message="Xledger token is missing", details={"type": self.type.value, "host": self.settings.host}, ) if self._http is None: self._http = self._init_http() self._http.session.headers.update( { "Authorization": f"token {token}", "Content-Type": "application/json", } )
[docs] def test_connection(self) -> tuple[bool, str]: """Run a lightweight GraphQL probe to verify connectivity. Returns: tuple[bool, str]: ``(True, "")`` on success, otherwise ``(False, error_message)``. """ try: if self._http is None: self.connect() response = self.connection.post( url=self.settings.host, json={"query": "query { __typename }"}, ) raise_for_graphql_errors(body=response.json()) return True, "" except Exception as exc: return False, str(exc)
[docs] def close(self) -> None: """Close and clear the underlying HTTP provider. This method is safe to call multiple times. """ if self._http is not None: self._http.close() self._http = None