Source code for ds_provider_postgresql_py_lib.linked_service.postgresql

"""
**File:** ``postgresql.py``
**Region:** ``ds_provider_postgresql_py_lib/linked_service/postgresql``

PostgreSQL Linked Service

This module implements a linked service for PostgreSQL databases.

Example:
    >>> linked_service = PostgreSQLLinkedService(
    ...     settings=PostgreSQLLinkedServiceSettings(
    ...         uri="postgresql://user:password@localhost:5432/mydb",
    ...     ),
    ... )
    >>> linked_service.connect()
"""

from dataclasses import dataclass, field
from typing import Generic, TypeVar

from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.linked_service import (
    LinkedService,
    LinkedServiceSettings,
)
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
    AuthenticationError,
    ConnectionError,
)
from sqlalchemy import Engine, create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import Pool

from ..enums import ResourceType

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class PostgreSQLLinkedServiceSettings(LinkedServiceSettings): """ The object containing the PostgreSQL linked service settings. """ uri: str = field(metadata={"mask": True}) """ PostgreSQL connection URI. Format: postgresql://[user[:password]@][host][:port][/database][?param1=value1&param2=value2] Examples: postgresql://user:password@localhost:5432/mydb postgresql://user:password@localhost:5432/mydb?sslmode=require postgresql://user@localhost/mydb postgresql://localhost/mydb """ pool_size: int = 5 """The size of the connection pool. Defaults to 5.""" max_overflow: int = 10 """The maximum overflow connections allowed. Defaults to 10.""" pool_timeout: int = 30 """The timeout in seconds for getting a connection from the pool. Defaults to 30.""" pool_recycle: int = 3600 """The time in seconds after which a connection is recycled. Defaults to 3600."""
PostgreSQLLinkedServiceSettingsType = TypeVar( "PostgreSQLLinkedServiceSettingsType", bound=PostgreSQLLinkedServiceSettings, )
[docs] @dataclass(kw_only=True) class PostgreSQLLinkedService( LinkedService[PostgreSQLLinkedServiceSettingsType], Generic[PostgreSQLLinkedServiceSettingsType], ): """ The class is used to connect with PostgreSQL database. """ settings: PostgreSQLLinkedServiceSettingsType _engine: Engine | None = field(default=None, init=False, repr=False, metadata={"serialize": False}) """The SQLAlchemy engine instance with connection pool.""" @property def type(self) -> ResourceType: """ Get the type of the linked service. Returns: ResourceType """ return ResourceType.LINKED_SERVICE @property def engine(self) -> Engine | None: """ Get the SQLAlchemy engine instance. Returns: Engine | None: The engine if initialized, None otherwise. """ return self._engine @property def connection(self) -> Engine: """ Get the established backend connection object. Returns: Engine: The initialized SQLAlchemy engine. Raises: ConnectionError: If connect() has not been called successfully. """ if self._engine is None: raise ConnectionError( message="Connection has not been established. Call connect() first.", details={"provider": self.type.value}, ) return self._engine @property def pool(self) -> Pool | None: """ Get the connection pool from the engine. The pool is automatically created by SQLAlchemy when create_engine() is called with pool parameters. All connections (via engine.connect(), engine.begin(), etc.) automatically use this pool. Returns: Pool | None: The connection pool if the engine is initialized, None otherwise. """ return self._engine.pool if self._engine else None
[docs] def connect(self) -> None: """ Connect to the PostgreSQL database and create a connection pool. Returns: None """ if self._engine is not None: return try: self._engine = create_engine( url=self.settings.uri, pool_size=self.settings.pool_size, max_overflow=self.settings.max_overflow, pool_timeout=self.settings.pool_timeout, pool_recycle=self.settings.pool_recycle, ) with self._engine.connect(): pass logger.debug("Connection pool created successfully.") except SQLAlchemyError as exc: self._engine = None if self._is_authentication_error(exc): raise AuthenticationError( message=f"Failed to authenticate with PostgreSQL: {exc!s}", details={"provider": self.type.value}, ) from exc raise ConnectionError( message=f"Failed to establish PostgreSQL connection: {exc!s}", details={"provider": self.type.value}, ) from exc
[docs] def test_connection(self) -> tuple[bool, str]: """ Test the connection to the PostgreSQL database. Returns: tuple[bool, str]: A tuple containing a boolean indicating success and a string message. """ try: if self._engine is None: self.connect() if self._engine is None: return False, "Failed to create engine" with self._engine.begin() as conn: result = conn.execute(text("SELECT 1")) result.fetchone() return True, "Connection successfully tested" except Exception as exc: return False, f"Connection test failed: {exc!s}"
[docs] def _is_authentication_error(self, error: Exception) -> bool: """ Detect authentication failures from SQLAlchemy/driver exceptions. Args: error: The exception to check. Returns: bool: True if the error is an authentication error, False otherwise. """ sql_state = getattr(getattr(error, "orig", None), "pgcode", None) if sql_state in {"28P01", "28000"}: return True message = str(error).lower() auth_markers = ( "password authentication failed", "authentication failed", "invalid password", "role does not exist", ) return any(marker in message for marker in auth_markers)
[docs] def close(self) -> None: """ Close the linked service. """ if self._engine is not None: self._engine.dispose() self._engine = None logger.debug("Linked service closed successfully.")