ds_provider_microsoft_py_lib.dataset.mssql

File: mssql.py Region: ds_provider_microsoft_py_lib/dataset/mssql

MSSQL Table Dataset

This module implements a dataset for Microsoft SQL Server tables.

Example: >>> dataset = MsSqlTable( … linked_service=MsSqlLinkedService(…), … settings=MsSqlTableDatasetSettings( … table=”your_table_name”, … schema=”your_schema_name”, … ) … ) >>> dataset.read()

Attributes

logger

MsSqlTableDatasetSettingsType

MsSqlLinkedServiceType

Classes

ReadSettings

Settings specific to the read() operation.

CreateSettings

Settings specific to the create() operation.

MsSqlTableDatasetSettings

The object containing the settings of the dataset.

MsSqlTable

Tabular dataset object which identifies data within a data store,

Module Contents

ds_provider_microsoft_py_lib.dataset.mssql.logger
class ds_provider_microsoft_py_lib.dataset.mssql.ReadSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the read() operation.

These settings only apply when reading data from the database and do not affect create(), delete(), update(), or rename() operations.

limit: int | None = None

The limit of the data to read.

columns: collections.abc.Sequence[str] | None = None

Specific columns to select. If None, selects all columns (*).

Example

columns=[“id”, “name”, “created_at”]

filters: dict[str, Any] | None = None

Dictionary of column filters for WHERE clause. Uses equality comparison.

Example

filters={“status”: “active”, “amount”: 100}

Multiple filters are combined with AND.

order_by: collections.abc.Sequence[str | tuple[str, str]] | None = None

Columns to order by. Can be: - List of column names (defaults to ascending) - List of (column_name, ‘asc’/’desc’) tuples

Example

order_by=[“created_at”] # ascending order_by=[(“created_at”, “desc”), “name”] # created_at desc, name asc

class ds_provider_microsoft_py_lib.dataset.mssql.CreateSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the create() operation.

These settings only apply when writing data to the database and do not affect read(), delete(), update(), or rename() operations.

index: bool = False

Whether to include the index in the output.

primary_key: bool = False

Whether to create a primary key when creating a new table.

primary_key_columns: collections.abc.Sequence[str] | None = None

Primary key columns to create when primary_key is enabled.

