Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 104 additions & 2 deletions datafusion/physical-plan/src/execution_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,26 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
}
}

/// Returns the plan that provides this plan's public
/// [`ExecutionPlan`] downcast identity.
///
/// This hook is for wrapper nodes that delegate their public downcast
/// identity to another plan while adding cross-cutting behavior such as
/// instrumentation. The default implementation returns `None`, meaning this
/// plan's concrete type is used for type introspection.
///
/// The `is` and `downcast_ref` helpers follow the returned delegate instead
/// of checking the current concrete type, making intermediate delegating
/// wrappers invisible to normal downcast-based inspection.
///
/// Implementations that opt in should return the delegate plan, not `self`.
///
/// This is independent from [`Self::children`] and should not be used for
/// plan traversal or optimizer rewrites.
fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
None
}

/// Get the schema for this execution plan
fn schema(&self) -> SchemaRef {
Arc::clone(self.properties().schema())
Expand Down Expand Up @@ -718,20 +738,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync {
impl dyn ExecutionPlan {
/// Returns `true` if the plan is of type `T`.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Prefer this over `downcast_ref::<T>().is_some()`. Works correctly when
/// called on `Arc<dyn ExecutionPlan>` via auto-deref.
pub fn is<T: ExecutionPlan>(&self) -> bool {
(self as &dyn Any).is::<T>()
match self.downcast_delegate() {
Some(delegate) => delegate.is::<T>(),
None => (self as &dyn Any).is::<T>(),
}
Comment on lines +747 to +750
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this prevent you from downcasting to the wrapper type?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, and this is intentional to mimic the old as_any behavior. I've updated the PR description to be clear about this choice.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the earlier PR would still work for all use cases, right? Because you're always choosing to downcast to a specific type, it seems like it would enable downcasting to either (or many, depending on levels) type.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous PR would stop on the first match, rather than go all the way to the leaf child. I agree this is being pedantic, as "real world" cases would probably never have a case where you'd need to match the leaf child but get interrupted by an intermediate node in a "wrapper chain". However, I still find the current implementation clearer and easier to reason about: the leaf node is what is returned.

}

/// Attempts to downcast this plan to a concrete type `T`, returning `None`
/// if the plan is not of that type.
///
/// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates
/// to it.
///
/// Works correctly when called on `Arc<dyn ExecutionPlan>` via auto-deref,
/// unlike `(&arc as &dyn Any).downcast_ref::<T>()` which would attempt to
/// downcast the `Arc` itself.
pub fn downcast_ref<T: ExecutionPlan>(&self) -> Option<&T> {
(self as &dyn Any).downcast_ref()
match self.downcast_delegate() {
Some(delegate) => delegate.downcast_ref::<T>(),
None => (self as &dyn Any).downcast_ref(),
}
}
}

Expand Down Expand Up @@ -1642,6 +1674,58 @@ mod tests {
}
}

#[derive(Debug)]
struct DowncastDelegatingExec(Arc<dyn ExecutionPlan>);

impl DisplayAs for DowncastDelegatingExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}

impl ExecutionPlan for DowncastDelegatingExec {
fn name(&self) -> &'static str {
Self::static_name()
}

fn properties(&self) -> &Arc<PlanProperties> {
unimplemented!()
}

fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
vec![]
}

fn with_new_children(
self: Arc<Self>,
_: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
unimplemented!()
}

fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> {
Some(self.0.as_ref())
}

fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}

fn partition_statistics(
&self,
_partition: Option<usize>,
) -> Result<Arc<Statistics>> {
unimplemented!()
}
}
#[test]
fn test_execution_plan_name() {
let schema1 = Arc::new(Schema::empty());
Expand All @@ -1654,6 +1738,24 @@ mod tests {
assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec");
}

#[test]
fn test_execution_plan_downcast_delegates_to_downcast_delegate() {
let schema = Arc::new(Schema::empty());
let inner: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(schema));
let wrapped: Arc<dyn ExecutionPlan> = Arc::new(DowncastDelegatingExec(inner));
let nested: Arc<dyn ExecutionPlan> =
Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped)));

for plan in [wrapped.as_ref(), nested.as_ref()] {
assert!(!plan.is::<DowncastDelegatingExec>());
assert!(plan.downcast_ref::<DowncastDelegatingExec>().is_none());
assert!(plan.is::<EmptyExec>());
assert!(plan.downcast_ref::<EmptyExec>().is_some());
assert!(!plan.is::<RenamedEmptyExec>());
assert!(plan.downcast_ref::<RenamedEmptyExec>().is_none());
}
}

/// A compilation test to ensure that the `ExecutionPlan::name()` method can
/// be called from a trait object.
/// Related ticket: https://github.com/apache/datafusion/pull/11047
Expand Down
Loading