Source code for ds_resource_plugin_py_lib.common.serde.deserialize.pandas

"""
**File:** ``pandas.py``
**Region:** ``ds_resource_plugin_py_lib/common/serde/deserialize``

Description
-----------
Deserialize a value into a pandas DataFrame.

Example
-------
.. code-block:: python

    from ds_resource_plugin_py_lib.common.resource.dataset.storage_format import DatasetStorageFormatType
    from ds_resource_plugin_py_lib.common.serde.deserialize.pandas import PandasDeserializer

    deserializer = PandasDeserializer(format=DatasetStorageFormatType.JSON)
    df = deserializer('{"a":[1,2],"b":["x","y"]}')
"""

import io
import json
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any

from ds_common_logger_py_lib import Logger

if TYPE_CHECKING:  # pragma: no cover
    from collections.abc import Callable

import pandas as pd

from ....common.resource.dataset.storage_format import DatasetStorageFormatType
from ...serde.deserialize.base import DataDeserializer

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class PandasDeserializer(DataDeserializer): format: DatasetStorageFormatType kwargs: dict[str, Any] = field(default_factory=dict)
[docs] def __call__(self, value: Any, **_kwargs: Any) -> pd.DataFrame: """ Deserialize a value into a pandas DataFrame. Args: value: The value to deserialize. **kwargs: Additional keyword arguments. Returns: A pandas DataFrame. """ logger.debug(f"PandasDeserializer __call__ with format: {self.format} and args: {self.kwargs}") if isinstance(value, bytes): value = io.BytesIO(value) elif isinstance(value, str): value = io.StringIO(value) elif isinstance(value, (dict, list)): value = io.StringIO(json.dumps(value)) format_readers: dict[DatasetStorageFormatType, Callable[[Any], pd.DataFrame]] = { DatasetStorageFormatType.CSV: lambda v: pd.read_csv(v, **self.kwargs), DatasetStorageFormatType.PARQUET: lambda v: pd.read_parquet(v, **self.kwargs), DatasetStorageFormatType.JSON: lambda v: pd.read_json(v, **self.kwargs), DatasetStorageFormatType.EXCEL: lambda v: pd.read_excel(v, **self.kwargs), DatasetStorageFormatType.XML: lambda v: pd.read_xml(v, **self.kwargs), } if self.format == DatasetStorageFormatType.SEMI_STRUCTURED_JSON: if isinstance(value, io.BytesIO): json_str = value.getvalue().decode("utf-8") value = json.loads(json_str) elif isinstance(value, io.StringIO): json_str = value.getvalue() value = json.loads(json_str) elif isinstance(value, str): value = json.loads(value) return pd.json_normalize(value, **self.kwargs) reader = format_readers.get(self.format) if reader: return reader(value) raise ValueError(f"Unsupported format: {self.format}")