Skip to content

Commit 20279eb

Browse files
olegnntaiki-e
authored andcommitted
FlattenUnordered: improve wakers behavior (#2566)
1 parent 75dca5a commit 20279eb

File tree

1 file changed

+16
-8
lines changed

1 file changed

+16
-8
lines changed

futures-util/src/stream/stream/flatten_unordered.rs

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,17 @@ impl SharedPollState {
9292
let value = self
9393
.state
9494
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
95-
let next_value = value | to_poll;
96-
95+
// Waking process for this waker already started
96+
if value & waking != NONE {
97+
return None;
98+
}
99+
let mut next_value = value | to_poll;
100+
// Only start the waking process if we're not in the polling phase and the stream isn't woken already
97101
if value & (WOKEN | POLLING) == NONE {
98-
Some(next_value | waking)
99-
} else if next_value != value {
102+
next_value |= waking;
103+
}
104+
105+
if next_value != value {
100106
Some(next_value)
101107
} else {
102108
None
@@ -141,11 +147,13 @@ impl SharedPollState {
141147
fn stop_waking(&self, waking: u8) -> u8 {
142148
self.state
143149
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |value| {
144-
let next_value = value & !waking;
145-
150+
let mut next_value = value & !waking;
151+
// Waker will be called only if the current waking state is the same as the specified waker state
146152
if value & WAKING_ALL == waking {
147-
Some(next_value | WOKEN)
148-
} else if next_value != value {
153+
next_value |= WOKEN;
154+
}
155+
156+
if next_value != value {
149157
Some(next_value)
150158
} else {
151159
None

0 commit comments

Comments
 (0)