@@ -27,6 +27,7 @@ use cfg_if::cfg_if;
27
27
28
28
use crate :: future:: Future ;
29
29
use crate :: task:: { Context , Poll } ;
30
+ use std:: marker:: PhantomData ;
30
31
31
32
cfg_if ! {
32
33
if #[ cfg( feature = "docs" ) ] {
@@ -111,6 +112,63 @@ pub trait Stream {
111
112
remaining : n,
112
113
}
113
114
}
115
+
116
+ /// Tests if every element of the stream matches a predicate.
117
+ ///
118
+ /// `all()` takes a closure that returns `true` or `false`. It applies
119
+ /// this closure to each element of the stream, and if they all return
120
+ /// `true`, then so does `all()`. If any of them return `false`, it
121
+ /// returns `false`.
122
+ ///
123
+ /// `all()` is short-circuiting; in other words, it will stop processing
124
+ /// as soon as it finds a `false`, given that no matter what else happens,
125
+ /// the result will also be `false`.
126
+ ///
127
+ /// An empty stream returns `true`.
128
+ ///
129
+ /// # Examples
130
+ ///
131
+ /// Basic usage:
132
+ ///
133
+ /// ```
134
+ /// # fn main() { async_std::task::block_on(async {
135
+ /// #
136
+ /// use async_std::prelude::*;
137
+ /// use async_std::stream;
138
+ ///
139
+ /// let mut s = stream::repeat::<u32>(42).take(3);
140
+ /// assert!(s.all(|x| x == 42).await);
141
+ ///
142
+ /// #
143
+ /// # }) }
144
+ /// ```
145
+ ///
146
+ /// Empty stream:
147
+ ///
148
+ /// ```
149
+ /// # fn main() { async_std::task::block_on(async {
150
+ /// #
151
+ /// use async_std::prelude::*;
152
+ /// use async_std::stream;
153
+ ///
154
+ /// let mut s = stream::empty::<u32>();
155
+ /// assert!(s.all(|_| false).await);
156
+ /// #
157
+ /// # }) }
158
+ /// ```
159
+ #[ inline]
160
+ fn all < F > ( & mut self , f : F ) -> AllFuture < ' _ , Self , F , Self :: Item >
161
+ where
162
+ Self : Sized ,
163
+ F : FnMut ( Self :: Item ) -> bool ,
164
+ {
165
+ AllFuture {
166
+ stream : self ,
167
+ result : true ,
168
+ __item : PhantomData ,
169
+ f,
170
+ }
171
+ }
114
172
}
115
173
116
174
impl < T : futures:: Stream + Unpin + ?Sized > Stream for T {
@@ -168,3 +226,50 @@ impl<S: futures::Stream> futures::Stream for Take<S> {
168
226
}
169
227
}
170
228
}
229
+
230
+ #[ derive( Debug ) ]
231
+ pub struct AllFuture < ' a , S , F , T >
232
+ where
233
+ F : FnMut ( T ) -> bool ,
234
+ {
235
+ stream : & ' a mut S ,
236
+ f : F ,
237
+ result : bool ,
238
+ __item : PhantomData < T > ,
239
+ }
240
+
241
+ impl < ' a , S , F , T > AllFuture < ' a , S , F , T >
242
+ where
243
+ F : FnMut ( T ) -> bool ,
244
+ {
245
+ pin_utils:: unsafe_pinned!( stream: & ' a mut S ) ;
246
+ pin_utils:: unsafe_unpinned!( result: bool ) ;
247
+ pin_utils:: unsafe_unpinned!( f: F ) ;
248
+ }
249
+
250
+ impl < S , F > Future for AllFuture < ' _ , S , F , S :: Item >
251
+ where
252
+ S : futures:: Stream + Unpin + Sized ,
253
+ F : FnMut ( S :: Item ) -> bool ,
254
+ {
255
+ type Output = bool ;
256
+
257
+ fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
258
+ use futures:: Stream ;
259
+ let next = futures:: ready!( self . as_mut( ) . stream( ) . poll_next( cx) ) ;
260
+ match next {
261
+ Some ( v) => {
262
+ let result = ( self . as_mut ( ) . f ( ) ) ( v) ;
263
+ * self . as_mut ( ) . result ( ) = result;
264
+ if result {
265
+ // don't forget to wake this task again to pull the next item from stream
266
+ cx. waker ( ) . wake_by_ref ( ) ;
267
+ Poll :: Pending
268
+ } else {
269
+ Poll :: Ready ( false )
270
+ }
271
+ }
272
+ None => Poll :: Ready ( self . result ) ,
273
+ }
274
+ }
275
+ }
0 commit comments