"""
**File:** ``graphql.py``
**Region:** ``ds_protocol_graphql_py_lib/dataset``
GraphQL dataset implementation for CRUD operations via GraphQL API.
Example:
>>> linked_service = HttpLinkedService(
... settings=HttpLinkedServiceSettings(
... host="https://api.example.graphql/graphql",
... auth_type=AuthType.NO_AUTH,
... ),
... id="service-id",
... name="graphql_service",
... version="1.0.0",
... )
>>> dataset = GraphqlDataset(
... linked_service=linked_service,
... settings=GraphqlDatasetSettings(
... url="https://api.example.graphql/graphql",
... read=GraphqlReadSettings(
... query="{ users { id name email } }"
... ),
... ),
... id="dataset-id",
... name="graphql_dataset",
... version="1.0.0",
... )
>>> dataset.read()
"""
from dataclasses import dataclass, field
from typing import Any, Generic, NoReturn, TypeVar
import pandas as pd # type: ignore[import-untyped]
from ds_common_logger_py_lib import Logger
from ds_common_serde_py_lib import Serializable
from ds_protocol_http_py_lib.dataset.http import HttpLinkedServiceType
from ds_resource_plugin_py_lib.common.resource.dataset import (
DatasetSettings,
DatasetStorageFormatType,
TabularDataset,
)
from ds_resource_plugin_py_lib.common.resource.dataset.errors import (
CreateError,
DeleteError,
ListError,
ReadError,
)
from ds_resource_plugin_py_lib.common.resource.errors import NotSupportedError
from ds_resource_plugin_py_lib.common.resource.linked_service.errors import (
ConnectionError,
)
from ds_resource_plugin_py_lib.common.serde.serialize import PandasSerializer
from ..enums import ResourceType
from ..serde.deserializer import GraphqlDeserializer
logger = Logger.get_logger(__name__, package=True)
[docs]
@dataclass(kw_only=True)
class GraphqlReadSettings(Serializable):
"""Settings specific to reading data from GraphQL API."""
query: str
""" The GraphQL query string to execute for reading data.
This should be a valid GraphQL query that the endpoint can execute to return the desired data.
For example: "{ users { id name email } }"
"""
variables: dict[str, Any] | None = None
""" Optional variables to include with the GraphQL query."""
operation_name: str | None = None
""" Optional operation name for the GraphQL query, used when the query contains multiple operations."""
[docs]
@dataclass(kw_only=True)
class GraphqlDeleteSettings(Serializable):
"""Settings specific to deleting data from GraphQL API."""
mutation: str
""" The GraphQL mutation string to execute for deleting data."""
identity_columns: list[str]
""" The list of column names in the input DataFrame that uniquely identify the rows to delete."""
variables: dict[str, Any] | None = None
""" Optional variables to include with the GraphQL mutation."""
operation_name: str | None = None
""" Optional operation name for the GraphQL mutation, used when the mutation contains multiple operations."""
[docs]
@dataclass(kw_only=True)
class GraphqlCreateSettings(Serializable):
"""Settings specific to creating data in GraphQL API."""
mutation: str
""" The GraphQL mutation string to execute for creating data.
This should be a valid GraphQL mutation that the endpoint can execute to create new records based on the input data.
For example: "mutation CreateUser($input: CreateUserInput!) { createUser(input: $input) { id name email } }"
"""
input_field: str # The field name for input variables (e.g., "input")
""" The name of the variable in the GraphQL mutation that will receive the input data."""
operation_name: str | None = None
""" Optional operation name for the GraphQL mutation, used when the mutation contains multiple operations."""
[docs]
@dataclass(kw_only=True)
class GraphqlDatasetSettings(DatasetSettings):
url: str
"""The URL of the GraphQL endpoint to connect to. This is the base URL where the GraphQL API is hosted."""
primary_keys: list[str] | None = None
"""Optional list of column names that serve as primary keys for the dataset.
This can be used for operations that require unique identification of rows."""
headers: dict[str, str] | None = None
"""Optional HTTP headers to include in requests to the GraphQL endpoint, such as authentication tokens or content type."""
read: GraphqlReadSettings | None = None
"""Settings for read operations."""
delete: GraphqlDeleteSettings | None = None
"""Settings for delete operations."""
create: GraphqlCreateSettings | None = None
"""Settings for create operations."""
GraphqlDatasetSettingsType = TypeVar(
"GraphqlDatasetSettingsType",
bound=GraphqlDatasetSettings,
)
[docs]
@dataclass(kw_only=True)
class GraphqlDataset(
TabularDataset[
HttpLinkedServiceType,
GraphqlDatasetSettingsType,
PandasSerializer,
GraphqlDeserializer,
],
Generic[HttpLinkedServiceType, GraphqlDatasetSettingsType],
):
"""
Represent Graphql dataset.
"""
settings: GraphqlDatasetSettingsType
linked_service: HttpLinkedServiceType
deserializer: GraphqlDeserializer | None = field(
default_factory=lambda: GraphqlDeserializer(format=DatasetStorageFormatType.JSON),
)
@property
def type(self) -> ResourceType:
return ResourceType.DATASET
@property
def supports_checkpoint(self) -> bool:
"""
Indicate whether this provider supports incremental loads via checkpointing.
GraphQL provider does not yet support checkpoint-based incremental loads.
All reads are full loads.
Returns:
False, indicating checkpointing is not supported.
"""
return False
[docs]
def read(self) -> None:
"""
Read Graphql dataset.
Sends a GraphQL query to the endpoint with the query, variables, and operation name
specified in settings.read. Populates self.output with the result as a DataFrame.
Handles various GraphQL response patterns via GraphqlDeserializer:
- Direct arrays: {"data": {"users": [...]}}
- Relay connections: {"data": {"users": {"edges": [{"node": {...}}]}}}
- Single objects: {"data": {"user": {...}}}
Returns:
None. The result is stored in self.output as a DataFrame.
Raises:
ConnectionError: If the linked service connection is not initialized.
ReadError: If read settings are not provided or if the GraphQL query fails.
"""
if self.linked_service.connection is None:
raise ConnectionError(message="Connection is not initialized.") from None
if not self.settings.read:
raise ReadError("GraphQL read settings must be provided in settings.read")
if not self.deserializer:
raise ReadError("Deserializer is not configured for GraphQL dataset")
try:
payload: dict[str, Any] = {
"query": self.settings.read.query,
}
if self.settings.read.variables:
payload["variables"] = self.settings.read.variables
if self.settings.read.operation_name:
payload["operationName"] = self.settings.read.operation_name
result = self.linked_service.connection.session.post(
url=self.settings.url,
json=payload,
headers=self.settings.headers,
)
result.raise_for_status()
response_data = result.json()
self._check_for_graphql_read_error(response_data)
self.output = self.deserializer.deserialize_graphql(response_data)
except ReadError:
raise
except Exception as e:
raise ReadError(
message=f"Failed to read from GraphQL: {e!s}",
details={"url": self.settings.url},
) from e
[docs]
@staticmethod
def _check_for_graphql_read_error(response_data: dict[str, Any]) -> None:
if "errors" in response_data:
base_message = "GraphQL query failed"
errors = response_data["errors"]
# Safely extract a human-readable message from the first error, if available.
if isinstance(errors, list) and errors:
first_error = errors[0]
if isinstance(first_error, dict):
error_message = first_error.get("message")
if isinstance(error_message, str) and error_message:
base_message = f"{base_message}: {error_message}"
elif isinstance(errors, dict):
error_message = errors.get("message")
if isinstance(error_message, str) and error_message:
base_message = f"{base_message}: {error_message}"
raise ReadError(
message=base_message,
details={"errors": errors},
)
[docs]
def create(self) -> None:
"""
Create new rows in the GraphQL endpoint using mutations.
Sends all rows in a single atomic GraphQL mutation request.
Populates self.output with the created rows.
Returns:
None. The result is stored in self.output as a DataFrame.
Raises:
ConnectionError: If the linked service connection is not initialized.
CreateError: If create settings are not provided or if the GraphQL mutation fails.
"""
# Per DATASET_CONTRACT: empty input is a no-op
if self.input is None or len(self.input) == 0:
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
if self.settings.create is None:
raise CreateError("Create settings must be provided in settings.create")
if not self.deserializer:
raise CreateError("Deserializer is not configured for GraphQL dataset")
self._validate_create_settings()
try:
rows_data = self.input.to_dict(orient="records")
input_value = rows_data[0] if len(rows_data) == 1 else rows_data
variables = {self.settings.create.input_field: input_value}
payload: dict[str, Any] = {"query": self.settings.create.mutation}
if variables:
payload["variables"] = variables
if self.settings.create.operation_name:
payload["operationName"] = self.settings.create.operation_name
result = self.linked_service.connection.session.post(
url=self.settings.url,
json=payload,
headers=self.settings.headers,
)
result.raise_for_status()
response_data = result.json()
if "errors" in response_data:
raise CreateError(
message="GraphQL create mutation failed",
details={
"input_data": rows_data,
"errors": response_data.get("errors"),
},
)
if "data" in response_data:
df_result = self.deserializer.deserialize_graphql(response_data)
self.output = df_result if not df_result.empty else self.input.copy()
else:
self.output = self.input.copy()
except CreateError:
raise
except Exception as e:
raise CreateError(
message=f"Failed to create rows via GraphQL: {e!s}",
details={
"url": self.settings.url,
"input_field": self.settings.create.input_field,
"row_count": len(self.input),
},
) from e
[docs]
def update(self) -> None:
"""
Update entity using Graphql.
"""
raise NotSupportedError("Update operation is not supported for Graphql dataset")
[docs]
def upsert(self) -> None:
"""
Upsert entity using Graphql.
"""
raise NotSupportedError("Upsert operation is not supported for Graphql dataset")
[docs]
def delete(self) -> None: # noqa: PLR0912
"""
Delete specific rows from the GraphQL endpoint using mutations.
Sends all rows in a single atomic GraphQL mutation request.
Populates self.output with the deleted rows.
Returns:
None. The result is stored in self.output as a DataFrame.
Raises:
ConnectionError: If the linked service connection is not initialized.
DeleteError: If delete settings are not provided, if identity columns are missing, or if the GraphQL mutation fails.
"""
# Per DATASET_CONTRACT: empty input is a no-op
if self.input is None or len(self.input) == 0:
self.output = self.input.copy() if self.input is not None else pd.DataFrame()
return
if self.linked_service.connection is None:
raise ConnectionError(message="Connection is not initialized.") from None
if not self.settings.delete:
raise DeleteError("GraphQL delete settings must be provided in settings.delete")
if not self.deserializer:
raise DeleteError("Deserializer is not configured for GraphQL dataset")
for col in self.settings.delete.identity_columns:
if col not in self.input.columns:
raise DeleteError(
message=f"Identity column '{col}' not found in input",
details={"available_columns": list(self.input.columns)},
)
try:
rows_data = self.input.to_dict(orient="records")
variables = {}
if len(rows_data) == 1:
# Single row: extract identity values
row = rows_data[0]
for col in self.settings.delete.identity_columns:
variables[col] = row[col]
else:
# Multiple rows: build array for each identity column
for col in self.settings.delete.identity_columns:
variables[col] = [row[col] for row in rows_data]
if self.settings.delete.variables:
variables.update(self.settings.delete.variables)
# Build GraphQL request payload
payload: dict[str, Any] = {
"query": self.settings.delete.mutation,
}
if variables:
payload["variables"] = variables
if self.settings.delete.operation_name:
payload["operationName"] = self.settings.delete.operation_name
# Execute the mutation (single atomic request)
result = self.linked_service.connection.session.post(
url=self.settings.url,
json=payload,
headers=self.settings.headers,
)
result.raise_for_status()
response_data = result.json()
if "errors" in response_data:
raise DeleteError(
message="GraphQL delete mutation failed",
details={
"input_data": rows_data,
"errors": response_data.get("errors"),
},
)
if "data" in response_data:
df_result = self.deserializer.deserialize_graphql(response_data)
self.output = df_result if not df_result.empty else self.input.copy()
else:
self.output = self.input.copy()
except DeleteError:
raise
except Exception as e:
raise DeleteError(
message=f"Failed to delete rows via GraphQL: {e!s}",
details={
"url": self.settings.url,
"identity_columns": self.settings.delete.identity_columns,
"row_count": len(self.input),
},
) from e
[docs]
def purge(self) -> NoReturn:
"""
Purge entity using Graphql.
"""
raise NotSupportedError("Purge operation is not supported for Graphql dataset")
[docs]
def rename(self) -> NoReturn:
"""
Rename entity using Graphql.
"""
raise NotSupportedError("Rename operation is not supported for Graphql dataset")
[docs]
def list(self) -> None:
"""
Discover available resources in the GraphQL schema via introspection.
Executes a GraphQL introspection query to fetch all available queries
and their arguments from the schema. Populates self.output with a DataFrame
containing resource metadata (name, type, description, etc.).
Returns:
None. The result is stored in self.output as a DataFrame.
Raises:
ConnectionError: If the linked service connection is not initialized.
ListError: If the GraphQL introspection query fails.
"""
if self.linked_service.connection is None:
raise ConnectionError(message="Connection is not initialized.") from None
client = self.linked_service.connection
# GraphQL introspection query to discover all queries
introspection_query = """
query IntrospectionQuery {
__schema {
queryType {
name
fields {
name
description
args {
name
type {
name
kind
}
}
}
}
}
}
"""
try:
result = client.session.post(
url=self.settings.url,
json={"query": introspection_query},
headers=self.settings.headers,
)
result.raise_for_status()
response_data = result.json()
if "errors" in response_data:
raise ListError(
message="GraphQL introspection query failed",
details={"errors": response_data.get("errors")},
)
# Extract query fields from the introspection response
query_type = response_data.get("data", {}).get("__schema", {}).get("queryType", {})
fields = query_type.get("fields", [])
resources = []
for field in fields:
arg_names = [arg.get("name") for arg in field.get("args", [])]
resources.append(
{
"name": field.get("name"),
"type": "query",
"description": field.get("description") or "",
"arguments": ", ".join(arg_names) if arg_names else None,
"arg_count": len(arg_names),
}
)
self.output = pd.DataFrame(resources)
except Exception as e:
raise ListError(
message=f"Failed to list GraphQL schema resources: {e!s}",
details={"url": self.settings.url},
) from e
[docs]
def close(self) -> None:
"""
Just to satisfy the contract - GraphQL dataset does not maintain persistent connections that require cleanup.
Returns:
None
"""
pass
[docs]
def _validate_create_settings(self) -> None:
"""
Validate create settings are properly configured.
Returns:
None: if settings are valid.
Raises:
CreateError: If any required create settings are missing or invalid.
"""
if self.linked_service.connection is None:
raise ConnectionError(message="Connection is not initialized.") from None
if not self.settings.create:
raise CreateError("GraphQL create settings must be provided in settings.create")
if not self.settings.create.mutation:
raise CreateError("GraphQL mutation must be provided in settings.create.mutation")
if not self.settings.create.input_field:
raise CreateError("Input field name must be provided in settings.create.input_field")