Skip to content

Commit d9a6b39

Browse files
committed
add crate::stream::Stream bound
Signed-off-by: Yoshua Wuyts <[email protected]>
1 parent f8e1695 commit d9a6b39

File tree

6 files changed

+78
-77
lines changed

6 files changed

+78
-77
lines changed

src/result/from_stream.rs

Lines changed: 42 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,47 @@
1-
use crate::stream::{FromStream, IntoStream, Stream};
1+
// use crate::stream::{FromStream, IntoStream, Stream};
22

3-
use std::pin::Pin;
3+
// use std::pin::Pin;
44

5-
impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
6-
where
7-
V: FromStream<T>,
8-
{
9-
/// Takes each element in the stream: if it is an `Err`, no further
10-
/// elements are taken, and the `Err` is returned. Should no `Err`
11-
/// occur, a container with the values of each `Result` is returned.
12-
#[inline]
13-
fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
14-
stream: S,
15-
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
16-
where
17-
<S as IntoStream>::IntoStream: Send + 'a,
18-
{
19-
let stream = stream.into_stream();
5+
// impl<T: Send, E: Send, V> FromStream<Result<T, E>> for Result<V, E>
6+
// where
7+
// V: FromStream<T>,
8+
// {
9+
// /// Takes each element in the stream: if it is an `Err`, no further
10+
// /// elements are taken, and the `Err` is returned. Should no `Err`
11+
// /// occur, a container with the values of each `Result` is returned.
12+
// #[inline]
13+
// fn from_stream<'a, S: IntoStream<Item = Result<T, E>>>(
14+
// stream: S,
15+
// ) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
16+
// where
17+
// <S as IntoStream>::IntoStream: Send + 'a,
18+
// {
19+
// let stream = stream.into_stream();
2020

21-
Pin::from(Box::new(async move {
22-
pin_utils::pin_mut!(stream);
21+
// Pin::from(Box::new(async move {
22+
// pin_utils::pin_mut!(stream);
2323

24-
// Using `scan` here because it is able to stop the stream early
25-
// if a failure occurs
26-
let mut found_error = None;
27-
let out: V = stream
28-
.scan((), |_, elem| {
29-
match elem {
30-
Ok(elem) => Some(elem),
31-
Err(err) => {
32-
found_error = Some(err);
33-
// Stop processing the stream on error
34-
None
35-
}
36-
}
37-
})
38-
.collect()
39-
.await;
24+
// // Using `scan` here because it is able to stop the stream early
25+
// // if a failure occurs
26+
// let mut found_error = None;
27+
// let out: V = stream
28+
// .scan((), |_, elem| {
29+
// match elem {
30+
// Ok(elem) => Some(elem),
31+
// Err(err) => {
32+
// found_error = Some(err);
33+
// // Stop processing the stream on error
34+
// None
35+
// }
36+
// }
37+
// })
38+
// .collect()
39+
// .await;
4040

41-
match found_error {
42-
Some(err) => Err(err),
43-
None => Ok(out),
44-
}
45-
}))
46-
}
47-
}
41+
// match found_error {
42+
// Some(err) => Err(err),
43+
// None => Ok(out),
44+
// }
45+
// }))
46+
// }
47+
// }

src/stream/from_stream.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use std::pin::Pin;
1111
///
1212
/// [`IntoStream`]: trait.IntoStream.html
1313
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
14-
pub trait FromStream<T: Send> {
14+
pub trait FromStream<T: Send + Unpin> {
1515
/// Creates a value from a stream.
1616
///
1717
/// # Examples
@@ -23,7 +23,7 @@ pub trait FromStream<T: Send> {
2323
///
2424
/// // let _five_fives = async_std::stream::repeat(5).take(5);
2525
/// ```
26-
fn from_stream<'a, S: IntoStream<Item = T> + Send + 'a>(
26+
fn from_stream<'a, S: IntoStream<Item = T> + Send + Unpin + 'a>(
2727
stream: S,
28-
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>;
28+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + Unpin + 'a>>;
2929
}

src/stream/into_stream.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures_core::stream::Stream;
1+
use crate::stream::Stream;
22

33
/// Conversion into a `Stream`.
44
///

src/stream/stream/mod.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -750,18 +750,18 @@ pub trait Stream {
750750
#[must_use = "if you really need to exhaust the iterator, consider `.for_each(drop)` instead (TODO)"]
751751
fn collect<'a, B>(self) -> dyn_ret!('a, B)
752752
where
753-
Self: futures_core::stream::Stream + Sized + Send + 'a,
754-
<Self as futures_core::stream::Stream>::Item: Send,
755-
B: FromStream<<Self as futures_core::stream::Stream>::Item>,
753+
Self: Stream + Sized + Send + Unpin + 'a,
754+
<Self as Stream>::Item: Send + Unpin,
755+
B: FromStream<<Self as crate::stream::Stream>::Item>,
756756
{
757757
FromStream::from_stream(self)
758758
}
759759
}
760760

761-
impl<T: futures_core::stream::Stream + ?Sized> Stream for T {
762-
type Item = <Self as futures_core::stream::Stream>::Item;
761+
// impl<T: futures_core::stream::Stream + ?Sized> Stream for T {
762+
// type Item = <Self as futures_core::stream::Stream>::Item;
763763

764-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
765-
futures_core::stream::Stream::poll_next(self, cx)
766-
}
767-
}
764+
// fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
765+
// futures_core::stream::Stream::poll_next(self, cx)
766+
// }
767+
// }

src/vec/from_stream.rs

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
use crate::stream::{FromStream, IntoStream, Stream};
1+
use crate::stream::{FromStream, IntoStream};
22

33
use std::pin::Pin;
44

5-
impl<T: Send> FromStream<T> for Vec<T> {
5+
impl<T: Send + Unpin> FromStream<T> for Vec<T> {
66
#[inline]
77
fn from_stream<'a, S: IntoStream<Item = T>>(
88
stream: S,
9-
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + 'a>>
9+
) -> Pin<Box<dyn core::future::Future<Output = Self> + Send + Unpin + 'a>>
1010
where
1111
<S as IntoStream>::IntoStream: Send + 'a,
1212
{
13-
let stream = stream.into_stream();
13+
let _stream = stream.into_stream();
1414

15-
Pin::from(Box::new(async move {
16-
pin_utils::pin_mut!(stream);
15+
// Box::pin(async move {
16+
// pin_utils::pin_mut!(stream);
1717

18-
let mut out = vec![];
19-
while let Some(item) = stream.next().await {
20-
out.push(item);
21-
}
22-
out
23-
}))
18+
// let mut out = vec![];
19+
// while let Some(item) = stream.next().await {
20+
// out.push(item);
21+
// }
22+
// out
23+
// })
24+
panic!();
2425
}
2526
}

src/vec/into_stream.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub struct IntoStream<T> {
77
iter: std::vec::IntoIter<T>,
88
}
99

10-
impl<T: Send> crate::stream::IntoStream for Vec<T> {
10+
impl<T: Send + Unpin> crate::stream::IntoStream for Vec<T> {
1111
type Item = T;
1212
type IntoStream = IntoStream<T>;
1313

@@ -25,30 +25,30 @@ impl<T: Send> crate::stream::IntoStream for Vec<T> {
2525
/// }
2626
/// ```
2727
#[inline]
28-
fn into_stream(mut self) -> IntoStream<T> {
28+
fn into_stream(self) -> IntoStream<T> {
2929
let iter = self.into_iter();
3030
IntoStream { iter }
3131
}
3232
}
3333

34-
impl<T: Send> futures_core::stream::Stream for IntoStream<T> {
34+
impl<T: Send + Unpin> crate::stream::Stream for IntoStream<T> {
3535
type Item = T;
3636

3737
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38-
Poll::Ready(self.iter.next())
38+
Poll::Ready(Pin::new(&mut *self).iter.next())
3939
}
4040
}
4141

4242
/// Slice stream.
4343
#[derive(Debug)]
44-
pub struct Stream<'a, T: 'a> {
44+
pub struct Stream<'a, T> {
4545
iter: std::slice::Iter<'a, T>,
4646
}
4747

48-
impl<'a, T: Sync> futures_core::stream::Stream for Stream<'a, T> {
48+
impl<'a, T: Sync> crate::stream::Stream for Stream<'a, T> {
4949
type Item = &'a T;
5050

51-
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51+
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
5252
Poll::Ready(self.iter.next())
5353
}
5454
}
@@ -65,14 +65,14 @@ impl<'a, T: Sync> crate::stream::IntoStream for &'a Vec<T> {
6565

6666
/// Mutable slice stream.
6767
#[derive(Debug)]
68-
pub struct StreamMut<'a, T: 'a> {
68+
pub struct StreamMut<'a, T> {
6969
iter: std::slice::IterMut<'a, T>,
7070
}
7171

72-
impl<'a, T: Sync> futures_core::stream::Stream for StreamMut<'a, T> {
72+
impl<'a, T: Sync> crate::stream::Stream for StreamMut<'a, T> {
7373
type Item = &'a mut T;
7474

75-
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
75+
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
7676
Poll::Ready(self.iter.next())
7777
}
7878
}

0 commit comments

Comments
 (0)