Skip to content

[SPARK-54805][SS][PYTHON] Implement TwsTester in PySpark#53758

Closed
fedimser wants to merge 11 commits intoapache:masterfrom
fedimser:tws-tester-python-2
Closed

[SPARK-54805][SS][PYTHON] Implement TwsTester in PySpark#53758
fedimser wants to merge 11 commits intoapache:masterfrom
fedimser:tws-tester-python-2

Conversation

@fedimser
Copy link
Copy Markdown
Contributor

@fedimser fedimser commented Jan 10, 2026

What changes were proposed in this pull request?

  • Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TrwansformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
  • Supported functionality:
    • Processing input rows and producing output rows via test().
    • Initial state setup via constructor parameter.
    • Direct state manipulation via setValueState, setListState, setMapState.
    • Direct state inspection via peekValueState, peekListState, peekMapState.
    • Timers (both ProcessingTime and EventTime modes).
  • This is Python version of Scala TwsTester in [SPARK-54122][SS] Implement TwsTester in Scala #53159.

Why are the changes needed?

Some users requested unit testing functionality for TWS.

Does this PR introduce any user-facing change?

Yes, it adds new public API to PySpark:

  • pyspark.sql.streaming.TwsTester

How was this patch tested?

Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.

Was this patch authored or co-authored using generative AI tooling?

Yes. Cursor with claude-4.5-opus-high was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-opus-high

@github-actions
Copy link
Copy Markdown

JIRA Issue Information

=== New Feature SPARK-54805 ===
Summary: Improve the testing experience for TransformWithState in PySpark
Assignee: None
Status: Open
Affected: ["4.2.0"]


This comment was automatically generated by GitHub Actions

@fedimser fedimser marked this pull request as ready for review January 10, 2026 18:19
@HeartSaVioR HeartSaVioR changed the title [SPARK-54805] Implement TwsTester in PySpark [SPARK-54805][SS][PYTHON] Implement TwsTester in PySpark Jan 30, 2026
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good in overall - not much thing to comment since the code change (in both code and test) mostly follows the Scala impl. I just commented about missing test cases and arguably redundant test cases probably being added by LLM.

Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py Outdated
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py Outdated
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py Outdated
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py Outdated
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
Comment thread python/pyspark/sql/tests/pandas/streaming/test_tws_tester.py
@fedimser fedimser requested a review from HeartSaVioR February 2, 2026 21:53
Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pending CI

@fedimser I see the CI failure - looks like it's more about infra? Could you please try rebasing with master again?

@fedimser
Copy link
Copy Markdown
Contributor Author

fedimser commented Feb 4, 2026

@HeartSaVioR , the error is failure to import pandas.

It's not allowed to import pandas globally in PySpark code. So instead I will:

  1. For type annotations - use pyspark.sql.pandas._typing.DataFrameLike (exactly how it's done in stateful_processor.py).
  2. There is one place where we need pandas - where we create Dataframe from a list of DataFrames. I moved it to a separate function _asPandasDataFrame and imported pandas locally.

@fedimser fedimser force-pushed the tws-tester-python-2 branch from 6a43bf8 to 4b799b5 Compare February 4, 2026 20:02
@HeartSaVioR
Copy link
Copy Markdown
Contributor

Awesome, thanks for doing that. I roughly remember we tried to make pandas be optional in PySpark code, so if we can do that as well, we should do that.

Copy link
Copy Markdown
Contributor

@HeartSaVioR HeartSaVioR left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 again, with pending CI

@fedimser fedimser force-pushed the tws-tester-python-2 branch from 4b799b5 to 1c6bb4b Compare February 4, 2026 21:01
@fedimser
Copy link
Copy Markdown
Contributor Author

fedimser commented Feb 4, 2026

Tests that were failing now pass. There is only pyspark-connect test failing, timing out (twice). I think it is unrelated to this change, but let's try to get the green check. I rebased again to re-trigger all tests.

@fedimser
Copy link
Copy Markdown
Contributor Author

fedimser commented Feb 5, 2026

@HeartSaVioR , in previous CI run all jobs passed except pyspark-connect. And in latest run (after rebase), pyspark-connect passed but other tests failed. Is this sufficient signal that this PR doesn't break any tests?

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Probably yes if you don't change the code but to retrigger the CI.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

https://github.com/fedimser/spark/actions/runs/21688225187/job/62553376983
https://github.com/fedimser/spark/actions/runs/21692971251/job/62556992226

It seems like the failure of modules are different which backs up the claim. Since we gave two times of trials, I'd love to avoid asking to trigger more.

@HeartSaVioR
Copy link
Copy Markdown
Contributor

Thanks! Merging to master.

rpnkv pushed a commit to rpnkv/spark that referenced this pull request Feb 18, 2026
### What changes were proposed in this pull request?

* Added TwsTester, a test helper for writing unit tests for StatefulProcessor implementations that will be used in TrwansformWithState operator in streaming queries. It processes input rows and returns output rows equivalent to those that would be produced by the processor in an actual Spark streaming query.
* Supported functionality:
   * Processing input rows and producing output rows via `test()`.
   * Initial state setup via constructor parameter.
   * Direct state manipulation via `setValueState`, `setListState`, `setMapState`.
   * Direct state inspection via `peekValueState`, `peekListState`, `peekMapState`.
   * Timers (both ProcessingTime and EventTime modes).
* This is Python version of Scala TwsTester in apache#53159.

### Why are the changes needed?

Some users requested unit testing functionality for TWS.

### Does this PR introduce _any_ user-facing change?

Yes, it adds new public API to PySpark:
* pyspark.sql.streaming.TwsTester

### How was this patch tested?

Added unit and end-to-end tests in this PR. End-to-end tests compare TwsTester output with results of a real streaming query.

### Was this patch authored or co-authored using generative AI tooling?

Yes. Cursor with claude-4.5-opus-high was used to assist with coding and generate some of documentation and tests.
Generated-by: claude-4.5-opus-high

Closes apache#53758 from fedimser/tws-tester-python-2.

Lead-authored-by: Dmytro Fedoriaka <dmytro.fedoriaka@databricks.com>
Co-authored-by: Dmytro Fedoriaka <fedimser@users.noreply.github.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants