Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions benchmarks/src/tpcds/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ impl RunOpt {
self.enable_piecewise_merge_join;
config.options_mut().execution.hash_join_buffering_capacity =
self.hash_join_buffering_capacity;
if std::env::var("DISABLE_MATERIALIZED_CTES").is_ok() {
config.options_mut().execution.enable_materialized_ctes = false;
}
let rt = self.common.build_runtime()?;
let ctx = SessionContext::new_with_config_rt(config, rt);
// register tables
Expand Down
6 changes: 6 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -631,6 +631,12 @@ config_namespace! {
/// Should DataFusion support recursive CTEs
pub enable_recursive_ctes: bool, default = true

/// Should DataFusion materialize CTEs that are referenced multiple times.
/// When enabled, CTEs referenced more than once are generally computed
/// once and cached, except for cheap CTEs and CTEs consumed below a top-level
/// limit.
pub enable_materialized_ctes: bool, default = true

/// Attempt to eliminate sorts by packing & sorting files with non-overlapping
/// statistics into the same file groups.
/// Currently experimental
Expand Down
4 changes: 3 additions & 1 deletion datafusion/core/src/execution/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2313,7 +2313,9 @@ impl QueryPlanner for DefaultQueryPlanner {
logical_plan: &LogicalPlan,
session_state: &SessionState,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
let planner = DefaultPhysicalPlanner::default();
let planner = DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
crate::materialized_cte_planner::MaterializedCtePlanner::new(),
)]);
planner
.create_physical_plan(logical_plan, session_state)
.await
Expand Down
1 change: 1 addition & 0 deletions datafusion/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,7 @@ pub mod dataframe;
pub mod datasource;
pub mod error;
pub mod execution;
pub mod materialized_cte_planner;
pub mod physical_planner;
pub mod prelude;
pub mod scalar;
Expand Down
154 changes: 154 additions & 0 deletions datafusion/core/src/materialized_cte_planner.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Extension planner for materialized CTEs.
//!
//! This module provides [`MaterializedCtePlanner`] which connects the logical
//! plan nodes ([`MaterializedCteProducer`] and [`MaterializedCteReader`]) to
//! their physical execution counterparts.

use std::collections::HashMap;
use std::sync::{Arc, Mutex};

use async_trait::async_trait;
use datafusion_common::Result;
use datafusion_expr::logical_plan::{MaterializedCteProducer, MaterializedCteReader};
use datafusion_expr::{LogicalPlan, UserDefinedLogicalNode};
use datafusion_physical_plan::materialized_cte::{
MaterializedCteCache, MaterializedCteExec, MaterializedCteReaderExec,
materialized_cte_statistics, replace_materialized_cte_readers,
};
use datafusion_physical_plan::{ExecutionPlan, ExecutionPlanProperties};

use crate::execution::context::SessionState;
use crate::physical_planner::{ExtensionPlanner, PhysicalPlanner};

/// An extension planner that handles materialized CTE logical nodes.
///
/// It maintains a map of CTE name to shared cache, ensuring that
/// producers and readers for the same CTE share the same cache instance.
#[derive(Debug)]
pub struct MaterializedCtePlanner {
/// Map of CTE name to shared cache
caches: Mutex<HashMap<String, Arc<MaterializedCteCache>>>,
/// Map of CTE name to the number of partitions readers should expose
partition_counts: Mutex<HashMap<String, usize>>,
}

impl MaterializedCtePlanner {
/// Create a new `MaterializedCtePlanner`.
pub fn new() -> Self {
Self {
caches: Mutex::new(HashMap::new()),
partition_counts: Mutex::new(HashMap::new()),
}
}

/// Get or create a cache for the given CTE name.
fn get_or_create_cache(&self, name: &str) -> Arc<MaterializedCteCache> {
let mut caches = self.caches.lock().unwrap();
Arc::clone(
caches
.entry(name.to_string())
.or_insert_with(|| Arc::new(MaterializedCteCache::new(name.to_string()))),
)
}

fn create_cache(&self, name: &str) -> Arc<MaterializedCteCache> {
let cache = Arc::new(MaterializedCteCache::new(name.to_string()));
self.caches
.lock()
.unwrap()
.insert(name.to_string(), Arc::clone(&cache));
cache
}

fn set_partition_count(&self, name: &str, partition_count: usize) {
self.partition_counts
.lock()
.unwrap()
.insert(name.to_string(), partition_count);
}

fn partition_count(&self, name: &str) -> usize {
self.partition_counts
.lock()
.unwrap()
.get(name)
.copied()
.unwrap_or(1)
}
}

impl Default for MaterializedCtePlanner {
fn default() -> Self {
Self::new()
}
}

#[async_trait]
impl ExtensionPlanner for MaterializedCtePlanner {
async fn plan_extension(
&self,
_planner: &dyn PhysicalPlanner,
node: &dyn UserDefinedLogicalNode,
_logical_inputs: &[&LogicalPlan],
physical_inputs: &[Arc<dyn ExecutionPlan>],
_session_state: &SessionState,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
// Handle MaterializedCteProducer
if let Some(producer) = node.as_any().downcast_ref::<MaterializedCteProducer>() {
let cache = self.create_cache(&producer.name);
let cte_plan = Arc::clone(&physical_inputs[0]);
let partition_count = cte_plan.output_partitioning().partition_count();
let statistics = materialized_cte_statistics(cte_plan.as_ref())?;
self.set_partition_count(&producer.name, partition_count);
let continuation = replace_materialized_cte_readers(
Arc::clone(&physical_inputs[1]),
&producer.name,
&cache,
partition_count,
&statistics,
)?;
let exec = MaterializedCteExec::new(
producer.name.clone(),
cte_plan,
continuation,
cache,
);
return Ok(Some(Arc::new(exec)));
}

// Handle MaterializedCteReader
if let Some(reader) = node.as_any().downcast_ref::<MaterializedCteReader>() {
let cache = self.get_or_create_cache(&reader.name);
let schema = Arc::clone(reader.schema.inner());
let statistics =
Arc::new(datafusion_physical_plan::Statistics::new_unknown(&schema));
let exec = MaterializedCteReaderExec::new(
reader.name.clone(),
schema,
cache,
self.partition_count(&reader.name),
statistics,
);
return Ok(Some(Arc::new(exec)));
}

Ok(None)
}
}
Loading
Loading