Source code for ds_resource_plugin_py_lib.common.serde.deserialize.awswrangler

"""
**File:** ``awswrangler.py``
**Region:** ``ds_resource_plugin_py_lib/common/serde/deserialize``

Description
-----------
Deserialize a value into a pandas DataFrame using awswrangler.

Example
-------
.. code-block:: python

    import boto3

    from ds_resource_plugin_py_lib.common.serde.deserialize.awswrangler import AwsWranglerDeserializer
    from ds_resource_plugin_py_lib.common.resource.dataset.storage_format import DatasetStorageFormatType

    boto3_session = boto3.Session()
    deserializer = AwsWranglerDeserializer(format=DatasetStorageFormatType.PARQUET)

    df = deserializer("s3://my-bucket/path/to/data.parquet", boto3_session=boto3_session)
"""

import json
from dataclasses import dataclass, field
from io import BytesIO
from typing import Any, cast

import awswrangler as wr
import pandas as pd
from ds_common_logger_py_lib import Logger

from ....common.resource.dataset.storage_format import DatasetStorageFormatType
from ...serde.deserialize.base import DataDeserializer

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class AwsWranglerDeserializer(DataDeserializer): format: DatasetStorageFormatType kwargs: dict[str, Any] = field(default_factory=dict)
[docs] def __call__(self, value: Any, **kwargs: Any) -> pd.DataFrame: """ Deserialize a value into a pandas DataFrame. Args: value: The value to deserialize. **kwargs: Additional keyword arguments. Returns: A pandas DataFrame. """ logger.debug(f"AwsWranglerDeserializer __call__ with format: {self.format} and args: {self.kwargs}") boto3_session = kwargs.get("boto3_session") if not boto3_session: raise ValueError("AWS boto3 Session is required.") if self.format == DatasetStorageFormatType.CSV: return cast( "pd.DataFrame", wr.s3.read_csv( path=value, boto3_session=boto3_session, **self.kwargs, ), ) elif self.format == DatasetStorageFormatType.PARQUET: return cast( "pd.DataFrame", wr.s3.read_parquet( path=value, boto3_session=boto3_session, **self.kwargs, ), ) elif self.format == DatasetStorageFormatType.JSON: return cast( "pd.DataFrame", wr.s3.read_json( path=value, boto3_session=boto3_session, **self.kwargs, ), ) elif self.format == DatasetStorageFormatType.SEMI_STRUCTURED_JSON: with BytesIO() as buffer: wr.s3.download( path=value, boto3_session=boto3_session, local_file=buffer, ) json_data = json.loads(buffer.getvalue().decode()) return pd.json_normalize(json_data, **self.kwargs) elif self.format == DatasetStorageFormatType.EXCEL: return wr.s3.read_excel( path=value, boto3_session=boto3_session, **self.kwargs, ) elif self.format == DatasetStorageFormatType.XML: with BytesIO() as buffer: wr.s3.download( path=value, boto3_session=boto3_session, local_file=buffer, ) return pd.read_xml(buffer, **self.kwargs) else: raise ValueError(f"Unsupported format: {self.format}")