ds_provider_postgresql_py_lib.dataset

File: __init__.py Region: ds_provider_postgresql_py_lib/dataset

PostgreSQL Dataset

This module implements a dataset for PostgreSQL databases.

Example

>>> dataset = PostgreSQLDataset(
...     deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
...     serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
...     settings=PostgreSQLDatasetSettings(
...         table="users",
...         read=ReadSettings(
...             columns=["id", "name"],
...             filters={"status": "active"},
...             order_by=["created_at"],
...         ),
...     ),
...     linked_service=PostgreSQLLinkedService(
...         settings=PostgreSQLLinkedServiceSettings(
...             uri="postgresql://user:password@localhost:5432/mydb",
...         ),
...     ),
... )
>>> dataset.read()
>>> data = dataset.output

Submodules

Classes

PostgreSQLDataset

Tabular dataset object which identifies data within a data store,

PostgreSQLDatasetSettings

Settings for PostgreSQL dataset operations.

Package Contents

class ds_provider_postgresql_py_lib.dataset.PostgreSQLDataset[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.TabularDataset[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType, ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer, ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer], Generic[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType]

Tabular dataset object which identifies data within a data store, such as table/csv/json/parquet/parquetdataset/ and other documents.

The input of the dataset is a pandas DataFrame. The output of the dataset is a pandas DataFrame.

linked_service: PostgreSQLLinkedServiceType
settings: PostgreSQLDatasetSettingsType
serializer: ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer | None
deserializer: ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer | None
property type: ds_provider_postgresql_py_lib.enums.ResourceType

Get the type of the dataset.

Returns:

The dataset resource type.

Return type:

ResourceType

create(**_kwargs: Any) None[source]

Create/write data to the configured table.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

CreateError – If writing data fails.

read(**_kwargs: Any) None[source]

Read rows from the configured table into self.output.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

ReadError – If reading data fails.

delete(**_kwargs: Any) None[source]

Delete rows matching configured identity columns.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

DeleteError – If deleting rows fails.

update(**_kwargs: Any) None[source]

Update rows matching configured identity columns.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

UpdateError – If updating rows fails.

upsert(**_kwargs: Any) None[source]

Insert or update rows using PostgreSQL ON CONFLICT semantics.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

UpsertError – If upserting rows fails.

purge(**_kwargs: Any) None[source]

Purge table contents or drop the table.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

PurgeError – If purging table data fails.

list(**_kwargs: Any) None[source]

List operation is not supported for this provider.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

NotSupportedError – Always, as list is not supported.

rename(**_kwargs: Any) None[source]

Rename operation is not supported for this provider.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

NotSupportedError – Always, as rename is not supported.

close() None[source]

Close the dataset and underlying linked service.

Returns:

None

_output_from_empty_input() pandas.DataFrame[source]

Build a consistent empty-operation output while preserving input schema.

Returns:

Empty dataframe or a schema-preserving input copy.

Return type:

pd.DataFrame

_get_table() sqlalchemy.Table[source]

Get the reflected SQLAlchemy table for configured schema and table.

Returns:

Reflected table object.

Return type:

Table

_build_table_from_input(content: pandas.DataFrame) sqlalchemy.Table[source]

Build a SQLAlchemy Table definition from input DataFrame dtypes.

Parameters:

content – Input DataFrame to build the table from.

Returns:

SQLAlchemy Table definition.

Return type:

Table

_resolve_create_primary_key_columns(content: pandas.DataFrame) collections.abc.Sequence[str] | None[source]

Resolve and validate create-time primary key columns.

Parameters:

content – Input DataFrame used for table creation.

Returns:

Primary key columns for new table creation.

Return type:

Sequence[str] | None

Raises:

ValidationError – If primary_key is enabled but columns are invalid.

_copy_into_table(conn: Any, table: sqlalchemy.Table, content: pandas.DataFrame) None[source]

Insert rows using PostgreSQL COPY.

_validate_columns(table: sqlalchemy.Table, column_names: collections.abc.Sequence[str]) None[source]

Validate that all requested columns exist in the reflected table.

Parameters:
  • table – Reflected SQLAlchemy table.

  • column_names – Column names to validate.

Returns:

None

Raises:

ValidationError – If one or more columns do not exist in the table.

_validate_read_settings() None[source]

Validate read settings before query construction.

Returns:

None

Raises:

ValidationError – If limit or order direction is invalid.

_build_select_columns(table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Build a SELECT statement for configured columns or all columns.

Parameters:

table – Reflected SQLAlchemy table.

Returns:

SELECT statement with chosen columns.

Return type:

Select[Any]

Raises:

ValidationError – If any selected column does not exist.

_build_filters(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply equality filters from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with WHERE conditions applied.

Return type:

Select[Any]

Raises:

ValidationError – If any filter column does not exist.

_build_order_by(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply ORDER BY clauses from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with ORDER BY applied.

Return type:

Select[Any]

Raises:

ValidationError – If any order-by column does not exist.

class ds_provider_postgresql_py_lib.dataset.PostgreSQLDatasetSettings[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.DatasetSettings

Settings for PostgreSQL dataset operations.

The read settings contains read-specific configuration that only applies to the read() operation, not to create(), delete(), update(), etc.

schema: str = 'public'

Schema for dataset operations.

table: str

Table for dataset operations.

read: ReadSettings

Settings for read().

create: CreateSettings

Settings for create().

update: UpdateSettings | None = None

Settings for update().

upsert: UpsertSettings | None = None

Settings for upsert().

delete: DeleteSettings | None = None

Settings for delete().

purge: PurgeSettings

Settings for purge().