ds_provider_microsoft_py_lib.dataset

File: __init__.py Region: ds-provider-microsoft-py-lib/dataset

Dataset module for Microsoft provider.

Example: >>> dataset = MsSqlTable( … linked_service=MsSqlLinkedService(…), … settings=MsSqlTableDatasetSettings( … table=”your_table_name”, … schema=”your_schema_name”, … read=ReadSettings( … limit=10, … columns=[“id”, “color”, “score”, “active”], … filters={“active”: True}, … order_by=[(“score”, “desc”), “id”], … ), … ) … ) >>> dataset.read()

Submodules

Classes

MsSqlTable

Tabular dataset object which identifies data within a data store,

MsSqlTableDatasetSettings

The object containing the settings of the dataset.

Package Contents

class ds_provider_microsoft_py_lib.dataset.MsSqlTable[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.TabularDataset[MsSqlLinkedServiceType, MsSqlTableDatasetSettingsType, ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer, ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer], Generic[MsSqlLinkedServiceType, MsSqlTableDatasetSettingsType]

Tabular dataset object which identifies data within a data store, such as table/csv/json/parquet/parquetdataset/ and other documents.

The input of the dataset is a pandas DataFrame. The output of the dataset is a pandas DataFrame.

linked_service: MsSqlLinkedServiceType
settings: MsSqlTableDatasetSettingsType
serializer: ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer | None
deserializer: ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer | None
property type: ds_provider_microsoft_py_lib.enums.ResourceType

Get the type of the Dataset.

Returns:

ResourceType

create(**_kwargs: Any) None[source]

Create/write data to the specified table.

Writes self.input (pandas DataFrame) to the database table with the configured create settings (mode, etc.).

Parameters:

_kwargs – Additional keyword arguments to pass to the request.

Raises:
  • ConnectionError – If the connection fails.

  • CreateError – If the create operation fails.

read(**_kwargs: Any) None[source]

Read rows from the configured table into self.output.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

ReadError – If reading data fails.

purge(**_kwargs: Any) None[source]

Remove all content from the target table.

Drops the entire table, leaving the structure empty. Per contract, the target is empty after purge() returns. This is idempotent – purging an already-empty (or non-existent) table is a no-op.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • PurgeError – If the purge operation fails.

delete(**_kwargs: Any) None[source]

Delete specific rows from the target table.

Removes only the rows in self.input, matched by all columns as identity. Per contract: empty input is a no-op (returns immediately). Deleting a row that does not exist is not an error.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • DeleteError – If the delete operation fails.

update(**_kwargs: Any) None[source]

Update existing rows in the target table.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – update is not supported.

rename(**_kwargs: Any) None[source]

Rename a resource (table) in the backend.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – rename is not supported.

close() None[source]

Clean up the connection to the backend.

Per contract: must be safe to call multiple times and never raise.

Returns:

None

list(**_kwargs: Any) None[source]

Discover available resources (tables) in the schema.

Uses SQLAlchemy’s Inspector to reflect and retrieve all tables in the configured schema with their metadata (type: table or view).

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:
  • ConnectionError – If the connection is not established.

  • ListError – If the list operation fails.

upsert(**_kwargs: Any) None[source]

Insert or update rows in the target table.

This operation is not supported for SQL Server datasets at this time.

Parameters:

_kwargs – Additional keyword arguments (ignored).

Raises:

NotSupportedError – Always – upsert is not supported.

_get_table() sqlalchemy.Table[source]

Get the SQLAlchemy Table object for the configured schema and table.

Returns:

The SQLAlchemy Table object.

Return type:

Table

static _pandas_dtype_to_sqlalchemy(dtypes: pandas.Series) dict[str, Any][source]

Convert pandas dtypes Series to a dict mapping column names to SQLAlchemy types.

Parameters:

dtypes – Pandas Series where index is column names and values are dtypes.

Returns:

Dictionary mapping column names to SQLAlchemy types.

Return type:

dict[str, Any]

_validate_column(table: sqlalchemy.Table, column_name: str) None[source]

Validate that a column exists in the table.

Parameters:
  • table – The SQLAlchemy Table object.

  • column_name – The name of the column to validate.

Raises:

ValueError – If the column doesn’t exist in the table.

_validate_columns(table: sqlalchemy.Table, column_names: collections.abc.Sequence[str]) None[source]

Validate that all requested columns exist in the reflected table.

Parameters:
  • table – Reflected SQLAlchemy table.

  • column_names – Column names to validate.

Returns:

None

Raises:

ValidationError – If one or more columns do not exist in the table.

_build_select_columns(table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Build a SELECT statement for configured columns or all columns.

Parameters:

table – Reflected SQLAlchemy table.

Returns:

SELECT statement with chosen columns.

Return type:

Select[Any]

Raises:

ValidationError – If any selected column does not exist.

_build_filters(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply equality filters from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with WHERE conditions applied.

Return type:

Select[Any]

Raises:

ValidationError – If any filter column does not exist.

_build_order_by(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply ORDER BY clauses from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with ORDER BY applied.

Return type:

Select[Any]

Raises:

ValidationError – If any order-by column does not exist.

_quote_identifier(name: str) str[source]

Quote identifiers safely for SQL Server using SQLAlchemy’s identifier preparer.

Reject identifiers containing obvious injection primitives like quotes, semicolons, or brackets before quoting.

Parameters:

name – The identifier name to quote.

Returns:

The safely quoted identifier.

Return type:

str

Raises:

ValueError – If the identifier contains unsafe characters.

get_details() dict[str, Any][source]

Get details about the dataset.

Constructs and returns a dictionary containing metadata about the current dataset configuration, including table name, schema name, and optional query filters and delete settings.

Returns:

A dictionary containing:
  • table_name (str): The name of the target table

  • schema_name (str): The schema containing the table

  • query_filter (Any, optional): Filter criteria if specified

  • delete_table (str, optional): Delete table setting if specified

Return type:

dict[str, Any]

static _is_na_scalar(v: Any) bool[source]

Check whether v is a scalar NA value (NaN, NaT, None, pd.NA).

pd.isna() returns an array-like result for non-scalar inputs (list, tuple, dict, ndarray), which makes a bare if pd.isna(v) raise ValueError: The truth value of an array is ambiguous. This helper guards against that by only calling pd.isna on values that are known to be scalar.

Parameters:

v – Any value from a record dict.

Returns:

True when v is a scalar NA-like value.

Return type:

bool

static _sanitize_records(records: collections.abc.Sequence[dict[collections.abc.Hashable, Any]]) collections.abc.Sequence[dict[collections.abc.Hashable, Any]][source]

Replace NaN and NaT values with None in record dicts.

SQL Server rejects float('nan') over the TDS/ODBC protocol with “The supplied value is not a valid instance of data type float”. Converting these sentinel values to None causes SQLAlchemy to emit proper SQL NULL parameters instead.

Non-scalar values (lists, tuples, dicts, ndarrays) are left as-is because pd.isna() returns an array-like result for them, which cannot be evaluated as a boolean.

Parameters:

records – Row dicts produced by DataFrame.to_dict(orient="records").

Returns:

The same rows with NaN/NaT replaced by None.

Return type:

Sequence[dict[Hashable, Any]]

static _get_identity_columns(table: sqlalchemy.Table) collections.abc.Sequence[str][source]

Return the names of identity (auto-increment) columns on table.

Parameters:

table – A reflected or constructed SQLAlchemy Table.

Returns:

Column names that have an identity property.

Return type:

Sequence[str]

_set_identity_insert(conn: Any, *, enabled: bool) None[source]

Toggle IDENTITY_INSERT for the configured table.

Parameters:
  • conn – Active SQLAlchemy connection.

  • enabledTrue to turn identity insert ON, False for OFF.

_copy_into_table(conn: Any, table: sqlalchemy.Table, content: pandas.DataFrame) None[source]

Insert rows from a DataFrame into a SQL Server table.

Handles identity-column awareness (toggling IDENTITY_INSERT) and sanitises NaN / NaT values so that SQL Server receives valid parameters.

Parameters:
  • conn – SQLAlchemy connection inside an active transaction.

  • table – SQLAlchemy Table object (metadata only).

  • content – DataFrame containing rows to insert.

_resolve_create_primary_key_columns(content: pandas.DataFrame) collections.abc.Sequence[str] | None[source]

Resolve and validate create-time primary key columns.

Parameters:

content – Input DataFrame used for table creation.

Returns:

Primary key columns for new table creation.

Return type:

Sequence[str] | None

Raises:

ValidationError – If primary_key is enabled but columns are invalid.

_build_table_from_input(content: pandas.DataFrame) sqlalchemy.Table[source]

Build a SQLAlchemy Table definition from input DataFrame dtypes.

Parameters:

content – Input DataFrame to build the table from.

Returns:

SQLAlchemy Table definition.

Return type:

Table

_output_from_empty_input() pandas.DataFrame[source]

Build a consistent empty-operation output while preserving input schema.

Returns:

Empty dataframe or a schema-preserving input copy.

Return type:

pd.DataFrame

_validate_read_settings() None[source]

Validate read settings before query construction.

Returns:

None

Raises:

ValidationError – If limit or order direction is invalid.

class ds_provider_microsoft_py_lib.dataset.MsSqlTableDatasetSettings[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.DatasetSettings

The object containing the settings of the dataset.

table: str

Table name for dataset operations.

schema: str

Schema for dataset operations.

read: ReadSettings

Settings for read().

create: CreateSettings

Settings for create().