Source code for ds_provider_microsoft_py_lib.linked_service.mssql

"""
**File:** ``mssql.py``
**Region:** ``ds_provider_microsoft_py_lib/linked_service/mssql``

Microsoft SQL Linked Service

This module implements a linked service for Microsoft SQL, allowing users to connect to and interact with
SQL Server instance.

Example:
>>>linked_service = MsSqlLinkedService(
...        settings=MsSqlLinkedServiceSettings(
...            server="account name",
...            database="database",
...            username="username",
...            password="password",
...        ),
...        id=uuid.uuid4(),
...        name="testmssqlpackage",
...        version="0.0.1",
...        description="testmssqlpackage"
...    )
>>> linked_service.connect()
"""

from dataclasses import dataclass, field
from typing import Generic, TypeVar
from urllib.parse import quote_plus

from ds_common_logger_py_lib import Logger
from ds_resource_plugin_py_lib.common.resource.linked_service import LinkedService, LinkedServiceSettings
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
    AuthenticationError,
    ConnectionError,
)
from sqlalchemy import create_engine, text
from sqlalchemy.engine import Engine
from sqlalchemy.exc import ArgumentError, OperationalError

from ..enums import ResourceType

logger = Logger.get_logger(__name__, package=True)


[docs] @dataclass(kw_only=True) class MsSqlLinkedServiceSettings(LinkedServiceSettings): """ The object containing the Microsoft SQL Server linked service settings. """ server: str """The hostname or IP address of the SQL Server instance.""" database: str """The name of the database to connect to.""" username: str """The username for authentication.""" password: str = field(metadata={"mask": True}) """The password for authentication. This field is masked in logs and serialized output.""" port: int = 1433 """The port number for the SQL Server instance. Defaults to 1433, the standard port for SQL Server.""" driver: str = "ODBC Driver 18 for SQL Server" """The ODBC driver to use for the connection. Defaults to "ODBC Driver 18 for SQL Server" """ encrypt: bool = True """Whether to encrypt the connection. Defaults to True.""" trust_server_certificate: bool = False """Whether to trust the server certificate when encrypting. Defaults to False.""" connection_timeout: int = 30 """The connection timeout in seconds. Defaults to 30."""
MsSqlLinkedServiceSettingsType = TypeVar( "MsSqlLinkedServiceSettingsType", bound=MsSqlLinkedServiceSettings, )
[docs] @dataclass(kw_only=True) class MsSqlLinkedService(LinkedService[MsSqlLinkedServiceSettingsType], Generic[MsSqlLinkedServiceSettingsType]): """ Linked service for connecting to Microsoft SQL Server. This linked service manages connections to SQL Server databases. It handles authentication, connection lifecycle, and error handling according to the linked service contract. Example: >>> settings = MsSqlLinkedServiceSettings( ... server="localhost", ... database="mydb", ... username="user", ... password="pass" ... ) >>> service = MsSqlLinkedService( ... settings=settings, ... id=uuid.uuid4(), ... name="my_mssql", ... version="0.0.1" ... ) >>> service.connect() >>> with service as svc: ... data = svc.connection.execute(...) """ settings: MsSqlLinkedServiceSettingsType _connection: Engine | None = field(default=None, init=False, repr=False, metadata={"serialize": False}) """The SQLAlchemy Engine instance representing the connection to the SQL Server database."""
[docs] def check_settings_is_set(self) -> None: """ Check if settings are set correctly. Returns: None Raises: AttributeError: If settings are not set correctly. """ if not isinstance(self.settings, MsSqlLinkedServiceSettings): raise AttributeError("settings not set.")
@property def connection(self) -> Engine: """ Get the backend connection (SQLAlchemy Engine). Returns: Engine: The SQLAlchemy Engine instance. Raises: ConnectionError: If connect() has not been called. """ if self._connection is None: raise ConnectionError( message="Connection not established. Call connect() first.", details={"server": self.settings.server, "database": self.settings.database}, ) return self._connection @connection.setter def connection(self, value: Engine | None) -> None: """ Set the backend connection (for testing purposes). Args: value: The Engine instance or None. """ self._connection = value @property def type(self) -> ResourceType: """ Get the type of the linked service. Returns: ResourceType """ return ResourceType.MICROSOFT_SQL_LINKED_SERVICE
[docs] def _get_connection_string(self) -> str: """ Build the ODBC connection string. Returns: str: The ODBC connection string. """ conn_str = ( f"DRIVER={{{self.settings.driver}}};" f"SERVER={self.settings.server},{self.settings.port};" f"DATABASE={self.settings.database};" f"UID={self.settings.username};" f"PWD={self.settings.password};" f"Encrypt={'yes' if self.settings.encrypt else 'no'};" f"TrustServerCertificate={'yes' if self.settings.trust_server_certificate else 'no'};" f"Connection Timeout={self.settings.connection_timeout};" ) return conn_str
[docs] def _create_engine(self) -> Engine: """ Connect to SQL Server and return SQLAlchemy Engine. Returns: Engine: The SQLAlchemy Engine instance. Raises: ConnectionError: If the engine cannot be created. AuthenticationError: If credentials are invalid. """ logger.debug("Creating SQLAlchemy engine for SQL Server...") try: conn_str = self._get_connection_string() url = f"mssql+pyodbc:///?odbc_connect={quote_plus(conn_str)}" engine = create_engine(url, echo=False, fast_executemany=True) logger.debug("SQLAlchemy engine created successfully.") return engine except ArgumentError as exc: # This typically indicates connection string or configuration issues logger.error(f"Invalid connection string or configuration: {exc}", exc_info=True) raise ConnectionError( message=f"Failed to create database engine: {exc!s}", details={ "server": self.settings.server, "port": self.settings.port, "database": self.settings.database, }, ) from exc except Exception as exc: logger.error(f"Unexpected error creating engine: {exc}", exc_info=True) raise ConnectionError( message=f"Failed to create database engine: {exc!s}", details={ "server": self.settings.server, "port": self.settings.port, "database": self.settings.database, }, ) from exc
[docs] def connect(self) -> None: """ Establish a connection to Microsoft SQL Server. The result is stored internally and accessible via the `connection` property. Returns: None Raises: ConnectionError: If the connection cannot be established. AuthenticationError: If credentials are invalid. Rules: - Idempotent: Calling connect() on an already-connected service reuses the connection. - Must authenticate using credentials from self.settings. - Must fail loudly if connection cannot be established. """ # Idempotent: reuse existing connection if self._connection is not None: logger.debug("Connection already established, reusing.") return self.check_settings_is_set() try: # Create the engine engine = self._create_engine() # Test the connection before storing it logger.debug("Testing connection to SQL Server...") with engine.connect() as conn: conn.execute(text("SELECT 1")) self._connection = engine logger.info( f"Successfully connected to SQL Server: {self.settings.server}:{self.settings.port}/{self.settings.database}" ) except OperationalError as exc: # OperationalError typically indicates authentication or connection issues error_str = str(exc).lower() if "login failed" in error_str or "authentication" in error_str: logger.error(f"Authentication failed: {exc}", exc_info=True) raise AuthenticationError( message=f"Authentication failed for user '{self.settings.username}': {exc!s}", details={ "server": self.settings.server, "database": self.settings.database, "username": self.settings.username, }, ) from exc else: logger.error(f"Connection failed: {exc}", exc_info=True) raise ConnectionError( message=f"Failed to connect to SQL Server: {exc!s}", details={ "server": self.settings.server, "port": self.settings.port, "database": self.settings.database, }, ) from exc except (ConnectionError, AuthenticationError): # Re-raise our own exception types raise except Exception as exc: logger.error(f"Unexpected error during connection: {exc}", exc_info=True) raise ConnectionError( message=f"Failed to connect to SQL Server: {exc!s}", details={ "server": self.settings.server, "port": self.settings.port, "database": self.settings.database, }, ) from exc
[docs] def test_connection(self) -> tuple[bool, str]: """ Verify that the connection to Microsoft SQL Server is healthy. Performs a lightweight check against the backend (a simple SELECT 1 query). This method does not raise on connection failure -- instead returns (False, "error message"). Exceptions are reserved for unexpected internal errors. Returns: tuple[bool, str] -- On success: (True, ""). On failure: (False, "reason"). Rules: - Must not raise on connection failure. - Must not modify any data. - Should complete quickly. - Idempotent: Yes. """ try: if self._connection is not None: # Use the existing connection for the health check with self._connection.connect() as conn: result = conn.execute(text("SELECT 1")) result.fetchone() else: # Create a temporary engine to test without mutating state engine = self._create_engine() try: with engine.connect() as conn: result = conn.execute(text("SELECT 1")) result.fetchone() finally: engine.dispose() logger.debug("Connection test successful.") return True, "" except Exception as exc: logger.error(f"Failed to test connection: {exc}", exc_info=True) return False, f"Connection test failed: {exc!s}"
[docs] def close(self) -> None: """ Release connections, sessions, or handles held by the linked service. This method is safe to call multiple times and does not raise even if the connection is already closed. Called automatically by `__exit__` when using a context manager. Returns: None Rules: - Must release any open connections, sessions, or handles. - Must not raise if the connection is already closed. - Must be safe to call multiple times. - Idempotent: Yes. """ try: if self._connection is not None: self._connection.dispose() self._connection = None logger.debug("Connection to SQL Server closed.") except Exception as exc: logger.error(f"Error closing connection: {exc}", exc_info=True)