-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
lazily load dask arrays to dask data frames by calling to_dask_dataframe #1489
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
66452cc
f4b564e
aac672d
67a71b6
55417aa
bf92c4c
84fe8e4
157613b
414be29
bf9ec78
c64db76
6703a41
138a237
17d7819
41a8e0d
47833f4
3c6dcb6
27de6b3
2ef7983
1cf80a7
024a1aa
67dbbe5
dd3c9c5
2948c86
d4247a9
5fd1fc7
4705fde
c73f5b4
64458e2
6f6b48d
9dcdbca
a422965
ab8180b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2416,6 +2416,66 @@ def from_dataframe(cls, dataframe): | |
obj[name] = (dims, data) | ||
return obj | ||
|
||
def to_dask_dataframe(self, set_index=False): | ||
""" | ||
Convert this dataset into a dask.dataframe.DataFrame. | ||
|
||
Both the coordinate and data variables in this dataset form | ||
the columns of the DataFrame. | ||
|
||
If set_index=True, the dask DataFrame is indexed by this dataset's | ||
coordinate. Since dask DataFrames to not support multi-indexes, | ||
set_index only works if there is one coordinate dimension. | ||
""" | ||
|
||
import dask.dataframe as dd | ||
|
||
ordered_dims = self.dims | ||
chunks = self.chunks | ||
|
||
# order columns so that coordinates appear before data | ||
columns = list(self.coords) + list(self.data_vars) | ||
|
||
data = [] | ||
for k in columns: | ||
v = self._variables[k] | ||
|
||
# consider coordinate variables as well as data varibles | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good place to mention in a comment your discovery that we need to convert to base variables in order for chunk() to work properly. |
||
if isinstance(v, xr.IndexVariable): | ||
v = v.to_base_variable() | ||
|
||
# ensure all variables span the same dimensions | ||
v = v.set_dims(ordered_dims) | ||
|
||
# ensure all variables have the same chunking structure | ||
if v.chunks != chunks: | ||
v = v.chunk(chunks) | ||
|
||
# reshape variable contents as a 1d array | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nit: some of these comments are probably slightly overboard -- if they simply restate what's in the code it's better to omit them. |
||
d = v.data.reshape(-1) | ||
|
||
# convert to dask DataFrames | ||
s = dd.from_array(d, columns=[k]) | ||
|
||
data.append(s) | ||
|
||
df = dd.concat(data, axis=1) | ||
|
||
if set_index: | ||
|
||
if len(ordered_dims) != 1: | ||
raise ValueError( | ||
'set_index=True only is valid for ' | ||
'for one-dimensional datasets') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you include the list of multiple dimensions in the error message? |
||
|
||
# extract out first (and only) coordinate variable | ||
coord_dim = list(ordered_dims)[0] | ||
|
||
if coord_dim in df.columns: | ||
df = df.set_index(coord_dim) | ||
|
||
return df | ||
|
||
def to_dict(self): | ||
""" | ||
Convert this dataset to a dictionary following xarray naming | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,13 +13,14 @@ | |
import xarray as xr | ||
from xarray import Variable, DataArray, Dataset | ||
import xarray.ufuncs as xu | ||
from xarray.core.pycompat import suppress | ||
from . import TestCase | ||
from xarray.core.pycompat import suppress, OrderedDict | ||
from . import TestCase, assert_frame_equal | ||
|
||
from xarray.tests import mock | ||
|
||
dask = pytest.importorskip('dask') | ||
import dask.array as da | ||
import dask.dataframe as dd | ||
|
||
|
||
class DaskTestCase(TestCase): | ||
|
@@ -29,9 +30,9 @@ def assertLazyAnd(self, expected, actual, test): | |
if isinstance(actual, Dataset): | ||
for k, v in actual.variables.items(): | ||
if k in actual.dims: | ||
self.assertIsInstance(var.data, np.ndarray) | ||
self.assertIsInstance(v.data, np.ndarray) | ||
else: | ||
self.assertIsInstance(var.data, da.Array) | ||
self.assertIsInstance(v.data, da.Array) | ||
elif isinstance(actual, DataArray): | ||
self.assertIsInstance(actual.data, da.Array) | ||
for k, v in actual.coords.items(): | ||
|
@@ -546,6 +547,100 @@ def test_from_dask_variable(self): | |
coords={'x': range(4)}, name='foo') | ||
self.assertLazyAndIdentical(self.lazy_array, a) | ||
|
||
def test_to_dask_dataframe(self): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be appreciated if you could break this into a few more sub-methods. We don't always follow this well currently, but smaller tests that only test one things are easier to work with. There's no strict line limit, but aim for less than 10-20 lines if possible. Another good time to break a test into parts is when you have different input data. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. No problem. Done. |
||
# Test conversion of Datasets to dask DataFrames | ||
x = da.from_array(np.random.randn(10), chunks=4) | ||
y = np.arange(10, dtype='uint8') | ||
t = list('abcdefghij') | ||
|
||
ds = Dataset(OrderedDict([('a', ('t', x)), | ||
('b', ('t', y)), | ||
('t', ('t', t))])) | ||
|
||
expected_pd = pd.DataFrame({'a': x, | ||
'b': y}, | ||
index=pd.Index(t, name='t')) | ||
|
||
# test if 1-D index is correctly set up | ||
expected = dd.from_pandas(expected_pd, chunksize=4) | ||
actual = ds.to_dask_dataframe(set_index=True) | ||
# test if we have dask dataframes | ||
self.assertIsInstance(actual, dd.DataFrame) | ||
|
||
# use the .equals from pandas to check dataframes are equivalent | ||
assert_frame_equal(expected.compute(), actual.compute()) | ||
|
||
# test if no index is given | ||
expected = dd.from_pandas(expected_pd.reset_index(drop=False), | ||
chunksize=4) | ||
|
||
actual = ds.to_dask_dataframe(set_index=False) | ||
|
||
self.assertIsInstance(actual, dd.DataFrame) | ||
assert_frame_equal(expected.compute(), actual.compute()) | ||
|
||
def test_to_dask_dataframe_2D(self): | ||
# Test if 2-D dataset is supplied | ||
w = da.from_array(np.random.randn(2, 3), chunks=(1, 2)) | ||
ds = Dataset({'w': (('x', 'y'), w)}) | ||
ds['x'] = ('x', np.array([0, 1], np.int64)) | ||
ds['y'] = ('y', list('abc')) | ||
|
||
# dask dataframes do not (yet) support multiindex, | ||
# but when it does, this would be the expected index: | ||
exp_index = pd.MultiIndex.from_arrays( | ||
[[0, 0, 0, 1, 1, 1], ['a', 'b', 'c', 'a', 'b', 'c']], | ||
names=['x', 'y']) | ||
expected = pd.DataFrame({'w': w.reshape(-1)}, | ||
index=exp_index) | ||
# so for now, reset the index | ||
expected = expected.reset_index(drop=False) | ||
|
||
actual = ds.to_dask_dataframe(set_index=False) | ||
|
||
self.assertIsInstance(actual, dd.DataFrame) | ||
assert_frame_equal(expected, actual.compute()) | ||
|
||
def test_to_dask_dataframe_coordinates(self): | ||
# Test if coordinate is also a dask array | ||
x = da.from_array(np.random.randn(10), chunks=4) | ||
t = da.from_array(np.arange(10)*2, chunks=4) | ||
|
||
ds = Dataset(OrderedDict([('a', ('t', x)), | ||
('t', ('t', t))])) | ||
|
||
expected_pd = pd.DataFrame({'a': x}, | ||
index=pd.Index(t, name='t')) | ||
expected = dd.from_pandas(expected_pd, chunksize=4) | ||
actual = ds.to_dask_dataframe(set_index=True) | ||
self.assertIsInstance(actual, dd.DataFrame) | ||
assert_frame_equal(expected.compute(), actual.compute()) | ||
|
||
def test_to_dask_dataframe_not_daskarray(self): | ||
# Test if DataArray is not a dask array | ||
x = np.random.randn(10) | ||
y = np.arange(10, dtype='uint8') | ||
t = list('abcdefghij') | ||
|
||
ds = Dataset(OrderedDict([('a', ('t', x)), | ||
('b', ('t', y)), | ||
('t', ('t', t))])) | ||
|
||
expected = pd.DataFrame({'a': x, 'b': y}, | ||
index=pd.Index(t, name='t')) | ||
|
||
actual = ds.to_dask_dataframe(set_index=True) | ||
self.assertIsInstance(actual, dd.DataFrame) | ||
assert_frame_equal(expected, actual.compute()) | ||
|
||
def test_to_dask_dataframe_no_coordinate(self): | ||
# Test if Dataset has a dimension without coordinates | ||
x = da.from_array(np.random.randn(10), chunks=4) | ||
ds = Dataset({'x': ('dim_0', x)}) | ||
expected = pd.DataFrame({'x': x.compute()}) | ||
actual = ds.to_dask_dataframe(set_index=True) | ||
assert_frame_equal(expected, actual.compute()) | ||
|
||
|
||
@pytest.mark.parametrize("method", ['load', 'compute']) | ||
def test_dask_kwargs_variable(method): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably add that
dims_order
keyword argument. Then this becomes something like: