Skip to content

GH-46572: [Python] expose filter option to python for join #46566

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

xingyu-long
Copy link

Rationale for this change

C++ implementation support filter while performing join, however, it didn't expose to python and I think it's good to have this, so other users can avoid additional filter op explicitly in their side.

What changes are included in this PR?

Support expression in python binding.

Are these changes tested?

Yes, added new test test_hash_join_with_filter

Are there any user-facing changes?

It will expose one more argument for user, i.e., filter_expression for Table.join and Datastet.join

Note: I added [draft] for this change, since I'd like to get feedback from reviewers first and then we can change the frontend calls, i.e., Table, Dataset pxi files.

Copy link

Thanks for opening a pull request!

If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose

Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project.

Then could you also rename the pull request title in the following format?

GH-${GITHUB_ISSUE_ID}: [${COMPONENT}] ${SUMMARY}

or

MINOR: [${COMPONENT}] ${SUMMARY}

See also:

@xingyu-long
Copy link
Author

cc @richardliaw since I discussed this with Richard and he suggested me to give this a try. and it may be helpful for ray project too. Thanks!

@AlenkaF
Copy link
Member

AlenkaF commented May 23, 2025

Hi @xingyu-long, thank you for opening a PR!
Could you first open an issue to track the changes and check the failing CI builds, some failing tests are connected.

@xingyu-long
Copy link
Author

@AlenkaF Thanks for taking a look!

