-
-
Notifications
You must be signed in to change notification settings - Fork 18.5k
IO: Fix parquet read from s3 directory #33632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 19 commits
aa94fe7
a30c71a
b2747eb
a51757a
968f3b6
789f4ca
040763e
40f5889
753d647
e4dcdc3
bb21431
fb38932
c29befd
4f78fc5
4b2828b
463c2ea
dabfe58
dea95f3
ae76e42
4b48326
211c36e
4897a32
bba4040
5bc6327
ca89c21
0df818e
2a1a85c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -585,6 +585,7 @@ I/O | |
unsupported HDF file (:issue:`9539`) | ||
- Bug in :meth:`~DataFrame.to_parquet` was not raising ``PermissionError`` when writing to a private s3 bucket with invalid creds. (:issue:`27679`) | ||
- Bug in :meth:`~DataFrame.to_csv` was silently failing when writing to an invalid s3 bucket. (:issue:`32486`) | ||
- :func:`read_parquet` now supports an s3 directory (:issue:`26388`) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you review the doc-strings to see if they need updating (e.g. may need a versionadded tag) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Parquet Docs strings indicate we already supported this I think? I updated the whatsnew and added an example in docs strings. |
||
|
||
Plotting | ||
^^^^^^^^ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -150,6 +150,23 @@ def urlopen(*args, **kwargs): | |
return urllib.request.urlopen(*args, **kwargs) | ||
|
||
|
||
def get_fs_for_path(filepath): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you type this (and the return annotation) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Left return type for now since it include optional dependencies. e.g Can add imports to the TYPE_CHECKING block at the top if that's appropriate? |
||
""" | ||
Get appropriate filesystem given a filepath. | ||
Support s3fs, gcs and local disk fs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you make this a full doc-string Paramateres / Returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure done :) |
||
""" | ||
if is_s3_url(filepath): | ||
from pandas.io import s3 | ||
|
||
return s3.get_fs() | ||
elif is_gcs_url(filepath): | ||
from pandas.io import gcs | ||
|
||
return gcs.get_fs() | ||
else: | ||
return None | ||
|
||
|
||
def get_filepath_or_buffer( | ||
filepath_or_buffer: FilePathOrBuffer, | ||
encoding: Optional[str] = None, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,7 +8,12 @@ | |
|
||
from pandas import DataFrame, get_option | ||
|
||
from pandas.io.common import get_filepath_or_buffer, is_gcs_url, is_s3_url | ||
from pandas.io.common import ( | ||
get_filepath_or_buffer, | ||
get_fs_for_path, | ||
is_gcs_url, | ||
is_s3_url, | ||
) | ||
|
||
|
||
def get_engine(engine: str) -> "BaseImpl": | ||
|
@@ -92,13 +97,15 @@ def write( | |
**kwargs, | ||
): | ||
self.validate_dataframe(df) | ||
path, _, _, should_close = get_filepath_or_buffer(path, mode="wb") | ||
file_obj, _, _, should_close = get_filepath_or_buffer(path, mode="wb") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you didn't change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That was indeed on purpose,
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, can you add some clarifying comments for it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure done! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this always a file_obj, never a path? e.g. should rename to filepath_or_buffer ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point, i've renamed to |
||
|
||
from_pandas_kwargs: Dict[str, Any] = {"schema": kwargs.pop("schema", None)} | ||
if index is not None: | ||
from_pandas_kwargs["preserve_index"] = index | ||
|
||
table = self.api.Table.from_pandas(df, **from_pandas_kwargs) | ||
# write_to_dataset does not support a file-like object when | ||
# a dircetory path is used, so just pass the path string. | ||
if partition_cols is not None: | ||
self.api.parquet.write_to_dataset( | ||
table, | ||
|
@@ -108,20 +115,18 @@ def write( | |
**kwargs, | ||
) | ||
else: | ||
self.api.parquet.write_table(table, path, compression=compression, **kwargs) | ||
self.api.parquet.write_table( | ||
table, file_obj, compression=compression, **kwargs | ||
) | ||
if should_close: | ||
path.close() | ||
file_obj.close() | ||
|
||
def read(self, path, columns=None, **kwargs): | ||
path, _, _, should_close = get_filepath_or_buffer(path) | ||
|
||
kwargs["use_pandas_metadata"] = True | ||
result = self.api.parquet.read_table( | ||
path, columns=columns, **kwargs | ||
).to_pandas() | ||
if should_close: | ||
path.close() | ||
|
||
parquet_ds = self.api.parquet.ParquetDataset( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This change breaks clients that pass a file-like object for Here's the call stack I'm seeing: .tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:315: in read_parquet
return impl.read(path, columns=columns, **kwargs)
.tox/test/lib/python3.7/site-packages/pandas/io/parquet.py:131: in read
path, filesystem=get_fs_for_path(path), **kwargs
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:1162: in __init__
self.paths = _parse_uri(path_or_paths)
.tox/test/lib/python3.7/site-packages/pyarrow/parquet.py:47: in _parse_uri
path = _stringify_path(path)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I filed bug report #34467 |
||
path, filesystem=get_fs_for_path(path), **kwargs | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. pyarrow seems to only allow a file path opposed to a dir path. Removing filesystem arg here throws:
To repo see the test case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, OK. I see now in pyarrow that apparently string URIs with "s3://..." are not supported (while "hdfs://" is supported). That's something we should fix on the pyarrow side as well. But of course until then this is fine. |
||
) | ||
kwargs["columns"] = columns | ||
result = parquet_ds.read_pandas(**kwargs).to_pandas() | ||
return result | ||
|
||
|
||
|
Uh oh!
There was an error while loading. Please reload this page.