"""
**File:** ``deserializer.py``
**Region:** ``ds_provider_xledger_py_lib/serde``
Description
-----------
Deserialize GraphQL responses into tabular dataframe outputs.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any, cast
import pandas as pd
from ds_resource_plugin_py_lib.common.serde.deserialize.base import DataDeserializer
from ..enums import OperationType
from ..utils.dataframe import edges_to_dataframe
if TYPE_CHECKING:
from ..utils.introspection import MetaData
[docs]
class XledgerDeserializer(DataDeserializer):
"""Parse GraphQL responses into dataframes using operation metadata."""
[docs]
def __call__(
self,
value: Any,
**kwargs: Any,
) -> pd.DataFrame:
"""Deserialize a GraphQL response body.
Args:
value: GraphQL response body.
**kwargs: Compatibility kwargs. Requires ``metadata`` and
``operation_settings``.
Returns:
Parsed dataframe output.
"""
metadata = cast("MetaData", kwargs["metadata"])
operation_settings = kwargs["operation_settings"]
payload = cast("dict[str, Any]", value)
columns = _resolve_output_columns(metadata=metadata, operation_settings=operation_settings)
root = _root_payload(payload=payload, metadata=metadata)
if not isinstance(root, dict):
return pd.DataFrame(columns=columns)
edges = root.get("edges")
if isinstance(edges, list):
return edges_to_dataframe(edges=edges, columns=columns)
return pd.json_normalize([root], sep="_").reindex(columns=columns)
[docs]
def get_next( # type: ignore[override]
self,
value: Any,
**kwargs: Any,
) -> bool:
"""Return ``hasNextPage`` from a read response payload.
Args:
value: GraphQL response body.
**kwargs: Compatibility kwargs. Requires ``metadata``.
Returns:
``True`` when there is a next page.
"""
metadata = cast("MetaData", kwargs["metadata"])
payload = cast("dict[str, Any]", value)
connection = _connection_payload(payload=payload, metadata=metadata)
page_info = connection.get("pageInfo")
if isinstance(page_info, dict):
return bool(page_info.get("hasNextPage", False))
return False
[docs]
def get_end_cursor( # type: ignore[override]
self,
value: Any,
**kwargs: Any,
) -> str | None:
"""Return last edge cursor from a read response payload.
Args:
value: GraphQL response body.
**kwargs: Compatibility kwargs. Requires ``metadata``.
Returns:
Last cursor when available, otherwise ``None``.
"""
metadata = cast("MetaData", kwargs["metadata"])
payload = cast("dict[str, Any]", value)
connection = _connection_payload(payload=payload, metadata=metadata)
edges = connection.get("edges")
if isinstance(edges, list) and edges:
last_edge = edges[-1]
if isinstance(last_edge, dict):
cursor = last_edge.get("cursor")
return str(cursor) if cursor is not None else None
return None
[docs]
def _root_payload(*, payload: dict[str, Any], metadata: MetaData) -> Any:
"""Extract operation root payload under ``data[metadata.name]``.
Args:
payload: GraphQL response payload.
metadata: Operation metadata carrying the root field name.
Returns:
Root payload object when present; otherwise ``None``.
"""
data = payload.get("data")
if not isinstance(data, dict):
return None
return data.get(metadata.name)
[docs]
def _connection_payload(*, payload: dict[str, Any], metadata: MetaData) -> dict[str, Any]:
"""Extract standardized connection payload for read operations.
Args:
payload: GraphQL response payload.
metadata: Read operation metadata.
Returns:
Connection object when found; otherwise an empty dictionary.
"""
root = _root_payload(payload=payload, metadata=metadata)
if isinstance(root, dict):
return root
return {}
[docs]
def _resolve_output_columns(*, metadata: MetaData, operation_settings: Any) -> list[str]:
"""Resolve output columns from operation settings and metadata.
Args:
metadata: Loaded operation metadata.
operation_settings: Operation settings object.
Returns:
Ordered output columns to apply in dataframe construction.
"""
explicit_columns: Any | None = getattr(operation_settings, "return_columns", None) or getattr(
operation_settings,
"columns",
None,
)
if explicit_columns is not None:
return [str(name) for name in explicit_columns]
if metadata.type == OperationType.READ.value:
defaults = [field.name for field in metadata.fields if field.default]
if defaults:
return defaults
return [field.name for field in metadata.fields]