diff --git a/benchmarks/src/tpcds/run.rs b/benchmarks/src/tpcds/run.rs index 58821340034da..ae0c31f48cdf2 100644 --- a/benchmarks/src/tpcds/run.rs +++ b/benchmarks/src/tpcds/run.rs @@ -168,6 +168,9 @@ impl RunOpt { self.enable_piecewise_merge_join; config.options_mut().execution.hash_join_buffering_capacity = self.hash_join_buffering_capacity; + if std::env::var("DISABLE_MATERIALIZED_CTES").is_ok() { + config.options_mut().execution.enable_materialized_ctes = false; + } let rt = self.common.build_runtime()?; let ctx = SessionContext::new_with_config_rt(config, rt); // register tables diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index e6d1ebbbbe746..bc8d90aa81dbb 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -631,6 +631,12 @@ config_namespace! { /// Should DataFusion support recursive CTEs pub enable_recursive_ctes: bool, default = true + /// Should DataFusion materialize CTEs that are referenced multiple times. + /// When enabled, CTEs referenced more than once are generally computed + /// once and cached, except for cheap CTEs and CTEs consumed below a top-level + /// limit. + pub enable_materialized_ctes: bool, default = true + /// Attempt to eliminate sorts by packing & sorting files with non-overlapping /// statistics into the same file groups. /// Currently experimental diff --git a/datafusion/core/src/execution/session_state.rs b/datafusion/core/src/execution/session_state.rs index de5e6b97c1af9..127ece8e87df0 100644 --- a/datafusion/core/src/execution/session_state.rs +++ b/datafusion/core/src/execution/session_state.rs @@ -2313,7 +2313,9 @@ impl QueryPlanner for DefaultQueryPlanner { logical_plan: &LogicalPlan, session_state: &SessionState, ) -> datafusion_common::Result> { - let planner = DefaultPhysicalPlanner::default(); + let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new( + crate::materialized_cte_planner::MaterializedCtePlanner::new(), + )]); planner .create_physical_plan(logical_plan, session_state) .await diff --git a/datafusion/core/src/lib.rs b/datafusion/core/src/lib.rs index 3170f4be7f683..3998f8a5e893d 100644 --- a/datafusion/core/src/lib.rs +++ b/datafusion/core/src/lib.rs @@ -773,6 +773,7 @@ pub mod dataframe; pub mod datasource; pub mod error; pub mod execution; +pub mod materialized_cte_planner; pub mod physical_planner; pub mod prelude; pub mod scalar; diff --git a/datafusion/core/src/materialized_cte_planner.rs b/datafusion/core/src/materialized_cte_planner.rs new file mode 100644 index 0000000000000..88839ae371b22 --- /dev/null +++ b/datafusion/core/src/materialized_cte_planner.rs @@ -0,0 +1,154 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Extension planner for materialized CTEs. +//! +//! This module provides [`MaterializedCtePlanner`] which connects the logical +//! plan nodes ([`MaterializedCteProducer`] and [`MaterializedCteReader`]) to +//! their physical execution counterparts. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +use async_trait::async_trait; +use datafusion_common::Result; +use datafusion_expr::logical_plan::{MaterializedCteProducer, MaterializedCteReader}; +use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode}; +use datafusion_physical_plan::materialized_cte::{ + MaterializedCteCache, MaterializedCteExec, MaterializedCteReaderExec, + materialized_cte_statistics, replace_materialized_cte_readers, +}; +use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties}; + +use crate::execution::context::SessionState; +use crate::physical_planner::{ExtensionPlanner, PhysicalPlanner}; + +/// An extension planner that handles materialized CTE logical nodes. +/// +/// It maintains a map of CTE name to shared cache, ensuring that +/// producers and readers for the same CTE share the same cache instance. +#[derive(Debug)] +pub struct MaterializedCtePlanner { + /// Map of CTE name to shared cache + caches: Mutex>>, + /// Map of CTE name to the number of partitions readers should expose + partition_counts: Mutex>, +} + +impl MaterializedCtePlanner { + /// Create a new `MaterializedCtePlanner`. + pub fn new() -> Self { + Self { + caches: Mutex::new(HashMap::new()), + partition_counts: Mutex::new(HashMap::new()), + } + } + + /// Get or create a cache for the given CTE name. + fn get_or_create_cache(&self, name: &str) -> Arc { + let mut caches = self.caches.lock().unwrap(); + Arc::clone( + caches + .entry(name.to_string()) + .or_insert_with(|| Arc::new(MaterializedCteCache::new(name.to_string()))), + ) + } + + fn create_cache(&self, name: &str) -> Arc { + let cache = Arc::new(MaterializedCteCache::new(name.to_string())); + self.caches + .lock() + .unwrap() + .insert(name.to_string(), Arc::clone(&cache)); + cache + } + + fn set_partition_count(&self, name: &str, partition_count: usize) { + self.partition_counts + .lock() + .unwrap() + .insert(name.to_string(), partition_count); + } + + fn partition_count(&self, name: &str) -> usize { + self.partition_counts + .lock() + .unwrap() + .get(name) + .copied() + .unwrap_or(1) + } +} + +impl Default for MaterializedCtePlanner { + fn default() -> Self { + Self::new() + } +} + +#[async_trait] +impl ExtensionPlanner for MaterializedCtePlanner { + async fn plan_extension( + &self, + _planner: &dyn PhysicalPlanner, + node: &dyn UserDefinedLogicalNode, + _logical_inputs: &[&LogicalPlan], + physical_inputs: &[Arc], + _session_state: &SessionState, + ) -> Result>> { + // Handle MaterializedCteProducer + if let Some(producer) = node.as_any().downcast_ref::() { + let cache = self.create_cache(&producer.name); + let cte_plan = Arc::clone(&physical_inputs[0]); + let partition_count = cte_plan.output_partitioning().partition_count(); + let statistics = materialized_cte_statistics(cte_plan.as_ref())?; + self.set_partition_count(&producer.name, partition_count); + let continuation = replace_materialized_cte_readers( + Arc::clone(&physical_inputs[1]), + &producer.name, + &cache, + partition_count, + &statistics, + )?; + let exec = MaterializedCteExec::new( + producer.name.clone(), + cte_plan, + continuation, + cache, + ); + return Ok(Some(Arc::new(exec))); + } + + // Handle MaterializedCteReader + if let Some(reader) = node.as_any().downcast_ref::() { + let cache = self.get_or_create_cache(&reader.name); + let schema = Arc::clone(reader.schema.inner()); + let statistics = + Arc::new(datafusion_physical_plan::Statistics::new_unknown(&schema)); + let exec = MaterializedCteReaderExec::new( + reader.name.clone(), + schema, + cache, + self.partition_count(&reader.name), + statistics, + ); + return Ok(Some(Arc::new(exec))); + } + + Ok(None) + } +} diff --git a/datafusion/core/tests/sql/cte.rs b/datafusion/core/tests/sql/cte.rs new file mode 100644 index 0000000000000..24084536a5a0a --- /dev/null +++ b/datafusion/core/tests/sql/cte.rs @@ -0,0 +1,351 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use super::*; +use arrow::array::StringArray; +use datafusion::catalog::MemTable; +use datafusion::physical_plan::ExecutionPlanProperties; +use datafusion::physical_plan::materialized_cte::{ + MaterializedCteExec, MaterializedCteReaderExec, +}; +use datafusion::physical_plan::{collect_partitioned, visit_execution_plan}; +use datafusion_common::assert_batches_eq; +use datafusion_common::stats::Precision; + +#[tokio::test] +async fn multi_reference_cte_materialization_heuristic() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE cte_scan_source AS VALUES (1), (2)") + .await? + .collect() + .await?; + + let reused_scan = ctx + .sql( + "WITH t AS (SELECT column1 AS a FROM cte_scan_source) \ + SELECT count(*) FROM t l JOIN t r ON l.a = r.a", + ) + .await?; + let physical_plan = reused_scan.create_physical_plan().await?; + let plan = displayable(physical_plan.as_ref()).indent(true).to_string(); + assert_contains!(&plan, "MaterializedCteExec"); + assert_contains!(&plan, "MaterializedCteReaderExec"); + + let cheap_literal = ctx + .sql( + "WITH t AS (SELECT 1 AS a) \ + SELECT count(*) FROM t l JOIN t r ON l.a = r.a", + ) + .await?; + let physical_plan = cheap_literal.create_physical_plan().await?; + let plan = displayable(physical_plan.as_ref()).indent(true).to_string(); + assert_not_contains!(&plan, "MaterializedCteExec"); + assert_not_contains!(&plan, "MaterializedCteReaderExec"); + + let limited_reuse = ctx + .sql( + "WITH t AS (SELECT column1 AS a FROM cte_scan_source) \ + SELECT * FROM t l JOIN t r ON l.a = r.a LIMIT 1", + ) + .await?; + let physical_plan = limited_reuse.create_physical_plan().await?; + let plan = displayable(physical_plan.as_ref()).indent(true).to_string(); + assert_not_contains!(&plan, "MaterializedCteExec"); + assert_not_contains!(&plan, "MaterializedCteReaderExec"); + + Ok(()) +} + +#[tokio::test] +async fn materialized_cte_reader_preserves_input_partitions() -> Result<()> { + let ctx = + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int64, false)])); + let partitions = (0..4) + .map(|partition| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![partition]))], + ) + .map(|batch| vec![batch]) + }) + .collect::>>()?; + let provider = MemTable::try_new(Arc::clone(&schema), partitions)?; + ctx.register_table("cte_partition_source", Arc::new(provider))?; + + let df = ctx + .sql( + "WITH t AS (SELECT i FROM cte_partition_source) \ + SELECT count(*) FROM t l JOIN t r ON l.i = r.i", + ) + .await?; + let physical_plan = df.create_physical_plan().await?; + + struct PartitionVisitor { + producer_partitions: Vec, + reader_partitions: Vec, + } + + impl ExecutionPlanVisitor for PartitionVisitor { + type Error = std::convert::Infallible; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + if plan.is::() { + self.producer_partitions + .push(plan.output_partitioning().partition_count()); + } + if plan.is::() { + self.reader_partitions + .push(plan.output_partitioning().partition_count()); + } + Ok(true) + } + } + + let mut visitor = PartitionVisitor { + producer_partitions: vec![], + reader_partitions: vec![], + }; + visit_execution_plan(physical_plan.as_ref(), &mut visitor).unwrap(); + + assert_eq!(visitor.producer_partitions, vec![1]); + assert_eq!(visitor.reader_partitions, vec![4, 4]); + + let results = df.collect().await?; + let expected = [ + "+----------+", + "| count(*) |", + "+----------+", + "| 4 |", + "+----------+", + ]; + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn materialized_cte_partitioned_continuation_executes_partitions_once() -> Result<()> +{ + let ctx = + SessionContext::new_with_config(SessionConfig::new().with_target_partitions(4)); + let schema = Arc::new(Schema::new(vec![Field::new("i", DataType::Int64, false)])); + let partitions = (0..4) + .map(|partition| { + RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int64Array::from(vec![partition]))], + ) + .map(|batch| vec![batch]) + }) + .collect::>>()?; + let provider = MemTable::try_new(Arc::clone(&schema), partitions)?; + ctx.register_table("cte_repartition_source", Arc::new(provider))?; + + let df = ctx + .sql( + "WITH t AS (SELECT i FROM cte_repartition_source) \ + SELECT l.i FROM t l JOIN t r ON l.i = r.i", + ) + .await?; + let physical_plan = df.create_physical_plan().await?; + + assert_eq!(physical_plan.output_partitioning().partition_count(), 4); + let results = collect_partitioned(physical_plan, ctx.task_ctx()).await?; + assert_eq!( + results + .iter() + .flatten() + .map(|batch| batch.num_rows()) + .sum::(), + 4 + ); + + Ok(()) +} + +#[tokio::test] +async fn materialized_cte_cache_is_per_physical_plan() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE cte_cache_source AS VALUES (1), (2)") + .await? + .collect() + .await?; + + let first = ctx + .sql( + "WITH t AS (SELECT column1 AS a FROM cte_cache_source WHERE column1 = 1) \ + SELECT l.a FROM t l JOIN t r ON l.a = r.a", + ) + .await?; + let physical_plan = first.create_physical_plan().await?; + let plan = displayable(physical_plan.as_ref()).indent(true).to_string(); + assert_contains!(&plan, "MaterializedCteExec"); + let results = first.collect().await?; + let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"]; + assert_batches_eq!(expected, &results); + + let second = ctx + .sql( + "WITH t AS (SELECT column1 AS a FROM cte_cache_source WHERE column1 = 2) \ + SELECT l.a FROM t l JOIN t r ON l.a = r.a", + ) + .await?; + let physical_plan = second.create_physical_plan().await?; + let plan = displayable(physical_plan.as_ref()).indent(true).to_string(); + assert_contains!(&plan, "MaterializedCteExec"); + let results = second.collect().await?; + let expected = ["+---+", "| a |", "+---+", "| 2 |", "+---+"]; + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn materialized_cte_reader_preserves_producer_statistics() -> Result<()> { + let ctx = SessionContext::new(); + ctx.sql("CREATE TABLE cte_cross_source AS VALUES (1), (2), (3), (4)") + .await? + .collect() + .await?; + + let df = ctx + .sql( + "WITH scalar_cte AS ( \ + SELECT max(column1) AS max_value FROM cte_cross_source \ + ) \ + SELECT l.max_value \ + FROM scalar_cte l JOIN scalar_cte r ON l.max_value = r.max_value", + ) + .await?; + let physical_plan = df.create_physical_plan().await?; + + struct StatisticsVisitor { + reader_rows: Vec>, + } + + impl ExecutionPlanVisitor for StatisticsVisitor { + type Error = datafusion::error::DataFusionError; + + fn pre_visit(&mut self, plan: &dyn ExecutionPlan) -> Result { + if plan.is::() { + self.reader_rows + .push(plan.partition_statistics(None)?.num_rows); + } + + Ok(true) + } + } + + let mut visitor = StatisticsVisitor { + reader_rows: vec![], + }; + visit_execution_plan(physical_plan.as_ref(), &mut visitor)?; + + assert_eq!( + visitor.reader_rows, + vec![Precision::Exact(1), Precision::Exact(1)] + ); + + let results = df.collect().await?; + let expected = [ + "+-----------+", + "| max_value |", + "+-----------+", + "| 4 |", + "+-----------+", + ]; + assert_batches_eq!(expected, &results); + + Ok(()) +} + +#[tokio::test] +async fn q39_filter_pushdown_regression() -> Result<()> { + // TPC-DS Q39 pattern: CTE aggregates over all months, + // but each reference filters on a different d_moy value. + // When inlined, predicate pushdown can push d_moy=4 / d_moy=5 into the scan. + // When materialized, ALL months are computed then filtered post-hoc. + + let mut config = SessionConfig::new(); + config.options_mut().execution.enable_materialized_ctes = true; + let ctx = SessionContext::new_with_config(config); + + ctx.sql("CREATE TABLE inventory (inv_item_sk INT, inv_warehouse_sk INT, inv_date_sk INT, inv_quantity_on_hand INT) AS VALUES (1,1,1,100),(1,1,2,200),(1,1,3,50)").await?.collect().await?; + ctx.sql("CREATE TABLE item (i_item_sk INT) AS VALUES (1)") + .await? + .collect() + .await?; + ctx.sql("CREATE TABLE warehouse (w_warehouse_name VARCHAR, w_warehouse_sk INT) AS VALUES ('wh1', 1)").await?.collect().await?; + ctx.sql("CREATE TABLE date_dim (d_date_sk INT, d_year INT, d_moy INT) AS VALUES (1, 1998, 4), (2, 1998, 5), (3, 1998, 6)").await?.collect().await?; + + let q39 = " + EXPLAIN with inv as + (select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy + ,stdev,mean, case mean when 0 then null else stdev/mean end cov + from(select w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy + ,stddev_samp(inv_quantity_on_hand) stdev,avg(inv_quantity_on_hand) mean + from inventory + ,item + ,warehouse + ,date_dim + where inv_item_sk = i_item_sk + and inv_warehouse_sk = w_warehouse_sk + and inv_date_sk = d_date_sk + and d_year = 1998 + group by w_warehouse_name,w_warehouse_sk,i_item_sk,d_moy) foo + where case mean when 0 then 0 else stdev/mean end > 1) + select inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean, inv1.cov + ,inv2.w_warehouse_sk,inv2.i_item_sk,inv2.d_moy,inv2.mean, inv2.cov + from inv inv1,inv inv2 + where inv1.i_item_sk = inv2.i_item_sk + and inv1.w_warehouse_sk = inv2.w_warehouse_sk + and inv1.d_moy=4 + and inv2.d_moy=4+1 + order by inv1.w_warehouse_sk,inv1.i_item_sk,inv1.d_moy,inv1.mean,inv1.cov + ,inv2.d_moy,inv2.mean, inv2.cov + "; + + let df = ctx.sql(q39).await?; + let results = df.collect().await?; + let plan_str = results + .iter() + .flat_map(|b| { + let col = b.column(1); + (0..col.len()).map(move |i| { + col.as_any() + .downcast_ref::() + .unwrap() + .value(i) + .to_string() + }) + }) + .collect::>() + .join("\n"); + + // With the disjoint group-key filter heuristic, Q39's CTE should NOT be + // materialized because each reference filters on a different d_moy value, + // allowing predicate pushdown to specialize each aggregate copy. + assert!( + !plan_str.contains("MaterializedCteExec") + && !plan_str.contains("MaterializedCteProducer"), + "Q39 CTE should NOT be materialized when consumers apply disjoint \ + filters on group-by keys (d_moy=4 vs d_moy=5)" + ); + + Ok(()) +} diff --git a/datafusion/core/tests/sql/mod.rs b/datafusion/core/tests/sql/mod.rs index 9a1dc5502ee60..7876ffdc2dcdf 100644 --- a/datafusion/core/tests/sql/mod.rs +++ b/datafusion/core/tests/sql/mod.rs @@ -64,6 +64,7 @@ macro_rules! assert_metrics { pub mod aggregates; pub mod create_drop; +mod cte; pub mod explain_analyze; pub mod joins; mod path_partition; diff --git a/datafusion/expr/src/logical_plan/materialized_cte.rs b/datafusion/expr/src/logical_plan/materialized_cte.rs new file mode 100644 index 0000000000000..a2aabb7df91e0 --- /dev/null +++ b/datafusion/expr/src/logical_plan/materialized_cte.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logical plan nodes for materialized CTEs. + +use std::collections::HashSet; +use std::fmt; +use std::hash::{Hash, Hasher}; +use std::sync::Arc; + +use crate::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore}; +use datafusion_common::tree_node::{Transformed, TreeNode}; +use datafusion_common::{DFSchema, DFSchemaRef, Result}; + +fn get_all_columns_from_schema(schema: &DFSchema) -> HashSet { + schema.fields().iter().map(|f| f.name().clone()).collect() +} + +/// A logical plan node that materializes a CTE and makes it available +/// to a continuation plan. The CTE is executed once, its results cached, +/// and any `MaterializedCteReader` nodes in the continuation plan read +/// from that cache. +#[derive(Debug, Clone)] +pub struct MaterializedCteProducer { + /// Name of the CTE being materialized + pub name: String, + /// The plan that computes the CTE + pub cte_plan: Arc, + /// The plan that uses the materialized CTE (continuation) + pub continuation: Arc, + /// The output schema (same as continuation's schema) + pub schema: DFSchemaRef, +} + +impl PartialEq for MaterializedCteProducer { + fn eq(&self, other: &Self) -> bool { + self.name == other.name + && self.cte_plan == other.cte_plan + && self.continuation == other.continuation + } +} + +impl Eq for MaterializedCteProducer {} + +impl PartialOrd for MaterializedCteProducer { + fn partial_cmp(&self, other: &Self) -> Option { + self.name.partial_cmp(&other.name) + } +} + +impl Hash for MaterializedCteProducer { + fn hash(&self, state: &mut H) { + self.name.hash(state); + self.cte_plan.hash(state); + self.continuation.hash(state); + } +} + +impl UserDefinedLogicalNodeCore for MaterializedCteProducer { + fn name(&self) -> &str { + "MaterializedCteProducer" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![self.cte_plan.as_ref(), self.continuation.as_ref()] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + get_all_columns_from_schema(self.schema()) + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MaterializedCteProducer: name={}", self.name) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + inputs: Vec, + ) -> Result { + assert_eq!(inputs.len(), 2); + let cte_plan = inputs[0].clone(); + let cte_schema = Arc::clone(cte_plan.schema()); + let name = self.name.clone(); + let continuation = inputs[1] + .clone() + .transform_down(move |node| { + if let LogicalPlan::Extension(Extension { + node: extension_node, + }) = &node + && let Some(reader) = extension_node + .as_any() + .downcast_ref::() + && reader.name == name + { + let reader = MaterializedCteReader { + name: reader.name.clone(), + schema: Arc::clone(&cte_schema), + }; + return Ok(Transformed::yes(LogicalPlan::Extension(Extension { + node: Arc::new(reader), + }))); + } + Ok(Transformed::no(node)) + })? + .data; + Ok(Self { + name: self.name.clone(), + cte_plan: Arc::new(cte_plan), + schema: Arc::clone(continuation.schema()), + continuation: Arc::new(continuation), + }) + } +} + +/// A logical plan node that reads from a previously materialized CTE cache. +/// This is a leaf node (no inputs) that will be wired to the cache at +/// physical planning time. +#[derive(Debug, Clone)] +pub struct MaterializedCteReader { + /// Name of the CTE to read from + pub name: String, + /// The schema of the CTE output + pub schema: DFSchemaRef, +} + +impl PartialEq for MaterializedCteReader { + fn eq(&self, other: &Self) -> bool { + self.name == other.name && self.schema == other.schema + } +} + +impl Eq for MaterializedCteReader {} + +impl PartialOrd for MaterializedCteReader { + fn partial_cmp(&self, other: &Self) -> Option { + self.name.partial_cmp(&other.name) + } +} + +impl Hash for MaterializedCteReader { + fn hash(&self, state: &mut H) { + self.name.hash(state); + self.schema.hash(state); + } +} + +impl UserDefinedLogicalNodeCore for MaterializedCteReader { + fn name(&self) -> &str { + "MaterializedCteReader" + } + + fn inputs(&self) -> Vec<&LogicalPlan> { + vec![] + } + + fn schema(&self) -> &DFSchemaRef { + &self.schema + } + + fn expressions(&self) -> Vec { + vec![] + } + + fn prevent_predicate_push_down_columns(&self) -> HashSet { + get_all_columns_from_schema(self.schema()) + } + + fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "MaterializedCteReader: name={}", self.name) + } + + fn with_exprs_and_inputs( + &self, + _exprs: Vec, + _inputs: Vec, + ) -> Result { + Ok(Self { + name: self.name.clone(), + schema: Arc::clone(&self.schema), + }) + } +} diff --git a/datafusion/expr/src/logical_plan/mod.rs b/datafusion/expr/src/logical_plan/mod.rs index 5087b25178ab6..609b5f16dcb64 100644 --- a/datafusion/expr/src/logical_plan/mod.rs +++ b/datafusion/expr/src/logical_plan/mod.rs @@ -22,6 +22,7 @@ pub mod dml; mod extension; pub(crate) mod invariants; pub use invariants::{InvariantLevel, assert_expected_schema, check_subquery_expr}; +pub mod materialized_cte; mod plan; mod statement; pub mod tree_node; @@ -56,3 +57,4 @@ pub use datafusion_common::format::ExplainFormat; pub use display::display_schema; pub use extension::{UserDefinedLogicalNode, UserDefinedLogicalNodeCore}; +pub use materialized_cte::{MaterializedCteProducer, MaterializedCteReader}; diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index 3005e975424b4..a1ae99ccab1d1 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -78,6 +78,7 @@ pub mod filter; pub mod filter_pushdown; pub mod joins; pub mod limit; +pub mod materialized_cte; pub mod memory; pub mod metrics; pub mod operator_statistics; diff --git a/datafusion/physical-plan/src/materialized_cte.rs b/datafusion/physical-plan/src/materialized_cte.rs new file mode 100644 index 0000000000000..bb35688ebd9ab --- /dev/null +++ b/datafusion/physical-plan/src/materialized_cte.rs @@ -0,0 +1,573 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical plan nodes for materialized CTEs. + +use std::fmt; +use std::future::Future; +use std::sync::Arc; + +use crate::coop::cooperative; +use crate::execution_plan::{Boundedness, EmissionType, collect_partitioned}; +use crate::memory::MemoryStream; +use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; +use crate::operator_statistics::StatisticsRegistry; +use crate::stream::RecordBatchStreamAdapter; +use crate::{ + DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, + SendableRecordBatchStream, Statistics, +}; + +use arrow::datatypes::SchemaRef; +use arrow::record_batch::RecordBatch; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_common::{Result, internal_err}; +use datafusion_execution::TaskContext; +use datafusion_physical_expr::{EquivalenceProperties, Partitioning}; +use futures::TryStreamExt; +use tokio::sync::OnceCell; + +/// A shared cache that stores the materialized CTE results. +/// The cache uses a `OnceCell` to ensure the CTE is only computed once. +#[derive(Debug)] +pub struct MaterializedCteCache { + /// Name of the CTE (for debugging) + name: String, + /// The cached batches, populated once by the producer + batches: OnceCell>>, +} + +impl MaterializedCteCache { + /// Create a new empty cache for the given CTE name. + pub fn new(name: String) -> Self { + Self { + name, + batches: OnceCell::new(), + } + } + + /// Store batches into the cache. Returns error if already populated. + pub fn store(&self, batches: Vec>) -> Result<()> { + self.batches.set(batches).map_err(|_| { + datafusion_common::DataFusionError::Internal(format!( + "MaterializedCteCache '{}' was already populated", + self.name + )) + }) + } + + /// Get the cached batches. Returns None if not yet populated. + pub fn get(&self) -> Option<&Vec>> { + self.batches.get() + } + + /// Get the cached batches, computing and storing them once if needed. + pub async fn get_or_try_init(&self, f: F) -> Result<&Vec>> + where + F: FnOnce() -> Fut, + Fut: Future>>>, + { + self.batches.get_or_try_init(f).await + } +} + +/// Physical execution plan that materializes a CTE and then executes +/// a continuation plan. The CTE results are cached in a shared +/// `MaterializedCteCache` for use by `MaterializedCteReaderExec` nodes. +#[derive(Debug)] +pub struct MaterializedCteExec { + /// Name of the CTE + name: String, + /// The plan that computes the CTE + cte_plan: Arc, + /// The continuation plan that uses the materialized CTE + continuation: Arc, + /// Shared cache for the CTE results + cache: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Cache holding plan properties + properties: Arc, +} + +impl MaterializedCteExec { + /// Create a new MaterializedCteExec. + pub fn new( + name: String, + cte_plan: Arc, + continuation: Arc, + cache: Arc, + ) -> Self { + let properties = Arc::clone(continuation.properties()); + Self { + name, + cte_plan, + continuation, + cache, + metrics: ExecutionPlanMetricsSet::new(), + properties, + } + } +} + +impl DisplayAs for MaterializedCteExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MaterializedCteExec: name={}", self.name) + } + DisplayFormatType::TreeRender => { + write!(f, "name={}", self.name) + } + } + } +} + +impl ExecutionPlan for MaterializedCteExec { + fn name(&self) -> &'static str { + "MaterializedCteExec" + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![&self.cte_plan, &self.continuation] + } + + fn with_new_children( + self: Arc, + children: Vec>, + ) -> Result> { + if children.len() != 2 { + return internal_err!( + "MaterializedCteExec expected 2 children, got {}", + children.len() + ); + } + let cte_plan = Arc::clone(&children[0]); + let partition_count = cte_plan.output_partitioning().partition_count(); + let statistics = materialized_cte_statistics(cte_plan.as_ref())?; + let continuation = replace_materialized_cte_readers( + Arc::clone(&children[1]), + &self.name, + &self.cache, + partition_count, + &statistics, + )?; + Ok(Arc::new(Self::new( + self.name.clone(), + cte_plan, + continuation, + Arc::clone(&self.cache), + ))) + } + + fn execute( + &self, + partition: usize, + context: Arc, + ) -> Result { + let output_partitions = self.properties.output_partitioning().partition_count(); + if partition >= output_partitions { + return internal_err!( + "MaterializedCteExec got partition {partition}, expected less than {output_partitions}" + ); + } + + let cache = Arc::clone(&self.cache); + let cte_plan = Arc::clone(&self.cte_plan); + let continuation = Arc::clone(&self.continuation); + let name = self.name.clone(); + let ctx = Arc::clone(&context); + let schema = Arc::clone(&self.continuation.schema()); + + let fut = async move { + // Materialize the CTE if not already done + let materialize_ctx = Arc::clone(&ctx); + cache + .get_or_try_init(|| async move { + let partitions = + collect_partitioned(cte_plan, materialize_ctx).await?; + + let num_partitions = partitions.len(); + let num_batches: usize = partitions.iter().map(Vec::len).sum(); + let num_rows: usize = partitions + .iter() + .flatten() + .map(|b| b.num_rows()) + .sum(); + log::info!( + "Materializing CTE '{name}': {num_partitions} partitions, {num_batches} batches, {num_rows} rows" + ); + + Ok(partitions) + }) + .await?; + + continuation.execute(partition, ctx) + }; + + // Use futures::stream::once to create a stream from the future, + // then flatten it to get a stream of RecordBatches + let stream = futures::stream::once(fut).try_flatten(); + Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::new(Statistics::new_unknown( + &self.continuation.schema(), + ))) + } +} + +/// Physical execution plan that reads from a previously materialized CTE cache. +/// This is a leaf node that retrieves the cached batches from the shared +/// `MaterializedCteCache`. +#[derive(Debug)] +pub struct MaterializedCteReaderExec { + /// Name of the CTE + name: String, + /// The schema of the CTE output + schema: SchemaRef, + /// Shared cache to read from + cache: Arc, + /// Execution metrics + metrics: ExecutionPlanMetricsSet, + /// Statistics from the plan that produces the materialized CTE + statistics: Arc, + /// Cache holding plan properties + properties: Arc, +} + +impl MaterializedCteReaderExec { + /// Create a new MaterializedCteReaderExec. + pub fn new( + name: String, + schema: SchemaRef, + cache: Arc, + partition_count: usize, + statistics: Arc, + ) -> Self { + let partition_count = reader_partition_count(partition_count, &statistics); + let properties = Self::compute_properties(Arc::clone(&schema), partition_count); + Self { + name, + schema, + cache, + metrics: ExecutionPlanMetricsSet::new(), + statistics, + properties: Arc::new(properties), + } + } + + /// The CTE this reader reads from. + pub fn cte_name(&self) -> &str { + &self.name + } + + fn compute_properties(schema: SchemaRef, partition_count: usize) -> PlanProperties { + PlanProperties::new( + EquivalenceProperties::new(schema), + Partitioning::UnknownPartitioning(partition_count), + EmissionType::Incremental, + Boundedness::Bounded, + ) + } +} + +impl DisplayAs for MaterializedCteReaderExec { + fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result { + match t { + DisplayFormatType::Default | DisplayFormatType::Verbose => { + write!(f, "MaterializedCteReaderExec: name={}", self.name) + } + DisplayFormatType::TreeRender => { + write!(f, "name={}", self.name) + } + } + } +} + +impl ExecutionPlan for MaterializedCteReaderExec { + fn name(&self) -> &'static str { + "MaterializedCteReaderExec" + } + + fn properties(&self) -> &Arc { + &self.properties + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _children: Vec>, + ) -> Result> { + Ok(Arc::clone(&self) as Arc) + } + + fn execute( + &self, + partition: usize, + _context: Arc, + ) -> Result { + let output_partitions = self.properties.output_partitioning().partition_count(); + if partition >= output_partitions { + return internal_err!( + "MaterializedCteReaderExec got partition {partition}, expected less than {output_partitions}" + ); + } + + let batches = self.cache.get().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "MaterializedCteReaderExec: cache for CTE '{}' is not yet populated. \ + The producer must execute before the reader.", + self.name + )) + })?; + + let partition_batches = if output_partitions == 1 { + batches.iter().flatten().cloned().collect() + } else { + batches.get(partition).cloned().unwrap_or_default() + }; + + let stream = + MemoryStream::try_new(partition_batches, Arc::clone(&self.schema), None)?; + Ok(Box::pin(cooperative(stream))) + } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } + + fn partition_statistics(&self, _partition: Option) -> Result> { + Ok(Arc::clone(&self.statistics)) + } +} + +fn reader_partition_count(partition_count: usize, statistics: &Statistics) -> usize { + match statistics.num_rows.get_value() { + Some(rows) if *rows < partition_count => 1, + _ => partition_count, + } +} + +/// Estimate the statistics exposed by materialized CTE readers. +pub fn materialized_cte_statistics(plan: &dyn ExecutionPlan) -> Result> { + Ok(Arc::clone( + StatisticsRegistry::default_with_builtin_providers() + .compute(plan)? + .base_arc(), + )) +} + +/// Replace readers for a materialized CTE with readers that use the provided +/// cache and expose the provided partition count and statistics. +pub fn replace_materialized_cte_readers( + plan: Arc, + name: &str, + cache: &Arc, + partition_count: usize, + statistics: &Arc, +) -> Result> { + plan.transform_up(|plan| { + let Some(reader) = plan.downcast_ref::() else { + return Ok(Transformed::no(plan)); + }; + + if reader.cte_name() != name { + return Ok(Transformed::no(plan)); + } + + Ok(Transformed::yes(Arc::new(MaterializedCteReaderExec::new( + name.to_string(), + plan.schema(), + Arc::clone(cache), + partition_count, + Arc::clone(statistics), + )) as Arc)) + }) + .data() +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ArrayRef, Int32Array}; + use arrow::datatypes::{DataType, Field, Schema}; + use datafusion_common::assert_batches_eq; + use datafusion_common::stats::Precision; + use futures::TryStreamExt; + + fn test_schema() -> SchemaRef { + Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])) + } + + fn test_batch(schema: &SchemaRef) -> RecordBatch { + let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3])); + RecordBatch::try_new(Arc::clone(schema), vec![array]).unwrap() + } + + fn test_statistics(schema: &SchemaRef) -> Arc { + Arc::new(Statistics::new_unknown(schema)) + } + + fn test_statistics_with_rows(schema: &SchemaRef, rows: usize) -> Arc { + Arc::new(Statistics::new_unknown(schema).with_num_rows(Precision::Exact(rows))) + } + + #[test] + fn test_cache_store_and_get() { + let cache = MaterializedCteCache::new("test".into()); + assert!(cache.get().is_none()); + + let schema = test_schema(); + let batch = test_batch(&schema); + cache.store(vec![vec![batch.clone()]]).unwrap(); + + let cached = cache.get().unwrap(); + assert_eq!(cached.len(), 1); + assert_eq!(cached[0].len(), 1); + assert_eq!(cached[0][0].num_rows(), 3); + } + + #[test] + fn test_cache_double_store_fails() { + let cache = MaterializedCteCache::new("test".into()); + let schema = test_schema(); + let batch = test_batch(&schema); + + cache.store(vec![vec![batch.clone()]]).unwrap(); + assert!(cache.store(vec![vec![batch]]).is_err()); + } + + #[tokio::test] + async fn test_reader_exec_reads_from_cache() { + let schema = test_schema(); + let batch = test_batch(&schema); + let cache = Arc::new(MaterializedCteCache::new("test".into())); + cache.store(vec![vec![batch.clone()]]).unwrap(); + + let reader = MaterializedCteReaderExec::new( + "test".into(), + Arc::clone(&schema), + cache, + 1, + test_statistics(&schema), + ); + + let context = Arc::new(TaskContext::default()); + let stream = reader.execute(0, context).unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + + let expected = [ + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", + ]; + assert_batches_eq!(expected, &batches); + } + + #[tokio::test] + async fn test_reader_exec_preserves_cache_partitions() { + let schema = test_schema(); + let batch = test_batch(&schema); + let cache = Arc::new(MaterializedCteCache::new("test".into())); + cache + .store(vec![vec![batch.clone()], vec![batch.clone()]]) + .unwrap(); + + let reader = MaterializedCteReaderExec::new( + "test".into(), + Arc::clone(&schema), + cache, + 2, + test_statistics(&schema), + ); + + assert_eq!( + reader.properties().output_partitioning().partition_count(), + 2 + ); + + let context = Arc::new(TaskContext::default()); + let stream = reader.execute(1, context).unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + + let expected = [ + "+---+", "| a |", "+---+", "| 1 |", "| 2 |", "| 3 |", "+---+", + ]; + assert_batches_eq!(expected, &batches); + } + + #[tokio::test] + async fn test_reader_exec_coalesces_exact_scalar_cache() { + let schema = test_schema(); + let batch = RecordBatch::try_new( + Arc::clone(&schema), + vec![Arc::new(Int32Array::from(vec![1]))], + ) + .unwrap(); + let cache = Arc::new(MaterializedCteCache::new("test".into())); + cache.store(vec![vec![], vec![batch.clone()]]).unwrap(); + + let reader = MaterializedCteReaderExec::new( + "test".into(), + Arc::clone(&schema), + cache, + 2, + test_statistics_with_rows(&schema, 1), + ); + + assert_eq!( + reader.properties().output_partitioning().partition_count(), + 1 + ); + + let context = Arc::new(TaskContext::default()); + let stream = reader.execute(0, context).unwrap(); + let batches: Vec = stream.try_collect().await.unwrap(); + + let expected = ["+---+", "| a |", "+---+", "| 1 |", "+---+"]; + assert_batches_eq!(expected, &batches); + } + + #[tokio::test] + async fn test_reader_exec_fails_when_cache_empty() { + let schema = test_schema(); + let cache = Arc::new(MaterializedCteCache::new("test".into())); + + let reader = MaterializedCteReaderExec::new( + "test".into(), + Arc::clone(&schema), + cache, + 1, + test_statistics(&schema), + ); + + let context = Arc::new(TaskContext::default()); + let result = reader.execute(0, context); + assert!(result.is_err()); + } +} diff --git a/datafusion/physical-plan/src/operator_statistics/mod.rs b/datafusion/physical-plan/src/operator_statistics/mod.rs index 041ef4666658d..105f8a2b22a9d 100644 --- a/datafusion/physical-plan/src/operator_statistics/mod.rs +++ b/datafusion/physical-plan/src/operator_statistics/mod.rs @@ -683,7 +683,11 @@ impl StatisticsProvider for AggregateStatisticsProvider { return Ok(StatisticsResult::Delegate); } - if child_stats.is_empty() || agg.group_expr().expr().is_empty() { + if agg.group_expr().expr().is_empty() { + return computed_with_row_count(plan, Precision::Exact(1)); + } + + if child_stats.is_empty() { return Ok(StatisticsResult::Delegate); } @@ -1595,6 +1599,20 @@ mod tests { Ok(()) } + #[test] + fn test_aggregate_provider_global_aggregate() -> Result<()> { + let source = make_source_with_ndv(100, vec![Some(10)]); + let agg = make_aggregate(source, PhysicalGroupBy::default())?; + + let registry = StatisticsRegistry::with_providers(vec![ + Arc::new(AggregateStatisticsProvider), + Arc::new(DefaultStatisticsProvider), + ]); + let stats = registry.compute(agg.as_ref())?; + assert_eq!(stats.base.num_rows, Precision::Exact(1)); + Ok(()) + } + #[test] fn test_aggregate_provider_no_ndv_delegates() -> Result<()> { // No NDV on the GROUP BY column diff --git a/datafusion/sql/src/cte.rs b/datafusion/sql/src/cte.rs index 18766d7056355..88985d86e6539 100644 --- a/datafusion/sql/src/cte.rs +++ b/datafusion/sql/src/cte.rs @@ -24,7 +24,7 @@ use datafusion_common::{ tree_node::{TreeNode, TreeNodeRecursion}, }; use datafusion_expr::{LogicalPlan, LogicalPlanBuilder, TableSource}; -use sqlparser::ast::{Query, SetExpr, SetOperator, With}; +use sqlparser::ast::{CteAsMaterialized, Query, SetExpr, SetOperator, With}; impl SqlToRel<'_, S> { pub(super) fn plan_with_clause( @@ -43,8 +43,21 @@ impl SqlToRel<'_, S> { ); } + // Track MATERIALIZED / NOT MATERIALIZED hints + if let Some(ref materialized) = cte.materialized { + match materialized { + CteAsMaterialized::Materialized => { + planner_context.insert_materialized_cte(&cte_name); + } + CteAsMaterialized::NotMaterialized => { + planner_context.insert_not_materialized_cte(&cte_name); + } + } + } + // Create a logical plan for the CTE let cte_plan = if is_recursive { + planner_context.insert_recursive_cte(&cte_name); self.recursive_cte(&cte_name, *cte.query, planner_context)? } else { self.non_recursive_cte(*cte.query, planner_context)? diff --git a/datafusion/sql/src/planner.rs b/datafusion/sql/src/planner.rs index 01215ae3434cf..5e1ea46561638 100644 --- a/datafusion/sql/src/planner.rs +++ b/datafusion/sql/src/planner.rs @@ -16,7 +16,7 @@ // under the License. //! [`SqlToRel`]: SQL Query Planner (produces [`LogicalPlan`] from SQL AST) -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::str::FromStr; use std::sync::Arc; use std::vec; @@ -276,6 +276,14 @@ pub struct PlannerContext { set_expr_left_schema: Option, /// The parameters of all lambdas seen so far lambda_parameters: HashMap, + /// CTEs explicitly marked as MATERIALIZED + materialized_cte_names: HashSet, + /// CTEs explicitly marked as NOT MATERIALIZED + not_materialized_cte_names: HashSet, + /// CTEs that are recursive + recursive_cte_names: HashSet, + /// Reference counts for CTEs (how many times each CTE is referenced) + cte_ref_counts: HashMap, } impl Default for PlannerContext { @@ -295,6 +303,10 @@ impl PlannerContext { create_table_schema: None, set_expr_left_schema: None, lambda_parameters: HashMap::new(), + materialized_cte_names: HashSet::new(), + not_materialized_cte_names: HashSet::new(), + recursive_cte_names: HashSet::new(), + cte_ref_counts: HashMap::new(), } } @@ -430,6 +442,61 @@ impl PlannerContext { ) -> Option { std::mem::replace(&mut self.set_expr_left_schema, schema) } + + /// Mark a CTE as explicitly MATERIALIZED + pub fn insert_materialized_cte(&mut self, name: &str) { + self.materialized_cte_names.insert(name.to_string()); + } + + /// Mark a CTE as explicitly NOT MATERIALIZED + pub fn insert_not_materialized_cte(&mut self, name: &str) { + self.not_materialized_cte_names.insert(name.to_string()); + } + + /// Mark a CTE as recursive + pub fn insert_recursive_cte(&mut self, name: &str) { + self.recursive_cte_names.insert(name.to_string()); + } + + /// Check if a CTE is explicitly marked as MATERIALIZED + pub fn is_materialized_cte(&self, name: &str) -> bool { + self.materialized_cte_names.contains(name) + } + + /// Check if a CTE is explicitly marked as NOT MATERIALIZED + pub fn is_not_materialized_cte(&self, name: &str) -> bool { + self.not_materialized_cte_names.contains(name) + } + + /// Check if a CTE is recursive + pub fn is_recursive_cte(&self, name: &str) -> bool { + self.recursive_cte_names.contains(name) + } + + /// Increment the reference count for a CTE + pub fn increment_cte_ref_count(&mut self, name: &str) { + *self.cte_ref_counts.entry(name.to_string()).or_insert(0) += 1; + } + + /// Get the reference count for a CTE + pub fn get_cte_ref_count(&self, name: &str) -> usize { + self.cte_ref_counts.get(name).copied().unwrap_or(0) + } + + /// Get a reference to the materialized CTE names + pub fn materialized_cte_names(&self) -> &HashSet { + &self.materialized_cte_names + } + + /// Get a reference to the CTE reference counts + pub fn cte_ref_counts(&self) -> &HashMap { + &self.cte_ref_counts + } + + /// Returns an iterator over CTE names + pub fn cte_names(&self) -> impl Iterator { + self.ctes.keys() + } } /// SQL query planner and binder diff --git a/datafusion/sql/src/query.rs b/datafusion/sql/src/query.rs index 76124cbc7eb59..906812705749a 100644 --- a/datafusion/sql/src/query.rs +++ b/datafusion/sql/src/query.rs @@ -15,17 +15,23 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashSet; use std::sync::Arc; use crate::planner::{ContextProvider, PlannerContext, SqlToRel}; use crate::stack::StackGuard; -use datafusion_common::{Constraints, DFSchema, Result, not_impl_err}; +use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion}; +use datafusion_common::{Constraints, DFSchema, DFSchemaRef, Result, not_impl_err}; use datafusion_expr::expr::{Sort, WildcardOptions}; +use datafusion_expr::logical_plan::{ + Extension, MaterializedCteProducer, MaterializedCteReader, +}; use datafusion_expr::select_expr::SelectExpr; use datafusion_expr::{ CreateMemoryTable, DdlStatement, Distinct, Expr, LogicalPlan, LogicalPlanBuilder, + Operator, }; use sqlparser::ast::{ Expr as SQLExpr, ExprWithAliasAndOrderBy, Ident, LimitClause, Offset, OffsetRows, @@ -63,6 +69,7 @@ impl SqlToRel<'_, S> { return not_impl_err!("FETCH clause is not supported yet"); } + let has_with = with.is_some(); if let Some(with) = with { self.plan_with_clause(with, planner_context)?; } @@ -99,7 +106,116 @@ impl SqlToRel<'_, S> { } }?; - self.pipe_operators(plan, pipe_operators, planner_context) + let plan = self.pipe_operators(plan, pipe_operators, planner_context)?; + + // Apply CTE materialization if this query had a WITH clause + if has_with { + self.apply_cte_materialization(plan, planner_context) + } else { + Ok(plan) + } + } + + /// Apply CTE materialization to the plan. + /// + /// For each CTE that should be materialized, this replaces SubqueryAlias + /// references with MaterializedCteReader nodes and wraps the plan in + /// MaterializedCteProducer nodes. + fn apply_cte_materialization( + &self, + plan: LogicalPlan, + planner_context: &mut PlannerContext, + ) -> Result { + // Check if materialized CTEs are enabled + if !self + .context_provider + .options() + .execution + .enable_materialized_ctes + { + return Ok(plan); + } + + // Collect CTE names that should be materialized + let cte_names: Vec = planner_context.cte_names().cloned().collect(); + let mut ctes_to_materialize: Vec<(String, LogicalPlan)> = Vec::new(); + + for cte_name in &cte_names { + // Skip recursive CTEs (they have their own execution mechanism) + if planner_context.is_recursive_cte(cte_name) { + continue; + } + + // Skip CTEs explicitly marked NOT MATERIALIZED + if planner_context.is_not_materialized_cte(cte_name) { + continue; + } + + // Count references in the plan tree + let ref_count = count_cte_references(&plan, cte_name); + // Determine if we should materialize: + // 1. Explicitly marked MATERIALIZED, OR + // 2. CTEs referenced more than once. + let should_materialize = planner_context.is_materialized_cte(cte_name) + || (ref_count > 1 && { + let cte_plan = planner_context.get_cte(cte_name); + cte_plan.is_some_and(|cte_plan| { + should_materialize_multi_reference_cte( + cte_plan, cte_name, &plan, ref_count, + ) + }) + }); + + if should_materialize + && ref_count > 0 + && let Some(cte_plan) = planner_context.get_cte(cte_name) + { + ctes_to_materialize.push((cte_name.clone(), cte_plan.clone())); + } + } + + if ctes_to_materialize.is_empty() { + return Ok(plan); + } + + // Sort CTEs by dependency order: CTEs that depend on other CTEs + // should be processed first (wrapped innermost = executed last) + ctes_to_materialize.sort_by(|(name_a, _), (name_b, _)| { + let a_deps_on_b = planner_context + .get_cte(name_a) + .is_some_and(|p| plan_references_cte(p, name_b)); + let b_deps_on_a = planner_context + .get_cte(name_b) + .is_some_and(|p| plan_references_cte(p, name_a)); + if a_deps_on_b { + std::cmp::Ordering::Less + } else if b_deps_on_a { + std::cmp::Ordering::Greater + } else { + std::cmp::Ordering::Equal + } + }); + + // Apply materialization: replace references and wrap plan + let mut result_plan = plan; + for (cte_name, cte_plan) in ctes_to_materialize { + // Replace all SubqueryAlias references to this CTE with readers + result_plan = + replace_cte_with_reader(result_plan, &cte_name, cte_plan.schema())?; + + // Wrap the plan in a producer + let producer = MaterializedCteProducer { + name: cte_name.clone(), + cte_plan: Arc::new(cte_plan), + continuation: Arc::new(result_plan.clone()), + schema: Arc::clone(result_plan.schema()), + }; + result_plan = LogicalPlan::Extension(Extension { + node: Arc::new(producer), + }); + } + + Ok(result_plan) } /// Apply pipe operators to a plan @@ -381,6 +497,344 @@ impl SqlToRel<'_, S> { } } +/// Decide whether to materialize a CTE referenced more than once. +/// +/// Multi-reference CTEs stay materialized by default, but cheap CTEs and CTEs +/// consumed below a top-level limit are left inline. Aggregate/distinct/window +/// CTEs and complex CTEs with many base table references stay materialized. +fn should_materialize_multi_reference_cte( + cte_plan: &LogicalPlan, + cte_name: &str, + continuation_plan: &LogicalPlan, + ref_count: usize, +) -> bool { + if ref_count <= 1 || is_cheap_to_inline(cte_plan) { + return false; + } + + if ends_in_aggregate_distinct_or_window(cte_plan) { + if consumers_apply_disjoint_group_key_filters( + cte_name, + continuation_plan, + ref_count, + ) { + return false; + } + return true; + } + + let base_table_references = count_base_table_references(cte_plan); + if base_table_references > 2 && base_table_references * ref_count > 10 { + return true; + } + + !contains_limit_on_single_child_path(continuation_plan) +} + +fn ends_in_aggregate_distinct_or_window(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::Aggregate(_) => true, + LogicalPlan::Distinct(_) => true, + LogicalPlan::Window(_) => true, + _ => { + let inputs = plan.inputs(); + inputs.len() == 1 && ends_in_aggregate_distinct_or_window(inputs[0]) + } + } +} + +/// Detects Q39-style patterns where each CTE reference is filtered on a different +/// literal value of a group-by key. In this case inlining is better because the +/// optimizer can push the filter through the aggregate, specializing each copy. +fn consumers_apply_disjoint_group_key_filters( + cte_name: &str, + continuation_plan: &LogicalPlan, + ref_count: usize, +) -> bool { + let per_ref_filters = collect_per_reference_filters(continuation_plan, cte_name); + if per_ref_filters.len() != ref_count || per_ref_filters.is_empty() { + return false; + } + + // Collect all column names that appear in any reference's filters. + let all_col_names: HashSet<&str> = per_ref_filters + .iter() + .flat_map(|filters| filters.iter().map(|(col, _)| col.as_str())) + .collect(); + + // For each column, check if every reference applies an equality filter on it + // with a distinct literal value per reference. + for col_name in all_col_names { + let mut seen_values: HashSet<&str> = HashSet::new(); + let mut all_have_filter = true; + for filters in &per_ref_filters { + let mut found = false; + for (filter_col, filter_val) in filters { + if filter_col == col_name { + seen_values.insert(filter_val.as_str()); + found = true; + break; + } + } + if !found { + all_have_filter = false; + break; + } + } + if all_have_filter && seen_values.len() == ref_count { + return true; + } + } + + false +} + +/// For each CTE reference in the continuation plan, collect equality filter +/// conditions (column_name, literal_value) that are attributed to that specific +/// reference. Uses column qualifiers to match filters to the correct reference. +fn collect_per_reference_filters( + plan: &LogicalPlan, + cte_name: &str, +) -> Vec> { + // Step 1: Find all CTE reference aliases and any filters on the path. + // A CTE reference is SubqueryAlias(cte_name) wrapped by an outer alias. + // Example: SubqueryAlias("inv1") → SubqueryAlias("inv") → [CTE body] + let mut ref_aliases: Vec = Vec::new(); + collect_cte_ref_aliases(plan, cte_name, &mut ref_aliases); + + if ref_aliases.is_empty() { + return Vec::new(); + } + + // Step 2: Collect all equality filters from the plan (before the join). + // These are qualified like "inv1.d_moy = 4" + let mut all_filters: Vec<(Option, String, String)> = Vec::new(); + collect_all_equality_filters(plan, cte_name, &mut all_filters); + + // Step 3: For each reference alias, find the filters that target it. + let mut results = Vec::new(); + for alias in &ref_aliases { + let mut ref_filters = Vec::new(); + for (qualifier, col_name, value) in &all_filters { + if qualifier.as_deref() == Some(alias.as_str()) { + ref_filters.push((col_name.clone(), value.clone())); + } + } + results.push(ref_filters); + } + + results +} + +/// Find the outer aliases wrapping each CTE reference. +/// For "FROM inv inv1, inv inv2", finds ["inv1", "inv2"] +fn collect_cte_ref_aliases( + plan: &LogicalPlan, + cte_name: &str, + aliases: &mut Vec, +) { + if let LogicalPlan::SubqueryAlias(outer_alias) = plan + && outer_alias.alias.table() != cte_name + && let LogicalPlan::SubqueryAlias(inner) = outer_alias.input.as_ref() + && inner.alias.table() == cte_name + { + aliases.push(outer_alias.alias.table().to_string()); + return; + } + for input in plan.inputs() { + collect_cte_ref_aliases(input, cte_name, aliases); + } +} + +/// Collect equality conditions from Filter nodes, extracting (qualifier, column_name, value). +/// Also handles simple constant arithmetic (like 4+1). +fn collect_all_equality_filters( + plan: &LogicalPlan, + cte_name: &str, + out: &mut Vec<(Option, String, String)>, +) { + if let LogicalPlan::SubqueryAlias(alias) = plan + && alias.alias.table() == cte_name + { + return; + } + + if let LogicalPlan::Filter(filter) = plan { + extract_qualified_equality_conditions(&filter.predicate, out); + } + + if let LogicalPlan::Join(join) = plan + && let Some(filter) = &join.filter + { + extract_qualified_equality_conditions(filter, out); + } + + for input in plan.inputs() { + collect_all_equality_filters(input, cte_name, out); + } +} + +fn extract_qualified_equality_conditions( + expr: &Expr, + out: &mut Vec<(Option, String, String)>, +) { + match expr { + Expr::BinaryExpr(binary) if binary.op == Operator::Eq => { + match (binary.left.as_ref(), binary.right.as_ref()) { + (Expr::Column(col), rhs) => { + if let Some(val) = try_eval_constant(rhs) { + out.push(( + col.relation.as_ref().map(|r| r.table().to_string()), + col.name().to_string(), + val, + )); + } + } + (lhs, Expr::Column(col)) => { + if let Some(val) = try_eval_constant(lhs) { + out.push(( + col.relation.as_ref().map(|r| r.table().to_string()), + col.name().to_string(), + val, + )); + } + } + _ => {} + } + } + Expr::BinaryExpr(binary) if binary.op == Operator::And => { + extract_qualified_equality_conditions(&binary.left, out); + extract_qualified_equality_conditions(&binary.right, out); + } + _ => {} + } +} + +/// Try to evaluate an expression as a constant value (literal or simple arithmetic). +fn try_eval_constant(expr: &Expr) -> Option { + match expr { + Expr::Literal(val, _) => Some(val.to_string()), + Expr::BinaryExpr(binary) => { + let left = try_eval_constant_i64(&binary.left)?; + let right = try_eval_constant_i64(&binary.right)?; + let result = match binary.op { + Operator::Plus => left.checked_add(right)?, + Operator::Minus => left.checked_sub(right)?, + Operator::Multiply => left.checked_mul(right)?, + _ => return None, + }; + Some(result.to_string()) + } + _ => None, + } +} + +fn try_eval_constant_i64(expr: &Expr) -> Option { + match expr { + Expr::Literal(val, _) => { + use datafusion_common::ScalarValue; + match val { + ScalarValue::Int8(Some(v)) => Some(*v as i64), + ScalarValue::Int16(Some(v)) => Some(*v as i64), + ScalarValue::Int32(Some(v)) => Some(*v as i64), + ScalarValue::Int64(Some(v)) => Some(*v), + ScalarValue::UInt8(Some(v)) => Some(*v as i64), + ScalarValue::UInt16(Some(v)) => Some(*v as i64), + ScalarValue::UInt32(Some(v)) => Some(*v as i64), + _ => None, + } + } + _ => None, + } +} + +fn is_cheap_to_inline(plan: &LogicalPlan) -> bool { + match plan { + LogicalPlan::EmptyRelation(_) => true, + _ => { + let inputs = plan.inputs(); + inputs.len() == 1 && is_cheap_to_inline(inputs[0]) + } + } +} + +fn count_base_table_references(plan: &LogicalPlan) -> usize { + let mut count = 0; + plan.apply(|node| { + if let LogicalPlan::TableScan(_) = node { + count += 1; + } + Ok(TreeNodeRecursion::Continue) + }) + .unwrap(); + count +} + +fn contains_limit_on_single_child_path(plan: &LogicalPlan) -> bool { + if matches!(plan, LogicalPlan::Limit(_)) { + return true; + } + + let inputs = plan.inputs(); + inputs.len() == 1 && contains_limit_on_single_child_path(inputs[0]) +} + +/// Check if a plan contains a SubqueryAlias reference to a given CTE name. +fn plan_references_cte(plan: &LogicalPlan, cte_name: &str) -> bool { + let mut found = false; + plan.apply(|node| { + if let LogicalPlan::SubqueryAlias(alias) = node + && alias.alias.table() == cte_name + { + found = true; + return Ok(TreeNodeRecursion::Jump); + } + Ok(TreeNodeRecursion::Continue) + }) + .unwrap(); + found +} + +/// Count how many times a CTE (by SubqueryAlias name) is referenced in the plan tree. +fn count_cte_references(plan: &LogicalPlan, cte_name: &str) -> usize { + let mut count = 0; + plan.apply(|node| { + if let LogicalPlan::SubqueryAlias(alias) = node + && alias.alias.table() == cte_name + { + count += 1; + return Ok(TreeNodeRecursion::Jump); + } + Ok(TreeNodeRecursion::Continue) + }) + .unwrap(); + count +} + +/// Replace SubqueryAlias nodes matching a CTE name with a MaterializedCteReader. +fn replace_cte_with_reader( + plan: LogicalPlan, + cte_name: &str, + cte_schema: &DFSchemaRef, +) -> Result { + plan.transform_down(|node| { + if let LogicalPlan::SubqueryAlias(ref alias) = node + && alias.alias.table() == cte_name + { + let reader = MaterializedCteReader { + name: cte_name.to_string(), + schema: Arc::clone(cte_schema), + }; + let extension = LogicalPlan::Extension(Extension { + node: Arc::new(reader), + }); + return Ok(datafusion_common::tree_node::Transformed::yes(extension)); + } + Ok(datafusion_common::tree_node::Transformed::no(node)) + }) + .map(|t| t.data) +} + /// Returns the order by expressions from the query. fn to_order_by_exprs(order_by: Option) -> Result> { to_order_by_exprs_with_select(order_by, None) diff --git a/datafusion/sql/src/relation/mod.rs b/datafusion/sql/src/relation/mod.rs index 08a292475fd72..8718437fa978b 100644 --- a/datafusion/sql/src/relation/mod.rs +++ b/datafusion/sql/src/relation/mod.rs @@ -187,13 +187,17 @@ impl SqlToRel<'_, S> { // Normalize name and alias let table_ref = self.object_name_to_table_reference(name)?; let table_name = table_ref.to_string(); - let cte = planner_context.get_cte(&table_name); + let cte_plan_cloned = planner_context.get_cte(&table_name).cloned(); + let is_cte = cte_plan_cloned.is_some(); + if is_cte { + planner_context.increment_cte_ref_count(&table_name); + } ( match ( - cte, + cte_plan_cloned, self.context_provider.get_table_source(table_ref.clone()), ) { - (Some(cte_plan), _) => Ok(cte_plan.clone()), + (Some(cte_plan), _) => Ok(cte_plan), (_, Ok(provider)) => LogicalPlanBuilder::scan( table_ref.clone(), provider, diff --git a/datafusion/sqllogictest/test_files/cte.slt b/datafusion/sqllogictest/test_files/cte.slt index d13e0d4f085e9..1dc0aa57e2dd8 100644 --- a/datafusion/sqllogictest/test_files/cte.slt +++ b/datafusion/sqllogictest/test_files/cte.slt @@ -1319,3 +1319,39 @@ RESET datafusion.execution.enable_recursive_ctes; statement ok RESET datafusion.sql_parser.enable_ident_normalization; + +# Materialized CTEs collect all input partitions before readers consume them. +query I +WITH t AS ( + SELECT 1 AS a + UNION ALL SELECT 2 AS a + UNION ALL SELECT 3 AS a + UNION ALL SELECT 4 AS a +) +SELECT sum(l.a + r.a) +FROM t l +JOIN t r ON l.a = r.a; +---- +20 + +# Materialized CTE readers can feed repartitioning join plans without +# re-entering a shared repartition output partition. +statement ok +set datafusion.optimizer.prefer_hash_join = false; + +query II rowsort +WITH t1 AS ( + SELECT 11 AS a, 12 AS b + UNION ALL + SELECT 11 AS a, 13 AS b +) +SELECT t2.* +FROM t1 +RIGHT SEMI JOIN t1 t2 +ON t1.a = t2.a AND t1.b = t2.b; +---- +11 12 +11 13 + +statement ok +RESET datafusion.optimizer.prefer_hash_join; diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index b0c7e3f8fe643..e879daa781532 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -218,6 +218,7 @@ datafusion.execution.batch_size 8192 datafusion.execution.coalesce_batches true datafusion.execution.collect_statistics true datafusion.execution.enable_ansi_mode false +datafusion.execution.enable_materialized_ctes true datafusion.execution.enable_recursive_ctes true datafusion.execution.enforce_batch_size_in_joins false datafusion.execution.hash_join_buffering_capacity 0 @@ -368,6 +369,7 @@ datafusion.execution.batch_size 8192 Default batch size while creating new batch datafusion.execution.coalesce_batches true When set to true, record batches will be examined between each operator and small batches will be coalesced into larger batches. This is helpful when there are highly selective filters or joins that could produce tiny output batches. The target batch size is determined by the configuration setting datafusion.execution.collect_statistics true Should DataFusion collect statistics when first creating a table. Has no effect after the table is created. Applies to the default `ListingTableProvider` in DataFusion. Defaults to true. datafusion.execution.enable_ansi_mode false Whether to enable ANSI SQL mode. The flag is experimental and relevant only for DataFusion Spark built-in functions When `enable_ansi_mode` is set to `true`, the query engine follows ANSI SQL semantics for expressions, casting, and error handling. This means: - **Strict type coercion rules:** implicit casts between incompatible types are disallowed. - **Standard SQL arithmetic behavior:** operations such as division by zero, numeric overflow, or invalid casts raise runtime errors rather than returning `NULL` or adjusted values. - **Consistent ANSI behavior** for string concatenation, comparisons, and `NULL` handling. When `enable_ansi_mode` is `false` (the default), the engine uses a more permissive, non-ANSI mode designed for user convenience and backward compatibility. In this mode: - Implicit casts between types are allowed (e.g., string to integer when possible). - Arithmetic operations are more lenient — for example, `abs()` on the minimum representable integer value returns the input value instead of raising overflow. - Division by zero or invalid casts may return `NULL` instead of failing. # Default `false` — ANSI SQL mode is disabled by default. +datafusion.execution.enable_materialized_ctes true Should DataFusion materialize CTEs that are referenced multiple times. When enabled, CTEs referenced more than once are generally computed once and cached, except for cheap CTEs and CTEs consumed below a top-level limit. datafusion.execution.enable_recursive_ctes true Should DataFusion support recursive CTEs datafusion.execution.enforce_batch_size_in_joins false Should DataFusion enforce batch size in joins or not. By default, DataFusion will not enforce batch size in joins. Enforcing batch size in joins can reduce memory usage when joining large tables with a highly-selective join filter, but is also slightly slower. datafusion.execution.hash_join_buffering_capacity 0 How many bytes to buffer in the probe side of hash joins while the build side is concurrently being built. Without this, hash joins will wait until the full materialization of the build side before polling the probe side. This is useful in scenarios where the query is not completely CPU bounded, allowing to do some early work concurrently and reducing the latency of the query. Note that when hash join buffering is enabled, the probe side will start eagerly polling data, not giving time for the producer side of dynamic filters to produce any meaningful predicate. Queries with dynamic filters might see performance degradation. Disabled by default, set to a number greater than 0 for enabling it. diff --git a/datafusion/sqllogictest/test_files/limit.slt b/datafusion/sqllogictest/test_files/limit.slt index fc62584dc3df1..b086f17b3a878 100644 --- a/datafusion/sqllogictest/test_files/limit.slt +++ b/datafusion/sqllogictest/test_files/limit.slt @@ -423,9 +423,9 @@ logical_plan 02)--TableScan: t1000 projection=[i] physical_plan 01)AggregateExec: mode=FinalPartitioned, gby=[i@0 as i], aggr=[] -02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=1 +02)--RepartitionExec: partitioning=Hash([i@0], 4), input_partitions=4 03)----AggregateExec: mode=Partial, gby=[i@0 as i], aggr=[] -04)------DataSourceExec: partitions=1 +04)------DataSourceExec: partitions=4 statement ok set datafusion.explain.show_sizes = true; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 576137bda29d1..faf84c5fcc75d 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -128,6 +128,7 @@ The following configuration settings are available: | datafusion.execution.listing_table_ignore_subdirectory | true | Should sub directories be ignored when scanning directories for data files. Defaults to true (ignores subdirectories), consistent with Hive. Note that this setting does not affect reading partitioned tables (e.g. `/table/year=2021/month=01/data.parquet`). | | datafusion.execution.listing_table_factory_infer_partitions | true | Should a `ListingTable` created through the `ListingTableFactory` infer table partitions from Hive compliant directories. Defaults to true (partition columns are inferred and will be represented in the table schema). | | datafusion.execution.enable_recursive_ctes | true | Should DataFusion support recursive CTEs | +| datafusion.execution.enable_materialized_ctes | true | Should DataFusion materialize CTEs that are referenced multiple times. When enabled, CTEs referenced more than once are generally computed once and cached, except for cheap CTEs and CTEs consumed below a top-level limit. | | datafusion.execution.split_file_groups_by_statistics | false | Attempt to eliminate sorts by packing & sorting files with non-overlapping statistics into the same file groups. Currently experimental | | datafusion.execution.keep_partition_by_columns | false | Should DataFusion keep the columns used for partition_by in the output RecordBatches | | datafusion.execution.skip_partial_aggregation_probe_ratio_threshold | 0.8 | Aggregation ratio (number of distinct groups / number of input rows) threshold for skipping partial aggregation. If the value is greater then partial aggregation will skip aggregation for further input |