Skip to content

Commit 1841885

Browse files
ambvgraingertkumaraditya303
authored
[3.10] gh-95166: cancel map waited on future on timeout (GH-95169) (GH-95375)
Co-authored-by: Thomas Grainger <[email protected]> Co-authored-by: Kumar Aditya <[email protected]>
1 parent a41b51d commit 1841885

File tree

3 files changed

+42
-2
lines changed

3 files changed

+42
-2
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,18 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
312312
done.update(waiter.finished_futures)
313313
return DoneAndNotDoneFutures(done, fs - done)
314314

315+
316+
def _result_or_cancel(fut, timeout=None):
317+
try:
318+
try:
319+
return fut.result(timeout)
320+
finally:
321+
fut.cancel()
322+
finally:
323+
# Break a reference cycle with the exception in self._exception
324+
del fut
325+
326+
315327
class Future(object):
316328
"""Represents the result of an asynchronous computation."""
317329

@@ -606,9 +618,9 @@ def result_iterator():
606618
while fs:
607619
# Careful not to keep a reference to the popped future
608620
if timeout is None:
609-
yield fs.pop().result()
621+
yield _result_or_cancel(fs.pop())
610622
else:
611-
yield fs.pop().result(end_time - time.monotonic())
623+
yield _result_or_cancel(fs.pop(), end_time - time.monotonic())
612624
finally:
613625
for future in fs:
614626
future.cancel()

Lib/test/test_concurrent_futures.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,33 @@ def submit(pool):
932932
with futures.ProcessPoolExecutor(1, mp_context=mp.get_context('fork')) as workers:
933933
workers.submit(tuple)
934934

935+
def test_executor_map_current_future_cancel(self):
936+
stop_event = threading.Event()
937+
log = []
938+
939+
def log_n_wait(ident):
940+
log.append(f"{ident=} started")
941+
try:
942+
stop_event.wait()
943+
finally:
944+
log.append(f"{ident=} stopped")
945+
946+
with self.executor_type(max_workers=1) as pool:
947+
# submit work to saturate the pool
948+
fut = pool.submit(log_n_wait, ident="first")
949+
try:
950+
with contextlib.closing(
951+
pool.map(log_n_wait, ["second", "third"], timeout=0)
952+
) as gen:
953+
with self.assertRaises(futures.TimeoutError):
954+
next(gen)
955+
finally:
956+
stop_event.set()
957+
fut.result()
958+
# ident='second' is cancelled as a result of raising a TimeoutError
959+
# ident='third' is cancelled because it remained in the collection of futures
960+
self.assertListEqual(log, ["ident='first' started", "ident='first' stopped"])
961+
935962

936963
class ProcessPoolExecutorTest(ExecutorTest):
937964

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix :meth:`concurrent.futures.Executor.map` to cancel the currently waiting on future on an error - e.g. TimeoutError or KeyboardInterrupt.

0 commit comments

Comments
 (0)