class ds_provider_microsoft_py_lib.dataset.mssql.MsSqlTableDatasetSettings[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.DatasetSettings

The object containing the settings of the dataset.

table: str

Table name for dataset operations.

schema: str

Schema for dataset operations.

read: ReadSettings

Settings for read().

create: CreateSettings

Settings for create().

ds_provider_microsoft_py_lib.dataset.mssql.MsSqlTableDatasetSettingsType
ds_provider_microsoft_py_lib.dataset.mssql.MsSqlLinkedServiceType
class ds_provider_microsoft_py_lib.dataset.mssql.MsSqlTable[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.TabularDataset[MsSqlLinkedServiceType, MsSqlTableDatasetSettingsType, ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer, ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer], Generic[MsSqlLinkedServiceType, MsSqlTableDatasetSettingsType]

Tabular dataset object which identifies data within a data store, such as table/csv/json/parquet/parquetdataset/ and other documents.

The input of the dataset is a pandas DataFrame. The output of the dataset is a pandas DataFrame.

linked_service: MsSqlLinkedServiceType
settings: MsSqlTableDatasetSettingsType
serializer: ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer | None
deserializer: ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer | None
property type: ds_provider_microsoft_py_lib.enums.ResourceType

Get the type of the Dataset.

Returns:

ResourceType

create(**_kwargs: Any) None[source]

Create/write data to the specified table.

Writes self.input (pandas DataFrame) to the database table with the configured create settings (mode, etc.).

Parameters:

_kwargs – Additional keyword arguments to pass to the request.

Raises:
  • ConnectionError – If the connection fails.

  • CreateError – If the create operation fails.

read(**_kwargs: Any) None[source]

Read rows from the configured table into self.output.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

ReadError – If reading data fails.

purge(**_kwargs: Any) None[source]

Remove all content from the target table.

Drops the entire table, leaving the structure empty. Per contract, the target is empty after purge() returns. This is idempotent – purging an already-empty (or non-existent) table is a no-op.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • PurgeError – If the purge operation fails.

delete(**_kwargs: Any) None[source]

Delete specific rows from the target table.

Removes only the rows in self.input, matched by all columns as identity. Per contract: empty input is a no-op (returns immediately). Deleting a row that does not exist is not an error.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • DeleteError – If the delete operation fails.

update(**_kwargs: Any) None[source]

Update existing rows in the target table.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – update is not supported.

rename(**_kwargs: Any) None[source]

Rename a resource (table) in the backend.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – rename is not supported.

close() None[source]

Clean up the connection to the backend.

Per contract: must be safe to call multiple times and never raise.

Returns:

None

list(**_kwargs: Any) None[source]

Discover available resources (tables) in the schema.

Uses SQLAlchemy’s Inspector to reflect and retrieve all tables in the configured schema with their metadata (type: table or view).

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • ListError – If the list operation fails.

upsert(**_kwargs: Any) None[source]

Insert or update rows in the target table.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – upsert is not supported.

_get_table() sqlalchemy.Table[source]

Get the SQLAlchemy Table object for the configured schema and table.

Returns:

The SQLAlchemy Table object.

Return type:

Table

static _pandas_dtype_to_sqlalchemy(dtypes: pandas.Series) dict[str, Any][source]

Convert pandas dtypes Series to a dict mapping column names to SQLAlchemy types.

Parameters:

dtypes – Pandas Series where index is column names and values are dtypes.

Returns:

Dictionary mapping column names to SQLAlchemy types.

Return type:

dict[str, Any]

_validate_column(table: sqlalchemy.Table, column_name: str) None[source]

Validate that a column exists in the table.

Parameters:
  • table – The SQLAlchemy Table object.

  • column_name – The name of the column to validate.

Raises:

ValueError – If the column doesn’t exist in the table.

_validate_columns(table: sqlalchemy.Table, column_names: collections.abc.Sequence[str]) None[source]

Validate that all requested columns exist in the reflected table.

Parameters:
  • table – Reflected SQLAlchemy table.

  • column_names – Column names to validate.

Returns:

None

Raises:

ValidationError – If one or more columns do not exist in the table.

_build_select_columns(table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Build a SELECT statement for configured columns or all columns.

Parameters:

table – Reflected SQLAlchemy table.

Returns:

SELECT statement with chosen columns.

Return type:

Select[Any]

Raises:

ValidationError – If any selected column does not exist.

_build_filters(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply equality filters from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with WHERE conditions applied.

Return type:

Select[Any]

Raises:

ValidationError – If any filter column does not exist.

_build_order_by(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply ORDER BY clauses from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with ORDER BY applied.

Return type:

Select[Any]

Raises:

ValidationError – If any order-by column does not exist.

_quote_identifier(name: str) str[source]

Quote identifiers safely for SQL Server using SQLAlchemy’s identifier preparer.

Reject identifiers containing obvious injection primitives like quotes, semicolons, or brackets before quoting.

Parameters:

name – The identifier name to quote.

Returns:

The safely quoted identifier.

Return type:

str

Raises:

ValueError – If the identifier contains unsafe characters.

get_details() dict[str, Any][source]

Get details about the dataset.

Constructs and returns a dictionary containing metadata about the current dataset configuration, including table name, schema name, and optional query filters and delete settings.

Returns:

A dictionary containing:
  • table_name (str): The name of the target table

  • schema_name (str): The schema containing the table

  • query_filter (Any, optional): Filter criteria if specified

  • delete_table (str, optional): Delete table setting if specified

Return type:

dict[str, Any]

static _is_na_scalar(v: Any) bool[source]

Check whether v is a scalar NA value (NaN, NaT, None, pd.NA).

pd.isna() returns an array-like result for non-scalar inputs (list, tuple, dict, ndarray), which makes a bare if pd.isna(v) raise ValueError: The truth value of an array is ambiguous. This helper guards against that by only calling pd.isna on values that are known to be scalar.

Parameters:

v – Any value from a record dict.

Returns:

True when v is a scalar NA-like value.

Return type:

bool

static _sanitize_records(records: collections.abc.Sequence[dict[collections.abc.Hashable, Any]]) collections.abc.Sequence[dict[collections.abc.Hashable, Any]][source]

Replace NaN and NaT values with None in record dicts.

SQL Server rejects float('nan') over the TDS/ODBC protocol with “The supplied value is not a valid instance of data type float”. Converting these sentinel values to None causes SQLAlchemy to emit proper SQL NULL parameters instead.

Non-scalar values (lists, tuples, dicts, ndarrays) are left as-is because pd.isna() returns an array-like result for them, which cannot be evaluated as a boolean.

Parameters:

records – Row dicts produced by DataFrame.to_dict(orient="records").

Returns:

The same rows with NaN/NaT replaced by None.

Return type:

Sequence[dict[Hashable, Any]]

static _get_identity_columns(table: sqlalchemy.Table) collections.abc.Sequence[str][source]

Return the names of identity (auto-increment) columns on table.

Parameters:

table – A reflected or constructed SQLAlchemy Table.

Returns:

Column names that have an identity property.

Return type:

Sequence[str]

_set_identity_insert(conn: Any, *, enabled: bool) None[source]

Toggle IDENTITY_INSERT for the configured table.

Parameters:
  • conn – Active SQLAlchemy connection.

  • enabledTrue to turn identity insert ON, False for OFF.

_copy_into_table(conn: Any, table: sqlalchemy.Table, content: pandas.DataFrame) None[source]

Insert rows from a DataFrame into a SQL Server table.

Handles identity-column awareness (toggling IDENTITY_INSERT) and sanitises NaN / NaT values so that SQL Server receives valid parameters.

Parameters:
  • conn – SQLAlchemy connection inside an active transaction.

  • table – SQLAlchemy Table object (metadata only).

  • content – DataFrame containing rows to insert.

_resolve_create_primary_key_columns(content: pandas.DataFrame) collections.abc.Sequence[str] | None[source]

Resolve and validate create-time primary key columns.

Parameters:

content – Input DataFrame used for table creation.

Returns:

Primary key columns for new table creation.

Return type:

Sequence[str] | None

Raises:

ValidationError – If primary_key is enabled but columns are invalid.

_build_table_from_input(content: pandas.DataFrame) sqlalchemy.Table[source]

Build a SQLAlchemy Table definition from input DataFrame dtypes.

Parameters:

content – Input DataFrame to build the table from.

Returns:

SQLAlchemy Table definition.

Return type:

Table

_output_from_empty_input() pandas.DataFrame[source]

Build a consistent empty-operation output while preserving input schema.

Returns:

Empty dataframe or a schema-preserving input copy.

Return type:

pd.DataFrame

_validate_read_settings() None[source]

Validate read settings before query construction.

Returns:

None

Raises:

ValidationError – If limit or order direction is invalid.