From 20a8f83e3a77aaa0214c504054d8d32e8edf2ecc Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 10 Apr 2026 13:49:29 +0000 Subject: [PATCH 1/2] physical_optimizer: preserve_file_partitions when num file groups < target_partitions `datafusion.optimizer.preserve_file_partitions` would not actually preserve the file partitions when the number of file groups is less than the target_partitions. This is unexpected behavior. If a user wants to preserve file partitions, it is because they want to avoid repartitions. This change fixes that by updating 1 line in the `enforce_distribution` optimizer rule. It also adds a regression test. Before ``` ProjectionExec AggregateExec: mode=FinalPartitioned, gby=[f_dkey, timestamp] RepartitionExec: partitioning=Hash([f_dkey, timestamp], 4), input_partitions=3 AggregateExec: mode=Partial, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` After ``` ProjectionExec AggregateExec: mode=SinglePartitioned, gby=[f_dkey, timestamp] DataSourceExec: file_groups=3, projection=[timestamp, value, f_dkey] ``` --- .../src/enforce_distribution.rs | 6 +- .../test_files/preserve_file_partitioning.slt | 64 +++++++++++++++++-- 2 files changed, 63 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index ea47c12030393..15c770c30eb6d 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1328,8 +1328,12 @@ pub fn ensure_distribution( }) ); - let allow_subset_satisfy_partitioning = current_partitions + let allow_subset_satisfy_partitioning = (current_partitions >= subset_satisfaction_threshold + // `preserve_file_partitions` exposes existing file-group + // partitioning to the optimizer. Respect it even when the + // file-group count is below `target_partitions`. + || config.optimizer.preserve_file_partitions > 0) && !is_partitioned_join && !requires_grouping_id; diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 48d99625eabd1..ef7f1a0112c4a 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -681,12 +681,10 @@ logical_plan 06)------SubqueryAlias: d 07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey] physical_plan -01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)] -02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3 -03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] -04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] -05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet -06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +01)AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] +02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet +04)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet query TTR rowsort SELECT f.f_dkey, d.env, sum(f.value) @@ -698,6 +696,60 @@ A dev 772.4 B prod 614.4 C prod 2017.6 +########## +# TEST 13: Partitioned Join where Number of File Groups is less than target_partitions +# With preserve_file_partitions enabled, we should still avoid repartitioning +########## + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +set datafusion.optimizer.preserve_file_partitions = 1; + +query TT +EXPLAIN SELECT f_dkey, timestamp, + COUNT(*), AVG(value) +FROM fact_table +GROUP BY f_dkey, timestamp; +---- +logical_plan +01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value) +02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]] +03)----TableScan: fact_table projection=[timestamp, value, f_dkey] +physical_plan +01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)] +02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)] +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet + +query TPIR rowsort +SELECT f_dkey, timestamp, + COUNT(*), AVG(value) +FROM fact_table +GROUP BY f_dkey, timestamp; +---- +A 2023-01-01T09:00:00 1 95.5 +A 2023-01-01T09:00:10 1 102.3 +A 2023-01-01T09:00:20 1 98.7 +A 2023-01-01T09:12:20 1 105.1 +A 2023-01-01T09:12:30 1 100 +A 2023-01-01T09:12:40 1 150 +A 2023-01-01T09:12:50 1 120.8 +B 2023-01-01T09:00:00 1 75.2 +B 2023-01-01T09:00:10 1 82.4 +B 2023-01-01T09:00:20 1 78.9 +B 2023-01-01T09:00:30 1 85.6 +B 2023-01-01T09:12:30 1 80 +B 2023-01-01T09:12:40 1 120 +B 2023-01-01T09:12:50 1 92.3 +C 2023-01-01T09:00:00 1 300.5 +C 2023-01-01T09:00:10 1 285.7 +C 2023-01-01T09:00:20 1 310.2 +C 2023-01-01T09:00:30 1 295.8 +C 2023-01-01T09:00:40 1 300 +C 2023-01-01T09:12:40 1 250 +C 2023-01-01T09:12:50 1 275.4 + ########## # CLEANUP ########## From bb11cd3fb7dcbd9f9873a2edece7b5d5d1880c17 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Fri, 10 Apr 2026 14:17:30 +0000 Subject: [PATCH 2/2] fix --- .../physical-optimizer/src/enforce_distribution.rs | 8 +++++--- .../test_files/preserve_file_partitioning.slt | 10 ++++++---- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index 15c770c30eb6d..87d0943610971 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1331,9 +1331,11 @@ pub fn ensure_distribution( let allow_subset_satisfy_partitioning = (current_partitions >= subset_satisfaction_threshold // `preserve_file_partitions` exposes existing file-group - // partitioning to the optimizer. Respect it even when the - // file-group count is below `target_partitions`. - || config.optimizer.preserve_file_partitions > 0) + // partitioning to the optimizer. Respect it when the only + // reason to repartition would be to increase partition count + // beyond the preserved file-group count. + || (config.optimizer.preserve_file_partitions > 0 + && current_partitions < target_partitions)) && !is_partitioned_join && !requires_grouping_id; diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index ef7f1a0112c4a..175d7d90cd8ed 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -681,10 +681,12 @@ logical_plan 06)------SubqueryAlias: d 07)--------TableScan: dimension_table_partitioned projection=[env, d_dkey] physical_plan -01)AggregateExec: mode=SinglePartitioned, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] -02)--HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] -03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet -04)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet +01)AggregateExec: mode=FinalPartitioned, gby=[f_dkey@0 as f_dkey, env@1 as env], aggr=[sum(f.value)] +02)--RepartitionExec: partitioning=Hash([f_dkey@0, env@1], 3), input_partitions=3 +03)----AggregateExec: mode=Partial, gby=[f_dkey@1 as f_dkey, env@2 as env], aggr=[sum(f.value)] +04)------HashJoinExec: mode=Partitioned, join_type=Inner, on=[(d_dkey@1, f_dkey@1)], projection=[value@2, f_dkey@3, env@0] +05)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/dimension_partitioned/d_dkey=C/data.parquet]]}, projection=[env, d_dkey], file_type=parquet +06)--------DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[value, f_dkey], file_type=parquet query TTR rowsort SELECT f.f_dkey, d.env, sum(f.value)