Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -203,32 +203,36 @@ jobs:
run: |
# Convert to lowercase to meet Docker repo name requirement
REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]')
IMG_NAME="apache-spark-ci-image:${{ inputs.branch }}-${{ github.run_id }}"
IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME"
IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME"
echo "image_url=$IMG_URL" >> $GITHUB_OUTPUT
- name: Generate infra image URL (Documentation)
id: infra-image-docs-outputs
run: |
# Convert to lowercase to meet Docker repo name requirement
REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]')
IMG_NAME="apache-spark-ci-image-docs:${{ inputs.branch }}-${{ github.run_id }}"
IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME"
IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME"
echo "image_docs_url=$IMG_URL" >> $GITHUB_OUTPUT
- name: Generate infra image URL (Linter)
id: infra-image-lint-outputs
run: |
# Convert to lowercase to meet Docker repo name requirement
REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]')
IMG_NAME="apache-spark-ci-image-lint:${{ inputs.branch }}-${{ github.run_id }}"
IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME"
IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME"
echo "image_lint_url=$IMG_URL" >> $GITHUB_OUTPUT
- name: Generate infra image URL (SparkR)
id: infra-image-sparkr-outputs
run: |
# Convert to lowercase to meet Docker repo name requirement
REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]')
IMG_NAME="apache-spark-ci-image-sparkr:${{ inputs.branch }}-${{ github.run_id }}"
IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME"
IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME"
echo "image_sparkr_url=$IMG_URL" >> $GITHUB_OUTPUT
- name: Generate infra image URL (PySpark ${{ env.PYSPARK_IMAGE_TO_TEST }})
id: infra-image-pyspark-outputs
Expand All @@ -237,8 +241,9 @@ jobs:
run: |
# Convert to lowercase to meet Docker repo name requirement
REPO_OWNER=$(echo "${{ github.repository_owner }}" | tr '[:upper:]' '[:lower:]')
REPO_NAME=$(echo "${{ github.event.repository.name }}" | tr '[:upper:]' '[:lower:]')
IMG_NAME="apache-spark-ci-image-pyspark-${{ env.PYSPARK_IMAGE_TO_TEST }}:${{ inputs.branch }}-${{ github.run_id }}"
IMG_URL="ghcr.io/$REPO_OWNER/$IMG_NAME"
IMG_URL="ghcr.io/$REPO_OWNER/$REPO_NAME/$IMG_NAME"
echo "image_pyspark_url=$IMG_URL" >> $GITHUB_OUTPUT
- name: Link the docker images
id: infra-image-link
Expand Down
134 changes: 128 additions & 6 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
A wrapper class for Spark DataFrame to behave like pandas DataFrame.
"""

from collections import defaultdict, namedtuple
from collections import defaultdict, deque, namedtuple
from collections.abc import Mapping
import re
import warnings
Expand All @@ -35,6 +35,7 @@
Callable,
ClassVar,
Dict,
Deque,
Generic,
IO,
Iterable,
Expand Down Expand Up @@ -77,6 +78,7 @@
from pyspark.sql import Column as PySparkColumn, DataFrame as PySparkDataFrame, functions as F
from pyspark.sql.functions import pandas_udf
from pyspark.sql.internal import InternalFunction as SF
from pyspark.sql.utils import is_remote
from pyspark.sql.types import (
ArrayType,
BooleanType,
Expand Down Expand Up @@ -4848,10 +4850,11 @@ def diff(self, periods: int = 1, axis: Axis = 0) -> "DataFrame":
Calculates the difference of a DataFrame element compared with another element in the
DataFrame (default is the element in the same column of the previous row).

.. note:: the current implementation of diff uses Spark's Window without
specifying partition specification. This leads to moving all data into
a single partition in a single machine and could cause serious
performance degradation. Avoid this method with very large datasets.
.. versionchanged:: 4.1.0
The implementation was optimized to use partition-local computation
instead of an unpartitioned Spark Window. This avoids
moving all data into a single partition and enables scalable processing
of large datasets.

Parameters
----------
Expand Down Expand Up @@ -4913,7 +4916,126 @@ def diff(self, periods: int = 1, axis: Axis = 0) -> "DataFrame":
if axis != 0:
raise NotImplementedError('axis should be either 0 or "index" currently.')

return self._apply_series_op(lambda psser: psser._diff(periods), should_resolve=True)
if not isinstance(periods, int):
raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__)

if is_remote():
return self._apply_series_op(lambda psser: psser._diff(periods), should_resolve=True)

internal = self._internal.resolved_copy
sdf = internal.spark_frame

index_column_names = internal.index_spark_column_names
data_column_names = internal.data_spark_column_names
selected_column_names = index_column_names + data_column_names + [NATURAL_ORDER_COLUMN_NAME]
abs_periods = abs(periods)

output_schema = StructType(
[
*[
StructField(field.name, field.dataType, field.nullable, field.metadata)
for field in sdf.select(*index_column_names).schema.fields
],
*[
StructField(field.name, DoubleType(), True, field.metadata)
for field in sdf.select(*data_column_names).schema.fields
],
sdf.schema[NATURAL_ORDER_COLUMN_NAME],
]
)

sdf_ordered = (
sdf.select(*selected_column_names)
.repartitionByRange(F.col(NATURAL_ORDER_COLUMN_NAME))
.sortWithinPartitions(NATURAL_ORDER_COLUMN_NAME)
)

def to_pdf(rows: List[Dict[str, Any]]) -> pd.DataFrame:
return pd.DataFrame(rows, columns=selected_column_names)

def summarize_partition(
pid: int, iterator: Iterator[Row]
) -> Iterator[Tuple[int, List[Dict[str, Any]], List[Dict[str, Any]]]]:
if abs_periods == 0:
yield pid, [], []
else:
head: List[Dict[str, Any]] = []
tail: Deque[Dict[str, Any]] = deque(maxlen=abs_periods)
for row in iterator:
row_dict = row.asDict(recursive=False)
if len(head) < abs_periods:
head.append(row_dict)
tail.append(row_dict)
yield pid, head, list(tail)

summaries = {
pid: (head, tail)
for pid, head, tail in sdf_ordered.rdd.mapPartitionsWithIndex(
summarize_partition
).collect()
}
ordered_pids = sorted(summaries)

def take_previous(pid: int) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for previous_pid in reversed(
[candidate for candidate in ordered_pids if candidate < pid]
):
rows = summaries[previous_pid][1] + rows
if len(rows) >= abs_periods:
return rows[-abs_periods:]
return rows

def take_next(pid: int) -> List[Dict[str, Any]]:
rows: List[Dict[str, Any]] = []
for next_pid in [candidate for candidate in ordered_pids if candidate > pid]:
rows.extend(summaries[next_pid][0])
if len(rows) >= abs_periods:
return rows[:abs_periods]
return rows

contexts = {
pid: (take_previous(pid) if periods > 0 else take_next(pid)) for pid in ordered_pids
}
contexts_bc = default_session().sparkContext.broadcast(contexts)

def diff_partition(pid: int, iterator: Iterator[Row]) -> Iterator[Row]:
rows = [row.asDict(recursive=False) for row in iterator]
if len(rows) == 0:
return

context = contexts_bc.value.get(pid, [])
if periods > 0:
pdf = to_pdf(context + rows)
diffed = pdf[data_column_names].diff(periods=periods).iloc[len(context) :]
else:
pdf = to_pdf(rows + context)
diffed = pdf[data_column_names].diff(periods=periods).iloc[: len(rows)]
diffed = diffed.astype(object).where(pd.notna(diffed), None)

current = to_pdf(rows)
result = pd.concat(
[
current[index_column_names].reset_index(drop=True),
diffed.reset_index(drop=True),
current[[NATURAL_ORDER_COLUMN_NAME]].reset_index(drop=True),
],
axis=1,
)
for record in result.to_dict("records"):
yield Row(**record)

result_sdf = default_session().createDataFrame(
sdf_ordered.rdd.mapPartitionsWithIndex(diff_partition), schema=output_schema
)

new_internal = internal.copy(
spark_frame=result_sdf,
index_spark_columns=[scol_for(result_sdf, c) for c in index_column_names],
data_spark_columns=[scol_for(result_sdf, c) for c in data_column_names],
data_fields=None,
)
return DataFrame(new_internal)

def nunique(
self,
Expand Down
24 changes: 13 additions & 11 deletions python/pyspark/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -4376,10 +4376,9 @@ def diff(self, periods: int = 1) -> "Series":
Calculates the difference of a Series element compared with another element in the
DataFrame (default is the element in the same column of the previous row).

.. note:: the current implementation of diff uses Spark's Window without
specifying partition specification. This leads to moving all data into
a single partition in a single machine and could cause serious
performance degradation. Avoid this method with very large datasets.
.. versionchanged:: 4.1.0
The implementation was optimized to use partition-local computation
instead of an unpartitioned Spark Window.

Parameters
----------
Expand Down Expand Up @@ -4435,18 +4434,21 @@ def diff(self, periods: int = 1) -> "Series":
5 NaN
Name: c, dtype: float64
"""
return self._diff(periods).spark.analyzed
return first_series(self.to_frame().diff(periods=periods)).rename(self.name)