I just opened the issue to track this (#46572). for the failing tests, probably related to corresponding python callers / function definition. but could you take a look first? since the main part is to enable join option in _acero.pyx, I'd like to get some feedback from the community for this part and see if it makes sense. Thanks!

@xingyu-long xingyu-long changed the title [draft][Python] expose filter option to python for join [draft] GH-46572: [python] expose filter option to python for join May 23, 2025
@AlenkaF AlenkaF marked this pull request as draft May 26, 2025 07:08
@AlenkaF AlenkaF changed the title [draft] GH-46572: [python] expose filter option to python for join GH-46572: [python] expose filter option to python for join May 26, 2025
@AlenkaF AlenkaF changed the title GH-46572: [python] expose filter option to python for join GH-46572: [Python] expose filter option to python for join May 26, 2025
@AlenkaF
Copy link
Member

AlenkaF commented May 26, 2025

Thanks for opening this issue!

I've marked the PR as a draft and updated the title.

Regarding the call in Table.join: I would suggest placing the new keyword argument at the end of the list — this helps preserve consistency and avoids breaking any assumptions about argument order.

Also, please make sure to connect it with acero.py and _acero.pyx.

CC: @raulcd for any additional thoughts.

Copy link
Member

@raulcd raulcd left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. In principle looks good to me. I would just change it to be the last argument of the function signature. As we are not using keyword only arguments these change is making the signature of the function change those provoking an unnecessary breaking change for users.

@github-actions github-actions bot added awaiting changes Awaiting changes and removed awaiting review Awaiting review labels May 26, 2025
@github-actions github-actions bot added awaiting change review Awaiting change review and removed awaiting changes Awaiting changes labels May 28, 2025
@xingyu-long
Copy link
Author

xingyu-long commented May 28, 2025

Thanks for the suggestion @AlenkaF @raulcd I just updated the code.

btw, I observed two things while I am writing tests for this matter

  1. it seems filter cannot apply for both side, i.e same field for both table/schema, this was implemented in c++ side
    if (in_left && in_right) {
    return Status::Invalid("FieldRef", ref.ToString(),
    "was found in both left and right schemas");

is this intended behavior?

for example, let's assume that we have two tables which have some common fields (id and name), and we'd like to join them by id and then filter name with certain pattern. so without exposing this API to python, we probably need to maintain a big intermediate state of temp join and then apply the filter on top of it.

but if we can apply the filter on both tables first before we joining two tables, it would be more efficient? that's why I'd like to confirm what's the expected behavior for this filter in c++ implementation.

  1. I tried to exercise filter with different join types, I saw following surprise. (assuming we use filter only on one side)
In [54]: import pandas as pd
    ...: import pyarrow as pa
    ...: df1 = pd.DataFrame({'id': [1, 2, 3],
    ...:                     'year': [2020, 2022, 2019]})
    ...: df2 = pd.DataFrame({'id': [3, 4],
    ...:                     'n_legs': [5, 100],
    ...:                     'animal': ["Brittle stars", "Centipede"]})
    ...: t1 = pa.Table.from_pandas(df1)
    ...: t2 = pa.Table.from_pandas(df2)

In [55]: t1.join(t2, 'id', join_type="right outer").combine_chunks()
Out[55]:
pyarrow.Table
year: int64
id: int64
n_legs: int64
animal: string
----
year: [[2019,null]]
id: [[3,4]]
n_legs: [[5,100]]
animal: [["Brittle stars","Centipede"]]

# and then we apply filter expression with intended mismatch here
In [56]: t1.join(t2, 'id', join_type="right outer", filter_expression=pc.equal(pc.field("n_legs"), 200)).combine_chunks()
Out[56]:
pyarrow.Table
year: int64
id: int64
n_legs: int64
animal: string
----
year: [[null,null]]
id: [[3,4]]
n_legs: [[5,100]]
animal: [["Brittle stars","Centipede"]] 

it seems we didn't return empty, instead, we return the right outer? it seems the join type takes higher priority than filter operation for the final result?

btw, it seems fine with inner join type.

In [57]: t1.join(t2, 'id', join_type="inner", filter_expression=pc.equal(pc.field("n_legs"), 200)).combine_chunks()
Out[57]:
pyarrow.Table
id: int64
year: int64
n_legs: int64
animal: string
----
id: []
year: []
n_legs: []
animal: []

this one seems like a bug to me, but I am not sure, @AlenkaF @raulcd could you provide some feedback on these two questions? Thanks!

@xingyu-long xingyu-long marked this pull request as ready for review May 28, 2025 06:36
@raulcd
Copy link
Member

raulcd commented May 29, 2025

I am no expert on this area.
I agree with you that the the test you shared seems to return an unexpected behaviour. I would expect the filter to be correctly applied.
Having said that I don't think the issue is coming from the code you have linked on acero/hash_join_node.cc::CollectFilterColumns, from my understanding this is the expected behavior. This isn't checking whether there are repeated fields on both schemas, is checking whether the filter field is in both schemas in order to avoid ambiguous filter expressions. cc @zanmato1984 @pitrou which have more knowledge around this and can help understand it better and can validate whether the test is related to a possible bug on right outer join.

@zanmato1984
Copy link
Contributor

zanmato1984 commented May 29, 2025

Thank you @xingyu-long for contributing this!

I'd first address your concern of:

it seems we didn't return empty, instead, we return the right outer? it seems the join type takes higher priority than filter operation for the final result?

btw, it seems fine with inner join type.

Yes, this is expected by SQL semantic. And this is also the difference between you put an expression within ON condition of JOIN and that within WHERE clause, e.g.,
FROM t1 LEFT JOIN t2 ON t1.value = x and t2.value = y
does not equal to
FROM t1 LEFT JOIN t2 ON true WHERE t1.value = x and t2.value = y
(They are equivalent ONLY for inner joins.)
This is quite understandable because otherwise you wouldn't need most of join types except inner :)

Conceptually, all subexpressions in ON condition are equally contributing to determine if two rows from each side are a "match" (the whole expression evaluates true) or a "non-match" (the whole expression evaluates null or false). It's just that in practice, most query engines do hash join that requires at least one equal condition with columns from both sides, and for such conditions the columns are used as join "key"s (in your case the join key is implicitly specified by columns with common name). The rest of the expression is normally treated as so-called "residual filter" (this is what your PR added). Now back to the "conceptually", depending on the join type (inner/left outer/right outer/etc), rows are then processed differently. Take inner and left outer as two examples:

  • inner join will keep all the columns from both sides for a match, and discard the entire row for a non-match - this is the same as if you do the filter on the table scan first than apply join.
  • left outer join will always keep the left side columns, and keep the right side columns as well for a match, or discard the right side columns (by filling nulls) for a non-match (but this row is still emitted in the join result).

for example, let's assume that we have two tables which have some common fields (id and name), and we'd like to join them by id and then filter name with certain pattern. so without exposing this API to python, we probably need to maintain a big intermediate state of temp join and then apply the filter on top of it.

Yes this is necessary for preserving the SQL-like join semantic - as long as you write the filter in the ON condition. Again, the filter support you are adding is the "residual filter" (the subexpressions other than join keys in ON condition), not a regular "filter".

but if we can apply the filter on both tables first before we joining two tables, it would be more efficient? that's why I'd like to confirm what's the expected behavior for this filter in c++ implementation.

In this case you can just do the filter ahead of join, e.g.,

t2_filtered = t2.filter(pc.equal(pc.field("n_legs"), 200)
t1.join(t2_filtered, 'id', join_type="right outer")

As long as it is what you needed.

> 1. it seems filter cannot apply for both side, i.e same field for both table/schema

This is an independent problem. Because join is concatenating columns from both sides, so it is possible that the result table contains columns with the same name. If so, you won't be able to further reference a such column without ambiguity. You can specify output_suffix_for_left/right to append unique identifiers to their column names, so that you can disambiguate them.

@zanmato1984
Copy link
Contributor

If my above comment addresses your concern, I'll in turn review the code. Thank you @xingyu-long .

@xingyu-long
Copy link
Author

xingyu-long commented May 30, 2025

If my above comment addresses your concern, I'll in turn review the code. Thank you @xingyu-long .

Thanks @zanmato1984 for your explanation, it makes sense. probably I should mention more details in function docstring for this usage then. at same time, feel free to review the changes since it just exposes what c++ does for python.

Copy link
Contributor

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some nits.

@xingyu-long
Copy link
Author

This is an independent problem. Because join is concatenating columns from both sides, so it is possible that the result table contains columns with the same name. If so, you won't be able to further reference a such column without ambiguity. You can specify output_suffix_for_left/right to append unique identifiers to their column names, so that you can disambiguate them.

I see, so if I understand this correctly, ideally, we probably should assign distinct key for both columns before using filter expression since output_suffix_for_left would only works for output at the end of the workflow, right? (sorry if this is a dumb question...) i.e., something like this won't work

    join_opts = HashJoinNodeOptions(
        "inner", left_keys="key", right_keys="key",
        output_suffix_for_left="_left",output_suffix_for_right="_right",
        filter=pc.equal(pc.field('key_left'), 2))     # <------------ will hit key not found in both schemas.
    joined = Declaration(
        "hashjoin", options=join_opts, inputs=[left_source, right_source])
    result = joined.to_table()

if we don't use filter at all, we are ok with same column, and we can use output_suffix_for_left to help for the output only. @zanmato1984

@zanmato1984
Copy link
Contributor

I see, so if I understand this correctly, ideally, we probably should assign distinct key for both columns before using filter expression since output_suffix_for_left would only works for output at the end of the workflow, right? (sorry if this is a dumb question...) i.e., something like this won't work

    join_opts = HashJoinNodeOptions(
        "inner", left_keys="key", right_keys="key",
        output_suffix_for_left="_left",output_suffix_for_right="_right",
        filter=pc.equal(pc.field('key_left'), 2))     # <------------ will hit key not found in both schemas.
    joined = Declaration(
        "hashjoin", options=join_opts, inputs=[left_source, right_source])
    result = joined.to_table()

Sorry I made a mistake. You are right about this. Thanks for clarifying.

If you want to write a similar test case, let's just workaround the constraint and use unique column names.

@xingyu-long
Copy link
Author

If you want to write a similar test case, let's just workaround the constraint and use unique column names.

Thanks for confirming it! The tests I added for test always true and always false should cover this.

Copy link
Contributor

@zanmato1984 zanmato1984 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

(I pushed a commit merely changing some line orders.)

@zanmato1984
Copy link
Contributor

I've approved the PR in terms of its functionality. I think we need another +1 from @AlenkaF @raulcd @pitrou in terms of python (or functionality of course) since I'm no python expert.

@xingyu-long
Copy link
Author

I've approved the PR in terms of its functionality. I think we need another +1 from @AlenkaF @raulcd @pitrou in terms of python (or functionality of course) since I'm no python expert.

Thanks! @zanmato1984 Really appreciated it!

I will wait for other approvals.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants