diff --git a/src/stream/stream/min_by.rs b/src/stream/stream/min_by.rs index ab12aa05b..806e75416 100644 --- a/src/stream/stream/min_by.rs +++ b/src/stream/stream/min_by.rs @@ -10,25 +10,41 @@ use crate::task::{Context, Poll}; pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] - pub struct MinByFuture { + pub struct MinMaxByFuture { #[pin] stream: S, compare: F, - min: Option, + value: Option, + direction: Direction, } } -impl MinByFuture { - pub(super) fn new(stream: S, compare: F) -> Self { - MinByFuture { +#[derive(PartialEq, Eq)] +enum Direction { + Maximizing, + Minimizing, +} + +impl MinMaxByFuture { + pub(super) fn new_min(stream: S, compare: F) -> Self { + MinMaxByFuture::new(stream, compare, Direction::Minimizing) + } + + pub(super) fn new_max(stream: S, compare: F) -> Self { + MinMaxByFuture::new(stream, compare, Direction::Maximizing) + } + + fn new(stream: S, compare: F, direction: Direction) -> Self { + MinMaxByFuture { stream, compare, - min: None, + value: None, + direction, } } } -impl Future for MinByFuture +impl Future for MinMaxByFuture where S: Stream + Unpin + Sized, S::Item: Copy, @@ -37,22 +53,26 @@ where type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + use Direction::*; + use Ordering::*; + let this = self.project(); let next = futures_core::ready!(this.stream.poll_next(cx)); match next { Some(new) => { cx.waker().wake_by_ref(); - match this.min.take() { - None => *this.min = Some(new), - Some(old) => match (this.compare)(&new, &old) { - Ordering::Less => *this.min = Some(new), - _ => *this.min = Some(old), + + match this.value.take() { + None => this.value.replace(new), + Some(old) => match ((this.compare)(&new, &old), this.direction) { + (Less, Minimizing) | (Greater, Maximizing) => this.value.replace(new), + _ => this.value.replace(old), }, - } + }; Poll::Pending } - None => Poll::Ready(*this.min), + None => Poll::Ready(*this.value), } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 501ece1b2..8355b81c2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -68,7 +68,7 @@ use gt::GtFuture; use last::LastFuture; use le::LeFuture; use lt::LtFuture; -use min_by::MinByFuture; +use min_by::MinMaxByFuture; use next::NextFuture; use nth::NthFuture; use partial_cmp::PartialCmpFuture; @@ -600,6 +600,45 @@ extension_trait! { FilterMap::new(self, f) } + #[doc = r#" + Returns the element that gives the maximum value with respect to the + specified comparison function. If several elements are equally maximum, + the first element is returned. If the stream is empty, `None` is returned. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use std::collections::VecDeque; + + use async_std::prelude::*; + + let s: VecDeque = vec![1, 2, 3].into_iter().collect(); + + let min = s.clone().max_by(|x, y| x.cmp(y)).await; + assert_eq!(min, Some(3)); + + let min = s.max_by(|x, y| y.cmp(x)).await; + assert_eq!(min, Some(1)); + + let min = VecDeque::::new().max_by(|x, y| x.cmp(y)).await; + assert_eq!(min, None); + # + # }) } + ``` + "#] + fn max_by( + self, + compare: F, + ) -> impl Future> [MinMaxByFuture] + where + Self: Sized, + F: FnMut(&Self::Item, &Self::Item) -> Ordering, + { + MinMaxByFuture::new_max(self, compare) + } + #[doc = r#" Returns the element that gives the minimum value with respect to the specified comparison function. If several elements are equally minimum, @@ -631,12 +670,12 @@ extension_trait! { fn min_by( self, compare: F, - ) -> impl Future> [MinByFuture] + ) -> impl Future> [MinMaxByFuture] where Self: Sized, F: FnMut(&Self::Item, &Self::Item) -> Ordering, { - MinByFuture::new(self, compare) + MinMaxByFuture::new_min(self, compare) } #[doc = r#"