Source code for ds_provider_grasp_py_lib.dataset.ingress

"""
**File:** ``ingress.py``
**Region:** ``ds_provider_grasp_py_lib/dataset/ingress``

Grasp Ingress Dataset

This module implements a dataset for Grasp Ingress.

Example:
    >>> dataset = GraspIngressDataset(
    ...     id=uuid.uuid4(),
    ...     name="ingress-dataset",
    ...     version="1.0.0",
    ...     deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
    ...     serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
    ...     settings=GraspIngressDatasetSettings(
    ...         owner_id="owner_id",
    ...         product_group_name="product_group_name",
    ...         product_name="product_name",
    ...         version="version",
    ...         include_history=True,
    ...     ),
    ...     linked_service=GraspAwsLinkedService(
    ...         id=uuid.uuid4(),
    ...         name="aws-linked-service",
    ...         version="1.0.0",
    ...         settings=GraspAwsLinkedServiceSettings(
    ...             access_key_id="access_key_id",
    ...             access_key_secret="access_key_secret",
    ...             region="region",
    ...         ),
    ...     ),
    ... )
    >>> dataset.read()
    >>> data = dataset.output
"""

from dataclasses import dataclass, field
from os import getenv
from typing import Any, Generic, NoReturn, TypeVar

from awswrangler.exceptions import NoFilesFound
from ds_common_logger_py_lib import Logger
from ds_provider_aws_py_lib.linked_service.aws import AWSLinkedService
from ds_resource_plugin_py_lib.common.resource.dataset import (
    DatasetSettings,
    DatasetStorageFormatType,
    TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import NotFoundError, ReadError
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import AuthorizationError
from ds_resource_plugin_py_lib.common.serde.deserialize import AwsWranglerDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import AwsWranglerSerializer

from ..enums import ResourceType
from ..utils import get_bucket_name

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class GraspIngressDatasetSettings(DatasetSettings): """ Settings for Grasp Ingress dataset operations. """
GraspIngressDatasetSettingsType = TypeVar( "GraspIngressDatasetSettingsType", bound=GraspIngressDatasetSettings, ) AWSLinkedServiceType = TypeVar( "AWSLinkedServiceType", bound=AWSLinkedService[Any], )
[docs] @dataclass(kw_only=True) class GraspIngressDataset( TabularDataset[ AWSLinkedServiceType, GraspIngressDatasetSettingsType, AwsWranglerSerializer, AwsWranglerDeserializer, ], Generic[AWSLinkedServiceType, GraspIngressDatasetSettingsType], ): linked_service: AWSLinkedServiceType settings: GraspIngressDatasetSettingsType serializer: AwsWranglerSerializer | None = field( default_factory=lambda: AwsWranglerSerializer(format=DatasetStorageFormatType.JSON), ) deserializer: AwsWranglerDeserializer | None = field( default_factory=lambda: AwsWranglerDeserializer(format=DatasetStorageFormatType.JSON), ) @property def type(self) -> ResourceType: return ResourceType.DATASET_INGRESS
[docs] def _get_s3_path(self, tenant_id: str, session_id: str) -> str: """ Get the S3 path for the Grasp Ingress dataset. Returns: str: The S3 path for the Grasp Ingress dataset. """ bucket = get_bucket_name() return f"s3://{bucket}/datalake/workflows/{tenant_id}/{self.id}/{session_id}.json"
[docs] def create(self) -> None: raise AuthorizationError( message="You are not authorized to create a Grasp Ingress dataset", status_code=403, details={"settings": self.settings.serialize()}, )
[docs] def read(self) -> None: """ Read data from the Grasp Ingress dataset. Raises: ReadError: If the read operation fails, including when no files are found at the S3 path or when the S3 path is invalid. """ tenant_id = getenv("TENANT_ID") session_id = getenv("SESSION_ID") if tenant_id is None or session_id is None: logger.error("TENANT_ID and SESSION_ID environment variables are required") raise ReadError( message="TENANT_ID and SESSION_ID environment variables are required", status_code=400, details={"type": self.type.value, "settings": self.settings.serialize()}, ) if not self.deserializer: logger.error("Deserializer is not set.") raise ReadError( message="Deserializer is not set.", status_code=400, details={ "type": self.type.value, "settings": self.settings.serialize(), }, ) s3_path = self._get_s3_path( tenant_id=tenant_id, session_id=session_id, ) logger.debug(f"Reading data from S3 path: {s3_path}") try: self.output = self.deserializer( s3_path, boto3_session=self.linked_service.connection, ) except NoFilesFound as exc: logger.error(f"No files found at S3 path: {s3_path}") raise NotFoundError( message=f"No ingress found for tenant_id: {tenant_id}, dataset_id: {self.id}, session_id: {session_id}", status_code=404, details={ "s3_path": s3_path, "type": self.type.value, "settings": self.settings.serialize(), "error": str(exc), }, ) from exc except Exception as exc: logger.exception(f"Failed to read data from S3 path: {s3_path}: {exc!s}") raise ReadError( message=f"Failed to read data from S3 path: {s3_path}: {exc!s}", status_code=500, details={ "s3_path": s3_path, "type": self.type.value, "settings": self.settings.serialize(), }, ) from exc logger.debug(f"Successfully read {len(self.output)} rows from {s3_path}")
[docs] def delete(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to delete a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def update(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to update a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def upsert(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to upsert a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def rename(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to rename a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def purge(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to purge a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def list(self) -> NoReturn: raise AuthorizationError( message="You are not authorized to list a Grasp Ingress dataset", status_code=403, details={ "type": self.type.value, "settings": self.settings.serialize(), }, )
[docs] def close(self) -> None: """ Close the dataset. """ self.linked_service.close()