Source code for ds_provider_postgresql_py_lib.dataset.postgresql

"""
**File:** ``postgresql.py``
**Region:** ``ds_provider_postgresql_py_lib/dataset/postgresql``

PostgreSQL Dataset

This module implements a dataset for PostgreSQL databases.

Example:
    >>> dataset = PostgreSQLDataset(
    ...     deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
    ...     serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
    ...     settings=PostgreSQLDatasetSettings(
    ...         table="users",
    ...         read=ReadSettings(
    ...             columns=["id", "name"],
    ...             filters={"status": "active"},
    ...             order_by=["created_at"],
    ...             limit=100,
    ...         )
    ...     ),
    ...     linked_service=PostgreSQLLinkedService(
    ...         settings=PostgreSQLLinkedServiceSettings(
    ...             uri="postgresql://user:password@localhost:5432/mydb",
    ...         ),
    ...     ),
    ... )
    >>> dataset.read()
    >>> data = dataset.output
"""

from collections.abc import Sequence
from dataclasses import dataclass, field
from io import StringIO
from typing import Any, Generic, TypeVar, cast

import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib import Serializable
from ds_resource_plugin_py_lib.common.resource.dataset import (
    DatasetSettings,
    DatasetStorageFormatType,
    TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
    CreateError,
    DeleteError,
    PurgeError,
    ReadError,
    UpdateError,
    UpsertError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ValidationError
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from sqlalchemy import (
    Column,
    MetaData,
    Table,
    and_,
    asc,
    desc,
    inspect,
    quoted_name,
    select,
    text,
)
from sqlalchemy import (
    delete as sa_delete,
)
from sqlalchemy import (
    update as sa_update,
)
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.sql import Select

from ..enums import ResourceType
from ..linked_service.postgresql import PostgreSQLLinkedService
from ..utils.dataset_identity import validate_duplicate_identity_rows, validate_identity_columns
from ..utils.dataset_rows import execute_returning_rows, output_from_rows
from ..utils.dataset_types import pandas_dtype_to_sqlalchemy
from ..utils.sql import quote_identifier

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class CreateSettings(Serializable): """ Settings specific to the create() operation. These settings only apply when writing data to the database and do not affect read(), delete(), update(), or rename() operations. """ index: bool = False """Whether to include the DataFrame index as columns during create() writes.""" primary_key: bool = False """Whether to create a primary key when creating a new table.""" primary_key_columns: Sequence[str] | None = None """Primary key columns to create when `primary_key` is enabled."""
[docs] @dataclass(kw_only=True) class UpdateSettings(Serializable): """ Settings specific to the update() operation. """ identity_columns: Sequence[str] """Columns that uniquely identify each row."""
[docs] @dataclass(kw_only=True) class UpsertSettings(Serializable): """ Settings specific to the upsert() operation. """ identity_columns: Sequence[str] """Columns that uniquely identify each row."""
[docs] @dataclass(kw_only=True) class DeleteSettings(Serializable): """ Settings specific to the delete() operation. """ identity_columns: Sequence[str] """Columns that uniquely identify each row."""
[docs] @dataclass(kw_only=True) class ReadSettings(Serializable): """ Settings specific to the read() operation. These settings only apply when reading data from the database and do not affect create(), delete(), update(), or rename() operations. """ limit: int | None = None """The limit of the data to read.""" columns: Sequence[str] | None = None """ Specific columns to select. If None, selects all columns (*). Example: columns=["id", "name", "created_at"] """ filters: dict[str, Any] | None = None """ Dictionary of column filters for WHERE clause. Uses equality comparison. Example: filters={"status": "active", "amount": 100} Multiple filters are combined with AND. """ order_by: Sequence[str | tuple[str, str]] | None = None """ Columns to order by. Can be: - List of column names (defaults to ascending) - List of (column_name, 'asc'/'desc') tuples Example: order_by=["created_at"] # ascending order_by=[("created_at", "desc"), "name"] # created_at desc, name asc """
[docs] @dataclass(kw_only=True) class PurgeSettings(Serializable): """ Settings specific to the purge() operation. """ drop_table: bool = False """Drop the table object instead of deleting rows.""" cascade: bool = False """Apply CASCADE when dropping the table."""
[docs] @dataclass(kw_only=True) class PostgreSQLDatasetSettings(DatasetSettings): """ Settings for PostgreSQL dataset operations. The `read` settings contains read-specific configuration that only applies to the read() operation, not to create(), delete(), update(), etc. """ schema: str = "public" """Schema for dataset operations.""" table: str """Table for dataset operations.""" read: ReadSettings = field(default_factory=ReadSettings) """Settings for read().""" create: CreateSettings = field(default_factory=CreateSettings) """Settings for create().""" update: UpdateSettings | None = None """Settings for update().""" upsert: UpsertSettings | None = None """Settings for upsert().""" delete: DeleteSettings | None = None """Settings for delete().""" purge: PurgeSettings = field(default_factory=PurgeSettings) """Settings for purge()."""
PostgreSQLDatasetSettingsType = TypeVar( "PostgreSQLDatasetSettingsType", bound=PostgreSQLDatasetSettings, ) PostgreSQLLinkedServiceType = TypeVar( "PostgreSQLLinkedServiceType", bound=PostgreSQLLinkedService[Any], )
[docs] @dataclass(kw_only=True) class PostgreSQLDataset( TabularDataset[ PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType, PandasSerializer, PandasDeserializer, ], Generic[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType], ): linked_service: PostgreSQLLinkedServiceType settings: PostgreSQLDatasetSettingsType serializer: PandasSerializer | None = field( default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON), ) deserializer: PandasDeserializer | None = field( default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON), ) @property def type(self) -> ResourceType: """ Get the type of the dataset. Returns: ResourceType: The dataset resource type. """ return ResourceType.DATASET
[docs] def create(self, **_kwargs: Any) -> None: """ Create/write data to the configured table. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: CreateError: If writing data fails. """ logger.debug("Starting create operation for %s.%s", self.settings.schema, self.settings.table) if self.input is None or self.input.empty: logger.debug("Create skipped because input is empty.") self.output = self._output_from_empty_input() return try: create_input = self.input.reset_index() if self.settings.create.index else self.input.copy() logger.debug( "Create input prepared with %d rows and columns=%s", len(create_input), list(create_input.columns), ) with self.linked_service.connection.begin() as conn: table_exists = bool(inspect(conn).has_table(self.settings.table, schema=self.settings.schema)) logger.debug("Table exists=%s for %s.%s", table_exists, self.settings.schema, self.settings.table) if table_exists: table = self._get_table() else: logger.debug("Table does not exist; creating new table for create operation.") table = self._build_table_from_input(create_input) table.create(bind=conn) self._copy_into_table(conn, table, create_input) self.output = self.input.copy() logger.debug("Create completed successfully. Rows written=%d", len(self.output)) except ValidationError as exc: logger.error("Create validation failed: %s", exc.message) raise CreateError( message=exc.message, status_code=exc.status_code, details={**(exc.details or {}), "settings": self.settings.create.serialize()}, ) from exc except Exception as exc: logger.error("Create failed: %s", exc) raise CreateError( message=f"Failed to write data to table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "settings": self.settings.create.serialize(), }, ) from exc
[docs] def read(self, **_kwargs: Any) -> None: """ Read rows from the configured table into `self.output`. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: ReadError: If reading data fails. """ logger.debug("Starting read operation for %s.%s", self.settings.schema, self.settings.table) stmt: Select[Any] | None = None try: self._validate_read_settings() table = self._get_table() stmt = self._build_select_columns(table) stmt = self._build_filters(stmt, table) stmt = self._build_order_by(stmt, table) if self.settings.read.limit is not None: stmt = stmt.limit(self.settings.read.limit) logger.debug("Executing query: %s", stmt) with self.linked_service.connection.connect() as conn: rows = conn.execute(stmt).mappings().all() self.output = pd.DataFrame.from_records(rows) logger.debug("Read completed successfully. Rows read=%d", len(self.output)) except NoSuchTableError as exc: logger.error( "Table '%s' does not exist in schema '%s'.", self.settings.table, self.settings.schema, ) raise ReadError( message=f"Table '{self.settings.table}' does not exist in schema '{self.settings.schema}'.", status_code=404, details={ "table": self.settings.table, "schema": self.settings.schema, "settings": self.settings.read.serialize(), }, ) from exc except ValidationError as exc: logger.error("Validation error: %s", exc) details = {**(exc.details or {}), "settings": self.settings.read.serialize()} raise ReadError( message=exc.message, status_code=exc.status_code, details=details, ) from exc except Exception as exc: logger.error("Failed to read data from table: %s", exc) raise ReadError( message=f"Failed to read data from table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "query": str(stmt) if stmt is not None else None, "settings": self.settings.read.serialize(), }, ) from exc
[docs] def delete(self, **_kwargs: Any) -> None: """ Delete rows matching configured identity columns. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: DeleteError: If deleting rows fails. """ logger.debug("Starting delete operation for %s.%s", self.settings.schema, self.settings.table) if self.input is None or self.input.empty: logger.debug("Delete skipped because input is empty.") self.output = self._output_from_empty_input() return if self.settings.delete is None: logger.error("Delete settings are missing.") raise DeleteError( message="Missing delete settings. Configure settings.delete.identity_columns.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "settings": self.settings.delete, }, ) try: table = self._get_table() validate_identity_columns( table=table, identity_columns=self.settings.delete.identity_columns, content=self.input, ) validate_duplicate_identity_rows( content=self.input, identity_columns=self.settings.delete.identity_columns, ) logger.debug( "Delete validated with identity_columns=%s and input_rows=%d", list(self.settings.delete.identity_columns), len(self.input), ) deleted_rows: list[dict[str, Any]] = [] with self.linked_service.connection.begin() as conn: for record in self.input.to_dict("records"): where_clause = and_(*[table.c[col] == record[col] for col in self.settings.delete.identity_columns]) stmt = sa_delete(table).where(where_clause).returning(*table.c) deleted_rows.extend(execute_returning_rows(conn, stmt)) self.output = output_from_rows(table, deleted_rows) logger.debug("Delete completed successfully. Rows deleted=%d", len(self.output)) except Exception as exc: logger.error("Delete failed: %s", exc) raise DeleteError( message=f"Failed to delete rows from table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "identity_columns": list(self.settings.delete.identity_columns), }, ) from exc
[docs] def update(self, **_kwargs: Any) -> None: """ Update rows matching configured identity columns. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: UpdateError: If updating rows fails. """ logger.debug("Starting update operation for %s.%s", self.settings.schema, self.settings.table) if self.input is None or self.input.empty: logger.debug("Update skipped because input is empty.") self.output = self._output_from_empty_input() return if self.settings.update is None: logger.error("Update settings are missing.") raise UpdateError( message="Missing update settings. Configure settings.update.identity_columns.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "settings": self.settings.update, }, ) try: table = self._get_table() validate_identity_columns( table=table, identity_columns=self.settings.update.identity_columns, content=self.input, ) validate_duplicate_identity_rows( content=self.input, identity_columns=self.settings.update.identity_columns, ) logger.debug( "Update validated with identity_columns=%s and input_rows=%d", list(self.settings.update.identity_columns), len(self.input), ) update_columns = [col for col in self.input.columns if col not in self.settings.update.identity_columns] if not update_columns: logger.error("Update input has no non-identity columns.") raise UpdateError( message="No non-identity columns provided for update.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "identity_columns": list(self.settings.update.identity_columns), }, ) updated_rows: list[dict[str, Any]] = [] with self.linked_service.connection.begin() as conn: for record in self.input.to_dict("records"): where_clause = and_(*[table.c[col] == record[col] for col in self.settings.update.identity_columns]) values = {col: record[col] for col in update_columns} stmt = sa_update(table).where(where_clause).values(**values).returning(*table.c) updated_rows.extend(execute_returning_rows(conn, stmt)) self.output = output_from_rows(table, updated_rows) logger.debug("Update completed successfully. Rows updated=%d", len(self.output)) except Exception as exc: logger.error("Update failed: %s", exc) raise UpdateError( message=f"Failed to update rows in table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "identity_columns": list(self.settings.update.identity_columns), }, ) from exc
[docs] def upsert(self, **_kwargs: Any) -> None: """ Insert or update rows using PostgreSQL ON CONFLICT semantics. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: UpsertError: If upserting rows fails. """ logger.debug("Starting upsert operation for %s.%s", self.settings.schema, self.settings.table) if self.input is None or self.input.empty: logger.debug("Upsert skipped because input is empty.") self.output = self._output_from_empty_input() return if self.settings.upsert is None: logger.error("Upsert settings are missing.") raise UpsertError( message="Missing upsert settings. Configure settings.upsert.identity_columns.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "settings": self.settings.upsert, }, ) try: table = self._get_table() validate_identity_columns( table=table, identity_columns=self.settings.upsert.identity_columns, content=self.input, ) validate_duplicate_identity_rows( content=self.input, identity_columns=self.settings.upsert.identity_columns, ) logger.debug( "Upsert validated with identity_columns=%s and input_rows=%d", list(self.settings.upsert.identity_columns), len(self.input), ) rows = self.input.to_dict("records") non_identity_columns = [col for col in self.input.columns if col not in self.settings.upsert.identity_columns] logger.debug("Upsert non-identity columns=%s", non_identity_columns) insert_stmt = pg_insert(table).values(rows) if non_identity_columns: upsert_stmt = insert_stmt.on_conflict_do_update( index_elements=[table.c[col] for col in self.settings.upsert.identity_columns], set_={col: insert_stmt.excluded[col] for col in non_identity_columns}, ) else: upsert_stmt = insert_stmt.on_conflict_do_nothing( index_elements=[table.c[col] for col in self.settings.upsert.identity_columns] ) stmt = upsert_stmt.returning(*table.c) with self.linked_service.connection.begin() as conn: upserted_rows = execute_returning_rows(conn, stmt) self.output = output_from_rows(table, upserted_rows) logger.debug("Upsert completed successfully. Rows returned=%d", len(self.output)) except Exception as exc: logger.error("Upsert failed: %s", exc) raise UpsertError( message=f"Failed to upsert rows in table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "identity_columns": list(self.settings.upsert.identity_columns), }, ) from exc
[docs] def purge(self, **_kwargs: Any) -> None: """ Purge table contents or drop the table. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: PurgeError: If purging table data fails. """ logger.debug("Starting purge operation for %s.%s", self.settings.schema, self.settings.table) logger.debug( "Purge settings: drop_table=%s, cascade=%s", self.settings.purge.drop_table, self.settings.purge.cascade, ) try: with self.linked_service.connection.begin() as conn: if self.settings.purge.drop_table: cascade = " CASCADE" if self.settings.purge.cascade else "" logger.debug("Dropping table %s.%s", self.settings.schema, self.settings.table) conn.execute( text( f"DROP TABLE IF EXISTS {quote_identifier(self.settings.schema)}." f"{quote_identifier(self.settings.table)}{cascade}" ) ) else: logger.debug("Deleting all rows from %s.%s", self.settings.schema, self.settings.table) inspector = inspect(self.linked_service.connection) if not inspector.has_table(self.settings.table, schema=self.settings.schema): logger.debug( "Purge skipped because table %s.%s does not exist.", self.settings.schema, self.settings.table, ) return table = self._get_table() conn.execute(sa_delete(table)) logger.debug("Purge completed successfully.") except Exception as exc: logger.error("Purge failed: %s", exc) raise PurgeError( message=f"Failed to purge table: {exc!s}", status_code=500, details={ "table": self.settings.table, "schema": self.settings.schema, "drop_table": self.settings.purge.drop_table, "cascade": self.settings.purge.cascade, }, ) from exc
[docs] def list(self, **_kwargs: Any) -> None: """ List operation is not supported for this provider. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: NotSupportedError: Always, as list is not supported. """ logger.error("List operation is not supported by PostgreSQLDataset.") raise NotSupportedError( message="Method 'list' is not supported by this provider.", details={"method": "list", "provider": self.type.value}, )
[docs] def rename(self, **_kwargs: Any) -> None: """ Rename operation is not supported for this provider. Args: _kwargs: Additional keyword arguments for interface compatibility. Returns: None Raises: NotSupportedError: Always, as rename is not supported. """ logger.error("Rename operation is not supported by PostgreSQLDataset.") raise NotSupportedError( message="Method 'rename' is not supported by this provider.", details={"method": "rename", "provider": self.type.value}, )
[docs] def close(self) -> None: """ Close the dataset and underlying linked service. Returns: None """ logger.debug("Closing dataset linked service for %s.%s", self.settings.schema, self.settings.table) self.linked_service.close()
[docs] def _output_from_empty_input(self) -> pd.DataFrame: """ Build a consistent empty-operation output while preserving input schema. Returns: pd.DataFrame: Empty dataframe or a schema-preserving input copy. """ input_value = cast("Any", self.input) if input_value is None: return pd.DataFrame() return cast("pd.DataFrame", input_value.copy())
[docs] def _get_table(self) -> Table: """ Get the reflected SQLAlchemy table for configured schema and table. Returns: Table: Reflected table object. """ logger.debug("Reflecting table metadata for %s.%s", self.settings.schema, self.settings.table) schema_name = quoted_name(self.settings.schema, quote=True) table_name = quoted_name(self.settings.table, quote=True) metadata = MetaData(schema=schema_name) return Table( table_name, metadata, schema=schema_name, autoload_with=self.linked_service.connection, )
[docs] def _build_table_from_input( self, content: pd.DataFrame, ) -> Table: """ Build a SQLAlchemy Table definition from input DataFrame dtypes. Args: content: Input DataFrame to build the table from. Returns: Table: SQLAlchemy Table definition. """ schema_name = quoted_name(self.settings.schema, quote=True) table_name = quoted_name(self.settings.table, quote=True) metadata = MetaData(schema=schema_name) dtype_map = pandas_dtype_to_sqlalchemy(content.dtypes) primary_key_columns = self._resolve_create_primary_key_columns(content) primary_key_set = set(primary_key_columns or []) logger.debug( "Building table from input with columns=%s and primary_key_columns=%s", list(content.columns), list(primary_key_set), ) columns = [ Column( str(col_name), cast("Any", dtype_map[str(col_name)]), primary_key=str(col_name) in primary_key_set, nullable=str(col_name) not in primary_key_set, ) for col_name in content.columns ] return Table( table_name, metadata, *columns, schema=schema_name, )
[docs] def _resolve_create_primary_key_columns( self, content: pd.DataFrame, ) -> Sequence[str] | None: """ Resolve and validate create-time primary key columns. Args: content: Input DataFrame used for table creation. Returns: Sequence[str] | None: Primary key columns for new table creation. Raises: ValidationError: If `primary_key` is enabled but columns are invalid. """ if not self.settings.create.primary_key: logger.debug("Create primary key disabled in settings.") return None if not self.settings.create.primary_key_columns: logger.error("Create primary key is enabled but primary_key_columns is missing.") raise ValidationError( message="Missing primary key columns for create().", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "create_settings": self.settings.create.serialize(), }, ) missing_columns = [col for col in self.settings.create.primary_key_columns if col not in content.columns] if missing_columns: logger.error("Create primary key columns missing from input: %s", missing_columns) raise ValidationError( message="Primary key columns do not exist in create input.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "missing_columns": missing_columns, "primary_key_columns": list(self.settings.create.primary_key_columns), }, ) logger.debug("Resolved create primary key columns: %s", list(self.settings.create.primary_key_columns)) return list(self.settings.create.primary_key_columns)
[docs] def _copy_into_table(self, conn: Any, table: Table, content: pd.DataFrame) -> None: """ Insert rows using PostgreSQL COPY. """ if content.empty: return table_name = f"{quote_identifier(str(table.schema or self.settings.schema))}.{quote_identifier(str(table.name))}" column_names = ", ".join(quote_identifier(str(col)) for col in content.columns) copy_sql = f"COPY {table_name} ({column_names}) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, NULL '\\N')" buffer = StringIO() content.to_csv(buffer, index=False, header=False, na_rep="\\N") buffer.seek(0) dbapi_connection = conn.connection with dbapi_connection.cursor() as cursor: cursor.copy_expert(copy_sql, buffer)
[docs] def _validate_columns(self, table: Table, column_names: Sequence[str]) -> None: """ Validate that all requested columns exist in the reflected table. Args: table: Reflected SQLAlchemy table. column_names: Column names to validate. Returns: None Raises: ValidationError: If one or more columns do not exist in the table. """ available_columns = list(table.c.keys()) missing_columns = list(dict.fromkeys(col for col in column_names if col not in table.c)) if missing_columns: raise ValidationError( message=f"Column(s) not found in table '{self.settings.table}'.", details={ "table": self.settings.table, "schema": self.settings.schema, "missing_columns": missing_columns, "available_columns": available_columns, }, )
[docs] def _validate_read_settings(self) -> None: """ Validate read settings before query construction. Returns: None Raises: ValidationError: If limit or order direction is invalid. """ read_settings = self.settings.read if read_settings.limit is not None and read_settings.limit <= 0: raise ValidationError( message="Read limit must be greater than 0.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "limit": read_settings.limit, }, ) if not read_settings.order_by: return invalid_order_specs: list[dict[str, str]] = [] for order_spec in read_settings.order_by: if not isinstance(order_spec, tuple): continue col_name, direction = order_spec if direction.lower() not in {"asc", "desc"}: invalid_order_specs.append( { "column": col_name, "direction": direction, } ) if invalid_order_specs: raise ValidationError( message="Invalid order_by direction. Use 'asc' or 'desc'.", status_code=400, details={ "table": self.settings.table, "schema": self.settings.schema, "invalid_order_by": invalid_order_specs, }, )
[docs] def _build_select_columns(self, table: Table) -> Select[Any]: """ Build a SELECT statement for configured columns or all columns. Args: table: Reflected SQLAlchemy table. Returns: Select[Any]: SELECT statement with chosen columns. Raises: ValidationError: If any selected column does not exist. """ if self.settings.read.columns: self._validate_columns(table, self.settings.read.columns) selected_columns = [table.c[col_name] for col_name in self.settings.read.columns] return select(*selected_columns) return select(table)
[docs] def _build_filters(self, stmt: Select[Any], table: Table) -> Select[Any]: """ Apply equality filters from read settings to the SELECT statement. Args: stmt: Current SELECT statement. table: Reflected SQLAlchemy table. Returns: Select[Any]: SELECT statement with WHERE conditions applied. Raises: ValidationError: If any filter column does not exist. """ if not self.settings.read.filters: return stmt self._validate_columns(table, list(self.settings.read.filters.keys())) filter_conditions = [table.c[col_name] == value for col_name, value in self.settings.read.filters.items()] return stmt.where(and_(*filter_conditions))
[docs] def _build_order_by(self, stmt: Select[Any], table: Table) -> Select[Any]: """ Apply ORDER BY clauses from read settings to the SELECT statement. Args: stmt: Current SELECT statement. table: Reflected SQLAlchemy table. Returns: Select[Any]: SELECT statement with ORDER BY applied. Raises: ValidationError: If any order-by column does not exist. """ if not self.settings.read.order_by: return stmt order_columns = [ col_name if isinstance(order_spec, tuple) else order_spec for order_spec in self.settings.read.order_by for col_name in ([order_spec[0]] if isinstance(order_spec, tuple) else [order_spec]) ] self._validate_columns(table, order_columns) order_clauses = [] for order_spec in self.settings.read.order_by: if isinstance(order_spec, tuple): col_name, direction = order_spec col = table.c[col_name] if direction.lower() == "desc": order_clauses.append(desc(col)) else: order_clauses.append(asc(col)) else: order_clauses.append(asc(table.c[order_spec])) return stmt.order_by(*order_clauses)