diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index b55d3c32cb56..50eac566d90e 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -115,6 +115,30 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { } } + /// Returns the plan that provides this plan's public + /// [`ExecutionPlan`] downcast identity. + /// + /// This hook is for wrapper nodes that delegate their public downcast + /// identity to another plan while adding cross-cutting behavior such as + /// instrumentation. The default implementation returns `None`, meaning this + /// plan's concrete type is used for type introspection. + /// + /// Most `ExecutionPlan` implementations should use the default `None`; + /// override this only for wrapper plans that intentionally delegate their + /// public downcast identity to another plan. + /// + /// The `is` and `downcast_ref` helpers follow the returned delegate instead + /// of checking the current concrete type, making intermediate delegating + /// wrappers invisible to normal downcast-based inspection. + /// + /// Implementations that opt in should return the delegate plan, not `self`. + /// + /// This is independent from [`Self::children`] and should not be used for + /// plan traversal or optimizer rewrites. + fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> { + None + } + /// Get the schema for this execution plan fn schema(&self) -> SchemaRef { Arc::clone(self.properties().schema()) @@ -718,20 +742,32 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { impl dyn ExecutionPlan { /// Returns `true` if the plan is of type `T`. /// + /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates + /// to it. + /// /// Prefer this over `downcast_ref::().is_some()`. Works correctly when /// called on `Arc` via auto-deref. pub fn is(&self) -> bool { - (self as &dyn Any).is::() + match self.downcast_delegate() { + Some(delegate) => delegate.is::(), + None => (self as &dyn Any).is::(), + } } /// Attempts to downcast this plan to a concrete type `T`, returning `None` /// if the plan is not of that type. /// + /// If this plan provides a [`ExecutionPlan::downcast_delegate`], delegates + /// to it. + /// /// Works correctly when called on `Arc` via auto-deref, /// unlike `(&arc as &dyn Any).downcast_ref::()` which would attempt to /// downcast the `Arc` itself. pub fn downcast_ref(&self) -> Option<&T> { - (self as &dyn Any).downcast_ref() + match self.downcast_delegate() { + Some(delegate) => delegate.downcast_ref::(), + None => (self as &dyn Any).downcast_ref(), + } } } @@ -1642,6 +1678,58 @@ mod tests { } } + #[derive(Debug)] + struct DowncastDelegatingExec(Arc); + + impl DisplayAs for DowncastDelegatingExec { + fn fmt_as( + &self, + _t: DisplayFormatType, + _f: &mut std::fmt::Formatter, + ) -> std::fmt::Result { + unimplemented!() + } + } + + impl ExecutionPlan for DowncastDelegatingExec { + fn name(&self) -> &'static str { + Self::static_name() + } + + fn properties(&self) -> &Arc { + unimplemented!() + } + + fn children(&self) -> Vec<&Arc> { + vec![] + } + + fn with_new_children( + self: Arc, + _: Vec>, + ) -> Result> { + unimplemented!() + } + + fn downcast_delegate(&self) -> Option<&dyn ExecutionPlan> { + Some(self.0.as_ref()) + } + + fn execute( + &self, + _partition: usize, + _context: Arc, + ) -> Result { + unimplemented!() + } + + fn partition_statistics( + &self, + _partition: Option, + ) -> Result> { + unimplemented!() + } + } #[test] fn test_execution_plan_name() { let schema1 = Arc::new(Schema::empty()); @@ -1654,6 +1742,24 @@ mod tests { assert_eq!(RenamedEmptyExec::static_name(), "MyRenamedEmptyExec"); } + #[test] + fn test_execution_plan_downcast_delegates_to_downcast_delegate() { + let schema = Arc::new(Schema::empty()); + let inner: Arc = Arc::new(EmptyExec::new(schema)); + let wrapped: Arc = Arc::new(DowncastDelegatingExec(inner)); + let nested: Arc = + Arc::new(DowncastDelegatingExec(Arc::clone(&wrapped))); + + for plan in [wrapped.as_ref(), nested.as_ref()] { + assert!(!plan.is::()); + assert!(plan.downcast_ref::().is_none()); + assert!(plan.is::()); + assert!(plan.downcast_ref::().is_some()); + assert!(!plan.is::()); + assert!(plan.downcast_ref::().is_none()); + } + } + /// A compilation test to ensure that the `ExecutionPlan::name()` method can /// be called from a trait object. /// Related ticket: https://github.com/apache/datafusion/pull/11047