proto: serialize and dedupe dynamic filters#20416
Draft
jayshrivastava wants to merge 1 commit intoapache:mainfrom
Draft
proto: serialize and dedupe dynamic filters#20416jayshrivastava wants to merge 1 commit intoapache:mainfrom
jayshrivastava wants to merge 1 commit intoapache:mainfrom
Conversation
Informs: datafusion-contrib/datafusion-distributed#180 Closes: apache#20418 Consider this scenario 1. You have a plan with a `HashJoinExec` and `DataSourceExec` 2. You run the physical optimizer and the `DataSourceExec` accepts `DynamicFilterPhysicalExpr` pushdown from the `HashJoinExec` 3. You serialize the plan, deserialize it, and execute it What should happen is that the dynamic filter should "work", meaning 1. When you deserialize the plan, both the `HashJoinExec` and `DataSourceExec` should have pointers to the same `DynamicFilterPhysicalExpr` 2. The `DynamicFilterPhysicalExpr` should be updated during execution by the `HashJoinExec` and the `DataSourceExec` should filter out rows This does not happen today for a few reasons, a couple of which this PR aims to address 1. `DynamicFilterPhysicalExpr` is not survive round-tripping. The internal exprs get inlined (ex. it may be serialized as `Literal`) 2. Even if `DynamicFilterPhysicalExpr` survives round-tripping, during pushdown, it's often the case that the `DynamicFilterPhysicalExpr` is rewritten. In this case, you have two `DynamicFilterPhysicalExpr` which are different `Arc`s but share the same `Inner` dynamic filter state. The current `DeduplicatingProtoConverter` does not handle this specific form of deduping. This PR aims to fix those problems by adding serde for `DynamicFilterPhysicalExpr` and deduping logic for the inner state of dynamic filters. It does not yet add a test for the `HashJoinExec` and `DataSourceExec` filter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers. Yes, via unit tests. `DynamicFilterPhysicalExpr` are now serialized by the default codec
158f2cf to
e0c6be3
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Informs: datafusion-contrib/datafusion-distributed#180
Closes: #20418
Rationale for this change
Consider this scenario
HashJoinExecandDataSourceExecDataSourceExecacceptsDynamicFilterPhysicalExprpushdown from theHashJoinExecWhat should happen is that the dynamic filter should "work", meaning
HashJoinExecandDataSourceExecshould have pointers to the sameDynamicFilterPhysicalExprDynamicFilterPhysicalExprshould be updated during execution by theHashJoinExecand theDataSourceExecshould filter out rowsThis does not happen today for a few reasons, a couple of which this PR aims to address
DynamicFilterPhysicalExpris not survive round-tripping. The internal exprs get inlined (ex. it may be serialized asLiteral)DynamicFilterPhysicalExprsurvives round-tripping, during pushdown, it's often the case that theDynamicFilterPhysicalExpris rewritten. In this case, you have twoDynamicFilterPhysicalExprwhich are differentArcs but share the sameInnerdynamic filter state. The currentDeduplicatingProtoConverterdoes not handle this specific form of deduping.This PR aims to fix those problems by adding serde for
DynamicFilterPhysicalExprand deduping logic for the inner state of dynamic filters.It does not yet add a test for the
HashJoinExecandDataSourceExecfilter pushdown case, but this is relevant follow up work. I tried to keep the PR small for reviewers.Are these changes tested?
Yes, via unit tests.
Are there any user-facing changes?
DynamicFilterPhysicalExprare now serialized by the default codec