"""
**File:** ``postgresql.py``
**Region:** ``ds_provider_postgresql_py_lib/dataset/postgresql``
PostgreSQL Dataset
This module implements a dataset for PostgreSQL databases.
Example:
>>> dataset = PostgreSQLDataset(
... deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
... serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
... settings=PostgreSQLDatasetSettings(
... table="users",
... read=ReadSettings(
... columns=["id", "name"],
... filters={"status": "active"},
... order_by=["created_at"],
... limit=100,
... )
... ),
... linked_service=PostgreSQLLinkedService(
... settings=PostgreSQLLinkedServiceSettings(
... uri="postgresql://user:password@localhost:5432/mydb",
... ),
... ),
... )
>>> dataset.read()
>>> data = dataset.output
"""
from collections.abc import Sequence
from dataclasses import dataclass, field
from io import StringIO
from typing import Any, Generic, TypeVar, cast
import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib import Serializable
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
DeleteError,
PurgeError,
ReadError,
UpdateError,
UpsertError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ValidationError
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from sqlalchemy import (
Column,
MetaData,
Table,
and_,
asc,
desc,
inspect,
quoted_name,
select,
text,
)
from sqlalchemy import (
delete as sa_delete,
)
from sqlalchemy import (
update as sa_update,
)
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.sql import Select
from ..enums import ResourceType
from ..linked_service.postgresql import PostgreSQLLinkedService
from ..utils.dataset_identity import validate_duplicate_identity_rows, validate_identity_columns
from ..utils.dataset_rows import execute_returning_rows, output_from_rows
from ..utils.dataset_types import pandas_dtype_to_sqlalchemy
from ..utils.sql import quote_identifier
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class CreateSettings(Serializable):
"""
Settings specific to the create() operation.
These settings only apply when writing data to the database
and do not affect read(), delete(), update(), or rename() operations.
"""
index: bool = False
"""Whether to include the DataFrame index as columns during create() writes."""
primary_key: bool = False
"""Whether to create a primary key when creating a new table."""
primary_key_columns: Sequence[str] | None = None
"""Primary key columns to create when `primary_key` is enabled."""
[docs]
@dataclass(kw_only=True)
class UpdateSettings(Serializable):
"""
Settings specific to the update() operation.
"""
identity_columns: Sequence[str]
"""Columns that uniquely identify each row."""
[docs]
@dataclass(kw_only=True)
class UpsertSettings(Serializable):
"""
Settings specific to the upsert() operation.
"""
identity_columns: Sequence[str]
"""Columns that uniquely identify each row."""
[docs]
@dataclass(kw_only=True)
class DeleteSettings(Serializable):
"""
Settings specific to the delete() operation.
"""
identity_columns: Sequence[str]
"""Columns that uniquely identify each row."""
[docs]
@dataclass(kw_only=True)
class ReadSettings(Serializable):
"""
Settings specific to the read() operation.
These settings only apply when reading data from the database
and do not affect create(), delete(), update(), or rename() operations.
"""
limit: int | None = None
"""The limit of the data to read."""
columns: Sequence[str] | None = None
"""
Specific columns to select. If None, selects all columns (*).
Example:
columns=["id", "name", "created_at"]
"""
filters: dict[str, Any] | None = None
"""
Dictionary of column filters for WHERE clause. Uses equality comparison.
Example:
filters={"status": "active", "amount": 100}
Multiple filters are combined with AND.
"""
order_by: Sequence[str | tuple[str, str]] | None = None
"""
Columns to order by. Can be:
- List of column names (defaults to ascending)
- List of (column_name, 'asc'/'desc') tuples
Example:
order_by=["created_at"] # ascending
order_by=[("created_at", "desc"), "name"] # created_at desc, name asc
"""
[docs]
@dataclass(kw_only=True)
class PurgeSettings(Serializable):
"""
Settings specific to the purge() operation.
"""
drop_table: bool = False
"""Drop the table object instead of deleting rows."""
cascade: bool = False
"""Apply CASCADE when dropping the table."""
[docs]
@dataclass(kw_only=True)
class PostgreSQLDatasetSettings(DatasetSettings):
"""
Settings for PostgreSQL dataset operations.
The `read` settings contains read-specific configuration that only
applies to the read() operation, not to create(), delete(), update(), etc.
"""
schema: str = "public"
"""Schema for dataset operations."""
table: str
"""Table for dataset operations."""
read: ReadSettings = field(default_factory=ReadSettings)
"""Settings for read()."""
create: CreateSettings = field(default_factory=CreateSettings)
"""Settings for create()."""
update: UpdateSettings | None = None
"""Settings for update()."""
upsert: UpsertSettings | None = None
"""Settings for upsert()."""
delete: DeleteSettings | None = None
"""Settings for delete()."""
purge: PurgeSettings = field(default_factory=PurgeSettings)
"""Settings for purge()."""
PostgreSQLDatasetSettingsType = TypeVar(
"PostgreSQLDatasetSettingsType",
bound=PostgreSQLDatasetSettings,
)
PostgreSQLLinkedServiceType = TypeVar(
"PostgreSQLLinkedServiceType",
bound=PostgreSQLLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class PostgreSQLDataset(
TabularDataset[
PostgreSQLLinkedServiceType,
PostgreSQLDatasetSettingsType,
PandasSerializer,
PandasDeserializer,
],
Generic[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType],
):
linked_service: PostgreSQLLinkedServiceType
settings: PostgreSQLDatasetSettingsType
serializer: PandasSerializer | None = field(
default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON),
)
deserializer: PandasDeserializer | None = field(
default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON),
)
@property
def type(self) -> ResourceType:
"""
Get the type of the dataset.
Returns:
ResourceType: The dataset resource type.
"""
return ResourceType.DATASET
[docs]
def create(self, **_kwargs: Any) -> None:
"""
Create/write data to the configured table.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
CreateError: If writing data fails.
"""
logger.debug("Starting create operation for %s.%s", self.settings.schema, self.settings.table)
if self.input is None or self.input.empty:
logger.debug("Create skipped because input is empty.")
self.output = self._output_from_empty_input()
return
try:
create_input = self.input.reset_index() if self.settings.create.index else self.input.copy()
logger.debug(
"Create input prepared with %d rows and columns=%s",
len(create_input),
list(create_input.columns),
)
with self.linked_service.connection.begin() as conn:
table_exists = bool(inspect(conn).has_table(self.settings.table, schema=self.settings.schema))
logger.debug("Table exists=%s for %s.%s", table_exists, self.settings.schema, self.settings.table)
if table_exists:
table = self._get_table()
else:
logger.debug("Table does not exist; creating new table for create operation.")
table = self._build_table_from_input(create_input)
table.create(bind=conn)
self._copy_into_table(conn, table, create_input)
self.output = self.input.copy()
logger.debug("Create completed successfully. Rows written=%d", len(self.output))
except ValidationError as exc:
logger.error("Create validation failed: %s", exc.message)
raise CreateError(
message=exc.message,
status_code=exc.status_code,
details={**(exc.details or {}), "settings": self.settings.create.serialize()},
) from exc
except Exception as exc:
logger.error("Create failed: %s", exc)
raise CreateError(
message=f"Failed to write data to table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.create.serialize(),
},
) from exc
[docs]
def read(self, **_kwargs: Any) -> None:
"""
Read rows from the configured table into `self.output`.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
ReadError: If reading data fails.
"""
logger.debug("Starting read operation for %s.%s", self.settings.schema, self.settings.table)
stmt: Select[Any] | None = None
try:
self._validate_read_settings()
table = self._get_table()
stmt = self._build_select_columns(table)
stmt = self._build_filters(stmt, table)
stmt = self._build_order_by(stmt, table)
if self.settings.read.limit is not None:
stmt = stmt.limit(self.settings.read.limit)
logger.debug("Executing query: %s", stmt)
with self.linked_service.connection.connect() as conn:
rows = conn.execute(stmt).mappings().all()
self.output = pd.DataFrame.from_records(rows)
logger.debug("Read completed successfully. Rows read=%d", len(self.output))
except NoSuchTableError as exc:
logger.error(
"Table '%s' does not exist in schema '%s'.",
self.settings.table,
self.settings.schema,
)
raise ReadError(
message=f"Table '{self.settings.table}' does not exist in schema '{self.settings.schema}'.",
status_code=404,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.read.serialize(),
},
) from exc
except ValidationError as exc:
logger.error("Validation error: %s", exc)
details = {**(exc.details or {}), "settings": self.settings.read.serialize()}
raise ReadError(
message=exc.message,
status_code=exc.status_code,
details=details,
) from exc
except Exception as exc:
logger.error("Failed to read data from table: %s", exc)
raise ReadError(
message=f"Failed to read data from table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"query": str(stmt) if stmt is not None else None,
"settings": self.settings.read.serialize(),
},
) from exc
[docs]
def delete(self, **_kwargs: Any) -> None:
"""
Delete rows matching configured identity columns.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
DeleteError: If deleting rows fails.
"""
logger.debug("Starting delete operation for %s.%s", self.settings.schema, self.settings.table)
if self.input is None or self.input.empty:
logger.debug("Delete skipped because input is empty.")
self.output = self._output_from_empty_input()
return
if self.settings.delete is None:
logger.error("Delete settings are missing.")
raise DeleteError(
message="Missing delete settings. Configure settings.delete.identity_columns.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.delete,
},
)
try:
table = self._get_table()
validate_identity_columns(
table=table,
identity_columns=self.settings.delete.identity_columns,
content=self.input,
)
validate_duplicate_identity_rows(
content=self.input,
identity_columns=self.settings.delete.identity_columns,
)
logger.debug(
"Delete validated with identity_columns=%s and input_rows=%d",
list(self.settings.delete.identity_columns),
len(self.input),
)
deleted_rows: list[dict[str, Any]] = []
with self.linked_service.connection.begin() as conn:
for record in self.input.to_dict("records"):
where_clause = and_(*[table.c[col] == record[col] for col in self.settings.delete.identity_columns])
stmt = sa_delete(table).where(where_clause).returning(*table.c)
deleted_rows.extend(execute_returning_rows(conn, stmt))
self.output = output_from_rows(table, deleted_rows)
logger.debug("Delete completed successfully. Rows deleted=%d", len(self.output))
except Exception as exc:
logger.error("Delete failed: %s", exc)
raise DeleteError(
message=f"Failed to delete rows from table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"identity_columns": list(self.settings.delete.identity_columns),
},
) from exc
[docs]
def update(self, **_kwargs: Any) -> None:
"""
Update rows matching configured identity columns.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
UpdateError: If updating rows fails.
"""
logger.debug("Starting update operation for %s.%s", self.settings.schema, self.settings.table)
if self.input is None or self.input.empty:
logger.debug("Update skipped because input is empty.")
self.output = self._output_from_empty_input()
return
if self.settings.update is None:
logger.error("Update settings are missing.")
raise UpdateError(
message="Missing update settings. Configure settings.update.identity_columns.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.update,
},
)
try:
table = self._get_table()
validate_identity_columns(
table=table,
identity_columns=self.settings.update.identity_columns,
content=self.input,
)
validate_duplicate_identity_rows(
content=self.input,
identity_columns=self.settings.update.identity_columns,
)
logger.debug(
"Update validated with identity_columns=%s and input_rows=%d",
list(self.settings.update.identity_columns),
len(self.input),
)
update_columns = [col for col in self.input.columns if col not in self.settings.update.identity_columns]
if not update_columns:
logger.error("Update input has no non-identity columns.")
raise UpdateError(
message="No non-identity columns provided for update.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"identity_columns": list(self.settings.update.identity_columns),
},
)
updated_rows: list[dict[str, Any]] = []
with self.linked_service.connection.begin() as conn:
for record in self.input.to_dict("records"):
where_clause = and_(*[table.c[col] == record[col] for col in self.settings.update.identity_columns])
values = {col: record[col] for col in update_columns}
stmt = sa_update(table).where(where_clause).values(**values).returning(*table.c)
updated_rows.extend(execute_returning_rows(conn, stmt))
self.output = output_from_rows(table, updated_rows)
logger.debug("Update completed successfully. Rows updated=%d", len(self.output))
except Exception as exc:
logger.error("Update failed: %s", exc)
raise UpdateError(
message=f"Failed to update rows in table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"identity_columns": list(self.settings.update.identity_columns),
},
) from exc
[docs]
def upsert(self, **_kwargs: Any) -> None:
"""
Insert or update rows using PostgreSQL ON CONFLICT semantics.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
UpsertError: If upserting rows fails.
"""
logger.debug("Starting upsert operation for %s.%s", self.settings.schema, self.settings.table)
if self.input is None or self.input.empty:
logger.debug("Upsert skipped because input is empty.")
self.output = self._output_from_empty_input()
return
if self.settings.upsert is None:
logger.error("Upsert settings are missing.")
raise UpsertError(
message="Missing upsert settings. Configure settings.upsert.identity_columns.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.upsert,
},
)
try:
table = self._get_table()
validate_identity_columns(
table=table,
identity_columns=self.settings.upsert.identity_columns,
content=self.input,
)
validate_duplicate_identity_rows(
content=self.input,
identity_columns=self.settings.upsert.identity_columns,
)
logger.debug(
"Upsert validated with identity_columns=%s and input_rows=%d",
list(self.settings.upsert.identity_columns),
len(self.input),
)
rows = self.input.to_dict("records")
non_identity_columns = [col for col in self.input.columns if col not in self.settings.upsert.identity_columns]
logger.debug("Upsert non-identity columns=%s", non_identity_columns)
insert_stmt = pg_insert(table).values(rows)
if non_identity_columns:
upsert_stmt = insert_stmt.on_conflict_do_update(
index_elements=[table.c[col] for col in self.settings.upsert.identity_columns],
set_={col: insert_stmt.excluded[col] for col in non_identity_columns},
)
else:
upsert_stmt = insert_stmt.on_conflict_do_nothing(
index_elements=[table.c[col] for col in self.settings.upsert.identity_columns]
)
stmt = upsert_stmt.returning(*table.c)
with self.linked_service.connection.begin() as conn:
upserted_rows = execute_returning_rows(conn, stmt)
self.output = output_from_rows(table, upserted_rows)
logger.debug("Upsert completed successfully. Rows returned=%d", len(self.output))
except Exception as exc:
logger.error("Upsert failed: %s", exc)
raise UpsertError(
message=f"Failed to upsert rows in table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"identity_columns": list(self.settings.upsert.identity_columns),
},
) from exc
[docs]
def purge(self, **_kwargs: Any) -> None:
"""
Purge table contents or drop the table.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
PurgeError: If purging table data fails.
"""
logger.debug("Starting purge operation for %s.%s", self.settings.schema, self.settings.table)
logger.debug(
"Purge settings: drop_table=%s, cascade=%s",
self.settings.purge.drop_table,
self.settings.purge.cascade,
)
try:
with self.linked_service.connection.begin() as conn:
if self.settings.purge.drop_table:
cascade = " CASCADE" if self.settings.purge.cascade else ""
logger.debug("Dropping table %s.%s", self.settings.schema, self.settings.table)
conn.execute(
text(
f"DROP TABLE IF EXISTS {quote_identifier(self.settings.schema)}."
f"{quote_identifier(self.settings.table)}{cascade}"
)
)
else:
logger.debug("Deleting all rows from %s.%s", self.settings.schema, self.settings.table)
inspector = inspect(self.linked_service.connection)
if not inspector.has_table(self.settings.table, schema=self.settings.schema):
logger.debug(
"Purge skipped because table %s.%s does not exist.",
self.settings.schema,
self.settings.table,
)
return
table = self._get_table()
conn.execute(sa_delete(table))
logger.debug("Purge completed successfully.")
except Exception as exc:
logger.error("Purge failed: %s", exc)
raise PurgeError(
message=f"Failed to purge table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"drop_table": self.settings.purge.drop_table,
"cascade": self.settings.purge.cascade,
},
) from exc
[docs]
def list(self, **_kwargs: Any) -> None:
"""
List operation is not supported for this provider.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
NotSupportedError: Always, as list is not supported.
"""
logger.error("List operation is not supported by PostgreSQLDataset.")
raise NotSupportedError(
message="Method 'list' is not supported by this provider.",
details={"method": "list", "provider": self.type.value},
)
[docs]
def rename(self, **_kwargs: Any) -> None:
"""
Rename operation is not supported for this provider.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
NotSupportedError: Always, as rename is not supported.
"""
logger.error("Rename operation is not supported by PostgreSQLDataset.")
raise NotSupportedError(
message="Method 'rename' is not supported by this provider.",
details={"method": "rename", "provider": self.type.value},
)
[docs]
def close(self) -> None:
"""
Close the dataset and underlying linked service.
Returns:
None
"""
logger.debug("Closing dataset linked service for %s.%s", self.settings.schema, self.settings.table)
self.linked_service.close()
[docs]
def _output_from_empty_input(self) -> pd.DataFrame:
"""
Build a consistent empty-operation output while preserving input schema.
Returns:
pd.DataFrame: Empty dataframe or a schema-preserving input copy.
"""
input_value = cast("Any", self.input)
if input_value is None:
return pd.DataFrame()
return cast("pd.DataFrame", input_value.copy())
[docs]
def _get_table(self) -> Table:
"""
Get the reflected SQLAlchemy table for configured schema and table.
Returns:
Table: Reflected table object.
"""
logger.debug("Reflecting table metadata for %s.%s", self.settings.schema, self.settings.table)
schema_name = quoted_name(self.settings.schema, quote=True)
table_name = quoted_name(self.settings.table, quote=True)
metadata = MetaData(schema=schema_name)
return Table(
table_name,
metadata,
schema=schema_name,
autoload_with=self.linked_service.connection,
)
[docs]
def _build_table_from_input(
self,
content: pd.DataFrame,
) -> Table:
"""
Build a SQLAlchemy Table definition from input DataFrame dtypes.
Args:
content: Input DataFrame to build the table from.
Returns:
Table: SQLAlchemy Table definition.
"""
schema_name = quoted_name(self.settings.schema, quote=True)
table_name = quoted_name(self.settings.table, quote=True)
metadata = MetaData(schema=schema_name)
dtype_map = pandas_dtype_to_sqlalchemy(content.dtypes)
primary_key_columns = self._resolve_create_primary_key_columns(content)
primary_key_set = set(primary_key_columns or [])
logger.debug(
"Building table from input with columns=%s and primary_key_columns=%s",
list(content.columns),
list(primary_key_set),
)
columns = [
Column(
str(col_name),
cast("Any", dtype_map[str(col_name)]),
primary_key=str(col_name) in primary_key_set,
nullable=str(col_name) not in primary_key_set,
)
for col_name in content.columns
]
return Table(
table_name,
metadata,
*columns,
schema=schema_name,
)
[docs]
def _resolve_create_primary_key_columns(
self,
content: pd.DataFrame,
) -> Sequence[str] | None:
"""
Resolve and validate create-time primary key columns.
Args:
content: Input DataFrame used for table creation.
Returns:
Sequence[str] | None: Primary key columns for new table creation.
Raises:
ValidationError: If `primary_key` is enabled but columns are invalid.
"""
if not self.settings.create.primary_key:
logger.debug("Create primary key disabled in settings.")
return None
if not self.settings.create.primary_key_columns:
logger.error("Create primary key is enabled but primary_key_columns is missing.")
raise ValidationError(
message="Missing primary key columns for create().",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"create_settings": self.settings.create.serialize(),
},
)
missing_columns = [col for col in self.settings.create.primary_key_columns if col not in content.columns]
if missing_columns:
logger.error("Create primary key columns missing from input: %s", missing_columns)
raise ValidationError(
message="Primary key columns do not exist in create input.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"missing_columns": missing_columns,
"primary_key_columns": list(self.settings.create.primary_key_columns),
},
)
logger.debug("Resolved create primary key columns: %s", list(self.settings.create.primary_key_columns))
return list(self.settings.create.primary_key_columns)
[docs]
def _copy_into_table(self, conn: Any, table: Table, content: pd.DataFrame) -> None:
"""
Insert rows using PostgreSQL COPY.
"""
if content.empty:
return
table_name = f"{quote_identifier(str(table.schema or self.settings.schema))}.{quote_identifier(str(table.name))}"
column_names = ", ".join(quote_identifier(str(col)) for col in content.columns)
copy_sql = f"COPY {table_name} ({column_names}) FROM STDIN WITH (FORMAT CSV, HEADER FALSE, NULL '\\N')"
buffer = StringIO()
content.to_csv(buffer, index=False, header=False, na_rep="\\N")
buffer.seek(0)
dbapi_connection = conn.connection
with dbapi_connection.cursor() as cursor:
cursor.copy_expert(copy_sql, buffer)
[docs]
def _validate_columns(self, table: Table, column_names: Sequence[str]) -> None:
"""
Validate that all requested columns exist in the reflected table.
Args:
table: Reflected SQLAlchemy table.
column_names: Column names to validate.
Returns:
None
Raises:
ValidationError: If one or more columns do not exist in the table.
"""
available_columns = list(table.c.keys())
missing_columns = list(dict.fromkeys(col for col in column_names if col not in table.c))
if missing_columns:
raise ValidationError(
message=f"Column(s) not found in table '{self.settings.table}'.",
details={
"table": self.settings.table,
"schema": self.settings.schema,
"missing_columns": missing_columns,
"available_columns": available_columns,
},
)
[docs]
def _validate_read_settings(self) -> None:
"""
Validate read settings before query construction.
Returns:
None
Raises:
ValidationError: If limit or order direction is invalid.
"""
read_settings = self.settings.read
if read_settings.limit is not None and read_settings.limit <= 0:
raise ValidationError(
message="Read limit must be greater than 0.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"limit": read_settings.limit,
},
)
if not read_settings.order_by:
return
invalid_order_specs: list[dict[str, str]] = []
for order_spec in read_settings.order_by:
if not isinstance(order_spec, tuple):
continue
col_name, direction = order_spec
if direction.lower() not in {"asc", "desc"}:
invalid_order_specs.append(
{
"column": col_name,
"direction": direction,
}
)
if invalid_order_specs:
raise ValidationError(
message="Invalid order_by direction. Use 'asc' or 'desc'.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"invalid_order_by": invalid_order_specs,
},
)
[docs]
def _build_select_columns(self, table: Table) -> Select[Any]:
"""
Build a SELECT statement for configured columns or all columns.
Args:
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with chosen columns.
Raises:
ValidationError: If any selected column does not exist.
"""
if self.settings.read.columns:
self._validate_columns(table, self.settings.read.columns)
selected_columns = [table.c[col_name] for col_name in self.settings.read.columns]
return select(*selected_columns)
return select(table)
[docs]
def _build_filters(self, stmt: Select[Any], table: Table) -> Select[Any]:
"""
Apply equality filters from read settings to the SELECT statement.
Args:
stmt: Current SELECT statement.
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with WHERE conditions applied.
Raises:
ValidationError: If any filter column does not exist.
"""
if not self.settings.read.filters:
return stmt
self._validate_columns(table, list(self.settings.read.filters.keys()))
filter_conditions = [table.c[col_name] == value for col_name, value in self.settings.read.filters.items()]
return stmt.where(and_(*filter_conditions))
[docs]
def _build_order_by(self, stmt: Select[Any], table: Table) -> Select[Any]:
"""
Apply ORDER BY clauses from read settings to the SELECT statement.
Args:
stmt: Current SELECT statement.
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with ORDER BY applied.
Raises:
ValidationError: If any order-by column does not exist.
"""
if not self.settings.read.order_by:
return stmt
order_columns = [
col_name if isinstance(order_spec, tuple) else order_spec
for order_spec in self.settings.read.order_by
for col_name in ([order_spec[0]] if isinstance(order_spec, tuple) else [order_spec])
]
self._validate_columns(table, order_columns)
order_clauses = []
for order_spec in self.settings.read.order_by:
if isinstance(order_spec, tuple):
col_name, direction = order_spec
col = table.c[col_name]
if direction.lower() == "desc":
order_clauses.append(desc(col))
else:
order_clauses.append(asc(col))
else:
order_clauses.append(asc(table.c[order_spec]))
return stmt.order_by(*order_clauses)