Source code for ds_provider_azure_py_lib.serde.table
"""
**File:** ``table.py``
**Region:** ``ds_provider_azure_py_lib/serde/table``
Serde for Azure Table Storage data.
"""
from typing import Any
import pandas as pd
from azure.core.paging import ItemPaged
from azure.data.tables import TableEntity
from ds_resource_plugin_py_lib.common.serde.deserialize import DataDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import DataSerializer
from ds_provider_azure_py_lib.serde.coercion import _coerce_value
[docs]
class AzureTableSerializer(DataSerializer):
"""
Serialize Azure Table Storage data.
The serializer is responsible for converting the data from
a DataFrame into a format that can be sent to the Azure Table Storage API.
"""
[docs]
def __call__(self, obj: pd.DataFrame, **_kwargs: Any) -> Any | dict[str, Any]:
"""
Serialize the data from a DataFrame into a dict.
Args:
obj: Input DataFrame.
_kwargs: Additional keyword arguments.
Returns:
A dict representation of the first row.
"""
df = obj.assign(
RowKey=obj["RowKey"].astype(str),
PartitionKey=obj["PartitionKey"].astype(str),
)
return {k: _coerce_value(v) for k, v in df.iloc[0].to_dict().items()}
[docs]
class AzureTableDeserializer(DataDeserializer):
"""
Deserialize Azure Table Storage data.
The deserializer is responsible for converting the data from
a dict into a format that can be sent to the Azure Table Storage API.
"""
[docs]
def __call__(self, value: ItemPaged[TableEntity], **_kwargs: Any) -> Any:
"""
Deserialize the data from an item-paged result into a DataFrame.
Args:
value: The paged table entities.
_kwargs: Additional keyword arguments.
Returns:
A DataFrame with the entities.
"""
data = []
for entity in value:
entity_data = {key: entity[key] for key in entity}
if "Timestamp" not in entity_data and hasattr(entity, "metadata") and "timestamp" in entity.metadata:
entity_data["Timestamp"] = entity.metadata["timestamp"]
data.append(entity_data)
if not data:
return pd.DataFrame()
return pd.DataFrame(data)