diff --git a/docs/source/feature_operational_metadata.rst b/docs/source/feature_operational_metadata.rst index 97b0f4f..d51069a 100644 --- a/docs/source/feature_operational_metadata.rst +++ b/docs/source/feature_operational_metadata.rst @@ -190,4 +190,121 @@ The below example illustrates the default configuration for a generic bronze and } } ] - } \ No newline at end of file + } + +Disabling Operational Metadata in a Dataflow Spec +------------------------------------------------- +You can disable operational metadata for a dataflow spec or for a target table. + +Disabling at Dataflow Spec Level +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use the ``features`` object to disable operational metadata for a dataflow spec. + +.. tabs:: + + .. tab:: JSON + + .. code-block:: json + :emphasize-lines: 5,6,7 + + { + "dataFlowId": "feature_materialized_views", + "dataFlowGroup": "feature_samples", + "dataFlowType": "materialized_view", + "features": { + "operationalMetadataEnabled": false + } + } + + .. tab:: YAML + + .. code-block:: yaml + :emphasize-lines: 4,5 + + dataFlowId: feature_materialized_views + dataFlowGroup: feature_samples + dataFlowType: materialized_view + features: + operationalMetadataEnabled: false + +Disabling at Target Table Level +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Use the ``configFlags`` array to disable operational metadata for a target table. + +.. tabs:: + + .. tab:: JSON + + .. code-block:: json + :emphasize-lines: 24 + + { + "dataFlowId": "crm_1", + "dataFlowGroup": "crm", + "dataFlowType": "standard", + "sourceType": "delta", + "sourceSystem": "crm", + "sourceViewName": "v_customer_address", + "sourceDetails": { + "database": "source_db", + "table": "customer_address", + "cdfEnabled": true, + "schemaPath": "schemas/customer_address.json" + }, + "mode": "stream", + "targetFormat": "delta", + "targetDetails": { + "table": "customer_address", + "tableProperties": { + "delta.autoOptimize.optimizeWrite": "true", + "delta.autoOptimize.autoCompact": "true" + }, + "partitionColumns": ["country_code"], + "schemaPath": "schemas/customer_address.json", + "configFlags": ["disableOperationalMetadata"] + }, + "dataQualityExpectationsEnabled": true, + "quarantineMode": "table", + "quarantineTargetDetails": { + "targetFormat": "delta", + "table": "customer_address_quarantine", + "tableProperties": {} + } + } + + .. tab:: YAML + + .. code-block:: yaml + :emphasize-lines: 22,23 + + dataFlowId: crm_1 + dataFlowGroup: crm + dataFlowType: standard + sourceType: delta + sourceSystem: crm + sourceViewName: v_customer_address + sourceDetails: + database: source_db + table: customer_address + cdfEnabled: true + schemaPath: schemas/customer_address.json + mode: stream + targetFormat: delta + targetDetails: + table: customer_address + tableProperties: + delta.autoOptimize.optimizeWrite: 'true' + delta.autoOptimize.autoCompact: 'true' + partitionColumns: + - country_code + schemaPath: schemas/customer_address.json + configFlags: + - disableOperationalMetadata + dataQualityExpectationsEnabled: true + quarantineMode: table + quarantineTargetDetails: + targetFormat: delta + table: customer_address_quarantine + tableProperties: {} diff --git a/src/dataflow/dataflow.py b/src/dataflow/dataflow.py index 93c4828..0717ff6 100644 --- a/src/dataflow/dataflow.py +++ b/src/dataflow/dataflow.py @@ -110,13 +110,6 @@ def _init_target_details(self): self.logger.info(f"Initializing DataFlow for target schema: {self.target_database}, {log_target}") self.logger.debug(f"Target Details: {self.target_details.__dict__}") - # Add operational metadata columns to the schema - if not hasattr(self.target_details, 'schema') or not self.target_details.schema: - return - - if not self.features.operationalMetadataEnabled: - return - def _init_cdc_settings(self): """init CDC settings.""" @@ -247,7 +240,7 @@ def create_dataflow(self): self._create_flow_groups() # create materialized view - self.target_details.create_table(expectations) + self.target_details.create_table(expectations, features=self.features) elif self.dataflow_spec.targetFormat in SinkType.__dict__.values(): diff --git a/src/dataflow/targets/base.py b/src/dataflow/targets/base.py index 8aeee0e..aa31f75 100644 --- a/src/dataflow/targets/base.py +++ b/src/dataflow/targets/base.py @@ -10,6 +10,7 @@ import utility from ..enums import TableType, TargetConfigFlags +from ..features import Features Self = TypeVar("Self", bound="BaseTargetDelta") @@ -250,7 +251,8 @@ def add_table_properties(self, table_properties: Dict) -> Self: def create_table( self, - expectations: Dict = None + expectations: Dict = None, + features: Features = None ) -> None: """ Create the target table for the data flow. @@ -277,13 +279,14 @@ def create_table( logger.debug(f"Expectations: {self.table}, {expectations}") logger.debug(f"Config Flags: {self.configFlags}") - self._create_table(schema, expectations) + self._create_table(schema, expectations, features) @abstractmethod def _create_table( self, schema: T.StructType | str, - expectations: Dict = None + expectations: Dict = None, + features: Features = None ) -> None: """Abstract implementation for target specific table creation logic.""" pass diff --git a/src/dataflow/targets/delta_materialized_view.py b/src/dataflow/targets/delta_materialized_view.py index fab50cf..dd24874 100644 --- a/src/dataflow/targets/delta_materialized_view.py +++ b/src/dataflow/targets/delta_materialized_view.py @@ -4,6 +4,7 @@ from pyspark import pipelines as dp from pyspark.sql import types as T +from ..features import Features from ..operational_metadata import OperationalMetadataMixin from ..sql import SqlMixin @@ -50,7 +51,8 @@ class TargetDeltaMaterializedView(BaseTargetDelta, SqlMixin, OperationalMetadata def _create_table( self, schema: T.StructType | str, - expectations: Dict = None + expectations: Dict = None, + features: Features = None ) -> None: """Create the target table for the data flow.""" spark = self.spark @@ -98,7 +100,8 @@ def mv_query(): df = spark.sql(sql) # Add operational metadata if needed - if operational_metadata_schema: + operational_metadata_enabled = features.operationalMetadataEnabled if features else True + if operational_metadata_schema and operational_metadata_enabled: df = self._add_operational_metadata( spark, df, diff --git a/src/dataflow/targets/delta_streaming_table.py b/src/dataflow/targets/delta_streaming_table.py index cf41994..3864f4b 100644 --- a/src/dataflow/targets/delta_streaming_table.py +++ b/src/dataflow/targets/delta_streaming_table.py @@ -5,6 +5,7 @@ from pyspark.sql import types as T from .base import BaseTargetDelta +from ..features import Features @dataclass(kw_only=True) @@ -40,7 +41,8 @@ class TargetDeltaStreamingTable(BaseTargetDelta): def _create_table( self, schema: T.StructType | str, - expectations: Dict = None + expectations: Dict = None, + features: Features = None ) -> None: """Create the target table for the data flow.""" dp.create_streaming_table( diff --git a/src/dataflow_spec_builder/dataflow_spec_builder.py b/src/dataflow_spec_builder/dataflow_spec_builder.py index 09b4bc1..bd07ff2 100644 --- a/src/dataflow_spec_builder/dataflow_spec_builder.py +++ b/src/dataflow_spec_builder/dataflow_spec_builder.py @@ -514,7 +514,6 @@ def _process_spec_data(self, base_path: str, spec_data: Dict) -> None: # Substitute secrets in the dataflow spec with SecretValue objects spec_data = self.secrets_manager.substitute_secrets(spec_data) - self.logger.info(f"Adding Dataflow Spec: {spec_data.get(self.Keys.DATA_FLOW_ID)}.") self.processed_specs.append(DataflowSpec(**spec_data)) diff --git a/src/dataflow_spec_builder/transformer/base.py b/src/dataflow_spec_builder/transformer/base.py index 3983efd..c250b2e 100644 --- a/src/dataflow_spec_builder/transformer/base.py +++ b/src/dataflow_spec_builder/transformer/base.py @@ -18,20 +18,19 @@ def _process_spec(self, spec_data: Dict) -> Union[Dict, List[Dict]]: def transform(self, spec_data: Dict) -> Union[Dict, List[Dict]]: """Transform the spec data. Returns either a single Dict or List[Dict].""" + spec_data = self._apply_features_and_limitations(spec_data) return self._process_spec(spec_data) def _apply_features_and_limitations(self, dataflow_spec: Dict) -> Dict: """Apply common features and limitations transformations.""" # Operational MetadataSnapshot features = dataflow_spec.get("features", {}) - if not features: dataflow_spec["features"] = {} # FEATURE: Operational Metadata - operational_metadata_enabled = features.get("operationalMetadataEnabled", None) - if not operational_metadata_enabled: - dataflow_spec["features"]["operationalMetadataEnabled"] = True + operational_metadata_enabled = features.get("operationalMetadataEnabled", True) + dataflow_spec["features"]["operationalMetadataEnabled"] = operational_metadata_enabled # LIMITATIONS: CDC SNAPSHOT if dataflow_spec.get("cdcSnapshotSettings"): diff --git a/src/dataflow_spec_builder/transformer/materialized_views.py b/src/dataflow_spec_builder/transformer/materialized_views.py index 1a3f11c..292403c 100644 --- a/src/dataflow_spec_builder/transformer/materialized_views.py +++ b/src/dataflow_spec_builder/transformer/materialized_views.py @@ -60,6 +60,7 @@ def _build_base_flow_spec(self, spec_data: Dict, mv_config: Dict, target_details "dataFlowId": spec_data.get("dataFlowId"), "dataFlowGroup": spec_data.get("dataFlowGroup"), "dataFlowType": spec_data.get("dataFlowType"), + "features": spec_data.get("features", {}), "targetFormat": TargetType.DELTA, "targetDetails": target_details, "quarantineMode": mv_config.get("quarantineMode"), diff --git a/src/dataflow_spec_builder/transformer/standard.py b/src/dataflow_spec_builder/transformer/standard.py index a592d41..612fd6b 100644 --- a/src/dataflow_spec_builder/transformer/standard.py +++ b/src/dataflow_spec_builder/transformer/standard.py @@ -42,6 +42,7 @@ def _build_base_flow_spec(self, spec_data: Dict) -> Dict: "dataFlowId": spec_data.get("dataFlowId"), "dataFlowGroup": spec_data.get("dataFlowGroup"), "dataFlowType": spec_data.get("dataFlowType"), + "features": spec_data.get("features", {}), "targetFormat": spec_data.get("targetFormat"), "targetDetails": spec_data.get("targetDetails"), "quarantineMode": spec_data.get("quarantineMode"), diff --git a/src/schemas/spec_materialized_views.json b/src/schemas/spec_materialized_views.json index d55b788..7dd2a7d 100644 --- a/src/schemas/spec_materialized_views.json +++ b/src/schemas/spec_materialized_views.json @@ -38,7 +38,15 @@ "comment": {"type": "string"}, "sparkConf": {"type": "object"}, "private": {"type": "boolean"}, - "rowFilter": {"type": "string"} + "rowFilter": {"type": "string"}, + "configFlags": { + "type": "array", + "items": { + "type": "string", + "enum": ["disableOperationalMetadata"] + }, + "default": [] + } }, "additionalProperties": false },