From e0c6be38c8cafd98468defe1188bf2bc4672a4d2 Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 17 Feb 2026 21:48:32 +0000 Subject: [PATCH 1/5] proto: serialize and dedupe dynamic filters Informs: https://github.com/datafusion-contrib/datafusion-distributed/issues/180 Closes: https://github.com/apache/datafusion/issues/20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec --- .../src/expressions/dynamic_filters.rs | 8 + datafusion/proto/proto/datafusion.proto | 13 ++ datafusion/proto/src/generated/pbjson.rs | 145 ++++++++++++ datafusion/proto/src/generated/prost.rs | 19 +- .../proto/src/physical_plan/from_proto.rs | 22 ++ datafusion/proto/src/physical_plan/mod.rs | 101 +++++++-- .../proto/src/physical_plan/to_proto.rs | 46 +++- .../tests/cases/roundtrip_physical_plan.rs | 209 ++++++++++++++++++ 8 files changed, 545 insertions(+), 18 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index d285f8b377eca..3fc865a828ef8 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -327,6 +327,14 @@ impl DynamicFilterPhysicalExpr { Arc::strong_count(self) > 1 || Arc::strong_count(&self.inner) > 1 } + /// Returns a unique identifier for the inner shared state. + /// + /// Useful for checking if two [Arc] with the same + /// underlying [DynamicFilterPhysicalExpr] are the same. + pub fn inner_id(&self) -> u64 { + Arc::as_ptr(&self.inner) as *const () as u64 + } + fn render( &self, f: &mut std::fmt::Formatter<'_>, diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 7c0268867691e..9e8f6f8658bc5 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -860,6 +860,12 @@ message PhysicalExprNode { // across serde roundtrips. optional uint64 expr_id = 30; + // For DynamicFilterPhysicalExpr, this identifies the shared inner state. + // Multiple expressions may have different expr_id values (different outer Arc wrappers) + // but the same dynamic_filter_inner_id (shared inner state). + // Used to reconstruct shared inner state during deserialization. + optional uint64 dynamic_filter_inner_id = 31; + oneof ExprType { // column references PhysicalColumn column = 1; @@ -897,9 +903,16 @@ message PhysicalExprNode { UnknownColumn unknown_column = 20; PhysicalHashExprNode hash_expr = 21; + + PhysicalDynamicFilterNode dynamic_filter = 22; } } +message PhysicalDynamicFilterNode { + repeated PhysicalExprNode children = 1; + PhysicalExprNode initial_expr = 2; +} + message PhysicalScalarUdfNode { string name = 1; repeated PhysicalExprNode args = 2; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index 5b2b9133ce13a..c2019693b8084 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16225,6 +16225,115 @@ impl<'de> serde::Deserialize<'de> for PhysicalDateTimeIntervalExprNode { deserializer.deserialize_struct("datafusion.PhysicalDateTimeIntervalExprNode", FIELDS, GeneratedVisitor) } } +impl serde::Serialize for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn serialize(&self, serializer: S) -> std::result::Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut len = 0; + if !self.children.is_empty() { + len += 1; + } + if self.initial_expr.is_some() { + len += 1; + } + let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; + if !self.children.is_empty() { + struct_ser.serialize_field("children", &self.children)?; + } + if let Some(v) = self.initial_expr.as_ref() { + struct_ser.serialize_field("initialExpr", v)?; + } + struct_ser.end() + } +} +impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { + #[allow(deprecated)] + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + const FIELDS: &[&str] = &[ + "children", + "initial_expr", + "initialExpr", + ]; + + #[allow(clippy::enum_variant_names)] + enum GeneratedField { + Children, + InitialExpr, + } + impl<'de> serde::Deserialize<'de> for GeneratedField { + fn deserialize(deserializer: D) -> std::result::Result + where + D: serde::Deserializer<'de>, + { + struct GeneratedVisitor; + + impl serde::de::Visitor<'_> for GeneratedVisitor { + type Value = GeneratedField; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(formatter, "expected one of: {:?}", &FIELDS) + } + + #[allow(unused_variables)] + fn visit_str(self, value: &str) -> std::result::Result + where + E: serde::de::Error, + { + match value { + "children" => Ok(GeneratedField::Children), + "initialExpr" | "initial_expr" => Ok(GeneratedField::InitialExpr), + _ => Err(serde::de::Error::unknown_field(value, FIELDS)), + } + } + } + deserializer.deserialize_identifier(GeneratedVisitor) + } + } + struct GeneratedVisitor; + impl<'de> serde::de::Visitor<'de> for GeneratedVisitor { + type Value = PhysicalDynamicFilterNode; + + fn expecting(&self, formatter: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + formatter.write_str("struct datafusion.PhysicalDynamicFilterNode") + } + + fn visit_map(self, mut map_: V) -> std::result::Result + where + V: serde::de::MapAccess<'de>, + { + let mut children__ = None; + let mut initial_expr__ = None; + while let Some(k) = map_.next_key()? { + match k { + GeneratedField::Children => { + if children__.is_some() { + return Err(serde::de::Error::duplicate_field("children")); + } + children__ = Some(map_.next_value()?); + } + GeneratedField::InitialExpr => { + if initial_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("initialExpr")); + } + initial_expr__ = map_.next_value()?; + } + } + } + Ok(PhysicalDynamicFilterNode { + children: children__.unwrap_or_default(), + initial_expr: initial_expr__, + }) + } + } + deserializer.deserialize_struct("datafusion.PhysicalDynamicFilterNode", FIELDS, GeneratedVisitor) + } +} impl serde::Serialize for PhysicalExprNode { #[allow(deprecated)] fn serialize(&self, serializer: S) -> std::result::Result @@ -16236,6 +16345,9 @@ impl serde::Serialize for PhysicalExprNode { if self.expr_id.is_some() { len += 1; } + if self.dynamic_filter_inner_id.is_some() { + len += 1; + } if self.expr_type.is_some() { len += 1; } @@ -16245,6 +16357,11 @@ impl serde::Serialize for PhysicalExprNode { #[allow(clippy::needless_borrows_for_generic_args)] struct_ser.serialize_field("exprId", ToString::to_string(&v).as_str())?; } + if let Some(v) = self.dynamic_filter_inner_id.as_ref() { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("dynamicFilterInnerId", ToString::to_string(&v).as_str())?; + } if let Some(v) = self.expr_type.as_ref() { match v { physical_expr_node::ExprType::Column(v) => { @@ -16304,6 +16421,9 @@ impl serde::Serialize for PhysicalExprNode { physical_expr_node::ExprType::HashExpr(v) => { struct_ser.serialize_field("hashExpr", v)?; } + physical_expr_node::ExprType::DynamicFilter(v) => { + struct_ser.serialize_field("dynamicFilter", v)?; + } } } struct_ser.end() @@ -16318,6 +16438,8 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { const FIELDS: &[&str] = &[ "expr_id", "exprId", + "dynamic_filter_inner_id", + "dynamicFilterInnerId", "column", "literal", "binary_expr", @@ -16350,11 +16472,14 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "unknownColumn", "hash_expr", "hashExpr", + "dynamic_filter", + "dynamicFilter", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { ExprId, + DynamicFilterInnerId, Column, Literal, BinaryExpr, @@ -16374,6 +16499,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { Extension, UnknownColumn, HashExpr, + DynamicFilter, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16396,6 +16522,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { { match value { "exprId" | "expr_id" => Ok(GeneratedField::ExprId), + "dynamicFilterInnerId" | "dynamic_filter_inner_id" => Ok(GeneratedField::DynamicFilterInnerId), "column" => Ok(GeneratedField::Column), "literal" => Ok(GeneratedField::Literal), "binaryExpr" | "binary_expr" => Ok(GeneratedField::BinaryExpr), @@ -16415,6 +16542,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { "extension" => Ok(GeneratedField::Extension), "unknownColumn" | "unknown_column" => Ok(GeneratedField::UnknownColumn), "hashExpr" | "hash_expr" => Ok(GeneratedField::HashExpr), + "dynamicFilter" | "dynamic_filter" => Ok(GeneratedField::DynamicFilter), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16435,6 +16563,7 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { V: serde::de::MapAccess<'de>, { let mut expr_id__ = None; + let mut dynamic_filter_inner_id__ = None; let mut expr_type__ = None; while let Some(k) = map_.next_key()? { match k { @@ -16446,6 +16575,14 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) ; } + GeneratedField::DynamicFilterInnerId => { + if dynamic_filter_inner_id__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilterInnerId")); + } + dynamic_filter_inner_id__ = + map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| x.0) + ; + } GeneratedField::Column => { if expr_type__.is_some() { return Err(serde::de::Error::duplicate_field("column")); @@ -16577,12 +16714,20 @@ impl<'de> serde::Deserialize<'de> for PhysicalExprNode { return Err(serde::de::Error::duplicate_field("hashExpr")); } expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::HashExpr) +; + } + GeneratedField::DynamicFilter => { + if expr_type__.is_some() { + return Err(serde::de::Error::duplicate_field("dynamicFilter")); + } + expr_type__ = map_.next_value::<::std::option::Option<_>>()?.map(physical_expr_node::ExprType::DynamicFilter) ; } } } Ok(PhysicalExprNode { expr_id: expr_id__, + dynamic_filter_inner_id: dynamic_filter_inner_id__, expr_type: expr_type__, }) } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index d9602665c284a..adfef2cd61999 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1292,9 +1292,15 @@ pub struct PhysicalExprNode { /// across serde roundtrips. #[prost(uint64, optional, tag = "30")] pub expr_id: ::core::option::Option, + /// For DynamicFilterPhysicalExpr, this identifies the shared inner state. + /// Multiple expressions may have different expr_id values (different outer Arc wrappers) + /// but the same dynamic_filter_inner_id (shared inner state). + /// Used to reconstruct shared inner state during deserialization. + #[prost(uint64, optional, tag = "31")] + pub dynamic_filter_inner_id: ::core::option::Option, #[prost( oneof = "physical_expr_node::ExprType", - tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21" + tags = "1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 14, 15, 16, 18, 19, 20, 21, 22" )] pub expr_type: ::core::option::Option, } @@ -1347,9 +1353,20 @@ pub mod physical_expr_node { UnknownColumn(super::UnknownColumn), #[prost(message, tag = "21")] HashExpr(super::PhysicalHashExprNode), + #[prost(message, tag = "22")] + DynamicFilter(::prost::alloc::boxed::Box), } } #[derive(Clone, PartialEq, ::prost::Message)] +pub struct PhysicalDynamicFilterNode { + #[prost(message, repeated, tag = "1")] + pub children: ::prost::alloc::vec::Vec, + #[prost(message, optional, boxed, tag = "2")] + pub initial_expr: ::core::option::Option< + ::prost::alloc::boxed::Box, + >, +} +#[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { #[prost(string, tag = "1")] pub name: ::prost::alloc::string::String, diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index e424be162648b..6f63012f97c51 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,6 +58,7 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -495,6 +496,27 @@ pub fn parse_physical_expr_with_converter( hash_expr.description.clone(), )) } + ExprType::DynamicFilter(dynamic_filter) => { + let children = parse_physical_exprs( + &dynamic_filter.children, + ctx, + input_schema, + codec, + proto_converter, + )?; + + let initial_expr = parse_required_physical_expr( + dynamic_filter.initial_expr.as_deref(), + ctx, + "initial_expr", + input_schema, + codec, + proto_converter, + )?; + + // Constructor signature is: new(children, inner) + Arc::new(DynamicFilterPhysicalExpr::new(children, initial_expr)) + } ExprType::Extension(extension) => { let inputs: Vec> = extension .inputs diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bfba715b91249..3a196808e228d 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -52,6 +52,7 @@ use datafusion_functions_table::generate_series::{ }; use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr}; use datafusion_physical_expr::async_scalar_function::AsyncFuncExpr; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; use datafusion_physical_expr::{LexOrdering, LexRequirement, PhysicalExprRef}; use datafusion_physical_plan::aggregates::{ AggregateExec, AggregateMode, LimitOptions, PhysicalGroupBy, @@ -3064,6 +3065,7 @@ impl protobuf::PhysicalPlanNode { }); Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3150,6 +3152,7 @@ impl protobuf::PhysicalPlanNode { }); Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(ExprType::Sort(sort_expr)), }) }) @@ -3818,6 +3821,18 @@ impl DeduplicatingSerializer { session_id: rand::random(), } } + + fn hash(&self, ptr: u64) -> u64 { + // Hash session_id, pointer address, and process ID together to create expr_id. + // - session_id: random per serializer, prevents collisions when merging serializations + // - ptr: unique address per Arc within a process + // - pid: prevents collisions if serializer is shared across processes + let mut hasher = DefaultHasher::new(); + self.session_id.hash(&mut hasher); + ptr.hash(&mut hasher); + std::process::id().hash(&mut hasher); + hasher.finish() + } } impl PhysicalProtoConverterExtension for DeduplicatingSerializer { @@ -3864,16 +3879,14 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { codec: &dyn PhysicalExtensionCodec, ) -> Result { let mut proto = serialize_physical_expr_with_converter(expr, codec, self)?; - - // Hash session_id, pointer address, and process ID together to create expr_id. - // - session_id: random per serializer, prevents collisions when merging serializations - // - ptr: unique address per Arc within a process - // - pid: prevents collisions if serializer is shared across processes - let mut hasher = DefaultHasher::new(); - self.session_id.hash(&mut hasher); - (Arc::as_ptr(expr) as *const () as u64).hash(&mut hasher); - std::process::id().hash(&mut hasher); - proto.expr_id = Some(hasher.finish()); + // Special case for dynamic filters. Two expressions may live in separate Arcs but + // point to the same inner dynamic filter state. This inner state must be deduplicated. + if let Some(dynamic_filter) = + expr.as_any().downcast_ref::() + { + proto.dynamic_filter_inner_id = Some(self.hash(dynamic_filter.inner_id())) + } + proto.expr_id = Some(self.hash(Arc::as_ptr(expr) as *const () as u64)); Ok(proto) } @@ -3885,6 +3898,10 @@ impl PhysicalProtoConverterExtension for DeduplicatingSerializer { struct DeduplicatingDeserializer { /// Cache mapping expr_id to deserialized expressions. cache: RefCell>>, + /// Cache mapping dynamic_filter_inner_id to the first deserialized DynamicFilterPhysicalExpr. + /// This ensures that multiple dynamic filters with the same dynamic_filter_inner_id + /// can share the same inner state after deserialization. + dynamic_filter_cache: RefCell>>, } impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { @@ -3918,12 +3935,52 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { + // First check the regular cache by expr_id (same outer Arc) if let Some(expr_id) = proto.expr_id { - // Check cache first if let Some(cached) = self.cache.borrow().get(&expr_id) { return Ok(Arc::clone(cached)); } - // Deserialize and cache + } + + // Check if we need to share inner state with a cached dynamic filter + if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id { + if let Some(cached_filter) = + self.dynamic_filter_cache.borrow().get(&dynamic_filter_id) + { + // We have a cached filter with the same dynamic_filter_inner_id + // Deserialize to get the new children, then create a new Arc with shared inner state + let expr = parse_physical_expr_with_converter( + proto, + ctx, + input_schema, + codec, + self, + )?; + + // Get the children from the newly deserialized expression + if let Some(new_df) = + expr.as_any().downcast_ref::() + { + let new_children: Vec> = + new_df.children().into_iter().cloned().collect(); + // Create a new Arc with the cached filter's inner state but new children + let expr_with_shared_inner = + Arc::clone(cached_filter).with_new_children(new_children)?; + + // Cache by expr_id if present + if let Some(expr_id) = proto.expr_id { + self.cache + .borrow_mut() + .insert(expr_id, Arc::clone(&expr_with_shared_inner)); + } + + return Ok(expr_with_shared_inner); + } + } + } + + // Normal deserialization path + let expr = if let Some(expr_id) = proto.expr_id { let expr = parse_physical_expr_with_converter( proto, ctx, @@ -3932,10 +3989,24 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { self, )?; self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - Ok(expr) + expr } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self) - } + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)? + }; + + // If this is a dynamic filter, cache it by dynamic_filter_inner_id + if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id { + if expr + .as_any() + .downcast_ref::() + .is_some() + { + self.dynamic_filter_cache + .borrow_mut() + .insert(dynamic_filter_id, Arc::clone(&expr)); + } + }; + Ok(expr) } fn physical_expr_to_proto( diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index a38e59acdab26..9c5e335bceff3 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -36,8 +36,9 @@ use datafusion_physical_expr::window::{SlidingAggregateWindowExpr, StandardWindo use datafusion_physical_expr_common::physical_expr::snapshot_physical_expr; use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr; use datafusion_physical_plan::expressions::{ - BinaryExpr, CaseExpr, CastExpr, Column, InListExpr, IsNotNullExpr, IsNullExpr, - LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, UnKnownColumn, + BinaryExpr, CaseExpr, CastExpr, Column, DynamicFilterPhysicalExpr, InListExpr, + IsNotNullExpr, IsNullExpr, LikeExpr, Literal, NegativeExpr, NotExpr, TryCastExpr, + UnKnownColumn, }; use datafusion_physical_plan::joins::{HashExpr, HashTableLookupExpr}; use datafusion_physical_plan::udaf::AggregateFunctionExpr; @@ -72,6 +73,7 @@ pub fn serialize_physical_aggr_expr( codec.try_encode_udaf(aggr_expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::AggregateExpr( protobuf::PhysicalAggregateExprNode { aggregate_function: Some(physical_aggregate_expr_node::AggregateFunction::UserDefinedAggrFunction(name)), @@ -256,6 +258,29 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { + // Check for DynamicFilterPhysicalExpr before snapshotting + if let Some(df) = value.as_any().downcast_ref::() { + let children = df + .children() + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let current_expr = + Box::new(proto_converter.physical_expr_to_proto(&df.current()?, codec)?); + + return Ok(protobuf::PhysicalExprNode { + expr_id: None, + dynamic_filter_inner_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + initial_expr: Some(current_expr), + }), + )), + }); + } + // Snapshot the expr in case it has dynamic predicate state so // it can be serialized let value = snapshot_physical_expr(Arc::clone(value))?; @@ -282,6 +307,7 @@ pub fn serialize_physical_expr_with_converter( }; return Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal(value)), }); } @@ -289,6 +315,7 @@ pub fn serialize_physical_expr_with_converter( if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Column( protobuf::PhysicalColumn { name: expr.name().to_string(), @@ -299,6 +326,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::UnknownColumn( protobuf::UnknownColumn { name: expr.name().to_string(), @@ -318,6 +346,7 @@ pub fn serialize_physical_expr_with_converter( Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::BinaryExpr( binary_expr, )), @@ -325,6 +354,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some( protobuf::physical_expr_node::ExprType::Case( Box::new( @@ -368,6 +398,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::NotExpr(Box::new( protobuf::PhysicalNot { expr: Some(Box::new( @@ -379,6 +410,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNullExpr( Box::new(protobuf::PhysicalIsNull { expr: Some(Box::new( @@ -390,6 +422,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::IsNotNullExpr( Box::new(protobuf::PhysicalIsNotNull { expr: Some(Box::new( @@ -401,6 +434,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::InList(Box::new( protobuf::PhysicalInListNode { expr: Some(Box::new( @@ -414,6 +448,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Negative(Box::new( protobuf::PhysicalNegativeNode { expr: Some(Box::new( @@ -425,6 +460,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(lit) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Literal( lit.value().try_into()?, )), @@ -432,6 +468,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Cast(Box::new( protobuf::PhysicalCastNode { expr: Some(Box::new( @@ -444,6 +481,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(cast) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::TryCast(Box::new( protobuf::PhysicalTryCastNode { expr: Some(Box::new( @@ -458,6 +496,7 @@ pub fn serialize_physical_expr_with_converter( codec.try_encode_udf(expr.fun(), &mut buf)?; Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::ScalarUdf( protobuf::PhysicalScalarUdfNode { name: expr.name().to_string(), @@ -475,6 +514,7 @@ pub fn serialize_physical_expr_with_converter( } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::LikeExpr(Box::new( protobuf::PhysicalLikeExprNode { negated: expr.negated(), @@ -492,6 +532,7 @@ pub fn serialize_physical_expr_with_converter( let (s0, s1, s2, s3) = expr.seeds(); Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::HashExpr( protobuf::PhysicalHashExprNode { on_columns: serialize_physical_exprs( @@ -518,6 +559,7 @@ pub fn serialize_physical_expr_with_converter( .collect::>()?; Ok(protobuf::PhysicalExprNode { expr_id: None, + dynamic_filter_inner_id: None, expr_type: Some(protobuf::physical_expr_node::ExprType::Extension( protobuf::PhysicalExtensionExprNode { expr: buf, inputs }, )), diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index bc310150d8982..fcc5cfb55185a 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -121,6 +121,7 @@ use datafusion_proto::physical_plan::{ PhysicalProtoConverterExtension, }; use datafusion_proto::protobuf; +use datafusion_proto::protobuf::physical_plan_node::PhysicalPlanType; use datafusion_proto::protobuf::{PhysicalExprNode, PhysicalPlanNode}; use prost::Message; @@ -129,6 +130,9 @@ use crate::cases::{ MyRegexUdfNode, }; +use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::utils::reassign_expr_columns; + /// Perform a serde roundtrip and assert that the string representation of the before and after plans /// are identical. Note that this often isn't sufficient to guarantee that no information is /// lost during serde because the string representation of a plan often only shows a subset of state. @@ -2751,6 +2755,7 @@ fn test_backward_compatibility_no_expr_id() -> Result<()> { // Manually create a proto without expr_id set let proto = PhysicalExprNode { expr_id: None, // Simulating old proto without this field + dynamic_filter_inner_id: None, expr_type: Some( datafusion_proto::protobuf::physical_expr_node::ExprType::Column( datafusion_proto::protobuf::PhysicalColumn { @@ -2949,6 +2954,210 @@ fn test_deduplication_within_expr_deserialization() -> Result<()> { Ok(()) } +#[test] +fn test_dynamic_filters_different_filter_same_inner_state() { + let filter_expr_1 = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc; + + // Column "a" is now at index 1, which creates a new filter. + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let filter_expr_2 = + reassign_expr_columns(Arc::clone(&filter_expr_1), &schema).unwrap(); + + // Meta-assertion: ensure this test is testing the case where the inner state is the same but + // the exprs are different + let (outer_equal, inner_equal) = + dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); + assert!(!outer_equal); + assert!(inner_equal); + test_deduplication_of_dynamic_filter_expression(filter_expr_1, filter_expr_2, schema) + .unwrap(); +} + +#[test] +fn test_dynamic_filters_same_filter() { + let filter_expr_1 = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + + let filter_expr_2 = Arc::clone(&filter_expr_1); + + // Ensure this test is testing the case where the inner state is the same and the exprs are the same + let (outer_equal, inner_equal) = + dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); + assert!(outer_equal); + assert!(inner_equal); + test_deduplication_of_dynamic_filter_expression(filter_expr_1, filter_expr_2, schema) + .unwrap(); +} + +#[test] +fn test_dynamic_filters_different_filter() { + let filter_expr_1 = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc; + + let filter_expr_2 = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc; + + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, false)])); + + // Ensure this test is testing the case where the inner state is the different and the outer exprs are different + let (outer_equal, inner_equal) = + dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); + assert!(!outer_equal); + assert!(!inner_equal); + test_deduplication_of_dynamic_filter_expression(filter_expr_1, filter_expr_2, schema) + .unwrap(); +} + +/// Returns (outer_equal, inner_equal) +/// +/// outer_equal is true if the two arcs point to the same data. +/// inner_equal is true if the two dynamic filters have the same inner state +fn dynamic_filter_outer_inner_equal( + filter_expr_1: &Arc, + filter_expr_2: &Arc, +) -> (bool, bool) { + ( + std::ptr::addr_eq(Arc::as_ptr(filter_expr_1), Arc::as_ptr(filter_expr_2)), + filter_expr_1 + .as_any() + .downcast_ref::() + .unwrap() + .inner_id() + == filter_expr_2 + .as_any() + .downcast_ref::() + .unwrap() + .inner_id(), + ) +} + +fn test_deduplication_of_dynamic_filter_expression( + filter_expr_1: Arc, + filter_expr_2: Arc, + schema: Arc, +) -> Result<()> { + let (outer_equal, inner_equal) = + dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); + + // Create execution plan: FilterExec(filter2) -> FilterExec(filter1) -> EmptyExec + let empty_exec = Arc::new(EmptyExec::new(schema)) as Arc; + let filter_exec1 = Arc::new(FilterExec::try_new(filter_expr_1, empty_exec)?) + as Arc; + let filter_exec2 = Arc::new(FilterExec::try_new(filter_expr_2, filter_exec1)?) + as Arc; + + // Serialize the plan + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.execution_plan_to_proto(&filter_exec2, &codec)?; + + let outer_filter = match &proto.physical_plan_type { + Some(PhysicalPlanType::Filter(outer_filter)) => outer_filter, + _ => panic!("Expected PhysicalPlanType::Filter"), + }; + + let inner_filter = match &outer_filter.input { + Some(inner_input) => match &inner_input.physical_plan_type { + Some(PhysicalPlanType::Filter(inner_filter)) => inner_filter, + _ => panic!("Expected PhysicalPlanType::Filter"), + }, + _ => panic!("Expected inner input"), + }; + + let filter1_proto = inner_filter + .expr + .as_ref() + .expect("Should have filter expression"); + + let filter2_proto = outer_filter + .expr + .as_ref() + .expect("Should have filter expression"); + + // Both should have dynamic_filter_inner_id set + let filter1_dynamic_id = filter1_proto + .dynamic_filter_inner_id + .expect("Filter1 should have dynamic_filter_inner_id"); + let filter2_dynamic_id = filter2_proto + .dynamic_filter_inner_id + .expect("Filter2 should have dynamic_filter_inner_id"); + + assert_eq!( + inner_equal, + filter1_dynamic_id == filter2_dynamic_id, + "Dynamic filters sharing the same inner state should have the same dynamic_filter_inner_id" + ); + + let filter1_expr_id = filter1_proto.expr_id.expect("Should have expr_id"); + let filter2_expr_id = filter2_proto.expr_id.expect("Should have expr_id"); + assert_eq!( + outer_equal, + filter1_expr_id == filter2_expr_id, + "Different filters have different expr ids" + ); + + // Test deserialization - verify that filters with same dynamic_filter_inner_id share state + let ctx = SessionContext::new(); + let deserialized_plan = + converter.proto_to_execution_plan(ctx.task_ctx().as_ref(), &codec, &proto)?; + + // Extract the two filter expressions from the deserialized plan + let outer_filter = deserialized_plan + .as_any() + .downcast_ref::() + .expect("Should be FilterExec"); + let filter2_deserialized = outer_filter.predicate(); + + let inner_filter = outer_filter.children()[0] + .as_any() + .downcast_ref::() + .expect("Inner should be FilterExec"); + let filter1_deserialized = inner_filter.predicate(); + + // The Arcs should be different (different outer wrappers) + assert_eq!( + outer_equal, + Arc::ptr_eq(filter1_deserialized, filter2_deserialized), + "Deserialized filters should be different Arcs" + ); + + // Check if they're DynamicFilterPhysicalExpr (they might be snapshotted to Literal) + let (df1, df2) = match ( + filter1_deserialized + .as_any() + .downcast_ref::(), + filter2_deserialized + .as_any() + .downcast_ref::(), + ) { + (Some(df1), Some(df2)) => (df1, df2), + _ => panic!("Should be DynamicFilterPhysicalExpr"), + }; + + // But they should have the same inner_id (shared inner state) + assert_eq!( + inner_equal, + df1.inner_id() == df2.inner_id(), + "Deserialized filters should share inner state" + ); + + Ok(()) +} + /// Test that session_id rotates between top-level serialization operations. /// This verifies that each top-level serialization gets a fresh session_id, /// which prevents cross-process collisions when serialized plans are merged. From 3191b1c0c5a71ef30bbb645265b4fd669dbbdefa Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 18 Feb 2026 20:34:57 +0000 Subject: [PATCH 2/5] wip --- .../src/expressions/dynamic_filters.rs | 126 ++++++++++++++---- datafusion/proto/proto/datafusion.proto | 5 +- datafusion/proto/src/generated/pbjson.rs | 83 ++++++++++-- datafusion/proto/src/generated/prost.rs | 12 +- .../proto/src/physical_plan/from_proto.rs | 30 ++++- datafusion/proto/src/physical_plan/mod.rs | 88 +++++------- .../proto/src/physical_plan/to_proto.rs | 57 ++++---- .../tests/cases/roundtrip_physical_plan.rs | 86 +++++++++++- 8 files changed, 359 insertions(+), 128 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 3fc865a828ef8..080ec04e5818b 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -99,6 +99,18 @@ impl Inner { } } + fn new_from_state( + generation: u64, + expr: Arc, + is_complete: bool, + ) -> Self { + Self { + generation, + expr, + is_complete, + } + } + /// Clone the inner expression. fn expr(&self) -> &Arc { &self.expr @@ -180,18 +192,73 @@ impl DynamicFilterPhysicalExpr { } } - fn remap_children( - children: &[Arc], - remapped_children: Option<&Vec>>, - expr: Arc, - ) -> Result> { - if let Some(remapped_children) = remapped_children { + /// Reconstructs a [`DynamicFilterPhysicalExpr`] from a snapshot. + /// + /// This is used during deserialization to recreate filters with their serialized state. + #[doc(hidden)] + pub fn new_from_snapshot( + children: Vec>, + remapped_children: Option>>, + generation: u64, + inner_expr: Arc, + is_complete: bool, + ) -> Self { + let state = if is_complete { + FilterState::Complete { generation } + } else { + FilterState::InProgress { generation } + }; + let (state_watch, _) = watch::channel(state); + + Self { + children, + remapped_children, + inner: Arc::new(RwLock::new(Inner::new_from_state( + generation, + inner_expr, + is_complete, + ))), + state_watch, + data_type: Arc::new(RwLock::new(None)), + nullable: Arc::new(RwLock::new(None)), + } + } + + /// Create a new [`DynamicFilterPhysicalExpr`] sharing inner state from a source filter. + /// + /// This is an internal API used during deserialization to reconstruct filters that share + /// the same inner state but have different outer wrappers (e.g., after column remapping). + /// + /// # Arguments + /// * `children` - The base children for this filter + /// * `remapped_children` - Optional remapped children (if different from base) + /// * `source` - The source filter to share inner state with + /// + /// # Warning + /// This is a low-level API intended for use by the proto deserialization layer. + /// Most users should use [`Self::new`] instead. + pub fn new_from_source( + &self, + source: &DynamicFilterPhysicalExpr, + ) -> Self { + Self { + children: self.children.clone(), + remapped_children: self.remapped_children.clone(), + inner: Arc::clone(&source.inner), + state_watch: self.state_watch.clone(), + data_type: Arc::clone(&self.data_type), + nullable: Arc::clone(&self.nullable), + } + } + + fn remap_children(&self, expr: Arc) -> Result> { + if let Some(remapped_children) = &self.remapped_children { // Remap the children to the new children // of the expression. expr.transform_up(|child| { // Check if this is any of our original children if let Some(pos) = - children.iter().position(|c| c.as_ref() == child.as_ref()) + self.children.iter().position(|c| c.as_ref() == child.as_ref()) { // If so, remap it to the current children // of the expression. @@ -219,7 +286,34 @@ impl DynamicFilterPhysicalExpr { /// remapped to match calls to [`PhysicalExpr::with_new_children`]. pub fn current(&self) -> Result> { let expr = Arc::clone(self.inner.read().expr()); - Self::remap_children(&self.children, self.remapped_children.as_ref(), expr) + self.remap_children(expr) + } + + /// Captures all state needed for serialization atomically. + /// + /// Returns (children, remapped_children, generation, inner_expr, is_complete). + #[doc(hidden)] + #[allow(clippy::type_complexity)] + pub fn current_snapshot( + &self, + ) -> Result<( + Vec>, + Option>>, + u64, + Arc, + bool, + )> { + let (generation, expr, is_complete) = { + let inner = self.inner.read(); + (inner.generation, Arc::clone(&inner.expr), inner.is_complete) + }; + Ok(( + self.children.clone(), + self.remapped_children.clone(), + generation, + self.remap_children(expr)?, + is_complete, + )) } /// Update the current expression and notify all waiters. @@ -233,11 +327,7 @@ impl DynamicFilterPhysicalExpr { // We still do this again in `current()` but doing it preventively here // reduces the work needed in some cases if `current()` is called multiple times // and the same externally facing `PhysicalExpr` is used for both `with_new_children` and `update()`.` - let new_expr = Self::remap_children( - &self.children, - self.remapped_children.as_ref(), - new_expr, - )?; + let new_expr = self.remap_children(new_expr)?; // Load the current inner, increment generation, and store the new one let mut current = self.inner.write(); @@ -446,16 +536,6 @@ impl PhysicalExpr for DynamicFilterPhysicalExpr { fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { self.render(f, |expr, f| expr.fmt_sql(f)) } - - fn snapshot(&self) -> Result>> { - // Return the current expression as a snapshot. - Ok(Some(self.current()?)) - } - - fn snapshot_generation(&self) -> u64 { - // Return the current generation of the expression. - self.inner.read().generation - } } #[cfg(test)] diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 9e8f6f8658bc5..1fbd986de7994 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -910,7 +910,10 @@ message PhysicalExprNode { message PhysicalDynamicFilterNode { repeated PhysicalExprNode children = 1; - PhysicalExprNode initial_expr = 2; + repeated PhysicalExprNode remapped_children = 2; + uint64 generation = 3; + PhysicalExprNode inner_expr = 4; + bool is_complete = 5; } message PhysicalScalarUdfNode { diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index c2019693b8084..41cf971c9293e 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -16236,15 +16236,35 @@ impl serde::Serialize for PhysicalDynamicFilterNode { if !self.children.is_empty() { len += 1; } - if self.initial_expr.is_some() { + if !self.remapped_children.is_empty() { + len += 1; + } + if self.generation != 0 { + len += 1; + } + if self.inner_expr.is_some() { + len += 1; + } + if self.is_complete { len += 1; } let mut struct_ser = serializer.serialize_struct("datafusion.PhysicalDynamicFilterNode", len)?; if !self.children.is_empty() { struct_ser.serialize_field("children", &self.children)?; } - if let Some(v) = self.initial_expr.as_ref() { - struct_ser.serialize_field("initialExpr", v)?; + if !self.remapped_children.is_empty() { + struct_ser.serialize_field("remappedChildren", &self.remapped_children)?; + } + if self.generation != 0 { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("generation", ToString::to_string(&self.generation).as_str())?; + } + if let Some(v) = self.inner_expr.as_ref() { + struct_ser.serialize_field("innerExpr", v)?; + } + if self.is_complete { + struct_ser.serialize_field("isComplete", &self.is_complete)?; } struct_ser.end() } @@ -16257,14 +16277,22 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { { const FIELDS: &[&str] = &[ "children", - "initial_expr", - "initialExpr", + "remapped_children", + "remappedChildren", + "generation", + "inner_expr", + "innerExpr", + "is_complete", + "isComplete", ]; #[allow(clippy::enum_variant_names)] enum GeneratedField { Children, - InitialExpr, + RemappedChildren, + Generation, + InnerExpr, + IsComplete, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -16287,7 +16315,10 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { { match value { "children" => Ok(GeneratedField::Children), - "initialExpr" | "initial_expr" => Ok(GeneratedField::InitialExpr), + "remappedChildren" | "remapped_children" => Ok(GeneratedField::RemappedChildren), + "generation" => Ok(GeneratedField::Generation), + "innerExpr" | "inner_expr" => Ok(GeneratedField::InnerExpr), + "isComplete" | "is_complete" => Ok(GeneratedField::IsComplete), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -16308,7 +16339,10 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { V: serde::de::MapAccess<'de>, { let mut children__ = None; - let mut initial_expr__ = None; + let mut remapped_children__ = None; + let mut generation__ = None; + let mut inner_expr__ = None; + let mut is_complete__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::Children => { @@ -16317,17 +16351,40 @@ impl<'de> serde::Deserialize<'de> for PhysicalDynamicFilterNode { } children__ = Some(map_.next_value()?); } - GeneratedField::InitialExpr => { - if initial_expr__.is_some() { - return Err(serde::de::Error::duplicate_field("initialExpr")); + GeneratedField::RemappedChildren => { + if remapped_children__.is_some() { + return Err(serde::de::Error::duplicate_field("remappedChildren")); + } + remapped_children__ = Some(map_.next_value()?); + } + GeneratedField::Generation => { + if generation__.is_some() { + return Err(serde::de::Error::duplicate_field("generation")); + } + generation__ = + Some(map_.next_value::<::pbjson::private::NumberDeserialize<_>>()?.0) + ; + } + GeneratedField::InnerExpr => { + if inner_expr__.is_some() { + return Err(serde::de::Error::duplicate_field("innerExpr")); + } + inner_expr__ = map_.next_value()?; + } + GeneratedField::IsComplete => { + if is_complete__.is_some() { + return Err(serde::de::Error::duplicate_field("isComplete")); } - initial_expr__ = map_.next_value()?; + is_complete__ = Some(map_.next_value()?); } } } Ok(PhysicalDynamicFilterNode { children: children__.unwrap_or_default(), - initial_expr: initial_expr__, + remapped_children: remapped_children__.unwrap_or_default(), + generation: generation__.unwrap_or_default(), + inner_expr: inner_expr__, + is_complete: is_complete__.unwrap_or_default(), }) } } diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index adfef2cd61999..71bfcde2b57e7 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -1361,10 +1361,14 @@ pub mod physical_expr_node { pub struct PhysicalDynamicFilterNode { #[prost(message, repeated, tag = "1")] pub children: ::prost::alloc::vec::Vec, - #[prost(message, optional, boxed, tag = "2")] - pub initial_expr: ::core::option::Option< - ::prost::alloc::boxed::Box, - >, + #[prost(message, repeated, tag = "2")] + pub remapped_children: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub generation: u64, + #[prost(message, optional, boxed, tag = "4")] + pub inner_expr: ::core::option::Option<::prost::alloc::boxed::Box>, + #[prost(bool, tag = "5")] + pub is_complete: bool, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct PhysicalScalarUdfNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index 6f63012f97c51..f144ab0614772 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -505,17 +505,37 @@ pub fn parse_physical_expr_with_converter( proto_converter, )?; - let initial_expr = parse_required_physical_expr( - dynamic_filter.initial_expr.as_deref(), + let remapped_children = if !dynamic_filter.remapped_children.is_empty() { + Some(parse_physical_exprs( + &dynamic_filter.remapped_children, + ctx, + input_schema, + codec, + proto_converter, + )?) + } else { + None + }; + + let inner_expr = parse_required_physical_expr( + dynamic_filter.inner_expr.as_deref(), ctx, - "initial_expr", + "inner_expr", input_schema, codec, proto_converter, )?; - // Constructor signature is: new(children, inner) - Arc::new(DynamicFilterPhysicalExpr::new(children, initial_expr)) + // Recreate filter from snapshot + let base_filter = Arc::new(DynamicFilterPhysicalExpr::new_from_snapshot( + children, + remapped_children, + dynamic_filter.generation, + inner_expr, + dynamic_filter.is_complete, + )); + + base_filter as Arc } ExprType::Extension(extension) => { let inputs: Vec> = extension diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 3a196808e228d..bcefda046cf1f 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -92,7 +92,7 @@ use self::from_proto::parse_protobuf_partitioning; use self::to_proto::serialize_partitioning; use crate::common::{byte_to_string, str_to_byte}; use crate::physical_plan::from_proto::{ - parse_physical_expr_with_converter, parse_physical_sort_expr, + parse_physical_expr_with_converter, parse_physical_exprs, parse_physical_sort_expr, parse_physical_sort_exprs, parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto, }; @@ -3935,77 +3935,55 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { where Self: Sized, { - // First check the regular cache by expr_id (same outer Arc) + // The entire expr is cached, so re-use it. if let Some(expr_id) = proto.expr_id { if let Some(cached) = self.cache.borrow().get(&expr_id) { return Ok(Arc::clone(cached)); } } + // Cache miss, we must deserialize the expr. + let mut expr = + parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)?; + // Check if we need to share inner state with a cached dynamic filter if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id { if let Some(cached_filter) = self.dynamic_filter_cache.borrow().get(&dynamic_filter_id) { - // We have a cached filter with the same dynamic_filter_inner_id - // Deserialize to get the new children, then create a new Arc with shared inner state - let expr = parse_physical_expr_with_converter( - proto, - ctx, - input_schema, - codec, - self, - )?; - - // Get the children from the newly deserialized expression - if let Some(new_df) = + // Get the base filter's structure + let Some(cached_df) = cached_filter + .as_any() + .downcast_ref::() + else { + return internal_err!( + "dynamic filter cache returned an expression that is not a DynamicFilterPhysicalExpr" + ); + }; + + // Get the base filter's structure + let Some(dynamic_filter_expr) = expr.as_any().downcast_ref::() - { - let new_children: Vec> = - new_df.children().into_iter().cloned().collect(); - // Create a new Arc with the cached filter's inner state but new children - let expr_with_shared_inner = - Arc::clone(cached_filter).with_new_children(new_children)?; - - // Cache by expr_id if present - if let Some(expr_id) = proto.expr_id { - self.cache - .borrow_mut() - .insert(expr_id, Arc::clone(&expr_with_shared_inner)); - } - - return Ok(expr_with_shared_inner); - } - } - } - - // Normal deserialization path - let expr = if let Some(expr_id) = proto.expr_id { - let expr = parse_physical_expr_with_converter( - proto, - ctx, - input_schema, - codec, - self, - )?; - self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); - expr - } else { - parse_physical_expr_with_converter(proto, ctx, input_schema, codec, self)? - }; - - // If this is a dynamic filter, cache it by dynamic_filter_inner_id - if let Some(dynamic_filter_id) = proto.dynamic_filter_inner_id { - if expr - .as_any() - .downcast_ref::() - .is_some() - { + else { + return internal_err!( + "dynamic_filter_id present in proto, but the expression was not a DynamicFilterPhysicalExpr" + ); + }; + expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)) + as Arc; + } else { + // Cache it self.dynamic_filter_cache .borrow_mut() .insert(dynamic_filter_id, Arc::clone(&expr)); } }; + + // Cache it if the cache key is available. + if let Some(expr_id) = proto.expr_id { + self.cache.borrow_mut().insert(expr_id, Arc::clone(&expr)); + }; + Ok(expr) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 9c5e335bceff3..dea4f0a935169 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -258,29 +258,6 @@ pub fn serialize_physical_expr_with_converter( codec: &dyn PhysicalExtensionCodec, proto_converter: &dyn PhysicalProtoConverterExtension, ) -> Result { - // Check for DynamicFilterPhysicalExpr before snapshotting - if let Some(df) = value.as_any().downcast_ref::() { - let children = df - .children() - .iter() - .map(|child| proto_converter.physical_expr_to_proto(child, codec)) - .collect::>>()?; - - let current_expr = - Box::new(proto_converter.physical_expr_to_proto(&df.current()?, codec)?); - - return Ok(protobuf::PhysicalExprNode { - expr_id: None, - dynamic_filter_inner_id: None, - expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( - Box::new(protobuf::PhysicalDynamicFilterNode { - children, - initial_expr: Some(current_expr), - }), - )), - }); - } - // Snapshot the expr in case it has dynamic predicate state so // it can be serialized let value = snapshot_physical_expr(Arc::clone(value))?; @@ -351,6 +328,40 @@ pub fn serialize_physical_expr_with_converter( binary_expr, )), }) + } else if let Some(df) = expr.downcast_ref::() { + // Capture all state atomically + let (base_children, remapped, generation, inner_expr_val, is_complete) = + df.current_snapshot()?; + + let children = base_children + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()?; + + let remapped_children = if let Some(remapped) = remapped { + remapped + .iter() + .map(|child| proto_converter.physical_expr_to_proto(child, codec)) + .collect::>>()? + } else { + vec![] + }; + + let inner_expr = Box::new(proto_converter.physical_expr_to_proto(&inner_expr_val, codec)?); + + Ok(protobuf::PhysicalExprNode { + expr_id: None, + dynamic_filter_inner_id: None, + expr_type: Some(protobuf::physical_expr_node::ExprType::DynamicFilter( + Box::new(protobuf::PhysicalDynamicFilterNode { + children, + remapped_children, + generation, + inner_expr: Some(inner_expr), + is_complete, + }), + )), + }) } else if let Some(expr) = expr.downcast_ref::() { Ok(protobuf::PhysicalExprNode { expr_id: None, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index fcc5cfb55185a..1574ce495222c 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3022,10 +3022,82 @@ fn test_dynamic_filters_different_filter() { .unwrap(); } +#[test] +fn test_dynamic_filter_roundtrip() -> Result<()> { + // Create a dynamic filter with base children + let filter_expr = Arc::new(DynamicFilterPhysicalExpr::new( + vec![Arc::new(Column::new("a", 0)) as Arc], + lit(true), + )) as Arc; + + // Add remapped columns by reassigning to a schema where "a" is at index 1 + let schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Int64, false), + Field::new("a", DataType::Int64, false), + ])); + let filter_expr = reassign_expr_columns(filter_expr, &schema).unwrap(); + + // Update its internal state + let df = filter_expr + .as_any() + .downcast_ref::() + .unwrap(); + df.update(lit(42))?; + df.update(lit(100))?; + df.mark_complete(); + + // Serialize + let codec = DefaultPhysicalExtensionCodec {}; + let converter = DeduplicatingProtoConverter {}; + let proto = converter.physical_expr_to_proto(&filter_expr, &codec)?; + + // Deserialize + let ctx = SessionContext::new(); + let deserialized_filter = + converter.proto_to_physical_expr(&proto, ctx.task_ctx().as_ref(), &schema, &codec)?; + + let deserialized_df = deserialized_filter + .as_any() + .downcast_ref::() + .expect("Should be DynamicFilterPhysicalExpr"); + + assert_dynamic_filter_fields_equal(df, deserialized_df); + + Ok(()) +} + +/// Asserts that dynamic filters have the same fields without asserting any pointer values. +/// Useful for testing filter equality after roundtrip. +fn assert_dynamic_filter_fields_equal( + df1: &DynamicFilterPhysicalExpr, + df2: &DynamicFilterPhysicalExpr, +) { + let (children1, remapped1, gen1, expr1, complete1) = df1.current_snapshot().unwrap(); + let (children2, remapped2, gen2, expr2, complete2) = df2.current_snapshot().unwrap(); + + assert_eq!(gen1, gen2, "Generation should be preserved"); + assert_eq!(complete1, complete2, "Completion status should be preserved"); + assert_eq!( + format!("{:?}", expr1), + format!("{:?}", expr2), + "Inner expression should be preserved" + ); + assert_eq!( + format!("{:?}", children1), + format!("{:?}", children2), + "Base children should be preserved" + ); + assert_eq!( + format!("{:?}", remapped1), + format!("{:?}", remapped2), + "Remapped children should be preserved" + ); +} + /// Returns (outer_equal, inner_equal) /// /// outer_equal is true if the two arcs point to the same data. -/// inner_equal is true if the two dynamic filters have the same inner state +/// inner_equal is true if the two dynamic filters have the same inner arc. fn dynamic_filter_outer_inner_equal( filter_expr_1: &Arc, filter_expr_2: &Arc, @@ -3052,12 +3124,12 @@ fn test_deduplication_of_dynamic_filter_expression( ) -> Result<()> { let (outer_equal, inner_equal) = dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); - + // Create execution plan: FilterExec(filter2) -> FilterExec(filter1) -> EmptyExec let empty_exec = Arc::new(EmptyExec::new(schema)) as Arc; - let filter_exec1 = Arc::new(FilterExec::try_new(filter_expr_1, empty_exec)?) + let filter_exec1 = Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_1), empty_exec)?) as Arc; - let filter_exec2 = Arc::new(FilterExec::try_new(filter_expr_2, filter_exec1)?) + let filter_exec2 = Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_2), filter_exec1)?) as Arc; // Serialize the plan @@ -3155,6 +3227,12 @@ fn test_deduplication_of_dynamic_filter_expression( "Deserialized filters should share inner state" ); + // Ensure the children and remapped children are equal after the roundtrip + let filter_1_before_roundtrip = filter_expr_1.as_any().downcast_ref::().unwrap(); + let filter_2_before_roundtrip = filter_expr_2.as_any().downcast_ref::().unwrap(); + assert_dynamic_filter_fields_equal(filter_1_before_roundtrip, df1); + assert_dynamic_filter_fields_equal(filter_2_before_roundtrip, df2); + Ok(()) } From 131abc224b9880fd9d0204ae06e33e19f55f6baf Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 18 Feb 2026 20:59:40 +0000 Subject: [PATCH 3/5] wip --- .../src/expressions/dynamic_filters.rs | 22 ++++++------ datafusion/proto/src/physical_plan/mod.rs | 12 +++---- .../proto/src/physical_plan/to_proto.rs | 3 +- .../tests/cases/roundtrip_physical_plan.rs | 36 +++++++++++++------ 4 files changed, 46 insertions(+), 27 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 080ec04e5818b..f3400f69acef0 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -237,10 +237,7 @@ impl DynamicFilterPhysicalExpr { /// # Warning /// This is a low-level API intended for use by the proto deserialization layer. /// Most users should use [`Self::new`] instead. - pub fn new_from_source( - &self, - source: &DynamicFilterPhysicalExpr, - ) -> Self { + pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) -> Self { Self { children: self.children.clone(), remapped_children: self.remapped_children.clone(), @@ -251,14 +248,19 @@ impl DynamicFilterPhysicalExpr { } } - fn remap_children(&self, expr: Arc) -> Result> { + fn remap_children( + &self, + expr: Arc, + ) -> Result> { if let Some(remapped_children) = &self.remapped_children { // Remap the children to the new children // of the expression. expr.transform_up(|child| { // Check if this is any of our original children - if let Some(pos) = - self.children.iter().position(|c| c.as_ref() == child.as_ref()) + if let Some(pos) = self + .children + .iter() + .position(|c| c.as_ref() == child.as_ref()) { // If so, remap it to the current children // of the expression. @@ -293,7 +295,7 @@ impl DynamicFilterPhysicalExpr { /// /// Returns (children, remapped_children, generation, inner_expr, is_complete). #[doc(hidden)] - #[allow(clippy::type_complexity)] + #[expect(clippy::type_complexity)] pub fn current_snapshot( &self, ) -> Result<( @@ -419,8 +421,8 @@ impl DynamicFilterPhysicalExpr { /// Returns a unique identifier for the inner shared state. /// - /// Useful for checking if two [Arc] with the same - /// underlying [DynamicFilterPhysicalExpr] are the same. + /// Useful for checking if two [`Arc`] with the same + /// underlying [`DynamicFilterPhysicalExpr`] are the same. pub fn inner_id(&self) -> u64 { Arc::as_ptr(&self.inner) as *const () as u64 } diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index bcefda046cf1f..2662b5e780a94 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -92,7 +92,7 @@ use self::from_proto::parse_protobuf_partitioning; use self::to_proto::serialize_partitioning; use crate::common::{byte_to_string, str_to_byte}; use crate::physical_plan::from_proto::{ - parse_physical_expr_with_converter, parse_physical_exprs, parse_physical_sort_expr, + parse_physical_expr_with_converter, parse_physical_sort_expr, parse_physical_sort_exprs, parse_physical_window_expr, parse_protobuf_file_scan_config, parse_record_batches, parse_table_schema_from_proto, }; @@ -3936,10 +3936,10 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { Self: Sized, { // The entire expr is cached, so re-use it. - if let Some(expr_id) = proto.expr_id { - if let Some(cached) = self.cache.borrow().get(&expr_id) { - return Ok(Arc::clone(cached)); - } + if let Some(expr_id) = proto.expr_id + && let Some(cached) = self.cache.borrow().get(&expr_id) + { + return Ok(Arc::clone(cached)); } // Cache miss, we must deserialize the expr. @@ -3972,7 +3972,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)) as Arc; } else { - // Cache it + // Cache it self.dynamic_filter_cache .borrow_mut() .insert(dynamic_filter_id, Arc::clone(&expr)); diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index dea4f0a935169..371229c63bed8 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -347,7 +347,8 @@ pub fn serialize_physical_expr_with_converter( vec![] }; - let inner_expr = Box::new(proto_converter.physical_expr_to_proto(&inner_expr_val, codec)?); + let inner_expr = + Box::new(proto_converter.physical_expr_to_proto(&inner_expr_val, codec)?); Ok(protobuf::PhysicalExprNode { expr_id: None, diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 1574ce495222c..9741495823dec 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3053,8 +3053,12 @@ fn test_dynamic_filter_roundtrip() -> Result<()> { // Deserialize let ctx = SessionContext::new(); - let deserialized_filter = - converter.proto_to_physical_expr(&proto, ctx.task_ctx().as_ref(), &schema, &codec)?; + let deserialized_filter = converter.proto_to_physical_expr( + &proto, + ctx.task_ctx().as_ref(), + &schema, + &codec, + )?; let deserialized_df = deserialized_filter .as_any() @@ -3076,7 +3080,10 @@ fn assert_dynamic_filter_fields_equal( let (children2, remapped2, gen2, expr2, complete2) = df2.current_snapshot().unwrap(); assert_eq!(gen1, gen2, "Generation should be preserved"); - assert_eq!(complete1, complete2, "Completion status should be preserved"); + assert_eq!( + complete1, complete2, + "Completion status should be preserved" + ); assert_eq!( format!("{:?}", expr1), format!("{:?}", expr2), @@ -3124,13 +3131,16 @@ fn test_deduplication_of_dynamic_filter_expression( ) -> Result<()> { let (outer_equal, inner_equal) = dynamic_filter_outer_inner_equal(&filter_expr_1, &filter_expr_2); - + // Create execution plan: FilterExec(filter2) -> FilterExec(filter1) -> EmptyExec let empty_exec = Arc::new(EmptyExec::new(schema)) as Arc; - let filter_exec1 = Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_1), empty_exec)?) - as Arc; - let filter_exec2 = Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_2), filter_exec1)?) - as Arc; + let filter_exec1 = + Arc::new(FilterExec::try_new(Arc::clone(&filter_expr_1), empty_exec)?) + as Arc; + let filter_exec2 = Arc::new(FilterExec::try_new( + Arc::clone(&filter_expr_2), + filter_exec1, + )?) as Arc; // Serialize the plan let codec = DefaultPhysicalExtensionCodec {}; @@ -3228,8 +3238,14 @@ fn test_deduplication_of_dynamic_filter_expression( ); // Ensure the children and remapped children are equal after the roundtrip - let filter_1_before_roundtrip = filter_expr_1.as_any().downcast_ref::().unwrap(); - let filter_2_before_roundtrip = filter_expr_2.as_any().downcast_ref::().unwrap(); + let filter_1_before_roundtrip = filter_expr_1 + .as_any() + .downcast_ref::() + .unwrap(); + let filter_2_before_roundtrip = filter_expr_2 + .as_any() + .downcast_ref::() + .unwrap(); assert_dynamic_filter_fields_equal(filter_1_before_roundtrip, df1); assert_dynamic_filter_fields_equal(filter_2_before_roundtrip, df2); From b99ae87f6c06abb47ebdc14e0420c4d0f4e5dedc Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 18 Feb 2026 22:10:28 +0000 Subject: [PATCH 4/5] wip --- .../src/expressions/dynamic_filters.rs | 174 +++++++++++------- .../physical-expr/src/expressions/mod.rs | 2 +- .../proto/src/physical_plan/from_proto.rs | 17 +- datafusion/proto/src/physical_plan/mod.rs | 2 +- .../proto/src/physical_plan/to_proto.rs | 17 +- .../tests/cases/roundtrip_physical_plan.rs | 48 ++--- 6 files changed, 145 insertions(+), 115 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index f3400f69acef0..91003a196eccd 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -22,7 +22,7 @@ use tokio::sync::watch; use crate::PhysicalExpr; use arrow::datatypes::{DataType, Schema}; use datafusion_common::{ - Result, + Result, internal_err, tree_node::{Transformed, TransformedResult, TreeNode}, }; use datafusion_expr::ColumnarValue; @@ -88,6 +88,69 @@ struct Inner { is_complete: bool, } +/// An atomic snapshot of a [`DynamicFilterPhysicalExpr`] used to reconstruct the expression during +/// serialization / deserialization. +pub struct DynamicFilterSnapshot { + children: Vec>, + remapped_children: Option>>, + // Inner state. + generation: u64, + inner_expr: Arc, + is_complete: bool, +} + +impl DynamicFilterSnapshot { + pub fn new( + children: Vec>, + remapped_children: Option>>, + generation: u64, + inner_expr: Arc, + is_complete: bool, + ) -> Self { + Self { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } + } + + pub fn children(&self) -> &[Arc] { + &self.children + } + + pub fn remapped_children(&self) -> Option<&[Arc]> { + self.remapped_children.as_deref() + } + + pub fn generation(&self) -> u64 { + self.generation + } + + pub fn inner_expr(&self) -> &Arc { + &self.inner_expr + } + + pub fn is_complete(&self) -> bool { + self.is_complete + } +} + +impl Display for DynamicFilterSnapshot { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "DynamicFilterSnapshot {{ children: {:?}, remapped_children: {:?}, generation: {}, inner_expr: {:?}, is_complete: {} }}", + self.children, + self.remapped_children, + self.generation, + self.inner_expr, + self.is_complete + ) + } +} + impl Inner { fn new(expr: Arc) -> Self { Self { @@ -99,18 +162,6 @@ impl Inner { } } - fn new_from_state( - generation: u64, - expr: Arc, - is_complete: bool, - ) -> Self { - Self { - generation, - expr, - is_complete, - } - } - /// Clone the inner expression. fn expr(&self) -> &Arc { &self.expr @@ -194,15 +245,17 @@ impl DynamicFilterPhysicalExpr { /// Reconstructs a [`DynamicFilterPhysicalExpr`] from a snapshot. /// - /// This is used during deserialization to recreate filters with their serialized state. + /// This is a low-level API intended for use by the proto deserialization layer. #[doc(hidden)] - pub fn new_from_snapshot( - children: Vec>, - remapped_children: Option>>, - generation: u64, - inner_expr: Arc, - is_complete: bool, - ) -> Self { + pub fn new_from_snapshot(snapshot: DynamicFilterSnapshot) -> Self { + let DynamicFilterSnapshot { + children, + remapped_children, + generation, + inner_expr, + is_complete, + } = snapshot; + let state = if is_complete { FilterState::Complete { generation } } else { @@ -213,39 +266,59 @@ impl DynamicFilterPhysicalExpr { Self { children, remapped_children, - inner: Arc::new(RwLock::new(Inner::new_from_state( + inner: Arc::new(RwLock::new(Inner { generation, - inner_expr, + expr: inner_expr, is_complete, - ))), + })), state_watch, data_type: Arc::new(RwLock::new(None)), nullable: Arc::new(RwLock::new(None)), } } - /// Create a new [`DynamicFilterPhysicalExpr`] sharing inner state from a source filter. - /// - /// This is an internal API used during deserialization to reconstruct filters that share - /// the same inner state but have different outer wrappers (e.g., after column remapping). + /// Atomically captures all state needed for serialization into a [`DynamicFilterSnapshot`]. /// - /// # Arguments - /// * `children` - The base children for this filter - /// * `remapped_children` - Optional remapped children (if different from base) - /// * `source` - The source filter to share inner state with + /// This is a low-level API intended for use by the proto deserialization layer. + #[doc(hidden)] + pub fn current_snapshot(&self) -> DynamicFilterSnapshot { + let (generation, inner_expr, is_complete) = { + let inner = self.inner.read(); + (inner.generation, Arc::clone(&inner.expr), inner.is_complete) + }; + DynamicFilterSnapshot { + children: self.children.clone(), + remapped_children: self.remapped_children.clone(), + generation, + inner_expr, + is_complete, + } + } + + /// Create a new [`DynamicFilterPhysicalExpr`] from [`self`], except, it overwrites the + /// internal state the source filter. /// - /// # Warning /// This is a low-level API intended for use by the proto deserialization layer. - /// Most users should use [`Self::new`] instead. - pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) -> Self { - Self { + /// + /// Safety: The dynamic filter should not be in use when calling this method, otherwise there + /// may be undefined behavior. This method may do the following or worse: + /// - transition the state to [`FilterState::Complete`] without notifying the watch + /// - cause a generation number to be emitted which is out of order + pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) -> Result { + // Best effort check that no one is subscribed. + if self.state_watch.receiver_count() > 0 { + return internal_err!( + "Cannot replace the inner state of a DynamicFilterPhysicalExpr that has subscribers" + ); + }; + Ok(Self { children: self.children.clone(), remapped_children: self.remapped_children.clone(), inner: Arc::clone(&source.inner), state_watch: self.state_watch.clone(), data_type: Arc::clone(&self.data_type), nullable: Arc::clone(&self.nullable), - } + }) } fn remap_children( @@ -291,33 +364,6 @@ impl DynamicFilterPhysicalExpr { self.remap_children(expr) } - /// Captures all state needed for serialization atomically. - /// - /// Returns (children, remapped_children, generation, inner_expr, is_complete). - #[doc(hidden)] - #[expect(clippy::type_complexity)] - pub fn current_snapshot( - &self, - ) -> Result<( - Vec>, - Option>>, - u64, - Arc, - bool, - )> { - let (generation, expr, is_complete) = { - let inner = self.inner.read(); - (inner.generation, Arc::clone(&inner.expr), inner.is_complete) - }; - Ok(( - self.children.clone(), - self.remapped_children.clone(), - generation, - self.remap_children(expr)?, - is_complete, - )) - } - /// Update the current expression and notify all waiters. /// Any children of this expression must be a subset of the original children /// passed to the constructor. diff --git a/datafusion/physical-expr/src/expressions/mod.rs b/datafusion/physical-expr/src/expressions/mod.rs index c9e02708d6c28..3924fe329f6b7 100644 --- a/datafusion/physical-expr/src/expressions/mod.rs +++ b/datafusion/physical-expr/src/expressions/mod.rs @@ -45,7 +45,7 @@ pub use cast::{CastExpr, cast}; pub use cast_column::CastColumnExpr; pub use column::{Column, col, with_new_schema}; pub use datafusion_expr::utils::format_state_name; -pub use dynamic_filters::DynamicFilterPhysicalExpr; +pub use dynamic_filters::{DynamicFilterPhysicalExpr, DynamicFilterSnapshot}; pub use in_list::{InListExpr, in_list}; pub use is_not_null::{IsNotNullExpr, is_not_null}; pub use is_null::{IsNullExpr, is_null}; diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f144ab0614772..40ed39772fb96 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -58,7 +58,9 @@ use super::{ use crate::logical_plan::{self}; use crate::protobuf::physical_expr_node::ExprType; use crate::{convert_required, protobuf}; -use datafusion_physical_expr::expressions::DynamicFilterPhysicalExpr; +use datafusion_physical_expr::expressions::{ + DynamicFilterPhysicalExpr, DynamicFilterSnapshot, +}; impl From<&protobuf::PhysicalColumn> for Column { fn from(c: &protobuf::PhysicalColumn) -> Column { @@ -528,13 +530,14 @@ pub fn parse_physical_expr_with_converter( // Recreate filter from snapshot let base_filter = Arc::new(DynamicFilterPhysicalExpr::new_from_snapshot( - children, - remapped_children, - dynamic_filter.generation, - inner_expr, - dynamic_filter.is_complete, + DynamicFilterSnapshot::new( + children, + remapped_children, + dynamic_filter.generation, + inner_expr, + dynamic_filter.is_complete, + ), )); - base_filter as Arc } ExprType::Extension(extension) => { diff --git a/datafusion/proto/src/physical_plan/mod.rs b/datafusion/proto/src/physical_plan/mod.rs index 2662b5e780a94..d585606d4f341 100644 --- a/datafusion/proto/src/physical_plan/mod.rs +++ b/datafusion/proto/src/physical_plan/mod.rs @@ -3969,7 +3969,7 @@ impl PhysicalProtoConverterExtension for DeduplicatingDeserializer { "dynamic_filter_id present in proto, but the expression was not a DynamicFilterPhysicalExpr" ); }; - expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)) + expr = Arc::new(dynamic_filter_expr.new_from_source(cached_df)?) as Arc; } else { // Cache it diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 371229c63bed8..609c16b2d3e30 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -330,15 +330,15 @@ pub fn serialize_physical_expr_with_converter( }) } else if let Some(df) = expr.downcast_ref::() { // Capture all state atomically - let (base_children, remapped, generation, inner_expr_val, is_complete) = - df.current_snapshot()?; + let snapshot = df.current_snapshot(); - let children = base_children + let children = snapshot + .children() .iter() .map(|child| proto_converter.physical_expr_to_proto(child, codec)) .collect::>>()?; - let remapped_children = if let Some(remapped) = remapped { + let remapped_children = if let Some(remapped) = snapshot.remapped_children() { remapped .iter() .map(|child| proto_converter.physical_expr_to_proto(child, codec)) @@ -347,8 +347,9 @@ pub fn serialize_physical_expr_with_converter( vec![] }; - let inner_expr = - Box::new(proto_converter.physical_expr_to_proto(&inner_expr_val, codec)?); + let inner_expr = Box::new( + proto_converter.physical_expr_to_proto(snapshot.inner_expr(), codec)?, + ); Ok(protobuf::PhysicalExprNode { expr_id: None, @@ -357,9 +358,9 @@ pub fn serialize_physical_expr_with_converter( Box::new(protobuf::PhysicalDynamicFilterNode { children, remapped_children, - generation, + generation: snapshot.generation(), inner_expr: Some(inner_expr), - is_complete, + is_complete: snapshot.is_complete(), }), )), }) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 9741495823dec..44cae87383d96 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -3065,40 +3065,13 @@ fn test_dynamic_filter_roundtrip() -> Result<()> { .downcast_ref::() .expect("Should be DynamicFilterPhysicalExpr"); - assert_dynamic_filter_fields_equal(df, deserialized_df); - - Ok(()) -} - -/// Asserts that dynamic filters have the same fields without asserting any pointer values. -/// Useful for testing filter equality after roundtrip. -fn assert_dynamic_filter_fields_equal( - df1: &DynamicFilterPhysicalExpr, - df2: &DynamicFilterPhysicalExpr, -) { - let (children1, remapped1, gen1, expr1, complete1) = df1.current_snapshot().unwrap(); - let (children2, remapped2, gen2, expr2, complete2) = df2.current_snapshot().unwrap(); - - assert_eq!(gen1, gen2, "Generation should be preserved"); - assert_eq!( - complete1, complete2, - "Completion status should be preserved" - ); - assert_eq!( - format!("{:?}", expr1), - format!("{:?}", expr2), - "Inner expression should be preserved" - ); assert_eq!( - format!("{:?}", children1), - format!("{:?}", children2), - "Base children should be preserved" - ); - assert_eq!( - format!("{:?}", remapped1), - format!("{:?}", remapped2), - "Remapped children should be preserved" + df.current_snapshot().to_string(), + deserialized_df.current_snapshot().to_string(), + "Snapshots should be equal" ); + + Ok(()) } /// Returns (outer_equal, inner_equal) @@ -3246,8 +3219,15 @@ fn test_deduplication_of_dynamic_filter_expression( .as_any() .downcast_ref::() .unwrap(); - assert_dynamic_filter_fields_equal(filter_1_before_roundtrip, df1); - assert_dynamic_filter_fields_equal(filter_2_before_roundtrip, df2); + + assert_eq!( + filter_1_before_roundtrip.current_snapshot().to_string(), + df1.current_snapshot().to_string() + ); + assert_eq!( + filter_2_before_roundtrip.current_snapshot().to_string(), + df2.current_snapshot().to_string() + ); Ok(()) } From 47784763bb35eb749dcb058074d4faf6ed2288ab Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Wed, 18 Feb 2026 22:43:56 +0000 Subject: [PATCH 5/5] wip --- .../src/expressions/dynamic_filters.rs | 127 ++++++++++++++---- 1 file changed, 102 insertions(+), 25 deletions(-) diff --git a/datafusion/physical-expr/src/expressions/dynamic_filters.rs b/datafusion/physical-expr/src/expressions/dynamic_filters.rs index 91003a196eccd..44154275c7d89 100644 --- a/datafusion/physical-expr/src/expressions/dynamic_filters.rs +++ b/datafusion/physical-expr/src/expressions/dynamic_filters.rs @@ -295,14 +295,16 @@ impl DynamicFilterPhysicalExpr { } } - /// Create a new [`DynamicFilterPhysicalExpr`] from [`self`], except, it overwrites the - /// internal state the source filter. + /// Create a new [`DynamicFilterPhysicalExpr`] from `self`, except it overwrites the + /// internal state with the source filter's state. /// /// This is a low-level API intended for use by the proto deserialization layer. /// - /// Safety: The dynamic filter should not be in use when calling this method, otherwise there + /// # Safety + /// + /// The dynamic filter should not be in use when calling this method, otherwise there /// may be undefined behavior. This method may do the following or worse: - /// - transition the state to [`FilterState::Complete`] without notifying the watch + /// - transition the state to complete without notifying the watch /// - cause a generation number to be emitted which is out of order pub fn new_from_source(&self, source: &DynamicFilterPhysicalExpr) -> Result { // Best effort check that no one is subscribed. @@ -633,15 +635,23 @@ mod test { &filter_schema_1, ) .unwrap(); - let snap = dynamic_filter_1.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); + let df1 = dynamic_filter_1 + .as_any() + .downcast_ref::() + .unwrap(); + let current1 = df1.current().unwrap(); + insta::assert_snapshot!(format!("{current1:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 0 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); let dynamic_filter_2 = reassign_expr_columns( Arc::clone(&dynamic_filter) as Arc, &filter_schema_2, ) .unwrap(); - let snap = dynamic_filter_2.snapshot().unwrap().unwrap(); - insta::assert_snapshot!(format!("{snap:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); + let df2 = dynamic_filter_2 + .as_any() + .downcast_ref::() + .unwrap(); + let current2 = df2.current().unwrap(); + insta::assert_snapshot!(format!("{current2:?}"), @r#"BinaryExpr { left: Column { name: "a", index: 1 }, op: Eq, right: Literal { value: Int32(42), field: Field { name: "lit", data_type: Int32 } }, fail_on_overflow: false }"#); // Both filters allow evaluating the same expression let batch_1 = RecordBatch::try_new( Arc::clone(&filter_schema_1), @@ -705,23 +715,6 @@ mod test { assert!(arr_1.eq(&expected)); } - #[test] - fn test_snapshot() { - let expr = lit(42) as Arc; - let dynamic_filter = DynamicFilterPhysicalExpr::new(vec![], Arc::clone(&expr)); - - // Take a snapshot of the current expression - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(expr)); - - // Update the current expression - let new_expr = lit(100) as Arc; - dynamic_filter.update(Arc::clone(&new_expr)).unwrap(); - // Take another snapshot - let snapshot = dynamic_filter.snapshot().unwrap(); - assert_eq!(snapshot, Some(new_expr)); - } - #[test] fn test_dynamic_filter_physical_expr_misbehaves_data_type_nullable() { let dynamic_filter = @@ -1003,4 +996,88 @@ mod test { "Hash should be stable after update (identity-based)" ); } + + #[test] + fn test_current_snapshot_roundtrip() { + let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, false)])); + let col_a = col("a", &schema).unwrap(); + + // Create a dynamic filter with children + let expr = Arc::new(BinaryExpr::new( + Arc::clone(&col_a), + datafusion_expr::Operator::Gt, + lit(10) as Arc, + )); + let filter = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_a)], + expr as Arc, + ); + + // Update expression and mark complete + filter + .update(lit(42) as Arc) + .expect("Update should succeed"); + filter.mark_complete(); + + // Take a snapshot and reconstruct + let snapshot = filter.current_snapshot(); + let reconstructed = DynamicFilterPhysicalExpr::new_from_snapshot(snapshot); + + // String representations should be equal + assert_eq!( + filter.current_snapshot().to_string(), + reconstructed.current_snapshot().to_string(), + ); + } + + #[test] + fn test_new_from_source() { + // Create a source filter + let source = Arc::new(DynamicFilterPhysicalExpr::new( + vec![], + lit(42) as Arc, + )); + + // Update and mark complete + source.update(lit(100) as Arc).unwrap(); + source.mark_complete(); + + // Create a target filter with different children + let schema = Arc::new(Schema::new(vec![Field::new("x", DataType::Int32, false)])); + let col_x = col("x", &schema).unwrap(); + let target = DynamicFilterPhysicalExpr::new( + vec![Arc::clone(&col_x)], + lit(0) as Arc, + ); + + // Create new filter from source's inner state + let combined = target.new_from_source(&source).unwrap(); + + // Verify inner state is shared (same inner_id) + assert_eq!( + combined.inner_id(), + source.inner_id(), + "new_from_source should share inner state with source" + ); + + // Verify children are from target, not source + let combined_snapshot = combined.current_snapshot(); + assert_eq!( + combined_snapshot.children().len(), + 1, + "Combined filter should have target's children" + ); + assert_eq!( + format!("{:?}", combined_snapshot.children()[0]), + format!("{:?}", col_x), + "Combined filter should have target's children" + ); + + // Verify inner expression comes from source + assert_eq!( + format!("{:?}", combined_snapshot.inner_expr()), + format!("{:?}", lit(100)), + "Combined filter should have source's inner expression" + ); + } }