diff --git a/datafusion/physical-optimizer/src/enforce_distribution.rs b/datafusion/physical-optimizer/src/enforce_distribution.rs index ea47c12030393..87d0943610971 100644 --- a/datafusion/physical-optimizer/src/enforce_distribution.rs +++ b/datafusion/physical-optimizer/src/enforce_distribution.rs @@ -1328,8 +1328,14 @@ pub fn ensure_distribution( }) ); - let allow_subset_satisfy_partitioning = current_partitions + let allow_subset_satisfy_partitioning = (current_partitions >= subset_satisfaction_threshold + // `preserve_file_partitions` exposes existing file-group + // partitioning to the optimizer. Respect it when the only + // reason to repartition would be to increase partition count + // beyond the preserved file-group count. + || (config.optimizer.preserve_file_partitions > 0 + && current_partitions < target_partitions)) && !is_partitioned_join && !requires_grouping_id; diff --git a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt index 48d99625eabd1..175d7d90cd8ed 100644 --- a/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt +++ b/datafusion/sqllogictest/test_files/preserve_file_partitioning.slt @@ -698,6 +698,60 @@ A dev 772.4 B prod 614.4 C prod 2017.6 +########## +# TEST 13: Partitioned Join where Number of File Groups is less than target_partitions +# With preserve_file_partitions enabled, we should still avoid repartitioning +########## + +statement ok +set datafusion.execution.target_partitions = 4; + +statement ok +set datafusion.optimizer.preserve_file_partitions = 1; + +query TT +EXPLAIN SELECT f_dkey, timestamp, + COUNT(*), AVG(value) +FROM fact_table +GROUP BY f_dkey, timestamp; +---- +logical_plan +01)Projection: fact_table.f_dkey, fact_table.timestamp, count(Int64(1)) AS count(*), avg(fact_table.value) +02)--Aggregate: groupBy=[[fact_table.f_dkey, fact_table.timestamp]], aggr=[[count(Int64(1)), avg(fact_table.value)]] +03)----TableScan: fact_table projection=[timestamp, value, f_dkey] +physical_plan +01)ProjectionExec: expr=[f_dkey@0 as f_dkey, timestamp@1 as timestamp, count(Int64(1))@2 as count(*), avg(fact_table.value)@3 as avg(fact_table.value)] +02)--AggregateExec: mode=SinglePartitioned, gby=[f_dkey@2 as f_dkey, timestamp@0 as timestamp], aggr=[count(Int64(1)), avg(fact_table.value)] +03)----DataSourceExec: file_groups={3 groups: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=A/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=B/data.parquet], [WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/preserve_file_partitioning/fact/f_dkey=C/data.parquet]]}, projection=[timestamp, value, f_dkey], file_type=parquet + +query TPIR rowsort +SELECT f_dkey, timestamp, + COUNT(*), AVG(value) +FROM fact_table +GROUP BY f_dkey, timestamp; +---- +A 2023-01-01T09:00:00 1 95.5 +A 2023-01-01T09:00:10 1 102.3 +A 2023-01-01T09:00:20 1 98.7 +A 2023-01-01T09:12:20 1 105.1 +A 2023-01-01T09:12:30 1 100 +A 2023-01-01T09:12:40 1 150 +A 2023-01-01T09:12:50 1 120.8 +B 2023-01-01T09:00:00 1 75.2 +B 2023-01-01T09:00:10 1 82.4 +B 2023-01-01T09:00:20 1 78.9 +B 2023-01-01T09:00:30 1 85.6 +B 2023-01-01T09:12:30 1 80 +B 2023-01-01T09:12:40 1 120 +B 2023-01-01T09:12:50 1 92.3 +C 2023-01-01T09:00:00 1 300.5 +C 2023-01-01T09:00:10 1 285.7 +C 2023-01-01T09:00:20 1 310.2 +C 2023-01-01T09:00:30 1 295.8 +C 2023-01-01T09:00:40 1 300 +C 2023-01-01T09:12:40 1 250 +C 2023-01-01T09:12:50 1 275.4 + ########## # CLEANUP ##########