"""
**File:** ``cart.py``
**Region:** ``ds_provider_grasp_py_lib/dataset/cart``
Grasp Cart Dataset
This module implements a dataset for Grasp Cart.
Example:
>>> dataset = GraspCartDataset(
... id=uuid.uuid4(),
... name="cart-dataset",
... version="1.0.0",
... deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
... serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
... settings=GraspCartDatasetSettings(
... owner_id="owner_id",
... product_group_name="product_group_name",
... product_name="product_name",
... version="version",
... include_history=True,
... ),
... linked_service=GraspAwsLinkedService(
... id=uuid.uuid4(),
... name="aws-linked-service",
... version="1.0.0",
... settings=GraspAwsLinkedServiceSettings(
... access_key_id="access_key_id",
... access_key_secret="access_key_secret",
... region="region",
... ),
... ),
... )
>>> dataset.read()
>>> data = dataset.output
"""
from dataclasses import dataclass
from os import getenv
from typing import Any, Generic, NoReturn, TypeVar
from awswrangler.exceptions import NoFilesFound
from ds_common_logger_py_lib import Logger
from ds_provider_aws_py_lib.linked_service.aws import AWSLinkedService
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import NotFoundError, ReadError
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import AuthorizationError
from ds_resource_plugin_py_lib.common.serde.deserialize import AwsWranglerDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import AwsWranglerSerializer
from ..enums import ResourceType
from ..utils import get_bucket_name
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class GraspCartDatasetSettings(DatasetSettings):
"""
Settings for Grasp Cart dataset operations.
"""
owner_id: str
"""The owner ID of the cart."""
product_group_name: str
"""The product group name of the cart."""
product_name: str
"""The product name of the cart."""
version: str = "1.0"
"""The version of the cart."""
include_history: bool = False
"""Whether to include history in the cart."""
GraspCartDatasetSettingsType = TypeVar(
"GraspCartDatasetSettingsType",
bound=GraspCartDatasetSettings,
)
AWSLinkedServiceType = TypeVar(
"AWSLinkedServiceType",
bound=AWSLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class GraspCartDataset(
TabularDataset[
AWSLinkedServiceType,
GraspCartDatasetSettingsType,
AwsWranglerSerializer,
AwsWranglerDeserializer,
],
Generic[AWSLinkedServiceType, GraspCartDatasetSettingsType],
):
linked_service: AWSLinkedServiceType
settings: GraspCartDatasetSettingsType
[docs]
def __post_init__(self) -> None:
self.deserializer = AwsWranglerDeserializer(format=DatasetStorageFormatType.PARQUET)
@property
def type(self) -> ResourceType:
return ResourceType.DATASET_CART
[docs]
def _get_s3_path(self, tenant_id: str) -> str:
bucket = get_bucket_name()
return (
f"s3://{bucket}/datalake/cart/{self.settings.product_group_name}/"
f"{self.settings.version}/{tenant_id}/{self.settings.product_name}/"
f"{self.settings.owner_id}/data/"
)
[docs]
def create(self) -> None:
raise AuthorizationError(
message="You are not authorized to create a Grasp Cart dataset",
status_code=403,
details={"settings": self.settings.serialize()},
)
[docs]
def read(self) -> None:
"""
Read data from the Grasp Cart dataset.
Raises:
ReadError: If the read operation fails, including when no files are found
at the S3 path or when the S3 path is invalid.
"""
tenant_id = getenv("TENANT_ID")
if tenant_id is None:
logger.error("TENANT_ID environment variable is required")
raise ReadError(
message="TENANT_ID environment variable is required",
status_code=400,
details={"type": self.type.value, "settings": self.settings.serialize()},
)
if not self.deserializer:
logger.error("Deserializer is not set.")
raise ReadError(
message="Deserializer is not set.",
status_code=400,
details={"type": self.type.value, "settings": self.settings.serialize()},
)
s3_path = self._get_s3_path(tenant_id=tenant_id)
logger.debug(f"Reading data from S3 path: {s3_path}")
try:
self.output = self.deserializer(
s3_path,
boto3_session=self.linked_service.connection,
)
except NoFilesFound as exc:
logger.error(f"No files found at S3 path: {s3_path}")
raise NotFoundError(
message=f"No files found at S3 path: {s3_path}",
status_code=404,
details={
"s3_path": s3_path,
"type": self.type.value,
"settings": self.settings.serialize(),
"error": str(exc),
},
) from exc
except Exception as exc:
logger.exception(f"Failed to read data from table: {exc!s}")
raise ReadError(
message=f"Failed to read data from table: {exc!s}",
status_code=500,
details={
"s3_path": s3_path,
"type": self.type.value,
"settings": self.settings.serialize(),
},
) from exc
if not self.settings.include_history:
logger.debug("Dropping _valid_to rows")
self.output = self.output.loc[self.output["_valid_to"].isna()]
logger.debug(f"Successfully read {len(self.output)} rows from {s3_path}")
[docs]
def delete(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to delete a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def update(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to update a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def upsert(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to upsert a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def rename(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to rename a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def purge(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to purge a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def list(self) -> NoReturn:
raise AuthorizationError(
message="You are not authorized to list a Grasp Cart dataset",
status_code=403,
details={
"type": self.type.value,
"settings": self.settings.serialize(),
},
)
[docs]
def close(self) -> None:
"""
Close the dataset.
"""
self.linked_service.close()