diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index d722444379acd..5ec00bb204ebe 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -203,32 +203,36 @@ jobs: run: | # Convert to lowercase to meet Docker repo name requirement REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]') IMG_NAME="apache-spark-ci-image:${{ inputs.branch }}-${{ github.run_id }}" - IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" + IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME" echo "image_url=$IMG_URL" >> $GITHUB_OUTPUT - name: Generate infra image URL (Documentation) id: infra-image-docs-outputs run: | # Convert to lowercase to meet Docker repo name requirement REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]') IMG_NAME="apache-spark-ci-image-docs:${{ inputs.branch }}-${{ github.run_id }}" - IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" + IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME" echo "image_docs_url=$IMG_URL" >> $GITHUB_OUTPUT - name: Generate infra image URL (Linter) id: infra-image-lint-outputs run: | # Convert to lowercase to meet Docker repo name requirement REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]') IMG_NAME="apache-spark-ci-image-lint:${{ inputs.branch }}-${{ github.run_id }}" - IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" + IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME" echo "image_lint_url=$IMG_URL" >> $GITHUB_OUTPUT - name: Generate infra image URL (SparkR) id: infra-image-sparkr-outputs run: | # Convert to lowercase to meet Docker repo name requirement REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]') IMG_NAME="apache-spark-ci-image-sparkr:${{ inputs.branch }}-${{ github.run_id }}" - IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" + IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME" echo "image_sparkr_url=$IMG_URL" >> $GITHUB_OUTPUT - name: Generate infra image URL (PySpark ${{ env.PYSPARK_IMAGE_TO_TEST }}) id: infra-image-pyspark-outputs @@ -237,8 +241,9 @@ jobs: run: | # Convert to lowercase to meet Docker repo name requirement REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]') + REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]') IMG_NAME="apache-spark-ci-image-pyspark-${{ env.PYSPARK_IMAGE_TO_TEST }}:${{ inputs.branch }}-${{ github.run_id }}" - IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME" + IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME" echo "image_pyspark_url=$IMG_URL" >> $GITHUB_OUTPUT - name: Link the docker images id: infra-image-link diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 812f66b882956..ea7feb3464899 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -19,7 +19,7 @@ A wrapper class for Spark DataFrame to behave like pandas DataFrame. """ -from collections import defaultdict, namedtuple +from collections import defaultdict, deque, namedtuple from collections.abc import Mapping import re import warnings @@ -35,6 +35,7 @@ Callable, ClassVar, Dict, + Deque, Generic, IO, Iterable, @@ -77,6 +78,7 @@ from pyspark.sql import Column as PySparkColumn, DataFrame as PySparkDataFrame, functions as F from pyspark.sql.functions import pandas_udf from pyspark.sql.internal import InternalFunction as SF +from pyspark.sql.utils import is_remote from pyspark.sql.types import ( ArrayType, BooleanType, @@ -4848,10 +4850,11 @@ def diff(self, periods: int = 1, axis: Axis = 0) -> "DataFrame": Calculates the difference of a DataFrame element compared with another element in the DataFrame (default is the element in the same column of the previous row). - .. note:: the current implementation of diff uses Spark's Window without - specifying partition specification. This leads to moving all data into - a single partition in a single machine and could cause serious - performance degradation. Avoid this method with very large datasets. + .. versionchanged:: 4.1.0 + The implementation was optimized to use partition-local computation + instead of an unpartitioned Spark Window. This avoids + moving all data into a single partition and enables scalable processing + of large datasets. Parameters ---------- @@ -4913,7 +4916,126 @@ def diff(self, periods: int = 1, axis: Axis = 0) -> "DataFrame": if axis != 0: raise NotImplementedError('axis should be either 0 or "index" currently.') - return self._apply_series_op(lambda psser: psser._diff(periods), should_resolve=True) + if not isinstance(periods, int): + raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__) + + if is_remote(): + return self._apply_series_op(lambda psser: psser._diff(periods), should_resolve=True) + + internal = self._internal.resolved_copy + sdf = internal.spark_frame + + index_column_names = internal.index_spark_column_names + data_column_names = internal.data_spark_column_names + selected_column_names = index_column_names + data_column_names + [NATURAL_ORDER_COLUMN_NAME] + abs_periods = abs(periods) + + output_schema = StructType( + [ + *[ + StructField(field.name, field.dataType, field.nullable, field.metadata) + for field in sdf.select(*index_column_names).schema.fields + ], + *[ + StructField(field.name, DoubleType(), True, field.metadata) + for field in sdf.select(*data_column_names).schema.fields + ], + sdf.schema[NATURAL_ORDER_COLUMN_NAME], + ] + ) + + sdf_ordered = ( + sdf.select(*selected_column_names) + .repartitionByRange(F.col(NATURAL_ORDER_COLUMN_NAME)) + .sortWithinPartitions(NATURAL_ORDER_COLUMN_NAME) + ) + + def to_pdf(rows: List[Dict[str, Any]]) -> pd.DataFrame: + return pd.DataFrame(rows, columns=selected_column_names) + + def summarize_partition( + pid: int, iterator: Iterator[Row] + ) -> Iterator[Tuple[int, List[Dict[str, Any]], List[Dict[str, Any]]]]: + if abs_periods == 0: + yield pid, [], [] + else: + head: List[Dict[str, Any]] = [] + tail: Deque[Dict[str, Any]] = deque(maxlen=abs_periods) + for row in iterator: + row_dict = row.asDict(recursive=False) + if len(head) < abs_periods: + head.append(row_dict) + tail.append(row_dict) + yield pid, head, list(tail) + + summaries = { + pid: (head, tail) + for pid, head, tail in sdf_ordered.rdd.mapPartitionsWithIndex( + summarize_partition + ).collect() + } + ordered_pids = sorted(summaries) + + def take_previous(pid: int) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for previous_pid in reversed( + [candidate for candidate in ordered_pids if candidate < pid] + ): + rows = summaries[previous_pid][1] + rows + if len(rows) >= abs_periods: + return rows[-abs_periods:] + return rows + + def take_next(pid: int) -> List[Dict[str, Any]]: + rows: List[Dict[str, Any]] = [] + for next_pid in [candidate for candidate in ordered_pids if candidate > pid]: + rows.extend(summaries[next_pid][0]) + if len(rows) >= abs_periods: + return rows[:abs_periods] + return rows + + contexts = { + pid: (take_previous(pid) if periods > 0 else take_next(pid)) for pid in ordered_pids + } + contexts_bc = default_session().sparkContext.broadcast(contexts) + + def diff_partition(pid: int, iterator: Iterator[Row]) -> Iterator[Row]: + rows = [row.asDict(recursive=False) for row in iterator] + if len(rows) == 0: + return + + context = contexts_bc.value.get(pid, []) + if periods > 0: + pdf = to_pdf(context + rows) + diffed = pdf[data_column_names].diff(periods=periods).iloc[len(context) :] + else: + pdf = to_pdf(rows + context) + diffed = pdf[data_column_names].diff(periods=periods).iloc[: len(rows)] + diffed = diffed.astype(object).where(pd.notna(diffed), None) + + current = to_pdf(rows) + result = pd.concat( + [ + current[index_column_names].reset_index(drop=True), + diffed.reset_index(drop=True), + current[[NATURAL_ORDER_COLUMN_NAME]].reset_index(drop=True), + ], + axis=1, + ) + for record in result.to_dict("records"): + yield Row(**record) + + result_sdf = default_session().createDataFrame( + sdf_ordered.rdd.mapPartitionsWithIndex(diff_partition), schema=output_schema + ) + + new_internal = internal.copy( + spark_frame=result_sdf, + index_spark_columns=[scol_for(result_sdf, c) for c in index_column_names], + data_spark_columns=[scol_for(result_sdf, c) for c in data_column_names], + data_fields=None, + ) + return DataFrame(new_internal) def nunique( self, diff --git a/python/pyspark/pandas/series.py b/python/pyspark/pandas/series.py index 6496137824de2..fd2c2b9aa3cbe 100644 --- a/python/pyspark/pandas/series.py +++ b/python/pyspark/pandas/series.py @@ -4376,10 +4376,9 @@ def diff(self, periods: int = 1) -> "Series": Calculates the difference of a Series element compared with another element in the DataFrame (default is the element in the same column of the previous row). - .. note:: the current implementation of diff uses Spark's Window without - specifying partition specification. This leads to moving all data into - a single partition in a single machine and could cause serious - performance degradation. Avoid this method with very large datasets. + .. versionchanged:: 4.1.0 + The implementation was optimized to use partition-local computation + instead of an unpartitioned Spark Window. Parameters ---------- @@ -4435,18 +4434,21 @@ def diff(self, periods: int = 1) -> "Series": 5 NaN Name: c, dtype: float64 """ - return self._diff(periods).spark.analyzed + return first_series(self.to_frame().diff(periods=periods)).rename(self.name) def _diff(self, periods: int, *, part_cols: Sequence["ColumnOrName"] = ()) -> "Series": if not isinstance(periods, int): raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__) - window = ( - Window.partitionBy(*part_cols) - .orderBy(NATURAL_ORDER_COLUMN_NAME) - .rowsBetween(-periods, -periods) + window = Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME) + scol = (self.spark.column - F.lag(self.spark.column, periods).over(window)).cast( + DoubleType() + ) + return self._with_new_scol( + scol, + field=self._internal.data_fields[0].copy( + dtype=np.dtype("float64"), spark_type=DoubleType(), nullable=True + ), ) - scol = self.spark.column - F.lag(self.spark.column, periods).over(window) - return self._with_new_scol(scol, field=self._internal.data_fields[0].copy(nullable=True)) def idxmax(self, skipna: bool = True) -> Union[Tuple, Any]: """ diff --git a/python/pyspark/pandas/tests/computation/test_compute.py b/python/pyspark/pandas/tests/computation/test_compute.py index da217e6f51e50..4c0cee677b29e 100644 --- a/python/pyspark/pandas/tests/computation/test_compute.py +++ b/python/pyspark/pandas/tests/computation/test_compute.py @@ -19,6 +19,7 @@ import pandas as pd from pyspark.sql import functions as sf +from pyspark.sql.utils import is_remote from pyspark import pandas as ps from pyspark.testing.pandasutils import PandasOnSparkTestCase from pyspark.testing.sqlutils import SQLTestUtils @@ -194,7 +195,53 @@ def test_diff(self): self.assert_eq(pdf.diff(), psdf.diff()) self.assert_eq(pdf.diff().diff(-1), psdf.diff().diff(-1)) - self.assert_eq(pdf.diff().sum().astype(int), psdf.diff().sum()) + self.assert_eq(pdf.diff().sum(), psdf.diff().sum()) + if not is_remote(): + self.assertNotIn( + "Window", + psdf.diff()._internal.spark_frame._jdf.queryExecution().analyzed().toString(), + ) + + # Test multi-index + pdf_multi = pdf.copy() + pdf_multi.index = pd.MultiIndex.from_tuples( + [("A", 1), ("A", 2), ("B", 1), ("B", 2), ("C", 1), ("C", 2)] + ) + psdf_multi = ps.from_pandas(pdf_multi) + self.assert_eq(pdf_multi.diff(), psdf_multi.diff()) + + # Test empty + pdf_empty = pd.DataFrame({"a": []}, dtype=int) + psdf_empty = ps.from_pandas(pdf_empty) + self.assert_eq(pdf_empty.diff(), psdf_empty.diff()) + + # Test single partition + self.assert_eq(pdf.diff(), psdf.spark.repartition(1).diff()) + + # Test with nulls + pdf_nulls = pd.DataFrame({"a": [1, None, 3, 4, None, 6]}, index=np.random.rand(6)) + psdf_nulls = ps.from_pandas(pdf_nulls) + self.assert_eq(pdf_nulls.diff(), psdf_nulls.diff()) + + # Test negative and large periods + self.assert_eq(pdf.diff(periods=0), psdf.diff(periods=0)) + self.assert_eq(pdf.diff(periods=-2), psdf.diff(periods=-2)) + self.assert_eq(pdf.diff(periods=10), psdf.diff(periods=10)) + + # Test cross-partition boundary rows after physical repartitioning. + pdf_boundary = pd.DataFrame( + {"a": list(range(20)), "b": [value * value for value in range(20)]}, + index=np.random.rand(20), + ) + psdf_boundary = ps.from_pandas(pdf_boundary).spark.repartition(4) + expected_boundary = psdf_boundary.to_pandas() + self.assert_eq(expected_boundary.diff(periods=4), psdf_boundary.diff(periods=4)) + self.assert_eq(expected_boundary.diff(periods=-4), psdf_boundary.diff(periods=-4)) + + # Test Series.diff + self.assert_eq(pdf["a"].diff(), psdf["a"].diff()) + self.assert_eq(pdf["a"].diff(-1), psdf["a"].diff(-1)) + self.assert_eq(pdf["a"].diff(3), psdf["a"].diff(3)) msg = "should be an int" with self.assertRaisesRegex(TypeError, msg): diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_basic_slow.py b/python/pyspark/pandas/tests/diff_frames_ops/test_basic_slow.py index 0e545f0d182d1..b23c4b120b707 100644 --- a/python/pyspark/pandas/tests/diff_frames_ops/test_basic_slow.py +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_basic_slow.py @@ -156,11 +156,9 @@ def test_diff(self): ) psdf = ps.from_pandas(pdf) + self.assert_eq(pdf.diff().loc[pdf["Col1"] == 20], psdf.diff().loc[psdf["Col1"] == 20]) self.assert_eq( - pdf.diff().loc[pdf["Col1"] == 20].astype(int), psdf.diff().loc[psdf["Col1"] == 20] - ) - self.assert_eq( - pdf["Col2"].diff().loc[pdf["Col1"] == 20].astype(int), + pdf["Col2"].diff().loc[pdf["Col1"] == 20], psdf["Col2"].diff().loc[psdf["Col1"] == 20], ) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py index 18084d8176ffc..a8fa59383f396 100644 --- a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py @@ -57,7 +57,7 @@ def test_diff(self): pdf.groupby(pkey)[["a"]].diff().sort_index(), ) - self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum().astype(int)) + self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum()) self.assert_eq(psdf.groupby(kkey)["a"].diff().sum(), pdf.groupby(pkey)["a"].diff().sum())