Skip to content

[SPARK-38946][PYTHON][PS] Generates a new dataframe instead of operating inplace in setitem #36353

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/docs/source/migration_guide/pyspark_3.3_to_3.4.rst
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,5 @@ Upgrading from PySpark 3.3 to 3.4
* In Spark 3.4, the infer schema process of ``groupby.apply`` in Pandas on Spark, will first infer the pandas type to ensure the accuracy of the pandas ``dtype`` as much as possible.

* In Spark 3.4, the ``Series.concat`` sort parameter will be respected to follow pandas 1.4 behaviors.

* In Spark 3.4, the ``DataFrame.__setitem__`` will make a copy and replace pre-existing arrays, which will NOT be over-written to follow pandas 1.4 behaviors.
26 changes: 19 additions & 7 deletions python/pyspark/pandas/frame.py
Original file line number Diff line number Diff line change
Expand Up @@ -498,20 +498,30 @@ def _internal(self) -> InternalFrame:
return cast(InternalFrame, self._internal_frame) # type: ignore[has-type]

def _update_internal_frame(
self, internal: InternalFrame, requires_same_anchor: bool = True
self,
internal: InternalFrame,
requires_same_anchor: bool = True,
anchor_force_disconnect: bool = False,
) -> None:
"""
Update InternalFrame with the given one.

If the column_label is changed or the new InternalFrame is not the same `anchor`,
disconnect the link to the Series and create a new one.
If the column_label is changed or the new InternalFrame is not the same `anchor` or the
`anchor_force_disconnect` flag is set to True, disconnect the original anchor and create
a new one.

If `requires_same_anchor` is `False`, checking whether or not the same anchor is ignored
and force to update the InternalFrame, e.g., replacing the internal with the resolved_copy,
updating the underlying Spark DataFrame which need to combine a different Spark DataFrame.

:param internal: the new InternalFrame
:param requires_same_anchor: whether checking the same anchor
Parameters
----------
internal : InternalFrame
The new InternalFrame
requires_same_anchor : bool
Whether checking the same anchor
anchor_force_disconnect : bool
Force to disconnect the original anchor and create a new one
"""
from pyspark.pandas.series import Series

