Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 118 additions & 1 deletion docs/source/feature_operational_metadata.rst
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,121 @@ The below example illustrates the default configuration for a generic bronze and
}
}
]
}
}

Disabling Operational Metadata in a Dataflow Spec
-------------------------------------------------
You can disable operational metadata for a dataflow spec or for a target table.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to make it super clear would it be better to update this to "You can disable operational metadata at a dataflow spec level or for specific target tables" or something like that to make it clear it's two different scopes not two separate items


Disabling at Dataflow Spec Level
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use the ``features`` object to disable operational metadata for a dataflow spec.

.. tabs::

.. tab:: JSON

.. code-block:: json
:emphasize-lines: 5,6,7

{
"dataFlowId": "feature_materialized_views",
"dataFlowGroup": "feature_samples",
"dataFlowType": "materialized_view",
"features": {
"operationalMetadataEnabled": false
}
}

.. tab:: YAML

.. code-block:: yaml
:emphasize-lines: 4,5

dataFlowId: feature_materialized_views
dataFlowGroup: feature_samples
dataFlowType: materialized_view
features:
operationalMetadataEnabled: false

Disabling at Target Table Level
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Use the ``configFlags`` array to disable operational metadata for a target table.

.. tabs::

.. tab:: JSON

.. code-block:: json
:emphasize-lines: 24

{
"dataFlowId": "crm_1",
"dataFlowGroup": "crm",
"dataFlowType": "standard",
"sourceType": "delta",
"sourceSystem": "crm",
"sourceViewName": "v_customer_address",
"sourceDetails": {
"database": "source_db",
"table": "customer_address",
"cdfEnabled": true,
"schemaPath": "schemas/customer_address.json"
},
"mode": "stream",
"targetFormat": "delta",
"targetDetails": {
"table": "customer_address",
"tableProperties": {
"delta.autoOptimize.optimizeWrite": "true",
"delta.autoOptimize.autoCompact": "true"
},
"partitionColumns": ["country_code"],
"schemaPath": "schemas/customer_address.json",
"configFlags": ["disableOperationalMetadata"]
},
"dataQualityExpectationsEnabled": true,
"quarantineMode": "table",
"quarantineTargetDetails": {
"targetFormat": "delta",
"table": "customer_address_quarantine",
"tableProperties": {}
}
}

.. tab:: YAML

.. code-block:: yaml
:emphasize-lines: 22,23

dataFlowId: crm_1
dataFlowGroup: crm
dataFlowType: standard
sourceType: delta
sourceSystem: crm
sourceViewName: v_customer_address
sourceDetails:
database: source_db
table: customer_address
cdfEnabled: true
schemaPath: schemas/customer_address.json
mode: stream
targetFormat: delta
targetDetails:
table: customer_address
tableProperties:
delta.autoOptimize.optimizeWrite: 'true'
delta.autoOptimize.autoCompact: 'true'
partitionColumns:
- country_code
schemaPath: schemas/customer_address.json
configFlags:
- disableOperationalMetadata
dataQualityExpectationsEnabled: true
quarantineMode: table
quarantineTargetDetails:
targetFormat: delta
table: customer_address_quarantine
tableProperties: {}
9 changes: 1 addition & 8 deletions src/dataflow/dataflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,13 +110,6 @@ def _init_target_details(self):
self.logger.info(f"Initializing DataFlow for target schema: {self.target_database}, {log_target}")
self.logger.debug(f"Target Details: {self.target_details.__dict__}")

# Add operational metadata columns to the schema
if not hasattr(self.target_details, 'schema') or not self.target_details.schema:
return

if not self.features.operationalMetadataEnabled:
return

def _init_cdc_settings(self):
"""init CDC settings."""

Expand Down Expand Up @@ -247,7 +240,7 @@ def create_dataflow(self):
self._create_flow_groups()

# create materialized view
self.target_details.create_table(expectations)
self.target_details.create_table(expectations, features=self.features)

elif self.dataflow_spec.targetFormat in SinkType.__dict__.values():

Expand Down
9 changes: 6 additions & 3 deletions src/dataflow/targets/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import utility

from ..enums import TableType, TargetConfigFlags
from ..features import Features

Self = TypeVar("Self", bound="BaseTargetDelta")

Expand Down Expand Up @@ -250,7 +251,8 @@ def add_table_properties(self, table_properties: Dict) -> Self:

