[SPARK-54805][SS][PYTHON] Implement TwsTester in PySpark#53758
[SPARK-54805][SS][PYTHON] Implement TwsTester in PySpark#53758fedimser wants to merge 11 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== New Feature SPARK-54805 === This comment was automatically generated by GitHub Actions |
HeartSaVioR
left a comment
There was a problem hiding this comment.
Looks good in overall - not much thing to comment since the code change (in both code and test) mostly follows the Scala impl. I just commented about missing test cases and arguably redundant test cases probably being added by LLM.
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1 pending CI
@fedimser I see the CI failure - looks like it's more about infra? Could you please try rebasing with master again?
|
@HeartSaVioR , the error is failure to import pandas. It's not allowed to import pandas globally in PySpark code. So instead I will:
|
6a43bf8 to
4b799b5
Compare
|
Awesome, thanks for doing that. I roughly remember we tried to make pandas be optional in PySpark code, so if we can do that as well, we should do that. |
HeartSaVioR
left a comment
There was a problem hiding this comment.
+1 again, with pending CI
4b799b5 to
1c6bb4b
Compare
|
Tests that were failing now pass. There is only pyspark-connect test failing, timing out (twice). I think it is unrelated to this change, but let's try to get the green check. I rebased again to re-trigger all tests. |
|
@HeartSaVioR , in previous CI run all jobs passed except pyspark-connect. And in latest run (after rebase), pyspark-connect passed but other tests failed. Is this sufficient signal that this PR doesn't break any tests? |
|
Probably yes if you don't change the code but to retrigger the CI. |
|
https://github.com/fedimser/spark/actions/runs/21688225187/job/62553376983 It seems like the failure of modules are different which backs up the claim. Since we gave two times of trials, I'd love to avoid asking to trigger more. |
|
Thanks! Merging to master. |
### What changes were proposed in this pull request? * Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TrwansformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query. * Supported functionality: * Processing input rows and producing output rows via `test()`. * Initial state setup via constructor parameter. * Direct state manipulation via `setValueState`, `setListState`, `setMapState`. * Direct state inspection via `peekValueState`, `peekListState`, `peekMapState`. * Timers (both ProcessingTime and EventTime modes). * This is Python version of Scala TwsTester in apache#53159. ### Why are the changes needed? Some users requested unit testing functionality for TWS. ### Does this PR introduce _any_ user-facing change? Yes, it adds new public API to PySpark: * pyspark.sql.streaming.TwsTester ### How was this patch tested? Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query. ### Was this patch authored or co-authored using generative AI tooling? Yes. Cursor with claude-4.5-opus-high was used to assist with coding and generate some of documentation and tests. Generated-by: claude-4.5-opus-high Closes apache#53758 from fedimser/tws-tester-python-2. Lead-authored-by: Dmytro Fedoriaka <dmytro.fedoriaka@databricks.com> Co-authored-by: Dmytro Fedoriaka <fedimser@users.noreply.github.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
What changes were proposed in this pull request?
test().setValueState,setListState,setMapState.peekValueState,peekListState,peekMapState.Why are the changes needed?
Some users requested unit testing functionality for TWS.
Does this PR introduce any user-facing change?
Yes, it adds new public API to PySpark:
How was this patch tested?
Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.
Was this patch authored or co-authored using generative AI tooling?
Yes. Cursor with claude-4.5-opus-high was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-opus-high