Skip to content

[SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for grouped map pandas UDF#54533

Open
Yicong-Huang wants to merge 4 commits intoapache:masterfrom
Yicong-Huang:SPARK-55726/benchmark/grouped-map-pandas-udf
Open

[SPARK-55726][PYTHON][TEST] Add ASV microbenchmark for grouped map pandas UDF#54533
Yicong-Huang wants to merge 4 commits intoapache:masterfrom
Yicong-Huang:SPARK-55726/benchmark/grouped-map-pandas-udf

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Feb 27, 2026

What changes were proposed in this pull request?

Add an ASV microbenchmark for SQL_GROUPED_MAP_PANDAS_UDF in python/benchmarks/bench_eval_type.py.

The benchmark simulates the full worker.py pipeline by constructing the complete binary protocol that main(infile, outfile) expects.

Large groups (100k rows/group) are split into Arrow sub-batches of 10k rows via spark.sql.execution.arrow.maxRecordsPerBatch (default), passed through RunnerConf, mirroring the JVM-side behaviour.

Why are the changes needed?

This is part of SPARK-55724 to add per-eval-type microbenchmarks for PySpark UDF worker pipelines. These benchmarks help catch performance regressions in the Python-side serialization/deserialization path (e.g., SPARK-55459 fixed a 3x regression in applyInPandas).

Does this PR introduce any user-facing change?

No.

How was this patch tested?

asv run --python=same -a repeat='(5,10,10.0)' on Apple M4 Max:

GroupedMapPandasUDFBench:
  time_small_groups_few_cols     (1k rows/group,   5 cols, 1500 groups):  884±3ms
  peakmem_small_groups_few_cols                                         : 1.96 G
  time_small_groups_many_cols    (1k rows/group,  50 cols,  200 groups):  741±5ms
  peakmem_small_groups_many_cols                                        : 1.99 G
  time_large_groups_few_cols     (100k rows/group, 5 cols,  350 groups):  786±60ms
  peakmem_large_groups_few_cols                                         : 3.19 G
  time_large_groups_many_cols    (100k rows/group,50 cols,   40 groups):  681±50ms
  peakmem_large_groups_many_cols                                        : 3.74 G
  time_mixed_types               (mixed cols, 1-arg UDF,   1300 groups):  884±3ms
  peakmem_mixed_types                                                   : 1.90 G
  time_mixed_types_two_args      (mixed cols, 2-arg UDF,   1600 groups):  822±4ms
  peakmem_mixed_types_two_args                                          : 1.91 G

Was this patch authored or co-authored using generative AI tooling?

No.

@Yicong-Huang Yicong-Huang marked this pull request as draft February 27, 2026 05:21
@Yicong-Huang Yicong-Huang force-pushed the SPARK-55726/benchmark/grouped-map-pandas-udf branch from c08abf5 to 6c7997f Compare February 27, 2026 07:53
import os
import sys

sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this line for?

Copy link
Contributor Author

@Yicong-Huang Yicong-Huang Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

asv framework cannot find pyspark. this line uses relative path of python/benchmarks folder to pyspark PYTHONPATH. I am not sure if there is a better way to do so. The existing ASV benchmarks don’t import PySpark, they only exercise pyarrow and pandas (both installed in the virtualenv), so they haven’t run into this issue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we change the path in bench_eval_type.py?

Copy link
Contributor Author

@Yicong-Huang Yicong-Huang Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we will put more benchmark against pyspark code in the future, so leaving it in the __init__ can help it to be shared?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or, we could move things under python/benchmarks/pyspark/ so that other pyarrow/pandas benchmarks won't be affected.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fangchenli any suggested way to config it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fangchenli any suggested way to config it?

If --python=same is used and pyspark is installed in the current env, you don't need to modify the path here. Another way is to remove --python=same, asv would create a new venv and install pyspark there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is I want asv to benchmark the current source code instead of the pyspark installed from pip.

Copy link
Contributor

@fangchenli fangchenli Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean installing the source code in the current env.

First, install the current source code in editable mode via pip install -e python/packaging/classic
If you do pip list or conda list, you should see something like
pyspark 4.2.0.dev0 ...../spark/python/packaging/classic

Then, when you set --python=same, pyspark is discoverable to asv.

Without --python=same, asv would create a new venv, build a pyspark wheel from the current source code, then install the wheel in the new venv and run the benchmarks. The build and install commands are here:

"build_command": [
"python -m pip wheel --no-deps -w {build_cache_dir} {conf_dir}/../packaging/classic"
],
"install_command": [
"python -m pip install --find-links {build_cache_dir} pyspark"
],

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've tested the editable mode and the without --python=same, both method work! I've removed the hack in __init__.py.

@Yicong-Huang Yicong-Huang marked this pull request as ready for review February 27, 2026 08:02
eval_type = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF

# ---- varying group size (float data, identity UDF) ----
for name, (rows_per_group, n_cols, num_groups) in {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it also split big group? (following spark.sql.execution.arrow.maxRecordsPerBatch which defaults to 10000`)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah good point. as I am simulating the input, those simulations current do not respect these configs. I can update it to split so that we can benchmark the concatenation logic in side eval type as well. Any other configs that I should consider in such as simulation?

Copy link
Contributor Author

@Yicong-Huang Yicong-Huang Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simulated split and default at 10000. benchmark results updated. large group would take a bit longer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am afraid spark.sql.execution.arrow.maxRecordsPerBatch doesn't take effect in Python side, the batching should happen in the JVM side.

def setup(self):
eval_type = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
runner_conf = {
"spark.sql.execution.arrow.maxRecordsPerBatch": self._MAX_RECORDS_PER_BATCH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect in the python side, SQL_GROUPED_MAP_PANDAS_UDF doesn't have to respect maxRecordsPerBatch, or maxRecordsPerBatch doesn't take effect.

Because the batching actually happens in the JVM side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

understood. I see this only affects TransformWithState related serializer. I will remove this from runner_conf but still simulate the split.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH, I am still not 100% sure whether we need to simulate the split

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think simulating is needed. during my refactoring a big performance diff is on concatenating batches, or iterating batches. So adding the split is more real.

@Yicong-Huang Yicong-Huang force-pushed the SPARK-55726/benchmark/grouped-map-pandas-udf branch from 9677aae to 60054db Compare February 28, 2026 07:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants