"""
**File:** ``introspection.py``
**Region:** ``ds_provider_xledger_py_lib/utils``
Description
-----------
Metadata loading helpers for dataset operation contracts.
"""
from __future__ import annotations
import json
from dataclasses import dataclass, field
from functools import lru_cache
from importlib.resources import files
from typing import Any
from ds_common_serde_py_lib.serializable import Serializable
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ValidationError
from ..enums import OperationType
[docs]
@dataclass(frozen=True, kw_only=True)
class EntryPointMetaData:
"""Container for all loaded operations for one entrypoint."""
entrypoint: str
operations: dict[OperationType, MetaData]
[docs]
def get(self, *, operation: OperationType) -> MetaData:
"""Return metadata for a specific operation.
Args:
operation: Operation to resolve.
Returns:
Metadata for the requested operation.
Raises:
NotSupportedError: If metadata for the operation is unavailable.
"""
metadata = self.operations.get(operation)
if metadata is not None:
return metadata
available = ", ".join(sorted(item.value for item in self.operations)) or "none"
raise NotSupportedError(
message=(
f"Metadata operation '{operation.value}' is unavailable for "
f"entrypoint '{self.entrypoint}'. Available operations: {available}."
),
)
@property
def read(self) -> MetaData | None:
"""Metadata for read operation, when available."""
return self.operations.get(OperationType.READ)
@property
def create(self) -> MetaData | None:
"""Metadata for create operation, when available."""
return self.operations.get(OperationType.CREATE)
@property
def update(self) -> MetaData | None:
"""Metadata for update operation, when available."""
return self.operations.get(OperationType.UPDATE)
@property
def delete(self) -> MetaData | None:
"""Metadata for delete operation, when available."""
return self.operations.get(OperationType.DELETE)
[docs]
@dataclass(kw_only=True)
class IntrospectionService:
"""Load and cache entrypoint metadata for dataset lifecycle reuse."""
entrypoint: str
_metadata: EntryPointMetaData | None = field(default=None, init=False, repr=False)
@property
def metadata(self) -> EntryPointMetaData:
"""Loaded entrypoint metadata.
Returns:
Cached metadata snapshot for the configured entrypoint.
"""
return self.load()
[docs]
def load(self) -> EntryPointMetaData:
"""Load metadata snapshot if needed and return cached value.
Returns:
Cached metadata snapshot for the configured entrypoint.
"""
if self._metadata is None:
self._metadata = _load_entrypoint_metadata(self.entrypoint)
return self._metadata
[docs]
def _load_entrypoint_metadata(entrypoint: str) -> EntryPointMetaData:
"""Load all available operations for an entrypoint from packaged assets.
Args:
entrypoint: Dataset entrypoint (supports nested paths).
Returns:
EntryPointMetaData containing all discovered operations.
Raises:
ValidationError: If entrypoint is missing.
NotSupportedError: If the entrypoint is unsupported.
"""
if not entrypoint.strip():
raise ValidationError(message="Dataset entrypoint must be provided.")
operations: dict[OperationType, MetaData] = {}
for operation in OperationType:
metadata = _load_operation_metadata(entrypoint=entrypoint, operation=operation)
if metadata is not None:
operations[operation] = metadata
if operations:
return EntryPointMetaData(entrypoint=entrypoint, operations=operations)
raise NotSupportedError(
message=f"Entrypoint '{entrypoint}' is not supported by this provider.",
)
[docs]
def _read_operation_assets(*, entrypoint: str, operation: OperationType) -> tuple[dict[str, Any], str] | None:
"""Read raw metadata and query files from package assets.
Args:
entrypoint: Dataset entrypoint.
operation: Operation to load.
Returns:
Tuple of (metadata_json_dict, query_graphql_text) when found,
otherwise ``None``.
"""
asset_texts = _read_operation_asset_texts(entrypoint=entrypoint, operation=operation)
if asset_texts is None:
return None
metadata_raw, query_raw = asset_texts
return json.loads(metadata_raw), query_raw
[docs]
@lru_cache(maxsize=256)
def _read_operation_asset_texts(*, entrypoint: str, operation: OperationType) -> tuple[str, str] | None:
"""Read raw metadata/query text files with process-local memoization.
Args:
entrypoint: Dataset entrypoint.
operation: Operation to load.
Returns:
Tuple of (metadata_json_text, query_graphql_text) when found,
otherwise ``None``.
"""
candidates = (entrypoint, entrypoint.lower())
for candidate in candidates:
path_tokens = [token for token in candidate.split("/") if token]
try:
metadata_raw = (
files("ds_provider_xledger_py_lib")
.joinpath("assets", *path_tokens, operation.value, "metadata.json")
.read_text(encoding="utf-8")
)
query_raw = (
files("ds_provider_xledger_py_lib")
.joinpath("assets", *path_tokens, operation.value, "query.graphql")
.read_text(encoding="utf-8")
)
return metadata_raw, query_raw
except FileNotFoundError:
continue
return None