Skip to content

Commit 7e3599a

Browse files
montekkiyoshuawuyts
andcommitted
add stream::min_by method (#146)
* add stream::min_by method * Update src/stream/stream.rs Co-Authored-By: Yoshua Wuyts <[email protected]>
1 parent bac74c2 commit 7e3599a

File tree

3 files changed

+90
-0
lines changed

3 files changed

+90
-0
lines changed

src/stream/min_by.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::cmp::Ordering;
2+
use std::pin::Pin;
3+
4+
use super::stream::Stream;
5+
use crate::future::Future;
6+
use crate::task::{Context, Poll};
7+
8+
/// A future that yields the minimum item in a stream by a given comparison function.
9+
#[derive(Clone, Debug)]
10+
pub struct MinBy<S: Stream, F> {
11+
stream: S,
12+
compare: F,
13+
min: Option<S::Item>,
14+
}
15+
16+
impl<S: Stream + Unpin, F> Unpin for MinBy<S, F> {}
17+
18+
impl<S: Stream + Unpin, F> MinBy<S, F> {
19+
pub(super) fn new(stream: S, compare: F) -> Self {
20+
MinBy {
21+
stream,
22+
compare,
23+
min: None,
24+
}
25+
}
26+
}
27+
28+
impl<S, F> Future for MinBy<S, F>
29+
where
30+
S: futures_core::stream::Stream + Unpin,
31+
S::Item: Copy,
32+
F: FnMut(&S::Item, &S::Item) -> Ordering,
33+
{
34+
type Output = Option<S::Item>;
35+
36+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37+
let next = futures_core::ready!(Pin::new(&mut self.stream).poll_next(cx));
38+
39+
match next {
40+
Some(new) => {
41+
cx.waker().wake_by_ref();
42+
match self.as_mut().min.take() {
43+
None => self.as_mut().min = Some(new),
44+
Some(old) => match (&mut self.as_mut().compare)(&new, &old) {
45+
Ordering::Less => self.as_mut().min = Some(new),
46+
_ => self.as_mut().min = Some(old),
47+
},
48+
}
49+
Poll::Pending
50+
}
51+
None => Poll::Ready(self.min),
52+
}
53+
}
54+
}

src/stream/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ pub use repeat::{repeat, Repeat};
2727
pub use stream::{Stream, Take};
2828

2929
mod empty;
30+
mod min_by;
3031
mod once;
3132
mod repeat;
3233
mod stream;

src/stream/stream.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
//! # }) }
2222
//! ```
2323
24+
use std::cmp::Ordering;
2425
use std::pin::Pin;
2526

2627
use cfg_if::cfg_if;
2728

29+
use super::min_by::MinBy;
2830
use crate::future::Future;
2931
use crate::task::{Context, Poll};
3032
use std::marker::PhantomData;
@@ -118,6 +120,39 @@ pub trait Stream {
118120
}
119121
}
120122

123+
/// Returns the element that gives the minimum value with respect to the
124+
/// specified comparison function. If several elements are equally minimum,
125+
/// the first element is returned. If the stream is empty, `None` is returned.
126+
///
127+
/// # Examples
128+
///
129+
/// ```
130+
/// # fn main() { async_std::task::block_on(async {
131+
/// #
132+
/// use std::collections::VecDeque;
133+
/// use async_std::stream::Stream;
134+
///
135+
/// let s: VecDeque<usize> = vec![1, 2, 3].into_iter().collect();
136+
///
137+
/// let min = Stream::min_by(s.clone(), |x, y| x.cmp(y)).await;
138+
/// assert_eq!(min, Some(1));
139+
///
140+
/// let min = Stream::min_by(s, |x, y| y.cmp(x)).await;
141+
/// assert_eq!(min, Some(3));
142+
///
143+
/// let min = Stream::min_by(VecDeque::<usize>::new(), |x, y| x.cmp(y)).await;
144+
/// assert_eq!(min, None);
145+
/// #
146+
/// # }) }
147+
/// ```
148+
fn min_by<F>(self, compare: F) -> MinBy<Self, F>
149+
where
150+
Self: Sized + Unpin,
151+
F: FnMut(&Self::Item, &Self::Item) -> Ordering,
152+
{
153+
MinBy::new(self, compare)
154+
}
155+
121156
/// Tests if every element of the stream matches a predicate.
122157
///
123158
/// `all()` takes a closure that returns `true` or `false`. It applies

0 commit comments

Comments
 (0)