def create_table(
self,
expectations: Dict = None
expectations: Dict = None,
features: Features = None
) -> None:
"""
Create the target table for the data flow.
Expand All @@ -277,13 +279,14 @@ def create_table(
logger.debug(f"Expectations: {self.table}, {expectations}")
logger.debug(f"Config Flags: {self.configFlags}")

self._create_table(schema, expectations)
self._create_table(schema, expectations, features)

@abstractmethod
def _create_table(
self,
schema: T.StructType | str,
expectations: Dict = None
expectations: Dict = None,
features: Features = None
) -> None:
"""Abstract implementation for target specific table creation logic."""
pass
Expand Down
7 changes: 5 additions & 2 deletions src/dataflow/targets/delta_materialized_view.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from pyspark import pipelines as dp
from pyspark.sql import types as T

from ..features import Features
from ..operational_metadata import OperationalMetadataMixin
from ..sql import SqlMixin

Expand Down Expand Up @@ -50,7 +51,8 @@ class TargetDeltaMaterializedView(BaseTargetDelta, SqlMixin, OperationalMetadata
def _create_table(
self,
schema: T.StructType | str,
expectations: Dict = None
expectations: Dict = None,
features: Features = None
) -> None:
"""Create the target table for the data flow."""
spark = self.spark
Expand Down Expand Up @@ -98,7 +100,8 @@ def mv_query():
df = spark.sql(sql)

# Add operational metadata if needed
if operational_metadata_schema:
operational_metadata_enabled = features.operationalMetadataEnabled if features else True
if operational_metadata_schema and operational_metadata_enabled:
df = self._add_operational_metadata(
spark,
df,
Expand Down
4 changes: 3 additions & 1 deletion src/dataflow/targets/delta_streaming_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from pyspark.sql import types as T

from .base import BaseTargetDelta
from ..features import Features


@dataclass(kw_only=True)
Expand Down Expand Up @@ -40,7 +41,8 @@ class TargetDeltaStreamingTable(BaseTargetDelta):
def _create_table(
self,
schema: T.StructType | str,
expectations: Dict = None
expectations: Dict = None,
features: Features = None
) -> None:
"""Create the target table for the data flow."""
dp.create_streaming_table(
Expand Down
1 change: 0 additions & 1 deletion src/dataflow_spec_builder/dataflow_spec_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,6 @@ def _process_spec_data(self, base_path: str, spec_data: Dict) -> None:

# Substitute secrets in the dataflow spec with SecretValue objects
spec_data = self.secrets_manager.substitute_secrets(spec_data)

self.logger.info(f"Adding Dataflow Spec: {spec_data.get(self.Keys.DATA_FLOW_ID)}.")
self.processed_specs.append(DataflowSpec(**spec_data))

Expand Down
7 changes: 3 additions & 4 deletions src/dataflow_spec_builder/transformer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,19 @@ def _process_spec(self, spec_data: Dict) -> Union[Dict, List[Dict]]:

def transform(self, spec_data: Dict) -> Union[Dict, List[Dict]]:
"""Transform the spec data. Returns either a single Dict or List[Dict]."""
spec_data = self._apply_features_and_limitations(spec_data)
return self._process_spec(spec_data)

def _apply_features_and_limitations(self, dataflow_spec: Dict) -> Dict:
"""Apply common features and limitations transformations."""
# Operational MetadataSnapshot
features = dataflow_spec.get("features", {})

if not features:
dataflow_spec["features"] = {}

# FEATURE: Operational Metadata
operational_metadata_enabled = features.get("operationalMetadataEnabled", None)
if not operational_metadata_enabled:
dataflow_spec["features"]["operationalMetadataEnabled"] = True
operational_metadata_enabled = features.get("operationalMetadataEnabled", True)
dataflow_spec["features"]["operationalMetadataEnabled"] = operational_metadata_enabled

# LIMITATIONS: CDC SNAPSHOT
if dataflow_spec.get("cdcSnapshotSettings"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def _build_base_flow_spec(self, spec_data: Dict, mv_config: Dict, target_details
"dataFlowId": spec_data.get("dataFlowId"),
"dataFlowGroup": spec_data.get("dataFlowGroup"),
"dataFlowType": spec_data.get("dataFlowType"),
"features": spec_data.get("features", {}),
"targetFormat": TargetType.DELTA,
"targetDetails": target_details,
"quarantineMode": mv_config.get("quarantineMode"),
Expand Down
1 change: 1 addition & 0 deletions src/dataflow_spec_builder/transformer/standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def _build_base_flow_spec(self, spec_data: Dict) -> Dict:
"dataFlowId": spec_data.get("dataFlowId"),
"dataFlowGroup": spec_data.get("dataFlowGroup"),
"dataFlowType": spec_data.get("dataFlowType"),
"features": spec_data.get("features", {}),
"targetFormat": spec_data.get("targetFormat"),
"targetDetails": spec_data.get("targetDetails"),
"quarantineMode": spec_data.get("quarantineMode"),
Expand Down
10 changes: 9 additions & 1 deletion src/schemas/spec_materialized_views.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,15 @@
"comment": {"type": "string"},
"sparkConf": {"type": "object"},
"private": {"type": "boolean"},
"rowFilter": {"type": "string"}
"rowFilter": {"type": "string"},
"configFlags": {
"type": "array",
"items": {
"type": "string",
"enum": ["disableOperationalMetadata"]
},
"default": []
}
},
"additionalProperties": false
},
Expand Down