"""
**File:** ``postgresql.py``
**Region:** ``ds_provider_postgresql_py_lib/linked_service/postgresql``
PostgreSQL Linked Service
This module implements a linked service for PostgreSQL databases.
Example:
>>> linked_service = PostgreSQLLinkedService(
... settings=PostgreSQLLinkedServiceSettings(
... uri="postgresql://user:password@localhost:5432/mydb",
... ),
... )
>>> linked_service.connect()
"""
from dataclasses import dataclass, field
from typing import Generic, TypeVar
from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.linked_service import (
LinkedService,
LinkedServiceSettings,
)
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
AuthenticationError,
ConnectionError,
)
from sqlalchemy import Engine, create_engine, text
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.pool import Pool
from ..enums import ResourceType
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class PostgreSQLLinkedServiceSettings(LinkedServiceSettings):
"""
The object containing the PostgreSQL linked service settings.
"""
uri: str = field(metadata={"mask": True})
"""
PostgreSQL connection URI.
Format: postgresql://[user[:password]@][host][:port][/database][?param1=value1¶m2=value2]
Examples:
postgresql://user:password@localhost:5432/mydb
postgresql://user:password@localhost:5432/mydb?sslmode=require
postgresql://user@localhost/mydb
postgresql://localhost/mydb
"""
pool_size: int = 5
"""The size of the connection pool. Defaults to 5."""
max_overflow: int = 10
"""The maximum overflow connections allowed. Defaults to 10."""
pool_timeout: int = 30
"""The timeout in seconds for getting a connection from the pool. Defaults to 30."""
pool_recycle: int = 3600
"""The time in seconds after which a connection is recycled. Defaults to 3600."""
PostgreSQLLinkedServiceSettingsType = TypeVar(
"PostgreSQLLinkedServiceSettingsType",
bound=PostgreSQLLinkedServiceSettings,
)
[docs]
@dataclass(kw_only=True)
class PostgreSQLLinkedService(
LinkedService[PostgreSQLLinkedServiceSettingsType],
Generic[PostgreSQLLinkedServiceSettingsType],
):
"""
The class is used to connect with PostgreSQL database.
"""
settings: PostgreSQLLinkedServiceSettingsType
_engine: Engine | None = field(default=None, init=False, repr=False, metadata={"serialize": False})
"""The SQLAlchemy engine instance with connection pool."""
@property
def type(self) -> ResourceType:
"""
Get the type of the linked service.
Returns:
ResourceType
"""
return ResourceType.LINKED_SERVICE
@property
def engine(self) -> Engine | None:
"""
Get the SQLAlchemy engine instance.
Returns:
Engine | None: The engine if initialized, None otherwise.
"""
return self._engine
@property
def connection(self) -> Engine:
"""
Get the established backend connection object.
Returns:
Engine: The initialized SQLAlchemy engine.
Raises:
ConnectionError: If connect() has not been called successfully.
"""
if self._engine is None:
raise ConnectionError(
message="Connection has not been established. Call connect() first.",
details={"provider": self.type.value},
)
return self._engine
@property
def pool(self) -> Pool | None:
"""
Get the connection pool from the engine.
The pool is automatically created by SQLAlchemy when create_engine() is called
with pool parameters. All connections (via engine.connect(), engine.begin(), etc.)
automatically use this pool.
Returns:
Pool | None: The connection pool if the engine is initialized, None otherwise.
"""
return self._engine.pool if self._engine else None
[docs]
def connect(self) -> None:
"""
Connect to the PostgreSQL database and create a connection pool.
Returns:
None
"""
if self._engine is not None:
return
try:
self._engine = create_engine(
url=self.settings.uri,
pool_size=self.settings.pool_size,
max_overflow=self.settings.max_overflow,
pool_timeout=self.settings.pool_timeout,
pool_recycle=self.settings.pool_recycle,
)
with self._engine.connect():
pass
logger.debug("Connection pool created successfully.")
except SQLAlchemyError as exc:
self._engine = None
if self._is_authentication_error(exc):
raise AuthenticationError(
message=f"Failed to authenticate with PostgreSQL: {exc!s}",
details={"provider": self.type.value},
) from exc
raise ConnectionError(
message=f"Failed to establish PostgreSQL connection: {exc!s}",
details={"provider": self.type.value},
) from exc
[docs]
def test_connection(self) -> tuple[bool, str]:
"""
Test the connection to the PostgreSQL database.
Returns:
tuple[bool, str]: A tuple containing a boolean indicating success and a string message.
"""
try:
if self._engine is None:
self.connect()
if self._engine is None:
return False, "Failed to create engine"
with self._engine.begin() as conn:
result = conn.execute(text("SELECT 1"))
result.fetchone()
return True, "Connection successfully tested"
except Exception as exc:
return False, f"Connection test failed: {exc!s}"
[docs]
def _is_authentication_error(self, error: Exception) -> bool:
"""
Detect authentication failures from SQLAlchemy/driver exceptions.
Args:
error: The exception to check.
Returns:
bool: True if the error is an authentication error, False otherwise.
"""
sql_state = getattr(getattr(error, "orig", None), "pgcode", None)
if sql_state in {"28P01", "28000"}:
return True
message = str(error).lower()
auth_markers = (
"password authentication failed",
"authentication failed",
"invalid password",
"role does not exist",
)
return any(marker in message for marker in auth_markers)
[docs]
def close(self) -> None:
"""
Close the linked service.
"""
if self._engine is not None:
self._engine.dispose()
self._engine = None
logger.debug("Linked service closed successfully.")