Expand All @@ -527,7 +537,7 @@ def _update_internal_frame(
renamed = old_label != new_label
not_same_anchor = requires_same_anchor and not same_anchor(internal, psser)

if renamed or not_same_anchor:
if renamed or not_same_anchor or anchor_force_disconnect:
psdf: DataFrame = DataFrame(self._internal.select_column(old_label))
psser._update_anchor(psdf)
psser = None
Expand Down Expand Up @@ -12903,7 +12913,9 @@ def assign_columns(
# Same Series.
psdf = self._assign({key: value})

self._update_internal_frame(psdf._internal)
# Since Spark 3.4, df.__setitem__ generates a new dataframe instead of operating
# in-place to follow pandas v1.4 behavior, see also SPARK-38946.
self._update_internal_frame(psdf._internal, anchor_force_disconnect=True)

@staticmethod
def _index_normalized_label(level: int, labels: Union[Name, Sequence[Name]]) -> List[Label]:
Expand Down
43 changes: 32 additions & 11 deletions python/pyspark/pandas/tests/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,14 @@ def test_inplace(self):
psdf["a"] = psdf["a"] + 10

self.assert_eq(psdf, pdf)
self.assert_eq(psser, pser)
# SPARK-38946: Since Spark 3.4, df.__setitem__ generate a new dataframe to follow
# pandas 1.4 behaviors
if LooseVersion(pd.__version__) >= LooseVersion("1.4.0"):
self.assert_eq(psser, pser)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like this is a common test with the old pandas?
Shall we move it out of if LooseVersion(pd.__version__) >= LooseVersion("1.4"):?

Copy link
Member Author

@Yikun Yikun Jun 23, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we follow latest behavior, so before 1.3 will raise not equal. added

else:
# Follow pandas latest behavior
with self.assertRaisesRegex(AssertionError, "Series are different"):
self.assert_eq(psser, pser)

def test_assign_list(self):
pdf, psdf = self.df_pair
Expand Down Expand Up @@ -1493,6 +1500,15 @@ def test_fillna(self):
pdf.fillna({"x": -1, "y": -2, "z": -5}, inplace=True)
psdf.fillna({"x": -1, "y": -2, "z": -5}, inplace=True)
self.assert_eq(psdf, pdf)
# Skip due to pandas bug: https://github.com/pandas-dev/pandas/issues/47188
if not (LooseVersion("1.4.0") <= LooseVersion(pd.__version__) <= LooseVersion("1.4.2")):
self.assert_eq(psser, pser)

pser = pdf.z
psser = psdf.z
pdf.fillna(0, inplace=True)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a testcase which @ueshin mentioned before

psdf.fillna(0, inplace=True)
self.assert_eq(psdf, pdf)
self.assert_eq(psser, pser)

s_nan = pd.Series([-1, -2, -5], index=["x", "y", "z"], dtype=int)
Expand Down Expand Up @@ -1536,14 +1552,15 @@ def test_fillna(self):
self.assert_eq(pdf.fillna(method="bfill"), psdf.fillna(method="bfill"))
self.assert_eq(pdf.fillna(method="bfill", limit=2), psdf.fillna(method="bfill", limit=2))

self.assert_eq(psdf.fillna({"x": -1}), pdf.fillna({"x": -1}))

self.assert_eq(
psdf.fillna({"x": -1, ("x", "b"): -2}), pdf.fillna({"x": -1, ("x", "b"): -2})
)
self.assert_eq(
psdf.fillna({("x", "b"): -2, "x": -1}), pdf.fillna({("x", "b"): -2, "x": -1})
)
# See also: https://github.com/pandas-dev/pandas/issues/47649
if LooseVersion("1.4.3") != LooseVersion(pd.__version__):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test to make sure 1.4.3 pass. it doesn't casue by this patch, it is a regression since pandas 1.4.3.

self.assert_eq(psdf.fillna({"x": -1}), pdf.fillna({"x": -1}))
self.assert_eq(
psdf.fillna({"x": -1, ("x", "b"): -2}), pdf.fillna({"x": -1, ("x", "b"): -2})
)
self.assert_eq(
psdf.fillna({("x", "b"): -2, "x": -1}), pdf.fillna({("x", "b"): -2, "x": -1})
)

# check multi index
pdf = pdf.set_index([("x", "a"), ("x", "b")])
Expand Down Expand Up @@ -2972,7 +2989,9 @@ def get_data(left_columns=None, right_columns=None):
left_pdf.update(right_pdf)
left_psdf.update(right_psdf)
self.assert_eq(left_pdf.sort_values(by=["A", "B"]), left_psdf.sort_values(by=["A", "B"]))
self.assert_eq(psser.sort_index(), pser.sort_index())
# Skip due to pandas bug: https://github.com/pandas-dev/pandas/issues/47188
if not (LooseVersion("1.4.0") <= LooseVersion(pd.__version__) <= LooseVersion("1.4.2")):
self.assert_eq(psser.sort_index(), pser.sort_index())

left_psdf, left_pdf, right_psdf, right_pdf = get_data()
left_pdf.update(right_pdf, overwrite=False)
Expand Down Expand Up @@ -5243,7 +5262,9 @@ def test_eval(self):
pdf.eval("A = B + C", inplace=True)
psdf.eval("A = B + C", inplace=True)
self.assert_eq(pdf, psdf)
self.assert_eq(pser, psser)
# Skip due to pandas bug: https://github.com/pandas-dev/pandas/issues/47449
if not (LooseVersion("1.4.0") <= LooseVersion(pd.__version__) <= LooseVersion("1.4.3")):
self.assert_eq(pser, psser)

# doesn't support for multi-index columns
columns = pd.MultiIndex.from_tuples([("x", "a"), ("y", "b"), ("z", "c")])
Expand Down