Skip to content

Commit d2a21fc

Browse files
committed
Preserve the relative order of delayed and non-delayed tasks in event loops
Fixed #4134
1 parent 2803a33 commit d2a21fc

File tree

2 files changed

+37
-15
lines changed

2 files changed

+37
-15
lines changed

kotlinx-coroutines-core/common/src/EventLoop.common.kt

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -256,21 +256,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
256256
// unconfined events take priority
257257
if (processUnconfinedEvent()) return 0
258258
// queue all delayed tasks that are due to be executed
259-
val delayed = _delayed.value
260-
if (delayed != null && !delayed.isEmpty) {
261-
val now = nanoTime()
262-
while (true) {
263-
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
264-
// to make sure that 'isEmpty' and `nextTime` that check both of them
265-
// do not transiently report that both delayed and queue are empty during move
266-
delayed.removeFirstIf {
267-
if (it.timeToExecute(now)) {
268-
enqueueImpl(it)
269-
} else
270-
false
271-
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
272-
}
273-
}
259+
enqueueDelayedTasks()
274260
// then process one event from queue
275261
val task = dequeue()
276262
if (task != null) {
@@ -283,6 +269,8 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
283269
final override fun dispatch(context: CoroutineContext, block: Runnable) = enqueue(block)
284270

285271
open fun enqueue(task: Runnable) {
272+
// are there some delayed tasks that should execute before this one? If so, move them to the queue first.
273+
enqueueDelayedTasks()
286274
if (enqueueImpl(task)) {
287275
// todo: we should unpark only when this delayed task became first in the queue
288276
unpark()
@@ -336,6 +324,25 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
336324
}
337325
}
338326

327+
/** Move all delayed tasks that are due to the main queue. */
328+
private fun enqueueDelayedTasks() {
329+
val delayed = _delayed.value
330+
if (delayed != null && !delayed.isEmpty) {
331+
val now = nanoTime()
332+
while (true) {
333+
// make sure that moving from delayed to queue removes from delayed only after it is added to queue
334+
// to make sure that 'isEmpty' and `nextTime` that check both of them
335+
// do not transiently report that both delayed and queue are empty during move
336+
delayed.removeFirstIf {
337+
if (it.timeToExecute(now)) {
338+
enqueueImpl(it)
339+
} else
340+
false
341+
} ?: break // quit loop when nothing more to remove or enqueueImpl returns false on "isComplete"
342+
}
343+
}
344+
}
345+
339346
private fun closeQueue() {
340347
assert { isCompleted }
341348
_queue.loop { queue ->

kotlinx-coroutines-core/jvm/test/EventLoopsTest.kt

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,21 @@ class EventLoopsTest : TestBase() {
126126
finish(4)
127127
}
128128

129+
/**
130+
* Tests that, when delayed tasks are due on an event loop, they will execute earlier than the newly-scheduled
131+
* non-delayed tasks.
132+
*/
133+
@Test
134+
fun testPendingDelayedBeingDueEarlier() = runTest {
135+
launch(start = CoroutineStart.UNDISPATCHED) {
136+
delay(1)
137+
expect(1)
138+
}
139+
Thread.sleep(100)
140+
yield()
141+
finish(2)
142+
}
143+
129144
class EventSync {
130145
private val waitingThread = atomic<Thread?>(null)
131146
private val fired = atomic(false)

0 commit comments

Comments
 (0)