Skip to content

Dask's Arrow serialization slow & memory intensive #2521

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
randerzander opened this issue Feb 11, 2019 · 16 comments
Closed

Dask's Arrow serialization slow & memory intensive #2521

randerzander opened this issue Feb 11, 2019 · 16 comments

Comments

@randerzander
Copy link

I'm creating a dummy 80MB single-partition Dask distributed DataFrame, and attempting to convert it to a PyArrow Table.

Doing so causes a notebook to throw GC warnings, and takes consistently over 20 seconds.

Versions:
PyArrow: 0.12.0
Dask: 1.1.1

Repro:

from dask.distributed import Client, wait, LocalCluster
import pyarrow as pa

ip = '0.0.0.0'
cluster = LocalCluster(ip=ip)
client = Client(cluster)

import dask.array as da
import dask.dataframe as dd

n_rows = 5000000
n_keys = 5000000

ddf = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()

def get_arrow(df):
    return pa.Table.from_pandas(df)

%time arrow_tables = ddf.map_partitions(get_arrow).compute()

Result:

distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 24% CPU time recently (threshold: 10%)
distributed.utils_perf - WARNING - full garbage collections took 26% CPU time recently (threshold: 10%)
CPU times: user 20.6 s, sys: 1.17 s, total: 21.7 s
Wall time: 22.5 s
@TomAugspurger
Copy link
Member

map_partitions is generally expected to return a dask DataFrame. The meta on your ddf.map_partitions(get_arrow) isn't accurate.

The easiest solution is to probably delay pa.Table.from_pandas, to get a list of delayed Table objects.

tables = [dask.delayed(pa.Table.from_pandas)(x) for x in ddf.to_delayed()]

and you can continue processing from there as needed.

@mrocklin
Copy link
Member

@TomAugspurger does this explain the long delay though? When I run this with prun/snakeviz I find that it's spending a bunch of time in pandas/core/dtypes/ construct_1d_object_array_from_listlike in Pandas.

@mrocklin
Copy link
Member

Specifying meta=object also doesn't help here.

@mrocklin
Copy link
Member

Also, here is a more minimal example that doesn't engage the distributed scheduler, but still takes a long time.

import pyarrow as pa

import dask.array as da
import dask.dataframe as dd

n_rows = 5000000
n_keys = 5000000

ddf = dd.concat([
    da.random.random(n_rows).to_dask_dataframe(columns='x'),
    da.random.randint(0, n_keys, size=n_rows).to_dask_dataframe(columns='id'),
], axis=1).persist()

def get_arrow(df):
    return pa.Table.from_pandas(df)

arrow_tables = ddf.map_partitions(get_arrow, meta=object).compute(scheduler='single-threaded')

@TomAugspurger
Copy link
Member

The type of arrow_tables is still a Series[object] containing a single pa.Table.

If I had to guess, dask.dataframe is doing the equivalent of

pd.Series(pa.Table.from_pandas(ddf.compute()))

In theory, that shouldn't take long for pandas to do, but this is a really thorny area. We (pandas) essentially end up doing

arr = np.empty((1,), dtype=object)
arr[:] = [table]

where table is pa.Table.from_pandas(ddf.compute()), which takes forever.

@mrocklin
Copy link
Member

Why pa.Table.from_pandas(ddf.compute()) ? and not pa.Table.from_pandas(a_partition)?

@TomAugspurger
Copy link
Member

Right; anywhere in
#2521 (comment) where I have ddf.compute it'll be a partition (that just happens to be the entire dask dataframe in this case since there's a single partition). So you'd get a Series with one row per original partition, where each row is a Table.

@mrocklin
Copy link
Member

A more minimal example, this time without Dask:

In [1]: import numpy as np, pandas as pd, pyarrow as pa

In [2]: df = pd.DataFrame({'x': np.arange(1000000)})

In [3]: %time t = pa.Table.from_pandas(df)
CPU times: user 5.36 ms, sys: 3.22 ms, total: 8.58 ms
Wall time: 7.47 ms

In [4]: %time s = pd.Series([t], dtype=object)
CPU times: user 2.7 s, sys: 114 ms, total: 2.82 s
Wall time: 2.81 s

@mrocklin
Copy link
Member

@TomAugspurger should I raise this upstream at Pandas or leave it here?

@TomAugspurger
Copy link
Member

We have issues for it :)

Rewriting the constructors are on my medium-term TODO: pandas-dev/pandas#24387 started for dataframe.

@randerzander
Copy link
Author

It's worth noting that I used from_pandas in the repro snippet to exclude the thornier example, which is with cudf: rapidsai/cudf#899

While there may be an issue with Pandas, I think there's also an issue with Dask

@mrocklin
Copy link
Member

While there may be an issue with Pandas, I think there's also an issue with Dask

@randerzander can I ask you to expand on this?

@randerzander
Copy link
Author

cudf.DataFrame.to_arrow is a similar operation to pyarrow.Table.from_pandas.

cudf.DataFrame.to_arrow (for the same dataset) returns in milliseconds. When I pull it from map_partitions with Dask, I get the GC warnings and 20s+ slowdown. I know the worker has to send the data back to the client, but 20s seems excessive for a small, local transfer.

@mrocklin
Copy link
Member

Yup, totally agree. My example above shows that this might be because we're putting these things into Pandas series, and apparently putting an Arrow Table into a Pandas Series takes several seconds. Probably this isn't a problem with serialization or communication on the Dask end, it's rather that by using Dask dataframe you're trying to keep these things around in Pandas, which is currently borking. (At least until @TomAugspurger refactors the Pandas constructor logic).

Short term the solution here is probably to avoid calling map_partitions(pa.Table.from_pandas), which will try to form another dataframe object, and instead use Dask Delayed as @TomAugspurger suggests above:

tables = [dask.delayed(pa.Table.from_pandas)(x) for x in ddf.to_delayed()]

This will avoid trying to put PyArrow table objects in Pandas series, which seems to be the fundamental bug here.

@mrocklin
Copy link
Member

Closing here as some combination of "not a bug" and "an upstream pandas problem".

I've re-raraised upstream at pandas-dev/pandas#25389

@d6tdev
Copy link

d6tdev commented Oct 20, 2019

Got this when calling .to_parquet() without having fastparquet installed so it used pyarrow to write and had trouble reading it. Problem was fixed by installing fastparquet.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants