ds_provider_postgresql_py_lib.dataset.postgresql

File: postgresql.py Region: ds_provider_postgresql_py_lib/dataset/postgresql

PostgreSQL Dataset

This module implements a dataset for PostgreSQL databases.

Example

>>> dataset = PostgreSQLDataset(
...     deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON),
...     serializer=PandasSerializer(format=DatasetStorageFormatType.JSON),
...     settings=PostgreSQLDatasetSettings(
...         table="users",
...         read=ReadSettings(
...             columns=["id", "name"],
...             filters={"status": "active"},
...             order_by=["created_at"],
...             limit=100,
...         )
...     ),
...     linked_service=PostgreSQLLinkedService(
...         settings=PostgreSQLLinkedServiceSettings(
...             uri="postgresql://user:password@localhost:5432/mydb",
...         ),
...     ),
... )
>>> dataset.read()
>>> data = dataset.output

Attributes

logger

PostgreSQLDatasetSettingsType

PostgreSQLLinkedServiceType

Classes

CreateSettings

Settings specific to the create() operation.

UpdateSettings

Settings specific to the update() operation.

UpsertSettings

Settings specific to the upsert() operation.

DeleteSettings

Settings specific to the delete() operation.

ReadSettings

Settings specific to the read() operation.

PurgeSettings

Settings specific to the purge() operation.

PostgreSQLDatasetSettings

Settings for PostgreSQL dataset operations.

PostgreSQLDataset

Tabular dataset object which identifies data within a data store,

Module Contents

ds_provider_postgresql_py_lib.dataset.postgresql.logger
class ds_provider_postgresql_py_lib.dataset.postgresql.CreateSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the create() operation.

These settings only apply when writing data to the database and do not affect read(), delete(), update(), or rename() operations.

index: bool = False

Whether to include the DataFrame index as columns during create() writes.

primary_key: bool = False

Whether to create a primary key when creating a new table.

primary_key_columns: collections.abc.Sequence[str] | None = None

Primary key columns to create when primary_key is enabled.

class ds_provider_postgresql_py_lib.dataset.postgresql.UpdateSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the update() operation.

identity_columns: collections.abc.Sequence[str]

Columns that uniquely identify each row.

class ds_provider_postgresql_py_lib.dataset.postgresql.UpsertSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the upsert() operation.

identity_columns: collections.abc.Sequence[str]

Columns that uniquely identify each row.

class ds_provider_postgresql_py_lib.dataset.postgresql.DeleteSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the delete() operation.

identity_columns: collections.abc.Sequence[str]

Columns that uniquely identify each row.

class ds_provider_postgresql_py_lib.dataset.postgresql.ReadSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the read() operation.

These settings only apply when reading data from the database and do not affect create(), delete(), update(), or rename() operations.

limit: int | None = None

The limit of the data to read.

columns: collections.abc.Sequence[str] | None = None

Specific columns to select. If None, selects all columns (*).

Example

columns=[“id”, “name”, “created_at”]

filters: dict[str, Any] | None = None

Dictionary of column filters for WHERE clause. Uses equality comparison.

Example

filters={“status”: “active”, “amount”: 100}

Multiple filters are combined with AND.

order_by: collections.abc.Sequence[str | tuple[str, str]] | None = None

Columns to order by. Can be: - List of column names (defaults to ascending) - List of (column_name, ‘asc’/’desc’) tuples

Example

order_by=[“created_at”] # ascending order_by=[(“created_at”, “desc”), “name”] # created_at desc, name asc

class ds_provider_postgresql_py_lib.dataset.postgresql.PurgeSettings[source]

Bases: ds_common_serde_py_lib.Serializable

Settings specific to the purge() operation.

drop_table: bool = False

Drop the table object instead of deleting rows.

cascade: bool = False

Apply CASCADE when dropping the table.

class ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDatasetSettings[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.DatasetSettings

Settings for PostgreSQL dataset operations.

The read settings contains read-specific configuration that only applies to the read() operation, not to create(), delete(), update(), etc.

schema: str = 'public'

Schema for dataset operations.

table: str

Table for dataset operations.

read: ReadSettings

Settings for read().

create: CreateSettings

Settings for create().

update: UpdateSettings | None = None

Settings for update().

upsert: UpsertSettings | None = None

Settings for upsert().

delete: DeleteSettings | None = None

Settings for delete().

purge: PurgeSettings

Settings for purge().

ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDatasetSettingsType
ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLLinkedServiceType
class ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDataset[source]

Bases: ds_resource_plugin_py_lib.common.resource.dataset.TabularDataset[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType, ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer, ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer], Generic[PostgreSQLLinkedServiceType, PostgreSQLDatasetSettingsType]

Tabular dataset object which identifies data within a data store, such as table/csv/json/parquet/parquetdataset/ and other documents.

The input of the dataset is a pandas DataFrame. The output of the dataset is a pandas DataFrame.

linked_service: PostgreSQLLinkedServiceType
settings: PostgreSQLDatasetSettingsType
serializer: ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer | None
deserializer: ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer | None
property type: ds_provider_postgresql_py_lib.enums.ResourceType

Get the type of the dataset.

Returns:

The dataset resource type.

Return type:

ResourceType

create(**_kwargs: Any) None[source]

Create/write data to the configured table.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

CreateError – If writing data fails.

read(**_kwargs: Any) None[source]

Read rows from the configured table into self.output.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

ReadError – If reading data fails.

delete(**_kwargs: Any) None[source]

Delete rows matching configured identity columns.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

DeleteError – If deleting rows fails.

update(**_kwargs: Any) None[source]

Update rows matching configured identity columns.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

UpdateError – If updating rows fails.

upsert(**_kwargs: Any) None[source]

Insert or update rows using PostgreSQL ON CONFLICT semantics.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

UpsertError – If upserting rows fails.

purge(**_kwargs: Any) None[source]

Purge table contents or drop the table.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

PurgeError – If purging table data fails.

list(**_kwargs: Any) None[source]

List operation is not supported for this provider.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

NotSupportedError – Always, as list is not supported.

rename(**_kwargs: Any) None[source]

Rename operation is not supported for this provider.

Parameters:

_kwargs – Additional keyword arguments for interface compatibility.

Returns:

None

Raises:

NotSupportedError – Always, as rename is not supported.

close() None[source]

Close the dataset and underlying linked service.

Returns:

None

_output_from_empty_input() pandas.DataFrame[source]

Build a consistent empty-operation output while preserving input schema.

Returns:

Empty dataframe or a schema-preserving input copy.

Return type:

pd.DataFrame

_get_table() sqlalchemy.Table[source]

Get the reflected SQLAlchemy table for configured schema and table.

Returns:

Reflected table object.

Return type:

Table

_build_table_from_input(content: pandas.DataFrame) sqlalchemy.Table[source]

Build a SQLAlchemy Table definition from input DataFrame dtypes.

Parameters:

content – Input DataFrame to build the table from.

Returns:

SQLAlchemy Table definition.

Return type:

Table

_resolve_create_primary_key_columns(content: pandas.DataFrame) collections.abc.Sequence[str] | None[source]

Resolve and validate create-time primary key columns.

Parameters:

content – Input DataFrame used for table creation.

Returns:

Primary key columns for new table creation.

Return type:

Sequence[str] | None

Raises:

ValidationError – If primary_key is enabled but columns are invalid.

_copy_into_table(conn: Any, table: sqlalchemy.Table, content: pandas.DataFrame) None[source]

Insert rows using PostgreSQL COPY.

_validate_columns(table: sqlalchemy.Table, column_names: collections.abc.Sequence[str]) None[source]

Validate that all requested columns exist in the reflected table.

Parameters:
  • table – Reflected SQLAlchemy table.

  • column_names – Column names to validate.

Returns:

None

Raises:

ValidationError – If one or more columns do not exist in the table.

_validate_read_settings() None[source]

Validate read settings before query construction.

Returns:

None

Raises:

ValidationError – If limit or order direction is invalid.

_build_select_columns(table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Build a SELECT statement for configured columns or all columns.

Parameters:

table – Reflected SQLAlchemy table.

Returns:

SELECT statement with chosen columns.

Return type:

Select[Any]

Raises:

ValidationError – If any selected column does not exist.

_build_filters(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply equality filters from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with WHERE conditions applied.

Return type:

Select[Any]

Raises:

ValidationError – If any filter column does not exist.

_build_order_by(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) sqlalchemy.sql.Select[Any][source]

Apply ORDER BY clauses from read settings to the SELECT statement.

Parameters:
  • stmt – Current SELECT statement.

  • table – Reflected SQLAlchemy table.

Returns:

SELECT statement with ORDER BY applied.

Return type:

Select[Any]

Raises:

ValidationError – If any order-by column does not exist.