Skip to content

Commit 61a6c89

Browse files
authored
Avoid reading the entire file in ChunkedStore (#4525)
1 parent cedb05a commit 61a6c89

File tree

6 files changed

+269
-142
lines changed

6 files changed

+269
-142
lines changed

datafusion/core/src/datasource/listing/url.rs

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,14 @@ impl ListingTableUrl {
9999
};
100100

101101
let path = std::path::Path::new(prefix).canonicalize()?;
102-
let url = match path.is_file() {
103-
true => Url::from_file_path(path).unwrap(),
104-
false => Url::from_directory_path(path).unwrap(),
105-
};
102+
let url = if path.is_dir() {
103+
Url::from_directory_path(path)
104+
} else {
105+
Url::from_file_path(path)
106+
}
107+
.map_err(|_| DataFusionError::Internal(format!("Can not open path: {}", s)))?;
108+
// TODO: Currently we do not have an IO-related error variant that accepts ()
109+
// or a string. Once we have such a variant, change the error type above.
106110

107111
Ok(Self::new(url, glob))
108112
}

datafusion/core/src/physical_plan/file_format/avro.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,21 +208,43 @@ mod tests {
208208
use crate::datasource::file_format::{avro::AvroFormat, FileFormat};
209209
use crate::datasource::listing::PartitionedFile;
210210
use crate::datasource::object_store::ObjectStoreUrl;
211+
use crate::physical_plan::file_format::chunked_store::ChunkedStore;
211212
use crate::physical_plan::file_format::partition_type_wrap;
212213
use crate::prelude::SessionContext;
213214
use crate::scalar::ScalarValue;
214215
use crate::test::object_store::local_unpartitioned_file;
215216
use arrow::datatypes::{DataType, Field, Schema};
216217
use futures::StreamExt;
217218
use object_store::local::LocalFileSystem;
219+
use object_store::ObjectStore;
220+
use rstest::*;
218221

219222
use super::*;
220223

221224
#[tokio::test]
222225
async fn avro_exec_without_partition() -> Result<()> {
226+
test_with_stores(Arc::new(LocalFileSystem::new())).await
227+
}
228+
229+
#[rstest]
230+
#[tokio::test]
231+
async fn test_chunked_avro(
232+
#[values(10, 20, 30, 40)] chunk_size: usize,
233+
) -> Result<()> {
234+
test_with_stores(Arc::new(ChunkedStore::new(
235+
Arc::new(LocalFileSystem::new()),
236+
chunk_size,
237+
)))
238+
.await
239+
}
240+
241+
async fn test_with_stores(store: Arc<dyn ObjectStore>) -> Result<()> {
242+
let ctx = SessionContext::new();
243+
ctx.runtime_env()
244+
.register_object_store("file", "", store.clone());
245+
223246
let testdata = crate::test_util::arrow_test_data();
224247
let filename = format!("{}/avro/alltypes_plain.avro", testdata);
225-
let store = Arc::new(LocalFileSystem::new()) as _;
226248
let meta = local_unpartitioned_file(filename);
227249

228250
let file_schema = AvroFormat {}.infer_schema(&store, &[meta.clone()]).await?;
@@ -239,8 +261,6 @@ mod tests {
239261
output_ordering: None,
240262
});
241263
assert_eq!(avro_exec.output_partitioning().partition_count(), 1);
242-
243-
let ctx = SessionContext::new();
244264
let mut results = avro_exec
245265
.execute(0, ctx.task_ctx())
246266
.expect("plan execution failed");

datafusion/core/src/physical_plan/file_format/chunked_store.rs

Lines changed: 76 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
// under the License.
1717

1818
use async_trait::async_trait;
19-
use bytes::Bytes;
19+
use bytes::{BufMut, Bytes, BytesMut};
2020
use futures::stream::BoxStream;
2121
use futures::StreamExt;
2222
use object_store::path::Path;
@@ -25,7 +25,7 @@ use object_store::{MultipartId, Result};
2525
use std::fmt::{Debug, Display, Formatter};
2626
use std::ops::Range;
2727
use std::sync::Arc;
28-
use tokio::io::AsyncWrite;
28+
use tokio::io::{AsyncReadExt, AsyncWrite, BufReader};
2929

3030
/// Wraps a [`ObjectStore`] and makes its get response return chunks
3131
///
@@ -70,24 +70,78 @@ impl ObjectStore for ChunkedStore {
7070
}
7171

7272
async fn get(&self, location: &Path) -> Result<GetResult> {
73-
let bytes = self.inner.get(location).await?.bytes().await?;
74-
let mut offset = 0;
75-
let chunk_size = self.chunk_size;
76-
77-
Ok(GetResult::Stream(
78-
futures::stream::iter(std::iter::from_fn(move || {
79-
let remaining = bytes.len() - offset;
80-
if remaining == 0 {
81-
return None;
82-
}
83-
let to_read = remaining.min(chunk_size);
84-
let next_offset = offset + to_read;
85-
let slice = bytes.slice(offset..next_offset);
86-
offset = next_offset;
87-
Some(Ok(slice))
88-
}))
89-
.boxed(),
90-
))
73+
match self.inner.get(location).await? {
74+
GetResult::File(std_file, ..) => {
75+
let file = tokio::fs::File::from_std(std_file);
76+
let reader = BufReader::new(file);
77+
Ok(GetResult::Stream(
78+
futures::stream::unfold(
79+
(reader, self.chunk_size),
80+
|(mut reader, chunk_size)| async move {
81+
let mut buffer = BytesMut::zeroed(chunk_size);
82+
let size = reader.read(&mut buffer).await.map_err(|e| {
83+
object_store::Error::Generic {
84+
store: "ChunkedStore",
85+
source: Box::new(e),
86+
}
87+
});
88+
match size {
89+
Ok(0) => None,
90+
Ok(value) => Some((
91+
Ok(buffer.split_to(value).freeze()),
92+
(reader, chunk_size),
93+
)),
94+
Err(e) => Some((Err(e), (reader, chunk_size))),
95+
}
96+
},
97+
)
98+
.boxed(),
99+
))
100+
}
101+
GetResult::Stream(stream) => {
102+
let buffer = BytesMut::new();
103+
Ok(GetResult::Stream(
104+
futures::stream::unfold(
105+
(stream, buffer, false, self.chunk_size),
106+
|(mut stream, mut buffer, mut exhausted, chunk_size)| async move {
107+
// Keep accumulating bytes until we reach capacity as long as
108+
// the stream can provide them:
109+
if exhausted {
110+
return None;
111+
}
112+
while buffer.len() < chunk_size {
113+
match stream.next().await {
114+
None => {
115+
exhausted = true;
116+
let slice = buffer.split_off(0).freeze();
117+
return Some((
118+
Ok(slice),
119+
(stream, buffer, exhausted, chunk_size),
120+
));
121+
}
122+
Some(Ok(bytes)) => {
123+
buffer.put(bytes);
124+
}
125+
Some(Err(e)) => {
126+
return Some((
127+
Err(object_store::Error::Generic {
128+
store: "ChunkedStore",
129+
source: Box::new(e),
130+
}),
131+
(stream, buffer, exhausted, chunk_size),
132+
))
133+
}
134+
};
135+
}
136+
// Return the chunked values as the next value in the stream
137+
let slice = buffer.split_to(chunk_size).freeze();
138+
Some((Ok(slice), (stream, buffer, exhausted, chunk_size)))
139+
},
140+
)
141+
.boxed(),
142+
))
143+
}
144+
}
91145
}
92146

93147
async fn get_range(&self, location: &Path, range: Range<usize>) -> Result<Bytes> {
@@ -125,7 +179,9 @@ impl ObjectStore for ChunkedStore {
125179
#[cfg(test)]
126180
mod tests {
127181
use super::*;
182+
use futures::StreamExt;
128183
use object_store::memory::InMemory;
184+
use object_store::path::Path;
129185

130186
#[tokio::test]
131187
async fn test_chunked() {

datafusion/core/src/physical_plan/file_format/csv.rs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,40 @@ mod tests {
559559
Ok(schema)
560560
}
561561

562+
async fn test_additional_stores(
563+
file_compression_type: FileCompressionType,
564+
store: Arc<dyn ObjectStore>,
565+
) {
566+
let ctx = SessionContext::new();
567+
ctx.runtime_env()
568+
.register_object_store("file", "", store.clone());
569+
570+
let task_ctx = ctx.task_ctx();
571+
572+
let file_schema = aggr_test_schema();
573+
let path = format!("{}/csv", arrow_test_data());
574+
let filename = "aggregate_test_100.csv";
575+
576+
let file_groups = partitioned_file_groups(
577+
path.as_str(),
578+
filename,
579+
1,
580+
FileType::CSV,
581+
file_compression_type.to_owned(),
582+
)
583+
.unwrap();
584+
585+
let config = partitioned_csv_config(file_schema, file_groups).unwrap();
586+
let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned());
587+
588+
let it = csv.execute(0, task_ctx).unwrap();
589+
let batches: Vec<_> = it.try_collect().await.unwrap();
590+
591+
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
592+
593+
assert_eq!(total_rows, 100);
594+
}
595+
562596
#[rstest(
563597
file_compression_type,
564598
case(FileCompressionType::UNCOMPRESSED),
@@ -567,45 +601,18 @@ mod tests {
567601
case(FileCompressionType::XZ)
568602
)]
569603
#[tokio::test]
570-
async fn test_chunked(file_compression_type: FileCompressionType) {
571-
let ctx = SessionContext::new();
572-
let chunk_sizes = [10, 20, 30, 40];
573-
574-
for chunk_size in chunk_sizes {
575-
ctx.runtime_env().register_object_store(
576-
"file",
577-
"",
578-
Arc::new(ChunkedStore::new(
579-
Arc::new(LocalFileSystem::new()),
580-
chunk_size,
581-
)),
582-
);
583-
584-
let task_ctx = ctx.task_ctx();
585-
586-
let file_schema = aggr_test_schema();
587-
let path = format!("{}/csv", arrow_test_data());
588-
let filename = "aggregate_test_100.csv";
589-
590-
let file_groups = partitioned_file_groups(
591-
path.as_str(),
592-
filename,
593-
1,
594-
FileType::CSV,
595-
file_compression_type.to_owned(),
596-
)
597-
.unwrap();
598-
599-
let config = partitioned_csv_config(file_schema, file_groups).unwrap();
600-
let csv = CsvExec::new(config, true, b',', file_compression_type.to_owned());
601-
602-
let it = csv.execute(0, task_ctx).unwrap();
603-
let batches: Vec<_> = it.try_collect().await.unwrap();
604-
605-
let total_rows = batches.iter().map(|b| b.num_rows()).sum::<usize>();
606-
607-
assert_eq!(total_rows, 100);
608-
}
604+
async fn test_chunked_csv(
605+
file_compression_type: FileCompressionType,
606+
#[values(10, 20, 30, 40)] chunk_size: usize,
607+
) {
608+
test_additional_stores(
609+
file_compression_type,
610+
Arc::new(ChunkedStore::new(
611+
Arc::new(LocalFileSystem::new()),
612+
chunk_size,
613+
)),
614+
)
615+
.await;
609616
}
610617

611618
#[tokio::test]

datafusion/core/src/physical_plan/file_format/delimited_stream.rs

Lines changed: 49 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -138,29 +138,37 @@ where
138138
{
139139
let delimiter = LineDelimiter::new();
140140

141-
futures::stream::unfold((s, delimiter), |(mut s, mut delimiter)| async move {
142-
loop {
143-
if let Some(next) = delimiter.next() {
144-
return Some((Ok(next), (s, delimiter)));
145-
}
141+
futures::stream::unfold(
142+
(s, delimiter, false),
143+
|(mut s, mut delimiter, mut exhausted)| async move {
144+
loop {
145+
if let Some(next) = delimiter.next() {
146+
return Some((Ok(next), (s, delimiter, exhausted)));
147+
} else if exhausted {
148+
return None;
149+
}
146150

147-
match s.next().await {
148-
Some(Ok(bytes)) => delimiter.push(bytes),
149-
Some(Err(e)) => return Some((Err(e), (s, delimiter))),
150-
None => match delimiter.finish() {
151-
Ok(true) => return None,
152-
Ok(false) => continue,
153-
Err(e) => return Some((Err(e), (s, delimiter))),
154-
},
151+
match s.next().await {
152+
Some(Ok(bytes)) => delimiter.push(bytes),
153+
Some(Err(e)) => return Some((Err(e), (s, delimiter, exhausted))),
154+
None => {
155+
exhausted = true;
156+
match delimiter.finish() {
157+
Ok(true) => return None,
158+
Ok(false) => continue,
159+
Err(e) => return Some((Err(e), (s, delimiter, exhausted))),
160+
}
161+
}
162+
}
155163
}
156-
}
157-
})
164+
},
165+
)
158166
}
159167

160168
#[cfg(test)]
161169
mod tests {
162170
use super::*;
163-
use futures::stream::TryStreamExt;
171+
use futures::stream::{BoxStream, TryStreamExt};
164172

165173
#[test]
166174
fn test_delimiter() {
@@ -209,6 +217,31 @@ mod tests {
209217
futures::stream::iter(input.into_iter().map(|s| Ok(Bytes::from(s))));
210218
let stream = newline_delimited_stream(input_stream);
211219

220+
let results: Vec<_> = stream.try_collect().await.unwrap();
221+
assert_eq!(
222+
results,
223+
vec![
224+
Bytes::from("hello\nworld\n"),
225+
Bytes::from("bingo\n"),
226+
Bytes::from("cupcakes")
227+
]
228+
)
229+
}
230+
#[tokio::test]
231+
async fn test_delimiter_unfold_stream() {
232+
let input_stream: BoxStream<'static, Result<Bytes>> = futures::stream::unfold(
233+
VecDeque::from(["hello\nworld\nbin", "go\ncup", "cakes"]),
234+
|mut input| async move {
235+
if !input.is_empty() {
236+
Some((Ok(Bytes::from(input.pop_front().unwrap())), input))
237+
} else {
238+
None
239+
}
240+
},
241+
)
242+
.boxed();
243+
let stream = newline_delimited_stream(input_stream);
244+
212245
let results: Vec<_> = stream.try_collect().await.unwrap();
213246
assert_eq!(
214247
results,

0 commit comments

Comments
 (0)