"""
**File:** ``mssql.py``
**Region:** ``ds_provider_microsoft_py_lib/dataset/mssql``
MSSQL Table Dataset
This module implements a dataset for Microsoft SQL Server tables.
Example:
>>> dataset = MsSqlTable(
... linked_service=MsSqlLinkedService(...),
... settings=MsSqlTableDatasetSettings(
... table="your_table_name",
... schema="your_schema_name",
... )
... )
>>> dataset.read()
"""
import re
from collections.abc import Hashable, Sequence
from dataclasses import dataclass, field
from typing import Any, Generic, TypeVar, cast
import pandas as pd
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib import Serializable
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
DeleteError,
ListError,
PurgeError,
ReadError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError, ValidationError
from ds_resource_plugin_py_lib.common.serde.deserialize import PandasDeserializer
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from sqlalchemy import (
BigInteger,
Boolean,
Column,
Float,
Integer,
MetaData,
String,
Table,
and_,
asc,
desc,
insert,
quoted_name,
select,
text,
)
from sqlalchemy.dialects.mssql import DATETIME2
from sqlalchemy.exc import NoSuchTableError
from sqlalchemy.inspection import inspect
from sqlalchemy.sql import Select
from ..enums import ResourceType
from ..linked_service.mssql import MsSqlLinkedService
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class ReadSettings(Serializable):
"""
Settings specific to the read() operation.
These settings only apply when reading data from the database
and do not affect create(), delete(), update(), or rename() operations.
"""
limit: int | None = None
"""The limit of the data to read."""
columns: Sequence[str] | None = None
"""
Specific columns to select. If None, selects all columns (*).
Example:
columns=["id", "name", "created_at"]
"""
filters: dict[str, Any] | None = None
"""
Dictionary of column filters for WHERE clause. Uses equality comparison.
Example:
filters={"status": "active", "amount": 100}
Multiple filters are combined with AND.
"""
order_by: Sequence[str | tuple[str, str]] | None = None
"""
Columns to order by. Can be:
- List of column names (defaults to ascending)
- List of (column_name, 'asc'/'desc') tuples
Example:
order_by=["created_at"] # ascending
order_by=[("created_at", "desc"), "name"] # created_at desc, name asc
"""
[docs]
@dataclass(kw_only=True)
class CreateSettings(Serializable):
"""
Settings specific to the create() operation.
These settings only apply when writing data to the database
and do not affect read(), delete(), update(), or rename() operations.
"""
index: bool = False
"""
Whether to include the index in the output.
"""
primary_key: bool = False
"""Whether to create a primary key when creating a new table."""
primary_key_columns: Sequence[str] | None = None
"""Primary key columns to create when `primary_key` is enabled."""
[docs]
@dataclass(kw_only=True)
class MsSqlTableDatasetSettings(DatasetSettings):
table: str
"""Table name for dataset operations."""
schema: str
"""Schema for dataset operations."""
read: ReadSettings = field(default_factory=ReadSettings)
"""Settings for read()."""
create: CreateSettings = field(default_factory=CreateSettings)
"""Settings for create()."""
MsSqlTableDatasetSettingsType = TypeVar(
"MsSqlTableDatasetSettingsType",
bound=MsSqlTableDatasetSettings,
)
MsSqlLinkedServiceType = TypeVar(
"MsSqlLinkedServiceType",
bound=MsSqlLinkedService[Any],
)
[docs]
@dataclass(kw_only=True)
class MsSqlTable(
TabularDataset[
MsSqlLinkedServiceType,
MsSqlTableDatasetSettingsType,
PandasSerializer,
PandasDeserializer,
],
Generic[MsSqlLinkedServiceType, MsSqlTableDatasetSettingsType],
):
linked_service: MsSqlLinkedServiceType
settings: MsSqlTableDatasetSettingsType
serializer: PandasSerializer | None = field(
default_factory=lambda: PandasSerializer(format=DatasetStorageFormatType.JSON),
)
deserializer: PandasDeserializer | None = field(
default_factory=lambda: PandasDeserializer(format=DatasetStorageFormatType.JSON),
)
@property
def type(self) -> ResourceType:
"""
Get the type of the Dataset.
Returns:
ResourceType
"""
return ResourceType.MICROSOFT_SQL_DATASET
[docs]
def create(self, **_kwargs: Any) -> None:
"""
Create/write data to the specified table.
Writes self.input (pandas DataFrame) to the database table with the
configured create settings (mode, etc.).
Args:
_kwargs: Additional keyword arguments to pass to the request.
Raises:
ConnectionError: If the connection fails.
CreateError: If the create operation fails.
"""
logger.debug("Starting create operation for %s.%s", self.settings.schema, self.settings.table)
if self.input is None or self.input.empty:
logger.debug("Create skipped because input is empty.")
self.output = self._output_from_empty_input()
return
try:
create_input = self.input.reset_index() if self.settings.create.index else self.input.copy()
logger.debug(
"Create input prepared with %d rows and columns=%s",
len(create_input),
list(create_input.columns),
)
with self.linked_service.connection.begin() as conn:
table_exists = bool(inspect(conn).has_table(self.settings.table, schema=self.settings.schema))
logger.debug("Table exists=%s for %s.%s", table_exists, self.settings.schema, self.settings.table)
if table_exists:
table = self._get_table()
else:
logger.debug("Table does not exist; creating new table for create operation.")
table = self._build_table_from_input(create_input)
table.create(bind=conn)
self._copy_into_table(conn, table, create_input)
self.output = self.input.copy()
logger.debug("Create completed successfully. Rows written=%d", len(self.output))
except ValidationError as exc:
logger.error("Create validation failed: %s", exc.message)
raise CreateError(
message=exc.message,
status_code=exc.status_code,
details={**(exc.details or {}), "settings": self.settings.create.serialize()},
) from exc
except Exception as exc:
logger.error("Create failed: %s", exc)
raise CreateError(
message=f"Failed to write data to table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.create.serialize(),
},
) from exc
[docs]
def read(self, **_kwargs: Any) -> None:
"""
Read rows from the configured table into `self.output`.
Args:
_kwargs: Additional keyword arguments for interface compatibility.
Returns:
None
Raises:
ReadError: If reading data fails.
"""
logger.debug("Starting read operation for %s.%s", self.settings.schema, self.settings.table)
stmt: Select[Any] | None = None
try:
self._validate_read_settings()
table = self._get_table()
stmt = self._build_select_columns(table)
stmt = self._build_filters(stmt, table)
stmt = self._build_order_by(stmt, table)
if self.settings.read.limit is not None:
stmt = stmt.limit(self.settings.read.limit)
logger.debug("Executing query: %s", stmt)
with self.linked_service.connection.connect() as conn:
rows = conn.execute(stmt).mappings().all()
self.output = pd.DataFrame.from_records(rows) # type: ignore[type-var]
logger.debug("Read completed successfully. Rows read=%d", len(self.output))
except NoSuchTableError as exc:
logger.error(
"Table '%s' does not exist in schema '%s'.",
self.settings.table,
self.settings.schema,
)
raise ReadError(
message=f"Table '{self.settings.table}' does not exist in schema '{self.settings.schema}'.",
status_code=404,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"settings": self.settings.read.serialize(),
},
) from exc
except ValidationError as exc:
logger.error("Validation error: %s", exc)
details = {**(exc.details or {}), "settings": self.settings.read.serialize()}
raise ReadError(
message=exc.message,
status_code=exc.status_code,
details=details,
) from exc
except Exception as exc:
logger.error("Failed to read data from table: %s", exc)
raise ReadError(
message=f"Failed to read data from table: {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"query": str(stmt) if stmt is not None else None,
"settings": self.settings.read.serialize(),
},
) from exc
[docs]
def purge(self, **_kwargs: Any) -> None:
"""
Remove all content from the target table.
Drops the entire table, leaving the structure empty. Per contract,
the target is empty after purge() returns. This is idempotent --
purging an already-empty (or non-existent) table is a no-op.
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
ConnectionError: If the connection is not established.
PurgeError: If the purge operation fails.
"""
try:
with self.linked_service.connection.begin() as conn:
if not inspect(conn).has_table(self.settings.table, schema=self.settings.schema):
logger.debug(
"Table %s.%s does not exist; purge is a no-op.",
self.settings.schema,
self.settings.table,
)
return
query = f"DROP TABLE IF EXISTS [{self.settings.schema}].[{self.settings.table}];"
logger.debug(f"Dropping table: {self.settings.schema}.{self.settings.table}")
conn.execute(text(query))
logger.info(f"Successfully purged table: {self.settings.schema}.{self.settings.table}")
except Exception as exc:
logger.error(f"Failed to purge table: {exc}", exc_info=True)
raise PurgeError(
message=f"Failed to purge table '{self.settings.schema}.{self.settings.table}': {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
},
) from exc
[docs]
def delete(self, **_kwargs: Any) -> None:
"""
Delete specific rows from the target table.
Removes only the rows in self.input, matched by all columns as identity.
Per contract: empty input is a no-op (returns immediately).
Deleting a row that does not exist is not an error.
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
ConnectionError: If the connection is not established.
DeleteError: If the delete operation fails.
"""
# Per contract: Empty input is not an error, return immediately
if self.input is None or self.input.empty:
logger.debug("Empty input provided to delete(); returning without action.")
self.output = self._output_from_empty_input()
return
try:
# Use all columns present in the input row as match criteria
key_columns = list(self.input.columns)
# Map potentially unsafe column names to safe SQLAlchemy bind parameter names
param_map = {col: f"p{idx}" for idx, col in enumerate(key_columns)}
where_clause = " AND ".join(f"{self._quote_identifier(col)} = :{param_map[col]}" for col in key_columns)
# Note: This is safe from SQL injection because:
# 1. Schema and table names are validated/quoted via _quote_identifier(), which rejects unsafe characters
# 2. Column names are validated through _quote_identifier() which rejects unsafe characters
# 3. Values are passed as parameters, not interpolated into the SQL
if getattr(self.settings, "schema", None):
safe_schema = self._quote_identifier(self.settings.schema)
safe_table = self._quote_identifier(self.settings.table)
table_identifier = f"{safe_schema}.{safe_table}"
else:
table_identifier = self._quote_identifier(self.settings.table)
delete_sql = text(f"DELETE FROM {table_identifier} WHERE {where_clause}") # nosec B608
# Build payloads using the safe parameter names
records = self.input.to_dict(orient="records")
payloads = [{param_map[col]: row[col] for col in key_columns} for row in records]
with self.linked_service.connection.begin() as conn:
conn.execute(delete_sql, payloads)
# Per contract: Populate output with the affected rows (copy of input)
self.output = self.input.copy()
logger.info(f"Successfully deleted {len(payloads)} rows from {self.settings.schema}.{self.settings.table}")
except Exception as exc:
logger.error(f"Failed to delete rows from table: {exc}", exc_info=True)
raise DeleteError(
message=f"Failed to delete rows from table '{self.settings.schema}.{self.settings.table}': {exc!s}",
status_code=500,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"row_count": len(self.input),
},
) from exc
[docs]
def update(self, **_kwargs: Any) -> None:
"""
Update existing rows in the target table.
This operation is not supported for SQL Server datasets at this time.
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
NotSupportedError: Always -- update is not supported.
"""
raise NotSupportedError(
message="Update operation is not supported for SQL Server datasets.",
details={"table": self.settings.table, "schema": self.settings.schema},
)
[docs]
def rename(self, **_kwargs: Any) -> None:
"""
Rename a resource (table) in the backend.
This operation is not supported for SQL Server datasets at this time.
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
NotSupportedError: Always -- rename is not supported.
"""
raise NotSupportedError(
message="Rename operation is not supported for SQL Server datasets.",
details={"table": self.settings.table, "schema": self.settings.schema},
)
[docs]
def close(self) -> None:
"""
Clean up the connection to the backend.
Per contract: must be safe to call multiple times and never raise.
Returns:
None
"""
try:
self.linked_service.close()
except Exception:
logger.debug("Exception suppressed during close().", exc_info=True)
[docs]
def list(self, **_kwargs: Any) -> None:
"""
Discover available resources (tables) in the schema.
Uses SQLAlchemy's Inspector to reflect and retrieve all tables
in the configured schema with their metadata (type: table or view).
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
ConnectionError: If the connection is not established.
ListError: If the list operation fails.
"""
try:
inspector = inspect(self.linked_service.connection)
# Get all tables and views in the schema
table_names = sorted(inspector.get_table_names(schema=self.settings.schema))
view_names = sorted(inspector.get_view_names(schema=self.settings.schema))
# Build resource info list with metadata
tables_info = []
for table_name in table_names:
tables_info.append(
{
"TABLE_SCHEMA": self.settings.schema,
"TABLE_NAME": table_name,
"TABLE_TYPE": "BASE TABLE",
}
)
for view_name in view_names:
tables_info.append(
{
"TABLE_SCHEMA": self.settings.schema,
"TABLE_NAME": view_name,
"TABLE_TYPE": "VIEW",
}
)
# Per contract: self.output must be populated with discovered resources
self.output = pd.DataFrame(tables_info)
logger.info(f"Successfully listed {len(self.output)} tables in schema: {self.settings.schema}")
except ListError:
# Re-raise our own exception type
raise
except Exception as exc:
logger.error(f"Failed to list tables in schema: {exc}", exc_info=True)
raise ListError(
message=f"Failed to list tables in schema '{self.settings.schema}': {exc!s}",
status_code=500,
details={"schema": self.settings.schema},
) from exc
[docs]
def upsert(self, **_kwargs: Any) -> None:
"""
Insert or update rows in the target table.
This operation is not supported for SQL Server datasets at this time.
Args:
_kwargs: Additional keyword arguments (ignored).
Raises:
NotSupportedError: Always -- upsert is not supported.
"""
raise NotSupportedError(
message="Upsert operation is not supported for SQL Server datasets.",
details={"table": self.settings.table, "schema": self.settings.schema},
)
[docs]
def _get_table(self) -> Table:
"""
Get the SQLAlchemy Table object for the configured schema and table.
Returns:
Table: The SQLAlchemy Table object.
"""
schema_name = quoted_name(self.settings.schema, quote=True)
table_name = quoted_name(self.settings.table, quote=True)
metadata = MetaData(schema=schema_name)
return Table(
table_name,
metadata,
schema=schema_name,
autoload_with=self.linked_service.connection,
)
[docs]
@staticmethod
def _pandas_dtype_to_sqlalchemy(dtypes: pd.Series) -> dict[str, Any]:
"""
Convert pandas dtypes Series to a dict mapping column names to SQLAlchemy types.
Args:
dtypes: Pandas Series where index is column names and values are dtypes.
Returns:
dict[str, Any]: Dictionary mapping column names to SQLAlchemy types.
"""
dtype_map: dict[str, Any] = {}
for col_name, dtype in dtypes.items():
col_name_str = str(col_name)
if pd.api.types.is_integer_dtype(dtype):
if hasattr(dtype, "itemsize") and dtype.itemsize <= 2:
dtype_map[col_name_str] = Integer()
else:
dtype_map[col_name_str] = BigInteger()
elif pd.api.types.is_float_dtype(dtype):
dtype_map[col_name_str] = Float()
elif pd.api.types.is_bool_dtype(dtype):
dtype_map[col_name_str] = Boolean()
elif pd.api.types.is_datetime64_any_dtype(dtype):
dtype_map[col_name_str] = DATETIME2() # type: ignore
elif pd.api.types.is_string_dtype(dtype) or isinstance(dtype, pd.CategoricalDtype):
dtype_map[col_name_str] = String(length=255)
else:
dtype_map[col_name_str] = String(length=255)
return dtype_map
[docs]
def _validate_column(self, table: Table, column_name: str) -> None:
"""
Validate that a column exists in the table.
Args:
table: The SQLAlchemy Table object.
column_name: The name of the column to validate.
Raises:
ValueError: If the column doesn't exist in the table.
"""
if column_name not in table.c:
available_columns = list(table.c.keys())
raise ValueError(
f"Column '{column_name}' not found in table '{self.settings.table}'. Available columns: {available_columns}"
)
[docs]
def _validate_columns(self, table: Table, column_names: Sequence[str]) -> None:
"""
Validate that all requested columns exist in the reflected table.
Args:
table: Reflected SQLAlchemy table.
column_names: Column names to validate.
Returns:
None
Raises:
ValidationError: If one or more columns do not exist in the table.
"""
available_columns = list(table.c.keys())
missing_columns = list(dict.fromkeys(col for col in column_names if col not in table.c))
if missing_columns:
raise ValidationError(
message=f"Column(s) not found in table '{self.settings.table}'.",
details={
"table": self.settings.table,
"schema": self.settings.schema,
"missing_columns": missing_columns,
"available_columns": available_columns,
},
)
[docs]
def _build_select_columns(self, table: Table) -> Select[Any]:
"""
Build a SELECT statement for configured columns or all columns.
Args:
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with chosen columns.
Raises:
ValidationError: If any selected column does not exist.
"""
if self.settings.read.columns:
self._validate_columns(table, self.settings.read.columns)
selected_columns = [table.c[col_name] for col_name in self.settings.read.columns]
return select(*selected_columns)
return select(table)
[docs]
def _build_filters(self, stmt: Select[Any], table: Table) -> Select[Any]:
"""
Apply equality filters from read settings to the SELECT statement.
Args:
stmt: Current SELECT statement.
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with WHERE conditions applied.
Raises:
ValidationError: If any filter column does not exist.
"""
if not self.settings.read.filters:
return stmt
self._validate_columns(table, list(self.settings.read.filters.keys()))
filter_conditions = [table.c[col_name] == value for col_name, value in self.settings.read.filters.items()]
return stmt.where(and_(*filter_conditions))
[docs]
def _build_order_by(self, stmt: Select[Any], table: Table) -> Select[Any]:
"""
Apply ORDER BY clauses from read settings to the SELECT statement.
Args:
stmt: Current SELECT statement.
table: Reflected SQLAlchemy table.
Returns:
Select[Any]: SELECT statement with ORDER BY applied.
Raises:
ValidationError: If any order-by column does not exist.
"""
if not self.settings.read.order_by:
return stmt
order_columns = [
col_name if isinstance(order_spec, tuple) else order_spec
for order_spec in self.settings.read.order_by
for col_name in ([order_spec[0]] if isinstance(order_spec, tuple) else [order_spec])
]
self._validate_columns(table, order_columns)
order_clauses = []
for order_spec in self.settings.read.order_by:
if isinstance(order_spec, tuple):
col_name, direction = order_spec
col = table.c[col_name]
if direction.lower() == "desc":
order_clauses.append(desc(col))
else:
order_clauses.append(asc(col))
else:
order_clauses.append(asc(table.c[order_spec]))
return stmt.order_by(*order_clauses)
[docs]
def _quote_identifier(self, name: str) -> str:
"""
Quote identifiers safely for SQL Server using SQLAlchemy's identifier preparer.
Reject identifiers containing obvious injection primitives like quotes, semicolons,
or brackets before quoting.
Args:
name: The identifier name to quote.
Returns:
str: The safely quoted identifier.
Raises:
ValueError: If the identifier contains unsafe characters.
"""
if re.search(r"[;\"'\[\]]", name):
raise ValueError(f"Unsafe identifier: {name!r}")
preparer = self.linked_service.connection.dialect.identifier_preparer
return preparer.quote(name)
[docs]
def get_details(self) -> dict[str, Any]:
"""
Get details about the dataset.
Constructs and returns a dictionary containing metadata about the current
dataset configuration, including table name, schema name, and optional
query filters and delete settings.
Returns:
dict[str, Any]: A dictionary containing:
- table_name (str): The name of the target table
- schema_name (str): The schema containing the table
- query_filter (Any, optional): Filter criteria if specified
- delete_table (str, optional): Delete table setting if specified
"""
details: dict[str, Any] = {
"table_name": self.settings.table,
"schema_name": self.settings.schema,
}
read_settings = getattr(self.settings, "read", None)
if read_settings is not None and read_settings.filters is not None:
details["filters"] = read_settings.filters
delete_settings = getattr(self.settings, "delete", None)
if delete_settings is not None:
details["delete_table"] = str(delete_settings.delete_table)
return details
[docs]
@staticmethod
def _is_na_scalar(v: Any) -> bool:
"""
Check whether *v* is a scalar NA value (NaN, NaT, None, pd.NA).
``pd.isna()`` returns an array-like result for non-scalar inputs
(list, tuple, dict, ndarray), which makes a bare ``if pd.isna(v)``
raise ``ValueError: The truth value of an array is ambiguous``.
This helper guards against that by only calling ``pd.isna`` on
values that are known to be scalar.
Args:
v: Any value from a record dict.
Returns:
bool: ``True`` when *v* is a scalar NA-like value.
"""
if isinstance(v, (list, tuple, dict)):
return False
try:
return bool(pd.isna(v))
except (ValueError, TypeError):
return False
[docs]
@staticmethod
def _sanitize_records(records: Sequence[dict[Hashable, Any]]) -> Sequence[dict[Hashable, Any]]:
"""
Replace NaN and NaT values with None in record dicts.
SQL Server rejects ``float('nan')`` over the TDS/ODBC protocol with
*"The supplied value is not a valid instance of data type float"*.
Converting these sentinel values to ``None`` causes SQLAlchemy to emit
proper SQL ``NULL`` parameters instead.
Non-scalar values (lists, tuples, dicts, ndarrays) are left as-is
because ``pd.isna()`` returns an array-like result for them, which
cannot be evaluated as a boolean.
Args:
records: Row dicts produced by ``DataFrame.to_dict(orient="records")``.
Returns:
Sequence[dict[Hashable, Any]]: The same rows with NaN/NaT replaced by None.
"""
return [{k: (None if MsSqlTable._is_na_scalar(v) else v) for k, v in row.items()} for row in records]
[docs]
@staticmethod
def _get_identity_columns(table: Table) -> Sequence[str]:
"""
Return the names of identity (auto-increment) columns on *table*.
Args:
table: A reflected or constructed SQLAlchemy Table.
Returns:
Sequence[str]: Column names that have an identity property.
"""
return [col.name for col in table.columns if hasattr(col, "identity") and col.identity]
[docs]
def _set_identity_insert(self, conn: Any, *, enabled: bool) -> None:
"""
Toggle ``IDENTITY_INSERT`` for the configured table.
Args:
conn: Active SQLAlchemy connection.
enabled: ``True`` to turn identity insert ON, ``False`` for OFF.
"""
state = "ON" if enabled else "OFF"
table_ref = f"[{self.settings.schema}].[{self.settings.table}]"
logger.debug(f"Setting IDENTITY_INSERT {state} for {self.settings.schema}.{self.settings.table}")
conn.execute(text(f"SET IDENTITY_INSERT {table_ref} {state}"))
[docs]
def _copy_into_table(self, conn: Any, table: Table, content: pd.DataFrame) -> None:
"""
Insert rows from a DataFrame into a SQL Server table.
Handles identity-column awareness (toggling ``IDENTITY_INSERT``) and
sanitises NaN / NaT values so that SQL Server receives valid parameters.
Args:
conn: SQLAlchemy connection inside an active transaction.
table: SQLAlchemy Table object (metadata only).
content: DataFrame containing rows to insert.
"""
if content.empty:
return
logger.debug(f"Inserting {len(content)} rows into {self.settings.schema}.{self.settings.table}")
identity_columns = self._get_identity_columns(table)
needs_identity_insert = bool(identity_columns) and any(col in content.columns for col in identity_columns)
try:
if needs_identity_insert:
self._set_identity_insert(conn, enabled=True)
records = self._sanitize_records(content.to_dict(orient="records"))
conn.execute(insert(table), records)
finally:
if needs_identity_insert:
self._set_identity_insert(conn, enabled=False)
[docs]
def _resolve_create_primary_key_columns(
self,
content: pd.DataFrame,
) -> Sequence[str] | None:
"""
Resolve and validate create-time primary key columns.
Args:
content: Input DataFrame used for table creation.
Returns:
Sequence[str] | None: Primary key columns for new table creation.
Raises:
ValidationError: If `primary_key` is enabled but columns are invalid.
"""
if not self.settings.create.primary_key:
logger.debug("Create primary key disabled in settings.")
return None
if not self.settings.create.primary_key_columns:
logger.error("Create primary key is enabled but primary_key_columns is missing.")
raise ValidationError(
message="Missing primary key columns for create().",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"create_settings": self.settings.create.serialize(),
},
)
missing_columns = [col for col in self.settings.create.primary_key_columns if col not in content.columns]
if missing_columns:
logger.error("Create primary key columns missing from input: %s", missing_columns)
raise ValidationError(
message="Primary key columns do not exist in create input.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"missing_columns": missing_columns,
"primary_key_columns": list(self.settings.create.primary_key_columns),
},
)
logger.debug("Resolved create primary key columns: %s", list(self.settings.create.primary_key_columns))
return list(self.settings.create.primary_key_columns)
[docs]
def _validate_read_settings(self) -> None:
"""
Validate read settings before query construction.
Returns:
None
Raises:
ValidationError: If limit or order direction is invalid.
"""
read_settings = self.settings.read
if read_settings.limit is not None and read_settings.limit <= 0:
raise ValidationError(
message="Read limit must be greater than 0.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"limit": read_settings.limit,
},
)
if not read_settings.order_by:
return
invalid_order_specs: list[dict[str, str]] = []
for order_spec in read_settings.order_by:
if not isinstance(order_spec, tuple):
continue
col_name, direction = order_spec
if direction.lower() not in {"asc", "desc"}:
invalid_order_specs.append(
{
"column": col_name,
"direction": direction,
}
)
if invalid_order_specs:
raise ValidationError(
message="Invalid order_by direction. Use 'asc' or 'desc'.",
status_code=400,
details={
"table": self.settings.table,
"schema": self.settings.schema,
"invalid_order_by": invalid_order_specs,
},
)