Source code for ds_protocol_sftp_py_lib.linked_service.sftp

"""
**File**: `sftp.py`
**Region**: `src/ds_protocol_sftp_py_lib/linked_service/sftp`

SFTP Linked Service implementation.

This module defines the `SftpLinkedService` class, which implements a linked service for SFTP connections,
including connection management, error handling, and integration with the SFTP client.

Example:
    >>> import uuid
    >>> from ds_protocol_sftp_py_lib.linked_service import SftpLinkedService, SftpLinkedServiceSettings
    >>> linked_service = SftpLinkedService(
    ...     id=uuid.uuid4(),
    ...     name="example::linked_service",
    ...     version="1.0.0",
    ...     settings=SftpLinkedServiceSettings(
    ...         host="sftp.example.com",
    ...         username="user",
    ...         password="password123",
    ...         private_key=None,
    ...         passphrase=None,
    ...         timeout=30.0,
    ...         host_key_fingerprint="AbCdEfGhIjKlMnOpQrStUvWxYz0123456789abcdEf==",
    ...         host_key_validation=True,
    ...         port=22,
    ...     ),
    ... )
    >>> linked_service.connect()
"""

from dataclasses import dataclass, field
from typing import Generic, TypeVar

from ds_resource_plugin_py_lib.common.resource.linked_service import LinkedService, LinkedServiceSettings
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
    ConnectionError,
)

from ..enums import ResourceType
from ..utils.sftp.provider import Sftp


[docs] @dataclass(kw_only=True) class SftpLinkedServiceSettings(LinkedServiceSettings): """Settings for SFTP Linked Service connections. Attributes: host (str): SFTP server hostname. username (str): Username for authentication. password (str | None): Password for authentication. private_key (str | None): Private key for authentication. passphrase (str | None): Passphrase for private key. timeout (float | None): Connection timeout in seconds. host_key_fingerprint (str | None): Expected host key fingerprint. host_key_validation (bool): Whether to validate host key. port (int): SFTP server port. """ host: str """Hostname or IP address of the SFTP server.""" username: str """Username for authentication.""" password: str | None = field(default=None, metadata={"mask": True}) """Password for authentication.""" private_key: str | None = field(default=None, metadata={"mask": True}) """Private key for authentication.""" passphrase: str | None = field(default=None, metadata={"mask": True}) """Passphrase for private key.""" timeout: float | None = None """Connection timeout in seconds.""" host_key_fingerprint: str | None = None """Expected host key fingerprint (base64-encoded MD5, as produced by Paramiko's get_fingerprint(); e.g., 'AbCdEfGhIjKlMnOpQrStUvWxYz0123456789abcdEf==').""" host_key_validation: bool = True """Whether to validate host key.""" port: int = 22 """SFTP server port."""
SftpLinkedServiceSettingsType = TypeVar("SftpLinkedServiceSettingsType", bound=SftpLinkedServiceSettings)
[docs] @dataclass(kw_only=True) class SftpLinkedService( LinkedService[SftpLinkedServiceSettingsType], Generic[SftpLinkedServiceSettingsType], ): """SFTP Linked Service implementation. Attributes: settings (SftpLinkedServiceSettingsType): Linked service settings. _connection (SFTPClient | None): Underlying SFTP client connection. _sftp (Sftp | None): Sftp provider instance. """ settings: SftpLinkedServiceSettingsType _sftp: Sftp | None = field(default=None, init=False, repr=False, metadata={"serialize": False}) @property def type(self) -> ResourceType: """Get the type of linked service. Returns: ResourceType: The type of the linked service. """ return ResourceType.LINKED_SERVICE @property def connection(self) -> Sftp: """Get the SFTP client connection. Returns: Sftp: The active SFTP client connection. Raises: ConnectionError: If the connection is not initialized. """ if self._sftp is None: raise ConnectionError( message="Connection is not initialized", details={ "host": self.settings.host, "username": self.settings.username, "port": self.settings.port, "type": self.type.value, }, ) return self._sftp
[docs] def _init_sftp(self) -> Sftp: """Initialize the Sftp client. Returns: Sftp: An initialized Sftp provider instance. """ return Sftp()
[docs] def connect(self) -> None: """Initialize the Sftp client instance if not already initialized. Raises: ConnectionError: If connection fails. AuthenticationError: If authentication fails. """ if self._sftp is None: self._sftp = self._init_sftp() self._sftp.connect( host=self.settings.host, port=self.settings.port, username=self.settings.username, password=self.settings.password, passphrase=self.settings.passphrase, host_key_fingerprint=self.settings.host_key_fingerprint, pkey=self.settings.private_key, host_key_validation=self.settings.host_key_validation, timeout=self.settings.timeout, )
[docs] def test_connection(self) -> tuple[bool, str]: """Perform a lightweight health check against the SFTP backend. Uses the SFTP client's listdir method to check connectivity and authentication. Returns: tuple[bool, str]: - (True, message) if successful. - (False, error message) otherwise. """ try: self.connect() if self._sftp is None: return False, "SFTP connection is not initialized after connect()" directory = self._sftp.client.listdir(".") if directory is not None: return True, "Connection successfully tested" except Exception as exc: return False, f"Failed to connect to SFTP server, error: {exc}"
[docs] def close(self) -> None: """Close the linked service. Sets the _sftp attribute to None to indicate the connection is closed. Raises: ConnectionError: If closing the SFTP connection fails. """ if self._sftp: self._sftp.close() self._sftp = None