Source code for ds_provider_xledger_py_lib.dataset.engines.read

"""
**File:** ``read.py``
**Region:** ``ds_provider_xledger_py_lib/dataset/engines``

Description
-----------
Read engine for paginated GraphQL execution.
"""

from __future__ import annotations

from dataclasses import dataclass, field, replace
from typing import TYPE_CHECKING, Any

from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib.serializable import Serializable

from ...utils.graphql import raise_for_graphql_errors
from ...utils.query_builder import build_query

if TYPE_CHECKING:
    import pandas as pd
    from ds_protocol_http_py_lib.utils.http.provider import Http

    from ...serde.deserializer import XledgerDeserializer
    from ...utils.introspection import MetaData
    from ..xledger import XledgerReadSettings

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class Checkpoint(Serializable): """Read checkpoint state used by the read engine.""" after: str | None = None has_next_page: bool = False
[docs] @dataclass(kw_only=True) class ReadEngine: """Execute Xledger read requests including pagination handling. This engine is intentionally stateful: - ``output`` accumulates collected rows page-by-page - ``checkpoint`` tracks the last valid pagination state """ connection: Http host: str deserializer: XledgerDeserializer metadata: MetaData output: list[pd.DataFrame] = field(default_factory=list, init=False) checkpoint: Checkpoint = field(default_factory=Checkpoint, init=False)
[docs] def execute( self, *, read_settings: XledgerReadSettings, checkpoint: dict[str, Any] | None = None, ) -> None: """Execute read flow and update ``output``/``checkpoint`` state. Args: read_settings: Effective read settings for query rendering/pagination. checkpoint: Existing checkpoint state to continue from. """ total_rows = 0 self.output = [] self.checkpoint = Checkpoint.deserialize(checkpoint or {}) seen_cursors: set[str] = set() logger.debug( "Starting read execution (pagination_enabled=%s, checkpoint_after=%s, settings=%s).", read_settings.pagination, self.checkpoint.after, read_settings.serialize(), ) initial_after = self.checkpoint.after if initial_after: logger.debug("Resuming paginated read from checkpoint cursor.") read_settings = replace(read_settings, after=initial_after, before=None) while True: payload = { "query": build_query( metadata=self.metadata, **{ "first": read_settings.first, "last": read_settings.last, "before": read_settings.before, "after": read_settings.after, "filter": read_settings.filter, "owner_set": read_settings.owner_set, "object_status": read_settings.object_status, "fields": read_settings.columns, }, ), "variables": {}, } logger.debug("Payload: %s", payload) response = self.connection.post( url=self.host, json=payload, ) body = response.json() raise_for_graphql_errors(body=body) frame = self.deserializer( body, metadata=self.metadata, operation_settings=read_settings, ) self.output.append(frame) total_rows += len(frame.index) has_next_page = self.deserializer.get_next( body, metadata=self.metadata, ) end_cursor = self.deserializer.get_end_cursor( body, metadata=self.metadata, ) self.checkpoint.after = end_cursor self.checkpoint.has_next_page = has_next_page logger.debug( "Read page processed (rows=%d, total_rows=%d, has_next_page=%s).", len(frame.index), total_rows, has_next_page, ) if not (read_settings.pagination and has_next_page and end_cursor): if read_settings.pagination and has_next_page and not end_cursor: logger.warning("Pagination requested next page but response did not include end_cursor; stopping read.") logger.debug( "Stopping pagination (pagination_enabled=%s, has_next_page=%s, has_end_cursor=%s).", read_settings.pagination, has_next_page, bool(end_cursor), ) break if end_cursor in seen_cursors: logger.warning("Detected repeated pagination cursor; stopping to prevent loop.") break seen_cursors.add(end_cursor) read_settings = replace(read_settings, after=end_cursor, before=None) logger.debug( "Continuing pagination (page_index=%d, next_cursor=%s, seen_cursors=%d).", len(self.output), end_cursor, len(seen_cursors), )