"""
**File:** ``table.py``
**Region:** ``ds_provider_azure_py_lib/dataset/table``
Azure Dataset - Table Storage
This module implements a dataset for Azure Table Storage, allowing for CRUD operations
on table entities using pandas DataFrames for data representation.
Example:
>>> azure_table = AzureTable(
... settings=AzureTableDatasetSettings(
... table_name="users",
... partition_key="partition_key",
... row_key="row_key",
... query_filter="additional query filter",
... delete_table=False,
... ),
... linked_service=AzureLinkedService(
... settings=AzureLinkedServiceSettings(
... account_name="account name",
... access_key="access key"
... ),
... id=uuid.uuid4(),
... name="testazurepackage",
... version="0.0.1",
... description="testazurepackage",
... ),
... id=uuid.uuid4(),
... name="testazurepackage",
... version="0.0.1",
... description="testazurepackage"
... )
>>> azure_table.read()
>>> table_data = azure_table.output
"""
import builtins
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar
import pandas as pd
from azure.core.exceptions import (
HttpResponseError,
ResourceExistsError,
)
from azure.data.tables import TableClient, TableTransactionError, UpdateMode
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
DatasetException,
DeleteError,
ReadError,
UpdateError,
UpsertError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ValidationError
from ..enums import ResourceType
from ..linked_service.storage_account import AzureLinkedService
from ..serde import AzureTableDeserializer, AzureTableSerializer
logger = Logger.get_logger(__name__, package=True)
TransactionEntry = tuple[str, dict[str, Any]] | tuple[str, dict[str, Any], Mapping[str, Any]]
[docs]
@dataclass(kw_only=True)
class ReadSettings:
"""
Settings specific to the read() operation.
These settings only apply when reading data from the database
and do not affect other operations like: create(), delete(), update(), or rename().
"""
query_filter: str | None = None
"""
An OData-compliant string to filter the entities returned by the read() operation.
If None, no filter is applied and all entities are returned.
Example: "PartitionKey eq '{self.partition_key}' and RowKey eq '{self.row_key}'"
"""
[docs]
@dataclass(kw_only=True)
class PurgeSettings:
"""
Settings specific to the purge() operation.
These settings only apply when deleting data from the database
and do not affect other operations like: create(), read(), update(), or rename().
"""
delete_table: bool = False
"""
If True, the entire table will be deleted when purge() is called.
If False, only the table content will be deleted.
"""
[docs]
@dataclass(kw_only=True)
class AzureTableDatasetSettings(DatasetSettings):
"""
Settings for Azure Table Storage dataset operations.
The `read` settings contains read-specific configuration that only
applies to the read() operation, not to create(), delete(), update(), etc.
"""
table_name: str
purge: PurgeSettings = field(default_factory=lambda: PurgeSettings())
"""
Purge-specific settings. Only applies to the purge() operation.
"""
read: ReadSettings = field(default_factory=lambda: ReadSettings())
"""
Read-specific settings. Only applies to the read() operation.
By default, read() will use read without filter.
"""
AzureTableDatasetSettingsType = TypeVar(
"AzureTableDatasetSettingsType",
bound=AzureTableDatasetSettings,
)
AzureLinkedServiceType = TypeVar(
"AzureLinkedServiceType",
bound=AzureLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class AzureTable(
TabularDataset[
AzureLinkedServiceType,
AzureTableDatasetSettingsType,
AzureTableSerializer,
AzureTableDeserializer,
],
Generic[AzureLinkedServiceType, AzureTableDatasetSettingsType],
):
linked_service: AzureLinkedServiceType
settings: AzureTableDatasetSettingsType
[docs]
def __post_init__(self) -> None:
self.serializer = AzureTableSerializer()
self.deserializer = AzureTableDeserializer()
@property
def type(self) -> ResourceType:
"""
Get the type of the Dataset.
Returns:
ResourceType
"""
return ResourceType.TABLE
[docs]
def _prepare_content(self, content: pd.DataFrame) -> dict[str, Any]:
"""
Ensure that the content is provided and is in the correct format.
Args:
content (pd.DataFrame): The content to prepare.
Returns:
dict: The prepared content.
Raises:
DatasetException: If the content is not a DataFrame, is empty, or does not contain required columns.
"""
if not isinstance(content, pd.DataFrame):
raise DatasetException(
f"The content must be a pandas DataFrame, got {type(content)} instead.",
status_code=400,
details=self.get_details(),
)
if len(content) == 0:
raise DatasetException(
"The DataFrame is empty. Cannot prepare content for Azure Table Storage.",
status_code=400,
details=self.get_details(),
)
required_columns = {"PartitionKey", "RowKey"}
if not required_columns.issubset(content.columns):
raise ValidationError(
f"The DataFrame must contain the columns: {', '.join(required_columns)}",
status_code=400,
details=self.get_details(),
)
if self.serializer is None:
raise DatasetException("Serializer is not initialized.", status_code=400, details=self.get_details())
return self.serializer(content)
[docs]
def _get_table_client(self) -> TableClient:
"""
Return a TableClient for the currently configured table.
Returns:
TableClient
"""
return self.linked_service.connection.table_service_client.get_table_client(table_name=self.settings.table_name)
[docs]
def _submit_transaction(self, transaction: Iterable[TransactionEntry], error_cls: builtins.type[DatasetException]) -> None:
"""
Submit transaction and map TableTransactionError to provided error_type.
Args:
transaction (Iterable[TransactionEntry]): The transaction to submit.
error_cls (builtins.type[DatasetException]): The exception class to raise on error.
Raises:
error_cls: An error submitting the transaction.
"""
table_client = self._get_table_client()
try:
if not transaction:
return
table_client.submit_transaction(transaction)
except (TableTransactionError, HttpResponseError) as exc:
logger.error(f"{error_cls.__class__.__name__}: {exc.message}")
if exc.status_code:
raise error_cls(message=exc.message, status_code=exc.status_code, details=self.get_details()) from exc
else:
raise error_cls(message=exc.message, details=self.get_details()) from exc
[docs]
def _delete_table(self) -> None:
"""
Deletes the entire table from Azure Table Storage.
Returns:
None
Raises:
DeleteError: If the table could not be deleted.
"""
logger.debug(f"Deleting table: {self.settings.table_name}.")
try:
self.linked_service.connection.table_service_client.delete_table(table_name=self.settings.table_name)
logger.info(f"Successfully deleted table: {self.settings.table_name}.")
except HttpResponseError as exc:
logger.error(f"Failed to delete table ({self.settings.table_name})")
raise DeleteError(f"Failed to delete table in Azure Table Storage: {exc!s}", details=self.get_details()) from exc
[docs]
def _create_table(self) -> None:
"""
Creates a table in Azure Table Storage if it does not exist.
Returns:
None
Raises:
CreateError: If the table could not be created due to an error other than it already existing.
"""
try:
self.linked_service.connection.table_service_client.create_table(
table_name=self.settings.table_name,
)
logger.info(f"Table ({self.settings.table_name}) successfully created.")
except ResourceExistsError:
logger.debug(f"Table ({self.settings.table_name}) already exists.")
except HttpResponseError as exc:
raise CreateError(f"Failed to create table in Azure Table Storage: {exc!s}", details=self.get_details()) from exc
[docs]
def read(self, **_kwargs: Any) -> None:
"""
Read Azure Table Storage dataset.
Args:
_kwargs: Additional keyword arguments
Returns:
None
Raises:
ReadError: If there is an error reading from Azure Table Storage.
"""
table_client: TableClient = self.linked_service.connection.table_service_client.get_table_client(
table_name=self.settings.table_name
)
try:
if self.settings.read.query_filter:
entities = table_client.query_entities(
query_filter=self.settings.read.query_filter,
)
else:
entities = table_client.list_entities()
if self.deserializer is None:
raise ReadError("Deserializer is not initialized.", status_code=400, details=self.get_details())
self.output = self.deserializer(entities)
except HttpResponseError as exc:
raise ReadError(f"Failed to read from Table Storage: {exc!s}", details=self.get_details(), status_code=500) from exc
logger.debug(f"Read data from Table Storage: {len(self.output)} items")
[docs]
def create(self, **_kwargs: Any) -> None:
"""
Create an entity in Azure Table Storage.
Returns:
None
Raises:
CreateError: If the entity could not be created.
"""
# Empty input is a no-op per contract
if self.input is None or len(self.input) == 0:
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
self._create_table()
transaction = self._build_transaction_from_input("create")
try:
self._submit_transaction(transaction, CreateError)
except TableTransactionError as exc:
raise CreateError(
message=exc.message, status_code=getattr(exc, "status_code", 500), details=self.get_details()
) from exc
except HttpResponseError as exc:
raise CreateError("Failed to create entity in Azure Table Storage.", details=self.get_details()) from exc
self.output = self.input.copy()
[docs]
def update(self, **_kwargs: Any) -> None:
"""
Update an entity in Azure Table Storage.
Returns:
None
"""
# Empty input is a no-op per contract
if self.input is None or len(self.input) == 0:
logger.debug("Input DataFrame is empty. No entities to update in Azure Table Storage.")
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
transaction = self._build_transaction_from_input("update", {"mode": UpdateMode.MERGE})
self._submit_transaction(transaction, UpdateError)
logger.info("Successfully updated entities.")
self.output = self.input.copy()
[docs]
def delete(self, **_kwargs: Any) -> None:
"""
Delete specific entities from Azure Table Storage.
Only entities specified in `self.input` are deleted, matched by PartitionKey and RowKey.
Args:
_kwargs: Additional keyword arguments
Returns:
None
Raises:
DeleteError: If there is an error deleting from Azure Table Storage.
"""
# For entity deletion, empty input is a no-op per contract
if self.input is None or len(self.input) == 0:
logger.debug("Input DataFrame is empty. No entities to delete in Azure Table.")
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
transaction = self._build_transaction_from_input("delete")
logger.debug(f"Deleting entities: {len(transaction)} items")
try:
self._submit_transaction(transaction, DeleteError)
except DeleteError as exc:
if exc.status_code == 404:
logger.warning("Entity not found during delete operation.")
else:
raise DeleteError(
message=exc.message, status_code=getattr(exc, "status_code", 500), details=self.get_details()
) from exc
logger.info("Successfully deleted entities.")
self.output = self.input.copy()
[docs]
def rename(self) -> NoReturn:
raise NotSupportedError("Rename operation is not supported for Azure Table datasets")
[docs]
def close(self) -> None:
"""
No need to close the linked service. Just to comply with the interface.
Returns:
None
"""
pass
[docs]
def list(self) -> NoReturn:
raise NotSupportedError("List operation is not supported for Azure Table datasets")
[docs]
def purge(self, **_kwargs: Any) -> None:
"""
Purge all entities from the table or drop the entire table.
If `delete_table=True` in settings, deletes the entire table.
Otherwise, deletes all entities from the table, leaving it empty.
Returns:
None
Raises:
DeleteError: If there is an error purging from Azure Table Storage.
"""
if self.settings.purge.delete_table:
# Delete the entire table
self._delete_table()
else:
# Delete all entities from the table
try:
table_client = self._get_table_client()
# Fetch all entities and delete them
entities_to_delete = list(table_client.list_entities())
if entities_to_delete:
# Build and submit delete transaction for all entities
transaction: list[TransactionEntry] = []
for entity in entities_to_delete:
# Entity must have PartitionKey and RowKey
entity_dict = dict(entity)
transaction.append(("delete", entity_dict))
self._submit_transaction(transaction, DeleteError)
logger.info(f"Successfully purged all entities from table {self.settings.table_name}.")
except HttpResponseError as exc:
if exc.status_code == 404:
logger.warning(f"Table {self.settings.table_name} not found during purge. Treating as already purged.")
else:
raise DeleteError(
f"Failed to purge entities from table {self.settings.table_name}: {exc!s}",
details=self.get_details(),
) from exc
[docs]
def upsert(self, **_kwargs: Any) -> None:
# Empty input is a no-op per contract
if self.input is None or len(self.input) == 0:
logger.debug("Input DataFrame is empty. No entities to update in Azure Table Storage.")
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
transaction = self._build_transaction_from_input("upsert", {"mode": UpdateMode.REPLACE})
self._submit_transaction(transaction, UpsertError)
logger.info("Successfully updated entities.")
self.output = self.input.copy()
[docs]
def get_details(self) -> dict[str, Any]:
"""
Get details about the dataset.
Returns:
dict[str, Any]
"""
details: dict[str, Any] = {
"table_name": self.settings.table_name,
"dataset_type": self.type.value,
}
read_settings = getattr(self.settings, "read", None)
if read_settings is not None and read_settings.query_filter is not None:
details["query_filter"] = read_settings.query_filter
purge_settings = getattr(self.settings, "purge", None)
if purge_settings is not None:
details["delete_table"] = str(purge_settings.delete_table)
return details