Source code for ds_provider_postgresql_py_lib.utils.dataset_identity
"""
**File:** ``dataset_identity.py``
**Region:** ``ds_provider_postgresql_py_lib/utils/dataset_identity``
Dataset Identity Helpers
This module contains identity-column validation helpers used by dataset
mutation operations.
"""
from __future__ import annotations
from typing import TYPE_CHECKING
from ds_resource_plugin_py_lib.common.resource.errors import ValidationError
if TYPE_CHECKING:
from collections.abc import Sequence
import pandas as pd
from sqlalchemy import Table
[docs]
def validate_identity_columns(
table: Table,
identity_columns: Sequence[str],
content: pd.DataFrame,
) -> None:
"""
Ensure identity columns exist in both input and target table.
Args:
table: Reflected SQLAlchemy table.
identity_columns: Identity columns used for matching.
content: Input DataFrame.
Returns:
None
Raises:
ValidationError: If identity configuration is invalid.
"""
missing_in_input = [col for col in identity_columns if col not in content.columns]
if missing_in_input:
raise ValidationError(
message="Missing identity columns in input.",
details={"missing_columns": missing_in_input},
)
missing_in_table = [col for col in identity_columns if col not in table.c]
if missing_in_table:
raise ValidationError(
message="Identity columns do not exist in target table.",
details={"missing_columns": missing_in_table},
)
[docs]
def validate_duplicate_identity_rows(
content: pd.DataFrame,
identity_columns: Sequence[str],
) -> None:
"""
Ensure input does not contain duplicate identity values.
Args:
content: Input DataFrame.
identity_columns: Identity columns used for matching.
Returns:
None
Raises:
ValidationError: If duplicate identity rows are found.
"""
duplicate_mask = content.duplicated(subset=list(identity_columns), keep=False)
if duplicate_mask.any():
duplicate_count = int(duplicate_mask.sum())
raise ValidationError(
message="Duplicate identity rows found.",
details={
"identity_columns": list(identity_columns),
"duplicate_count": duplicate_count,
},
)