Skip to content

Commit bff10fe

Browse files
Shady Khalifayoshuawuyts
Shady Khalifa
authored andcommitted
Stream::any implementation (#135)
* add stream::any method * use `ret` macro and small improvements * fix docs return type in `ret` macro
1 parent 532c73c commit bff10fe

File tree

1 file changed

+113
-4
lines changed

1 file changed

+113
-4
lines changed

src/stream/stream.rs

Lines changed: 113 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,15 @@ cfg_if! {
3636

3737
macro_rules! ret {
3838
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
39+
($a:lifetime, $f:tt, $o:ty, $t1:ty) => (ImplFuture<$a, $o>);
40+
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => (ImplFuture<$a, $o>);
41+
3942
}
4043
} else {
4144
macro_rules! ret {
4245
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
46+
($a:lifetime, $f:tt, $o:ty, $t1:ty) => ($f<$a, Self, $t1>);
47+
($a:lifetime, $f:tt, $o:ty, $t1:ty, $t2:ty) => ($f<$a, Self, $t1, $t2>);
4348
}
4449
}
4550
}
@@ -81,7 +86,7 @@ pub trait Stream {
8186
/// #
8287
/// # }) }
8388
/// ```
84-
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
89+
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
8590
where
8691
Self: Unpin;
8792

@@ -157,14 +162,71 @@ pub trait Stream {
157162
/// # }) }
158163
/// ```
159164
#[inline]
160-
fn all<F>(&mut self, f: F) -> AllFuture<'_, Self, F, Self::Item>
165+
fn all<F>(&mut self, f: F) -> ret!('_, AllFuture, bool, F, Self::Item)
161166
where
162167
Self: Sized,
163168
F: FnMut(Self::Item) -> bool,
164169
{
165170
AllFuture {
166171
stream: self,
167-
result: true,
172+
result: true, // the default if the empty stream
173+
__item: PhantomData,
174+
f,
175+
}
176+
}
177+
178+
/// Tests if any element of the stream matches a predicate.
179+
///
180+
/// `any()` takes a closure that returns `true` or `false`. It applies
181+
/// this closure to each element of the stream, and if any of them return
182+
/// `true`, then so does `any()`. If they all return `false`, it
183+
/// returns `false`.
184+
///
185+
/// `any()` is short-circuiting; in other words, it will stop processing
186+
/// as soon as it finds a `true`, given that no matter what else happens,
187+
/// the result will also be `true`.
188+
///
189+
/// An empty stream returns `false`.
190+
///
191+
/// # Examples
192+
///
193+
/// Basic usage:
194+
///
195+
/// ```
196+
/// # fn main() { async_std::task::block_on(async {
197+
/// #
198+
/// use async_std::prelude::*;
199+
/// use async_std::stream;
200+
///
201+
/// let mut s = stream::repeat::<u32>(42).take(3);
202+
/// assert!(s.any(|x| x == 42).await);
203+
///
204+
/// #
205+
/// # }) }
206+
/// ```
207+
///
208+
/// Empty stream:
209+
///
210+
/// ```
211+
/// # fn main() { async_std::task::block_on(async {
212+
/// #
213+
/// use async_std::prelude::*;
214+
/// use async_std::stream;
215+
///
216+
/// let mut s = stream::empty::<u32>();
217+
/// assert!(!s.any(|_| false).await);
218+
/// #
219+
/// # }) }
220+
/// ```
221+
#[inline]
222+
fn any<F>(&mut self, f: F) -> ret!('_, AnyFuture, bool, F, Self::Item)
223+
where
224+
Self: Sized,
225+
F: FnMut(Self::Item) -> bool,
226+
{
227+
AnyFuture {
228+
stream: self,
229+
result: false, // the default if the empty stream
168230
__item: PhantomData,
169231
f,
170232
}
@@ -174,7 +236,7 @@ pub trait Stream {
174236
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
175237
type Item = <Self as futures::Stream>::Item;
176238

177-
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
239+
fn next(&mut self) -> ret!('_, NextFuture, Option<Self::Item>)
178240
where
179241
Self: Unpin,
180242
{
@@ -273,3 +335,50 @@ where
273335
}
274336
}
275337
}
338+
339+
#[derive(Debug)]
340+
pub struct AnyFuture<'a, S, F, T>
341+
where
342+
F: FnMut(T) -> bool,
343+
{
344+
stream: &'a mut S,
345+
f: F,
346+
result: bool,
347+
__item: PhantomData<T>,
348+
}
349+
350+
impl<'a, S, F, T> AnyFuture<'a, S, F, T>
351+
where
352+
F: FnMut(T) -> bool,
353+
{
354+
pin_utils::unsafe_pinned!(stream: &'a mut S);
355+
pin_utils::unsafe_unpinned!(result: bool);
356+
pin_utils::unsafe_unpinned!(f: F);
357+
}
358+
359+
impl<S, F> Future for AnyFuture<'_, S, F, S::Item>
360+
where
361+
S: futures::Stream + Unpin + Sized,
362+
F: FnMut(S::Item) -> bool,
363+
{
364+
type Output = bool;
365+
366+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
367+
use futures::Stream;
368+
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
369+
match next {
370+
Some(v) => {
371+
let result = (self.as_mut().f())(v);
372+
*self.as_mut().result() = result;
373+
if result {
374+
Poll::Ready(true)
375+
} else {
376+
// don't forget to wake this task again to pull the next item from stream
377+
cx.waker().wake_by_ref();
378+
Poll::Pending
379+
}
380+
}
381+
None => Poll::Ready(self.result),
382+
}
383+
}
384+
}

0 commit comments

Comments
 (0)