def _diff(self, periods: int, *, part_cols: Sequence["ColumnOrName"] = ()) -> "Series":
if not isinstance(periods, int):
raise TypeError("periods should be an int; however, got [%s]" % type(periods).__name__)
window = (
Window.partitionBy(*part_cols)
.orderBy(NATURAL_ORDER_COLUMN_NAME)
.rowsBetween(-periods, -periods)
window = Window.partitionBy(*part_cols).orderBy(NATURAL_ORDER_COLUMN_NAME)
scol = (self.spark.column - F.lag(self.spark.column, periods).over(window)).cast(
DoubleType()
)
return self._with_new_scol(
scol,
field=self._internal.data_fields[0].copy(
dtype=np.dtype("float64"), spark_type=DoubleType(), nullable=True
),
)
scol = self.spark.column - F.lag(self.spark.column, periods).over(window)
return self._with_new_scol(scol, field=self._internal.data_fields[0].copy(nullable=True))

def idxmax(self, skipna: bool = True) -> Union[Tuple, Any]:
"""
Expand Down
49 changes: 48 additions & 1 deletion python/pyspark/pandas/tests/computation/test_compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pandas as pd

from pyspark.sql import functions as sf
from pyspark.sql.utils import is_remote
from pyspark import pandas as ps
from pyspark.testing.pandasutils import PandasOnSparkTestCase
from pyspark.testing.sqlutils import SQLTestUtils
Expand Down Expand Up @@ -194,7 +195,53 @@ def test_diff(self):

self.assert_eq(pdf.diff(), psdf.diff())
self.assert_eq(pdf.diff().diff(-1), psdf.diff().diff(-1))
self.assert_eq(pdf.diff().sum().astype(int), psdf.diff().sum())
self.assert_eq(pdf.diff().sum(), psdf.diff().sum())
if not is_remote():
self.assertNotIn(
"Window",
psdf.diff()._internal.spark_frame._jdf.queryExecution().analyzed().toString(),
)

# Test multi-index
pdf_multi = pdf.copy()
pdf_multi.index = pd.MultiIndex.from_tuples(
[("A", 1), ("A", 2), ("B", 1), ("B", 2), ("C", 1), ("C", 2)]
)
psdf_multi = ps.from_pandas(pdf_multi)
self.assert_eq(pdf_multi.diff(), psdf_multi.diff())

# Test empty
pdf_empty = pd.DataFrame({"a": []}, dtype=int)
psdf_empty = ps.from_pandas(pdf_empty)
self.assert_eq(pdf_empty.diff(), psdf_empty.diff())

# Test single partition
self.assert_eq(pdf.diff(), psdf.spark.repartition(1).diff())

# Test with nulls
pdf_nulls = pd.DataFrame({"a": [1, None, 3, 4, None, 6]}, index=np.random.rand(6))
psdf_nulls = ps.from_pandas(pdf_nulls)
self.assert_eq(pdf_nulls.diff(), psdf_nulls.diff())

# Test negative and large periods
self.assert_eq(pdf.diff(periods=0), psdf.diff(periods=0))
self.assert_eq(pdf.diff(periods=-2), psdf.diff(periods=-2))
self.assert_eq(pdf.diff(periods=10), psdf.diff(periods=10))

# Test cross-partition boundary rows after physical repartitioning.
pdf_boundary = pd.DataFrame(
{"a": list(range(20)), "b": [value * value for value in range(20)]},
index=np.random.rand(20),
)
psdf_boundary = ps.from_pandas(pdf_boundary).spark.repartition(4)
expected_boundary = psdf_boundary.to_pandas()
self.assert_eq(expected_boundary.diff(periods=4), psdf_boundary.diff(periods=4))
self.assert_eq(expected_boundary.diff(periods=-4), psdf_boundary.diff(periods=-4))

# Test Series.diff
self.assert_eq(pdf["a"].diff(), psdf["a"].diff())
self.assert_eq(pdf["a"].diff(-1), psdf["a"].diff(-1))
self.assert_eq(pdf["a"].diff(3), psdf["a"].diff(3))

msg = "should be an int"
with self.assertRaisesRegex(TypeError, msg):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,9 @@ def test_diff(self):
)
psdf = ps.from_pandas(pdf)

self.assert_eq(pdf.diff().loc[pdf["Col1"] == 20], psdf.diff().loc[psdf["Col1"] == 20])
self.assert_eq(
pdf.diff().loc[pdf["Col1"] == 20].astype(int), psdf.diff().loc[psdf["Col1"] == 20]
)
self.assert_eq(
pdf["Col2"].diff().loc[pdf["Col1"] == 20].astype(int),
pdf["Col2"].diff().loc[pdf["Col1"] == 20],
psdf["Col2"].diff().loc[psdf["Col1"] == 20],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def test_diff(self):
pdf.groupby(pkey)[["a"]].diff().sort_index(),
)

self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum().astype(int))
self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum())
self.assert_eq(psdf.groupby(kkey)["a"].diff().sum(), pdf.groupby(pkey)["a"].diff().sum())


Expand Down