ds_provider_postgresql_py_lib.dataset.postgresql ================================================ .. py:module:: ds_provider_postgresql_py_lib.dataset.postgresql .. autoapi-nested-parse:: **File:** ``postgresql.py`` **Region:** ``ds_provider_postgresql_py_lib/dataset/postgresql`` PostgreSQL Dataset This module implements a dataset for PostgreSQL databases. .. rubric:: Example >>> dataset = PostgreSQLDataset( ... deserializer=PandasDeserializer(format=DatasetStorageFormatType.JSON), ... serializer=PandasSerializer(format=DatasetStorageFormatType.JSON), ... settings=PostgreSQLDatasetSettings( ... table="users", ... read=ReadSettings( ... columns=["id", "name"], ... filters={"status": "active"}, ... order_by=["created_at"], ... limit=100, ... ) ... ), ... linked_service=PostgreSQLLinkedService( ... settings=PostgreSQLLinkedServiceSettings( ... uri="postgresql://user:password@localhost:5432/mydb", ... ), ... ), ... ) >>> dataset.read() >>> data = dataset.output Attributes ---------- .. autoapisummary:: ds_provider_postgresql_py_lib.dataset.postgresql.logger ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDatasetSettingsType ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLLinkedServiceType Classes ------- .. autoapisummary:: ds_provider_postgresql_py_lib.dataset.postgresql.CreateSettings ds_provider_postgresql_py_lib.dataset.postgresql.UpdateSettings ds_provider_postgresql_py_lib.dataset.postgresql.UpsertSettings ds_provider_postgresql_py_lib.dataset.postgresql.DeleteSettings ds_provider_postgresql_py_lib.dataset.postgresql.ReadSettings ds_provider_postgresql_py_lib.dataset.postgresql.PurgeSettings ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDatasetSettings ds_provider_postgresql_py_lib.dataset.postgresql.PostgreSQLDataset Module Contents --------------- .. py:data:: logger .. py:class:: CreateSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the create() operation. These settings only apply when writing data to the database and do not affect read(), delete(), update(), or rename() operations. .. py:attribute:: index :type: bool :value: False Whether to include the DataFrame index as columns during create() writes. .. py:attribute:: primary_key :type: bool :value: False Whether to create a primary key when creating a new table. .. py:attribute:: primary_key_columns :type: collections.abc.Sequence[str] | None :value: None Primary key columns to create when `primary_key` is enabled. .. py:class:: UpdateSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the update() operation. .. py:attribute:: identity_columns :type: collections.abc.Sequence[str] Columns that uniquely identify each row. .. py:class:: UpsertSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the upsert() operation. .. py:attribute:: identity_columns :type: collections.abc.Sequence[str] Columns that uniquely identify each row. .. py:class:: DeleteSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the delete() operation. .. py:attribute:: identity_columns :type: collections.abc.Sequence[str] Columns that uniquely identify each row. .. py:class:: ReadSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the read() operation. These settings only apply when reading data from the database and do not affect create(), delete(), update(), or rename() operations. .. py:attribute:: limit :type: int | None :value: None The limit of the data to read. .. py:attribute:: columns :type: collections.abc.Sequence[str] | None :value: None Specific columns to select. If None, selects all columns (*). .. rubric:: Example columns=["id", "name", "created_at"] .. py:attribute:: filters :type: dict[str, Any] | None :value: None Dictionary of column filters for WHERE clause. Uses equality comparison. .. rubric:: Example filters={"status": "active", "amount": 100} Multiple filters are combined with AND. .. py:attribute:: order_by :type: collections.abc.Sequence[str | tuple[str, str]] | None :value: None Columns to order by. Can be: - List of column names (defaults to ascending) - List of (column_name, 'asc'/'desc') tuples .. rubric:: Example order_by=["created_at"] # ascending order_by=[("created_at", "desc"), "name"] # created_at desc, name asc .. py:class:: PurgeSettings Bases: :py:obj:`ds_common_serde_py_lib.Serializable` Settings specific to the purge() operation. .. py:attribute:: drop_table :type: bool :value: False Drop the table object instead of deleting rows. .. py:attribute:: cascade :type: bool :value: False Apply CASCADE when dropping the table. .. py:class:: PostgreSQLDatasetSettings Bases: :py:obj:`ds_resource_plugin_py_lib.common.resource.dataset.DatasetSettings` Settings for PostgreSQL dataset operations. The `read` settings contains read-specific configuration that only applies to the read() operation, not to create(), delete(), update(), etc. .. py:attribute:: schema :type: str :value: 'public' Schema for dataset operations. .. py:attribute:: table :type: str Table for dataset operations. .. py:attribute:: read :type: ReadSettings Settings for read(). .. py:attribute:: create :type: CreateSettings Settings for create(). .. py:attribute:: update :type: UpdateSettings | None :value: None Settings for update(). .. py:attribute:: upsert :type: UpsertSettings | None :value: None Settings for upsert(). .. py:attribute:: delete :type: DeleteSettings | None :value: None Settings for delete(). .. py:attribute:: purge :type: PurgeSettings Settings for purge(). .. py:data:: PostgreSQLDatasetSettingsType .. py:data:: PostgreSQLLinkedServiceType .. py:class:: PostgreSQLDataset Bases: :py:obj:`ds_resource_plugin_py_lib.common.resource.dataset.TabularDataset`\ [\ :py:obj:`PostgreSQLLinkedServiceType`\ , :py:obj:`PostgreSQLDatasetSettingsType`\ , :py:obj:`ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer`\ , :py:obj:`ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer`\ ], :py:obj:`Generic`\ [\ :py:obj:`PostgreSQLLinkedServiceType`\ , :py:obj:`PostgreSQLDatasetSettingsType`\ ] Tabular dataset object which identifies data within a data store, such as table/csv/json/parquet/parquetdataset/ and other documents. The input of the dataset is a pandas DataFrame. The output of the dataset is a pandas DataFrame. .. py:attribute:: linked_service :type: PostgreSQLLinkedServiceType .. py:attribute:: settings :type: PostgreSQLDatasetSettingsType .. py:attribute:: serializer :type: ds_resource_plugin_py_lib.common.serde.serialize.PandasSerializer | None .. py:attribute:: deserializer :type: ds_resource_plugin_py_lib.common.serde.deserialize.PandasDeserializer | None .. py:property:: type :type: ds_provider_postgresql_py_lib.enums.ResourceType Get the type of the dataset. :returns: The dataset resource type. :rtype: ResourceType .. py:method:: create(**_kwargs: Any) -> None Create/write data to the configured table. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises CreateError: If writing data fails. .. py:method:: read(**_kwargs: Any) -> None Read rows from the configured table into `self.output`. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises ReadError: If reading data fails. .. py:method:: delete(**_kwargs: Any) -> None Delete rows matching configured identity columns. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises DeleteError: If deleting rows fails. .. py:method:: update(**_kwargs: Any) -> None Update rows matching configured identity columns. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises UpdateError: If updating rows fails. .. py:method:: upsert(**_kwargs: Any) -> None Insert or update rows using PostgreSQL ON CONFLICT semantics. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises UpsertError: If upserting rows fails. .. py:method:: purge(**_kwargs: Any) -> None Purge table contents or drop the table. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises PurgeError: If purging table data fails. .. py:method:: list(**_kwargs: Any) -> None List operation is not supported for this provider. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises NotSupportedError: Always, as list is not supported. .. py:method:: rename(**_kwargs: Any) -> None Rename operation is not supported for this provider. :param _kwargs: Additional keyword arguments for interface compatibility. :returns: None :raises NotSupportedError: Always, as rename is not supported. .. py:method:: close() -> None Close the dataset and underlying linked service. :returns: None .. py:method:: _output_from_empty_input() -> pandas.DataFrame Build a consistent empty-operation output while preserving input schema. :returns: Empty dataframe or a schema-preserving input copy. :rtype: pd.DataFrame .. py:method:: _get_table() -> sqlalchemy.Table Get the reflected SQLAlchemy table for configured schema and table. :returns: Reflected table object. :rtype: Table .. py:method:: _build_table_from_input(content: pandas.DataFrame) -> sqlalchemy.Table Build a SQLAlchemy Table definition from input DataFrame dtypes. :param content: Input DataFrame to build the table from. :returns: SQLAlchemy Table definition. :rtype: Table .. py:method:: _resolve_create_primary_key_columns(content: pandas.DataFrame) -> collections.abc.Sequence[str] | None Resolve and validate create-time primary key columns. :param content: Input DataFrame used for table creation. :returns: Primary key columns for new table creation. :rtype: Sequence[str] | None :raises ValidationError: If `primary_key` is enabled but columns are invalid. .. py:method:: _copy_into_table(conn: Any, table: sqlalchemy.Table, content: pandas.DataFrame) -> None Insert rows using PostgreSQL COPY. .. py:method:: _validate_columns(table: sqlalchemy.Table, column_names: collections.abc.Sequence[str]) -> None Validate that all requested columns exist in the reflected table. :param table: Reflected SQLAlchemy table. :param column_names: Column names to validate. :returns: None :raises ValidationError: If one or more columns do not exist in the table. .. py:method:: _validate_read_settings() -> None Validate read settings before query construction. :returns: None :raises ValidationError: If limit or order direction is invalid. .. py:method:: _build_select_columns(table: sqlalchemy.Table) -> sqlalchemy.sql.Select[Any] Build a SELECT statement for configured columns or all columns. :param table: Reflected SQLAlchemy table. :returns: SELECT statement with chosen columns. :rtype: Select[Any] :raises ValidationError: If any selected column does not exist. .. py:method:: _build_filters(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) -> sqlalchemy.sql.Select[Any] Apply equality filters from read settings to the SELECT statement. :param stmt: Current SELECT statement. :param table: Reflected SQLAlchemy table. :returns: SELECT statement with WHERE conditions applied. :rtype: Select[Any] :raises ValidationError: If any filter column does not exist. .. py:method:: _build_order_by(stmt: sqlalchemy.sql.Select[Any], table: sqlalchemy.Table) -> sqlalchemy.sql.Select[Any] Apply ORDER BY clauses from read settings to the SELECT statement. :param stmt: Current SELECT statement. :param table: Reflected SQLAlchemy table. :returns: SELECT statement with ORDER BY applied. :rtype: Select[Any] :raises ValidationError: If any order-by column does not exist.