Source code for ds_resource_plugin_py_lib.common.serde.serialize.awswrangler

"""
**File:** ``awswrangler.py``
**Region:** ``ds_resource_plugin_py_lib/common/serde/serialize``

Description
-----------
Serialize a pandas DataFrame into a value using awswrangler.

Example
-------
.. code-block:: python

    from ds_resource_plugin_py_lib.common.serde.serialize.awswrangler import AwsWranglerSerializer
    from ds_resource_plugin_py_lib.common.resource.dataset.storage_format import DatasetStorageFormatType

    serializer = AwsWranglerSerializer(format=DatasetStorageFormatType.CSV)
    result = serializer(df, boto3_session=boto3_session)
"""

from dataclasses import dataclass, field
from typing import Any

import awswrangler as wr
import pandas as pd
from ds_common_logger_py_lib import Logger

from ....common.resource.dataset.storage_format import DatasetStorageFormatType
from ...serde.serialize.base import DataSerializer

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class AwsWranglerSerializer(DataSerializer): format: DatasetStorageFormatType kwargs: dict[str, Any] = field(default_factory=dict)
[docs] def __call__(self, obj: pd.DataFrame, **kwargs: Any) -> Any: """ Serialize a pandas DataFrame into a value. Args: obj: The pandas DataFrame to serialize. **kwargs: Additional keyword arguments. Returns: A value. """ logger.debug(f"AwsWranglerSerializer __call__ with format: {self.format} and args: {self.kwargs}") boto3_session = kwargs.get("boto3_session") if not boto3_session: raise ValueError("AWS boto3 Session is required.") if self.format == DatasetStorageFormatType.CSV: return wr.s3.to_csv( obj, boto3_session=boto3_session, **self.kwargs, ) elif self.format == DatasetStorageFormatType.PARQUET: return wr.s3.to_parquet( obj, boto3_session=boto3_session, **self.kwargs, ) elif self.format == DatasetStorageFormatType.JSON: return wr.s3.to_json( obj, boto3_session=boto3_session, **self.kwargs, ) elif self.format == DatasetStorageFormatType.EXCEL: return wr.s3.to_excel( obj, boto3_session=boto3_session, **self.kwargs, ) elif self.format == DatasetStorageFormatType.XML: return wr.s3.upload( obj.to_xml(), boto3_session=boto3_session, **self.kwargs, ) else: raise ValueError(f"Unsupported format: {self.format}")