Source code for ds_resource_plugin_py_lib.common.resource.client

"""
**File:** ``client.py``
**Region:** ``ds_resource_plugin_py_lib/common/resource``

Description
-----------
Resource client for discovering and managing datasets and linked services using entry points.

Example
-------
.. code-block:: python

    from ds_resource_plugin_py_lib.common.resource.client import ResourceClient

    client = ResourceClient()

    # Inspect discovered resources (populated via Python entry points).
    print(client.resources.keys())
    print(client.linked_services.keys())
    print(client.datasets.keys())

    dataset = client.dataset(config={"type": "dataset.example", "version": "1.0.0"})
    linked_service = client.linked_service(config={"type": "linked_service.example", "version": "1.0.0"})

    linked_service.connect()
    print(linked_service.connection)
    print(dataset.read())
"""

from functools import lru_cache
from importlib import import_module
from importlib.metadata import entry_points
from pathlib import Path
from typing import Any, cast

import yaml
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib.errors import DeserializationError

from ...libs.utils import import_string, sanitize_version
from ..resource.dataset.base import Dataset, DatasetInfo
from ..resource.linked_service.base import LinkedService, LinkedServiceInfo

logger = Logger.get_logger(__name__, package=True)


[docs] class ResourceClient: PROTOCOL_GROUP = "ds.protocols" PROVIDER_GROUP = "ds.providers" def __init__(self) -> None: super().__init__() self._resource_dict: dict[str, dict[str, Any]] = {} self._linked_services: dict[tuple[str, str], LinkedServiceInfo] = {} self._datasets: dict[tuple[str, str], DatasetInfo] = {} self._discover_resources(self.PROTOCOL_GROUP) self._discover_resources(self.PROVIDER_GROUP) logger.debug(f"Loaded {len(self._resource_dict)} resources") logger.debug(f"Loaded {len(self._linked_services)} linked services") logger.debug(f"Loaded {len(self._datasets)} datasets")
[docs] @classmethod @lru_cache(maxsize=1) def get_instance(cls) -> "ResourceClient": """Get the singleton instance of ResourceClient.""" return cls()
@property def resources(self) -> dict[str, dict[str, Any]]: return self._resource_dict @property def linked_services(self) -> dict[tuple[str, str], LinkedServiceInfo]: return self._linked_services @property def datasets(self) -> dict[tuple[str, str], DatasetInfo]: return self._datasets
[docs] def _discover_resources(self, group: str) -> None: """ Discover protocol/provider packages via entry points. Each entry point must target a Python package that contains resource.yaml in its root. """ try: eps = entry_points(group=group) except Exception as exc: logger.warning(f"Failed to read entry points for {group}: {exc}") return for ep in eps: try: module = import_module(ep.module) module_path = getattr(module, "__file__", None) if not module_path: logger.warning(f"Entry point {ep.name} has no __file__; skipping.") continue real_path = str(Path(module_path).parent.resolve()) self._scan_resource_directory(real_path) except Exception as exc: logger.error(f"Error when loading entry point {ep.name} ({group}): {exc}")
[docs] def _scan_resource_directory(self, resource_dir: str) -> None: """ Scan a resource directory for resource.yaml. Checks root first (new behavior), then subdirectories (old behavior). Args: resource_dir: Path to the resource directory. """ resource_path = Path(resource_dir) if not resource_path.exists(): logger.debug(f"Resource directory {resource_dir} does not exist") return self._load_resource_from_path(str(resource_path))
[docs] def _load_resource_from_path(self, path: str) -> None: """ Load resource configuration from a directory path. Args: path: Path to the resource directory. """ resource_dir = Path(path) resource_yaml = resource_dir / "resource.yaml" if not resource_yaml.exists(): logger.debug(f"No resource.yaml found in {path}") return try: with Path(resource_yaml).open() as f: resource_config = yaml.safe_load(f) if not resource_config: logger.warning(f"Empty resource configuration in {resource_yaml}") return resource_name = resource_config.get("name", resource_dir.name) self._resource_dict[resource_name] = resource_config self._parse_linked_services(resource_config) self._parse_datasets(resource_config) except Exception as exc: logger.error(f"Error loading resource configuration from {resource_yaml}: {exc}")
[docs] def _parse_linked_services(self, config: dict[str, Any]) -> None: """ Parse linked services from resource configuration. Args: config: Resource configuration dictionary. """ linked_services = config.get("linked_service", []) for service in linked_services: service_name = service.get("name") if service_name: type = service.get("type") version = service.get("version", "1.0.0") service_info = LinkedServiceInfo( type=type, name=service_name, class_name=service.get("class_name"), version=version, description=service.get("description"), ) # Store by composite key (type, version) to support multiple versions self._linked_services[service_info.key] = service_info
[docs] def _parse_datasets(self, config: dict[str, Any]) -> None: """ Parse datasets from resource configuration. Args: config: Resource configuration dictionary. """ datasets = config.get("dataset", []) for dataset in datasets: dataset_name = dataset.get("name") if dataset_name: type = dataset.get("type") version = dataset.get("version", "1.0.0") dataset_info = DatasetInfo( type=type, name=dataset_name, class_name=dataset.get("class_name"), version=version, description=dataset.get("description"), ) self._datasets[dataset_info.key] = dataset_info
[docs] def _get_dataset_model_cls(self, _type: str, version: str) -> type[Dataset[Any, Any, Any, Any]]: """ Get a dataset model class by type and optionally version. Args: _type: str version: str Returns: Type[Dataset] """ cls_name = self.datasets[(_type, version)].class_name logger.debug("Dataset Class Name: %s", cls_name) return cast("type[Dataset[Any, Any, Any, Any]]", import_string(cls_name))
[docs] def _get_linked_service_model_cls(self, _type: str, version: str) -> type[LinkedService[Any]]: """ Get a linked service model class by type and version. Args: _type: The type of the linked service. version: str version of the linked service. Returns: Type[LinkedService] """ cls_name = self.linked_services[(_type, version)].class_name logger.debug("Linked Service Class Name: %s", cls_name) return cast("type[LinkedService[Any]]", import_string(cls_name))
[docs] def linked_service(self, config: dict[str, Any]) -> LinkedService[Any]: """ Get a linked service instance by configuration. Args: config: dict containing at least 'type' and 'version' Returns: LinkedService Raises: DeserializationError: If the linked service cannot be deserialized. """ try: type = config["type"] version = sanitize_version(config["version"]) model_cls = self._get_linked_service_model_cls(type, version) linked_service: LinkedService[Any] = model_cls.deserialize(config) return linked_service except (TypeError, KeyError) as exc: logger.error(f"Error deserializing linked service: {exc}") raise DeserializationError( message=f"Error deserializing linked service: {exc}", details={"config": config, "error": str(exc)}, ) from exc
[docs] def dataset(self, config: dict[str, Any]) -> Dataset[Any, Any, Any, Any]: """ Get a dataset instance by configuration. Args: config: dict containing at least 'type' and 'version' Returns: Dataset Raises: DeserializationError: If the dataset cannot be deserialized. """ try: type = config["type"] version = sanitize_version(config["version"]) dataset_cls = self._get_dataset_model_cls(type, version) dataset: Dataset[Any, Any, Any, Any] = dataset_cls.deserialize(config) return dataset except (TypeError, KeyError) as exc: logger.error(f"Error deserializing dataset: {exc}") raise DeserializationError( message=f"Error deserializing dataset: {exc}", details={"config": config, "error": str(exc)}, ) from exc