diff --git a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala index 03aaf284f070f..e80d43b115547 100644 --- a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala +++ b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala @@ -166,6 +166,52 @@ case class Scd1BatchProcessor( ) } + /** + * Left anti-join the microbatch with the auxiliary table on tombstones that match against and + * effectively delete late-arriving upserts (or stale deletes). + * + * @param microbatchDf The incoming microbatch dataframe with at minimum all of the key + * columns + CDC metadata column. + * @param auxiliaryTableDf Dataframe representing the auxiliary table, with at minimum the key + * columns + CDC metadata column. + * + * The returned filtered dataframe has the same schema as the input microbatch, but with only + * the rows that remain unaffected by any known tombstones. + */ + def applyTombstonesToMicrobatch( + microbatchDf: DataFrame, + auxiliaryTableDf: DataFrame): DataFrame = { + val aliasedMicrobatchDf = microbatchDf.alias("microbatch") + val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable") + + val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName + + val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata") + val effectiveSeq = F.greatest( + Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata), + Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata) + ) + val tombstoneDeleteSeq = + Scd1BatchProcessor.deleteSequenceOf(F.col(s"auxiliaryTable.$cdcMetadata")) + + val keysMatch = changeArgs.keys + .map { k => + F.col(s"microbatch.${k.quoted}") === F.col(s"auxiliaryTable.${k.quoted}") + } + .reduce(_ && _) + + // A microbatch row is considered late-arriving (and therefore deleted by the tombstone) when + // the auxiliary table holds a tombstone for the same key with a strictly larger delete + // sequence. Both late-arriving upserts and deletes are dropped. + val microbatchRowDeletedByTombstone = effectiveSeq < tombstoneDeleteSeq + + aliasedMicrobatchDf.join( + right = aliasedAuxiliaryTableDf, + joinExprs = keysMatch && microbatchRowDeletedByTombstone, + joinType = "left_anti" + ) + } + private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit = { val microbatchSqlConf = microbatch.sparkSession.sessionState.conf val resolver = microbatchSqlConf.resolver @@ -194,6 +240,14 @@ object Scd1BatchProcessor { private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence" private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence" + /** Project the delete sequence out of the CDC metadata column. */ + private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(cdcDeleteSequenceFieldName) + + /** Project the upsert sequence out of the CDC metadata column. */ + private[autocdc] def upsertSequenceOf(cdcMetadataCol: Column): Column = + cdcMetadataCol.getField(cdcUpsertSequenceFieldName) + /** * Schema of the CDC metadata struct column for SCD1. */ diff --git a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala index a49c89e357555..c78dc123621b3 100644 --- a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala +++ b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala @@ -41,6 +41,18 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) ) + /** DataType for the CDC metadata column, where sequencing type is Long. */ + private val cdcMetadataColSchemaType: DataType = new StructType() + .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) + .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType) + + /** + * Helper to construct a CDC metadata column row, following [[cdcMetadataColSchemaType]]. + */ + private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq: Option[Long]): Row = + Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null)) + + /** Build a microbatch [[DataFrame]] from explicit rows and an explicit schema. */ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame = spark.createDataFrame(spark.sparkContext.parallelize(rows), schema) @@ -53,6 +65,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { private def columnNamesAndDataTypes(schema: StructType): Seq[(String, DataType)] = schema.fields.map(f => (f.name, f.dataType)).toSeq + // =============== deduplicateMicrobatch tests =============== + test("deduplicateMicrobatch keeps only the row with the largest sequence value per key") { val schema = new StructType() .add("id", IntegerType) @@ -463,6 +477,8 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { assert(columnNamesAndDataTypes(result.schema) == columnNamesAndDataTypes(schema)) } + // =============== extendMicrobatchRowsWithCdcMetadata tests =============== + test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or an upsert " + "per deleteCondition") { val schema = new StructType() @@ -861,11 +877,7 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { // Even if a column is created with backticks via DDL, those backticks are consumed by Spark // before resolving the schema; they won't show up in the schema field. .add("user.id", StringType) - .add( - Scd1BatchProcessor.cdcMetadataColName, - new StructType() - .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType) - .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) val batch = microbatchOf(schema)( Row(1, "u-100", Row(null, 10L)) @@ -932,4 +944,333 @@ class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession { ) } } + + // =============== applyTombstonesToMicrobatch tests =============== + + /** + * Schema for the microbatch input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] + * tests. + */ + private val applyTombstonesToMicrobatchTestMicrobatchSchema: StructType = new StructType() + // Key column. + .add("id", IntegerType) + // Data column. + .add("value", StringType) + // CDC metadata column. + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + /** + * Schema for the auxiliary input to [[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests. + * + * In practice for SCD1 the auxiliary table only carries key columns and the CDC metadata + * column -- never user data columns -- so we mirror that production-side asymmetry here, + * even though the function's API contract would allow a single shared schema. + */ + private val applyTombstonesToMicrobatchTestAuxiliarySchema: StructType = new StructType() + // Key column. + .add("id", IntegerType) + // CDC metadata column. + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts when a matching " + + "tombstone exists for the same key") { + // Both microbatch events have an effective sequence strictly less than the tombstone's + // delete sequence, so they must be anti-joined out of the microbatch regardless of whether + // they are deletes or upserts. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), + Row(1, "stale-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq = None)) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } + + test("applyTombstonesToMicrobatch keeps a microbatch row whose effective sequence ties the " + + "tombstone's delete sequence") { + // The join uses strict `<`, so a microbatch row with the same effective sequence as the + // tombstone is kept. This is an internal tie-breaking convention for SCD1 only, and is + // *not* a publicly documented contract: if external callers ever start relying on it, both + // this test and the join condition in applyTombstonesToMicrobatch should move together. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "tied-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10))) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Row(1, "tied-upsert", Row(null, 10L)) + ) + } + + test("applyTombstonesToMicrobatch keeps microbatch rows whose effective sequence exceeds the " + + "tombstone's delete sequence") { + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "fresher-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15))), + Row(1, "fresher-delete", cdcMetadataRow(deleteSeq = Some(20), upsertSeq = None)) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Seq( + Row(1, "fresher-upsert", Row(null, 15L)), + Row(1, "fresher-delete", Row(20L, null)) + ) + ) + } + + test("applyTombstonesToMicrobatch leaves microbatch rows untouched when the tombstone targets " + + "a different key") { + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stays", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + // Tombstone on a different key with a much larger sequence; the key match must guard + // against cross-key application no matter how stale the microbatch row's sequence is. + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(2, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Row(1, "stays", Row(null, 5L)) + ) + } + + test("applyTombstonesToMicrobatch with composite keys requires every key column to match") { + val schema = new StructType() + .add("region", StringType) + .add("customer_id", IntegerType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + val microbatch = microbatchOf(schema)( + Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), + Row("US", 2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + // Tombstone matches on `region` only; `customer_id` differs from every microbatch row. + // The join condition is the AND of all key column equalities, so neither microbatch row + // should be dropped. + val auxiliary = microbatchOf(schema)( + Row("US", 99, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("region"), UnqualifiedColumnName("customer_id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Seq( + Row("US", 1, Row(null, 5L)), + Row("US", 2, Row(null, 5L)) + ) + ) + } + + test("applyTombstonesToMicrobatch supports backticked key names containing a literal dot") { + val schema = new StructType() + .add("user.id", IntegerType) + .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType) + + val microbatch = microbatchOf(schema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + val auxiliary = microbatchOf(schema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("`user.id`")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } + + test("applyTombstonesToMicrobatch is a no-op when the auxiliary table is empty") { + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))), + Row(2, "kept-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq = None)) + ) + + // Empty auxiliary: no rows means the left-anti join cannot match any microbatch row, so the + // microbatch passes through untouched regardless of its contents. + + // Conceptually, this means there are no tombstones that could potentially have delete-matched + // against incoming rows in the microbatch. + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)() + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Seq( + Row(1, "kept-upsert", Row(null, 5L)), + Row(2, "kept-delete", Row(7L, null)) + ) + ) + } + + test("applyTombstonesToMicrobatch keeps microbatch rows when the matching aux row has a " + + "null deleteSequence") { + // SCD1's tombstone-merge invariant guarantees aux rows always have a non-null + // deleteSequence, but if a corrupt aux row ever does carry a null deleteSequence, the + // join's `<` predicate evaluates to null (SQL 3-valued logic) and the microbatch row is + // retained -- a safe fallback that never silently drops data. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + checkAnswer( + df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary), + expectedAnswer = Row(1, "kept-upsert", Row(null, 5L)) + ) + } + + test("applyTombstonesToMicrobatch is unaffected by stale tombstones in auxiliary table") { + // SCD1's tombstone-merge invariant guarantees at most one tombstone per key in the + // auxiliary, but if multiple ever coexist for the same key, the left-anti semantics drop + // the microbatch row whenever *any* matching tombstone has a strictly greater + // deleteSequence. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(8))) + ) + // Two tombstones on key=1: one stale (deleteSeq=5, doesn't dominate the microbatch row's + // effective seq of 8), one fresh (deleteSeq=10, dominates). The fresh one alone is enough + // to drop the microbatch row. + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None)), + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None)) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } + + test("applyTombstonesToMicrobatch ignores the aux row's upsertSequence even when it is set") { + // SCD1's tombstone-merge invariant guarantees aux rows always have a null upsertSequence + // (by definition, an aux row is an unswallowed tombstone). But if a corrupt aux row ever + // has both fields set, only its deleteSequence is read by the join condition; the + // upsertSequence is never inspected, so the row continues to behave as a pure tombstone. + val microbatch = microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)( + Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))) + ) + // Aux row with both fields populated; only deleteSeq=10 drives the tombstone-drop decision. + val auxiliary = microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)( + Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = Some(20))) + ) + + val processor = Scd1BatchProcessor( + changeArgs = ChangeArgs( + keys = Seq(UnqualifiedColumnName("id")), + // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is already encoded + // into the CDC metadata column. + sequencing = F.lit(0L), + storedAsScdType = ScdType.Type1 + ), + resolvedSequencingType = LongType + ) + + val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary) + assert(result.collect().isEmpty) + } }