From 8e7eb822fed97d68774e363731320296245ef502 Mon Sep 17 00:00:00 2001 From: araika Date: Wed, 29 Apr 2026 20:45:03 +0530 Subject: [PATCH] HIVE-29613: Cross-product join falls back to single-reducer shuffle merge when small-side row estimate marginally exceeds hive.xprod.mapjoin.small.table.rows --- .../hive/ql/optimizer/ConvertJoinMapJoin.java | 45 ++++++++++++-- .../ql/optimizer/TestConvertJoinMapJoin.java | 60 +++++++++++++++++++ 2 files changed, 100 insertions(+), 5 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java index 0d94dff357c0..89a914648fa7 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ConvertJoinMapJoin.java @@ -1079,6 +1079,34 @@ private boolean isCrossProduct(JoinOperator joinOp) { return true; } + /** + * When estimated rows already exceed {@link ConfVars#XPROD_SMALL_TABLE_ROWS_THRESHOLD}, still allow + * cross-product map join if {@link #computeOnlineDataSize} fits within the unconditional map-join + * byte budget ({@link ConfVars#HIVE_CONVERT_JOIN_NOCONDITIONAL_TASK_THRESHOLD}). NDV-based cardinality + * can overshoot row counts on tiny tables while bytes remain broadcast-safe. + */ + @VisibleForTesting + boolean crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(Statistics parentStats, + long xprodRowThreshold, long noconditionalMaxBytes) { + long onlineBytes = computeOnlineDataSize(parentStats); + if (onlineBytes <= 0) { + LOG.debug( + "Cross-product map join: row estimate {} exceeds {} but online size unavailable; not applying byte fallback", + parentStats.getNumRows(), xprodRowThreshold); + return false; + } + if (onlineBytes <= noconditionalMaxBytes) { + LOG.info( + "Cross-product map join: row estimate {} exceeds {} but online size {} within broadcast budget {}; allowing map join", + parentStats.getNumRows(), xprodRowThreshold, onlineBytes, noconditionalMaxBytes); + return true; + } + LOG.debug( + "Cross-product map join: row estimate {} exceeds {} and online size {} exceeds budget {}", + parentStats.getNumRows(), xprodRowThreshold, onlineBytes, noconditionalMaxBytes); + return false; + } + /** * Return result for getMapJoinConversion method. */ @@ -1299,14 +1327,21 @@ && checkShuffleSizeForLargeTable(joinOp, bigTablePosition, context)) { boolean cartesianProductEdgeEnabled = HiveConf.getBoolVar(context.conf, HiveConf.ConfVars.TEZ_CARTESIAN_PRODUCT_EDGE_ENABLED); if (cartesianProductEdgeEnabled && !hasOuterJoin(joinOp) && isCrossProduct(joinOp)) { + final long xprodRowThreshold = + HiveConf.getIntVar(context.conf, HiveConf.ConfVars.XPROD_SMALL_TABLE_ROWS_THRESHOLD); + final long noconditionalBroadcastBudget = + HiveConf.getLongVar(context.conf, HiveConf.ConfVars.HIVE_CONVERT_JOIN_NOCONDITIONAL_TASK_THRESHOLD); for (int i = 0 ; i < joinOp.getParentOperators().size(); i ++) { if (i != bigTablePosition) { Statistics parentStats = joinOp.getParentOperators().get(i).getStatistics(); - if (parentStats.getNumRows() > - HiveConf.getIntVar(context.conf, HiveConf.ConfVars.XPROD_SMALL_TABLE_ROWS_THRESHOLD)) { - // if any of smaller side is estimated to generate more than - // threshold rows we would disable mapjoin - return null; + if (parentStats.getNumRows() > xprodRowThreshold) { + // NDV-based filters often estimate a few rows on tiny lookups (e.g. 2 vs 1); row count + // alone can reject a safe broadcast. If estimated online size still fits the same + // unconditional map-join byte budget, allow conversion (same knob as noconditionaltask). + if (!crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(parentStats, xprodRowThreshold, + noconditionalBroadcastBudget)) { + return null; + } } } } diff --git a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestConvertJoinMapJoin.java b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestConvertJoinMapJoin.java index 82005de35a2a..d5c51e0dce54 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestConvertJoinMapJoin.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/optimizer/TestConvertJoinMapJoin.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.optimizer; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.Mockito.mock; @@ -37,6 +38,65 @@ class TestConvertJoinMapJoin { + @Test + void crossProductByteFallback_allowsWhenOnlineSizeWithinBudget() { + ConvertJoinMapJoin converter = new ConvertJoinMapJoin(); + converter.hashTableLoadFactor = 0.75f; + Statistics stats = new Statistics(2L, 500L, 0L, 0L); + assertTrue( + converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 10_000_000L)); + } + + @Test + void crossProductByteFallback_rejectsWhenOnlineSizeExceedsBudget() { + ConvertJoinMapJoin converter = new ConvertJoinMapJoin(); + converter.hashTableLoadFactor = 0.75f; + Statistics stats = new Statistics(50_000L, 50_000_000L, 0L, 0L); + assertFalse( + converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 10_000_000L)); + } + + @Test + void crossProductByteFallback_rejectsWhenBudgetTooSmallForEstimatedSize() { + ConvertJoinMapJoin converter = new ConvertJoinMapJoin(); + converter.hashTableLoadFactor = 0.75f; + Statistics stats = new Statistics(2L, 500L, 0L, 0L); + assertFalse(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck(stats, 1L, 1L)); + } + + /** + * NDV-driven filter selectivity can estimate a tiny lookup at ~2 rows / a few hundred bytes + * onlineDataSize when {@code hive.xprod.mapjoin.small.table.rows=1}. The row-only gate would + * reject the broadcast even though the build side is well below the noconditionaltask byte + * budget. The byte fallback must still admit map-join in that shape. + */ + @Test + void crossProductByteFallback_twoRowsTinyOnlineSize() { + ConvertJoinMapJoin converter = new ConvertJoinMapJoin(); + converter.hashTableLoadFactor = 0.75f; + final long ndvDrivenRowEstimate = 2L; + final long tinyDataSizeBytes = 296L; + Statistics stats = new Statistics(ndvDrivenRowEstimate, tinyDataSizeBytes, 0L, 0L); + final long xprodRowThreshold = 1L; + final long noconditionalBudgetBytes = 10_000_000L; + assertTrue(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck( + stats, xprodRowThreshold, noconditionalBudgetBytes)); + } + + /** + * Same small row estimate (2) as the previous case, but with bytes large enough that the build + * side exceeds the broadcast budget — the byte fallback must reject so the row-only cap still + * bites. + */ + @Test + void crossProductByteFallback_rejectsTwoRowsWhenEstimatedPayloadExceedsBudget() { + ConvertJoinMapJoin converter = new ConvertJoinMapJoin(); + converter.hashTableLoadFactor = 0.75f; + Statistics stats = new Statistics(2L, 50_000_000L, 0L, 0L); + assertFalse(converter.crossProductBuildSideWithinBroadcastBudgetAfterRowCheck( + stats, 1L, 10_000_000L)); + } + @Test void testComputeOnlineDataSizeGenericLargeDataSize() { ConvertJoinMapJoin converter = new ConvertJoinMapJoin();