Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 108 additions & 2 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
}
}

/// Returns the plan that provides this plan's public
/// [`ExecutionPlan`] downcast identity.
///
/// This hook is for wrapper nodes that delegate their public downcast
/// identity to another plan while adding cross-cutting behavior such as
/// instrumentation. The default implementation returns `None`, meaning this
/// plan's concrete type is used for type introspection.
///
/// Most `ExecutionPlan` implementations should use the default `None`;
/// override this only for wrapper plans that intentionally delegate their
/// public downcast identity to another plan.
///
/// The `is` and `downcast_ref` helpers follow the returned delegate instead
/// of checking the current concrete type, making intermediate delegating
/// wrappers invisible to normal downcast-based inspection.
///
/// Implementations that opt in should return the delegate plan, not `self`.
///
/// This is independent from [`Self::children`] and should not be used for
/// plan traversal or optimizer rewrites.
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
None
}

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
Expand Down Expand Up @@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
impl dyn ExecutionPlan {
/// Returns `true` if the plan is of type `T`.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn ExecutionPlan>` via auto-deref.
pub fn is<T: ExecutionPlan>(&self) -> bool {
(self as &dyn Any).is::<T>()
match self.downcast_delegate() {
Some(delegate) => delegate.is::<T>(),
None => (self as &dyn Any).is::<T>(),
}
}

/// Attempts to downcast this plan to a concrete type `T`, returning `None`
/// if the plan is not of that type.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
/// downcast the `Arc` itself.
pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
match self.downcast_delegate() {
Some(delegate) => delegate.downcast_ref::<T>(),
None => (self as &dyn Any).downcast_ref(),
}
}
}

Expand Down Expand Up @@ -1642,6 +1678,58 @@ mod tests {
}
}

#[derive(Debug)]
struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);

impl DisplayAs for DowncastDelegatingExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}

impl ExecutionPlan for DowncastDelegatingExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
Some(self.0.as_ref())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

fn partition_statistics(
&self,
_partition: Option<usize>,
) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
#[test]
fn test_execution_plan_name() {
let schema1 = Arc::new(Schema::empty());
Expand All @@ -1654,6 +1742,24 @@ mod tests {
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
}

#[test]
fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
let schema = Arc::new(Schema::empty());
let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
let nested: Arc<dyn ExecutionPlan> =
Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));

for plan in [wrapped.as_ref(), nested.as_ref()] {
assert!(!plan.is::<DowncastDelegatingExec>());
assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
assert!(plan.is::<EmptyExec>());
assert!(plan.downcast_ref::<EmptyExec>().is_some());
assert!(!plan.is::<RenamedEmptyExec>());
assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
}
}

/// A compilation test to ensure that the `ExecutionPlan::name()` method can
/// be called from a trait object.
/// Related ticket: https://github.com/apache/datafusion/pull/11047
Expand Down
Loading