From f2f41e0272350398885a3245cdbd6c2758113e0d Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 18 Feb 2017 01:11:45 -0500 Subject: [PATCH 1/8] Add context management for ProcessPoolExecutor+CLN tomMoral/loky#48 * Add context argument to allow non forking ProcessPoolExecutor * Do some cleaning (pep8+nonused code+naming) * Liberate the ressource earlier in the `_worker_process` --- Lib/concurrent/futures/process.py | 59 +++++++++-------- Lib/test/test_concurrent_futures.py | 99 +++++++++++++++++++++++++++-- 2 files changed, 128 insertions(+), 30 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 50ee296ac89b78..aa0670a5e33210 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -50,8 +50,7 @@ from concurrent.futures import _base import queue from queue import Full -import multiprocessing -from multiprocessing import SimpleQueue +import multiprocessing as mp from multiprocessing.connection import wait import threading import weakref @@ -74,11 +73,11 @@ # threads/processes finish. _threads_queues = weakref.WeakKeyDictionary() -_shutdown = False +_global_shutdown = False def _python_exit(): - global _shutdown - _shutdown = True + global _global_shutdown + _global_shutdown = True items = list(_threads_queues.items()) for t, q in items: q.put(None) @@ -158,12 +157,10 @@ def _process_worker(call_queue, result_queue): This worker is run in a separate process. Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and + call_queue: A ctx.Queue of _CallItems that will be read and evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written + result_queue: A ctx.Queue of _ResultItems that will written to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. """ while True: call_item = call_queue.get(block=True) @@ -180,6 +177,11 @@ def _process_worker(call_queue, result_queue): result_queue.put(_ResultItem(call_item.work_id, result=r)) + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + + def _add_call_item_to_queue(pending_work_items, work_ids, call_queue): @@ -231,20 +233,21 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as + process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems + call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the + result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. """ executor = None def shutting_down(): - return _shutdown or executor is None or executor._shutdown_thread + return (_global_shutdown or executor is None + or executor._shutdown_thread) def shutdown_worker(): # This is an upper bound @@ -254,7 +257,7 @@ def shutdown_worker(): # Release the queue's resources as soon as possible. call_queue.close() # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS X. + # some ctx.Queue methods may deadlock on Mac OS X. for p in processes.values(): p.join() @@ -377,13 +380,15 @@ class BrokenProcessPool(RuntimeError): class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, ctx=None): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. + ctx: A multiprocessing context to launch the workers. This object + should provide SimpleQueue, Queue and Process. """ _check_system_limits() @@ -394,17 +399,19 @@ def __init__(self, max_workers=None): raise ValueError("max_workers must be greater than 0") self._max_workers = max_workers + if ctx is None: + ctx = mp.get_context() + self._ctx = ctx # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) + self._call_queue = ctx.Queue(self._max_workers + EXTRA_QUEUED_CALLS) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. self._call_queue._ignore_epipe = True - self._result_queue = SimpleQueue() + self._result_queue = ctx.SimpleQueue() self._work_ids = queue.Queue() self._queue_management_thread = None # Map of pids to processes @@ -426,20 +433,20 @@ def weakref_cb(_, q=self._result_queue): # Start the processes so that their sentinels are known. self._adjust_process_count() self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue)) + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue)) self._queue_management_thread.daemon = True self._queue_management_thread.start() _threads_queues[self._queue_management_thread] = self._result_queue def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( + p = self._ctx.Process( target=_process_worker, args=(self._call_queue, self._result_queue)) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a888dcacc49160..5b4258d86a455b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -18,6 +18,7 @@ from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) from concurrent.futures.process import BrokenProcessPool +from multiprocessing import get_context def create_future(state=PENDING, exception=None, result=None): @@ -59,7 +60,7 @@ def make_dummy_object(_): return MyObject() -class BaseTestCase(unittest.TestCase): +class BaseTestCase(BaseTestCase): def setUp(self): self._thread_key = test.support.threading_setup() @@ -109,6 +110,38 @@ class ThreadPoolMixin(ExecutorMixin): class ProcessPoolMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor + def setUp(self): + self.t1 = time.time() + try: + self.executor = self.executor_type( + max_workers=self.worker_count, + ctx=get_context(self.ctx)) + except NotImplementedError as e: + self.skipTest(str(e)) + self._prime_executor() + + +class ProcessPoolForkMixin(ProcessPoolMixin): + ctx = "fork" + + def setUp(self): + if sys.platform == "win32": + self.skipTest("require unix system") + super().setUp() + + +class ProcessPoolSpawnMixin(ProcessPoolMixin): + ctx = "spawn" + + +class ProcessPoolForkserverMixin(ProcessPoolMixin): + ctx = "forkserver" + + def setUp(self): + if sys.platform == "win32": + self.skipTest("require unix system") + super().setUp() + class ExecutorShutdownTest: def test_run_after_shutdown(self): @@ -229,6 +262,22 @@ def test_del_shutdown(self): p.join() +class ProcessPoolForkShutdownTest(ProcessPoolForkMixin, BaseTestCase, + ProcessPoolShutdownTest): + pass + + +class ProcessPoolForkserverShutdownTest(ProcessPoolForkserverMixin, + BaseTestCase, + ProcessPoolShutdownTest): + pass + + +class ProcessPoolSpawnShutdownTest(ProcessPoolSpawnMixin, BaseTestCase, + ProcessPoolShutdownTest): + pass + + class WaitTests: def test_first_completed(self): @@ -348,7 +397,17 @@ def future_func(): sys.setswitchinterval(oldswitchinterval) -class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, BaseTestCase): +class ProcessPoolForkWaitTests(ProcessPoolForkMixin, WaitTests, BaseTestCase): + pass + + +class ProcessPoolForkserverWaitTests(ProcessPoolForkserverMixin, WaitTests, + BaseTestCase): + pass + + +class ProcessPoolSpawnWaitTests(ProcessPoolSpawnMixin, BaseTestCase, + WaitTests): pass @@ -433,7 +492,20 @@ class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase pass -class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, BaseTestCase): + +class ProcessPoolForkAsCompletedTests(ProcessPoolForkMixin, AsCompletedTests, + BaseTestCase): + pass + + +class ProcessPoolForkserverAsCompletedTests(ProcessPoolForkserverMixin, + AsCompletedTests, + BaseTestCase): + pass + + +class ProcessPoolSpawnAsCompletedTests(ProcessPoolSpawnMixin, AsCompletedTests, + BaseTestCase): pass @@ -533,7 +605,7 @@ def test_default_workers(self): (os.cpu_count() or 1) * 5) -class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, BaseTestCase): +class ProcessPoolExecutorTest(ExecutorTest): def test_killed_child(self): # When a child process is abruptly terminated, the whole pool gets # "broken". @@ -589,6 +661,25 @@ def test_traceback(self): f1.getvalue()) +class ProcessPoolForkExecutorTest(ProcessPoolForkMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + +class ProcessPoolForkserverExecutorTest(ProcessPoolForkserverMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + +class ProcessPoolSpawnExecutorTest(ProcessPoolSpawnMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): callback_result = None From d469da1eb5e558b75daef6ee9e58c37337801a89 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 18 Feb 2017 01:59:42 -0500 Subject: [PATCH 2/8] Makes wakeup independent from result_queue tomMoral/loky#48 This avoids deadlocks if a Process dies while: * Unpickling the _CallItem * Pickling a _ResultItem Wakeups are done with _Sentinel object that cannot be used with wait. We do not use a Connection/Queue as it brings lot of overhead in the Executor to use only a small part of it. We might want to implement a Sentinel object that can be waited upon to simplify and robustify the code. Test no deadlock with crashes * TST crash in CallItem unpickling * TST crash in func call run * TST crash in REsult pickling the test include crashes with PythonError/SystemExist/SegFault --- Lib/concurrent/futures/process.py | 75 +++++++-- Lib/test/test_concurrent_futures.py | 234 ++++++++++++++++++++++++++++ 2 files changed, 293 insertions(+), 16 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index aa0670a5e33210..24c401d78b28ae 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -72,16 +72,36 @@ # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() +_threads_wakeup = weakref.WeakKeyDictionary() _global_shutdown = False +_POLL_TIMEOUT = .001 + + +class _Sentinel: + __slot__ = ["_state"] + + def __init__(self): + self._state = False + + def set(self): + self._state = True + + def get_and_unset(self): + s = self._state + if s: + self._state = False + return s + + def _python_exit(): global _global_shutdown _global_shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: + items = list(_threads_wakeup.items()) + for t, wakeup in items: + if t.is_alive(): + wakeup.set() + for t, _ in items: t.join() # Controls how many more calls than processes will be queued in the call queue. @@ -224,7 +244,8 @@ def _queue_management_worker(executor_reference, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -242,6 +263,8 @@ def _queue_management_worker(executor_reference, derived from _WorkItems for processing by the process workers. result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + wakeup: A _Sentinel to allow waking up the queue_manager_thread from + the main Thread and avoid deadlocks caused by broken queues. """ executor = None @@ -268,12 +291,23 @@ def shutdown_worker(): work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) + # As the wakeup object is not waitable, we need to implement a polling + # loop. This loop is similar to the one in the wait function. A + # specific waitable event object complient to wait could be implemented + while not wakeup.get_and_unset(): + sentinels = [p.sentinel for p in processes.values()] + assert sentinels + ready = wait([reader] + sentinels, timeout=_POLL_TIMEOUT) + if len(ready) > 0: + break + else: + # The thread have just been awaken + ready = [] + result_item = None + if reader in ready: result_item = reader.recv() - else: + elif len(ready) > 0: # Mark the process pool broken so that submits fail right now. executor = executor_reference() if executor is not None: @@ -424,11 +458,18 @@ def __init__(self, max_workers=None, ctx=None): self._queue_count = 0 self._pending_work_items = {} + # Permits to wake_up the queue_manager_thread independently of + # result_queue state. This avoid deadlocks caused by the non + # transmission of wakeup signal when a worker died with the + # _result_queue write lock. + self._wakeup = _Sentinel() + def _start_queue_management_thread(self): # When the executor gets lost, the weakref callback will wake up # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) + def weakref_cb(_, wakeup=self._wakeup): + wakeup.set() + if self._queue_management_thread is None: # Start the processes so that their sentinels are known. self._adjust_process_count() @@ -439,10 +480,12 @@ def weakref_cb(_, q=self._result_queue): self._pending_work_items, self._work_ids, self._call_queue, - self._result_queue)) + self._result_queue, + self._wakeup), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + _threads_wakeup[self._queue_management_thread] = self._wakeup def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): @@ -468,7 +511,7 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._wakeup.set() self._start_queue_management_thread() return f @@ -508,7 +551,7 @@ def shutdown(self, wait=True): self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) + self._wakeup.set() if wait: self._queue_management_thread.join() # To reduce the risk of opening too many files, remove references to diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 5b4258d86a455b..d1cab014dbc6cd 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -679,6 +679,240 @@ class ProcessPoolSpawnExecutorTest(ProcessPoolSpawnMixin, pass +def hide_process_stderr(): + import io + setattr(sys, "stderr", io.StringIO()) + + +def _crash(): + """Induces a segfault""" + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _exit(): + """Induces a sys exit with exitcode 1""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process""" + hide_process_stderr() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls""" + hide_process_stderr() + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error, (UnpicklingError, ) + + +class TimingWrapper(object): + """Creates a wrapper for a function which records the time it takes to + finish + """ + + def __init__(self, func): + self.func = func + self.elapsed = None + + def __call__(self, *args, **kwds): + t = time.time() + try: + return self.func(*args, **kwds) + finally: + self.elapsed = time.time() - t + + +class ExecutorDeadlockTest: + # If ExecutorDeadlockTest takes more than 100secs to complete, it is very + # likely caught in a deadlock. As there is no easy way to detect it, + # faulthandler will print the traceback and exit. + TIMEOUT = 15 + + @classmethod + def _sleep_id(cls, x, delay): + time.sleep(delay) + return x + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + executor.shutdown(wait=True, kill_workers=True) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Deadlock executor:\n\n{tb}") + + def test_crash(self): + # extensive testing for deadlock caused by crash in a pool + crash_cases = [ + # Check problem occuring while unpickling a task on workers + (id, (ExitAtUnpickle(),), BrokenProcessPool, + "exit at task unpickle"), + (id, (ErrorAtUnpickle(),), BrokenProcessPool, + "error at task unpickle"), + (id, (CrashAtUnpickle(),), BrokenProcessPool, + "crash at task unpickle"), + # Check problem occuring during func execution on workers + (_crash, (), BrokenProcessPool, + "crash during func execution on worker"), + (_exit, (), SystemExit, + "exit during func execution on worker"), + (_raise_error, (RuntimeError, ), RuntimeError, + "error during func execution on worker"), + # Check problem occuring while pickling a task result + # on workers + (_return_instance, (CrashAtPickle,), BrokenProcessPool, + "crash during result pickle on worker"), + (_return_instance, (ExitAtPickle,), BrokenProcessPool, + "exit during result pickle on worker"), + (_return_instance, (ErrorAtPickle,), BrokenProcessPool, + "error during result pickle on worker"), + ] + for func, args, error, name in crash_cases: + with self.subTest(name): + # skip the test involving pickle errors with manager as it + # breaks the manager and not the pool in this cases + # skip the test involving pickle errors with thread as the + # tasks and results are not pickled in this case + with test.support.captured_stderr(): + executor = self.executor_type(max_workers=2, + ctx=get_context(self.ctx)) + res = executor.submit(func, *args) + with self.assertRaises(error): + try: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + @classmethod + def _test_getpid(cls, a): + return os.getpid() + + @classmethod + def _test_kill_worker(cls, pid=None, delay=0.01): + """Function that send SIGKILL at process pid after delay second""" + time.sleep(delay) + if pid is None: + pid = os.getpid() + try: + from signal import SIGKILL + except ImportError: + from signal import SIGTERM as SIGKILL + try: + os.kill(pid, SIGKILL) + time.sleep(.01) + except (ProcessLookupError, PermissionError): + pass + + def test_crash_races(self): + + from itertools import repeat + for n_proc in [1, 2, 5, 17]: + with self.subTest(n_proc=n_proc): + # Test for external crash signal comming from neighbor + # with various race setup + executor = self.executor_type(max_workers=2, + ctx=get_context(self.ctx)) + try: + raise AttributeError() + pids = [p.pid for p in executor._processes] + assert len(pids) == n_proc + except AttributeError: + pids = [pid for pid in executor.map( + self._test_getpid, [None] * n_proc)] + assert None not in pids + res = self.executor.map( + self._sleep_id, repeat(True, 2 * n_proc), + [.001 * (j // 2) for j in range(2 * n_proc)], + chunksize=1) + assert all(res) + res = executor.map(self._test_kill_worker, pids[::-1], + timeout=self.TIMEOUT) + with self.assertRaises(BrokenProcessPool): + try: + [v for v in res] + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker failed + + with self.executor_type(max_workers=2, + ctx=get_context(self.ctx)) as executor: + executor.submit(self._test_kill_worker, ()) + time.sleep(.01) + executor.shutdown() + + +class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolForkserverExecutorDeadlockTest(ProcessPoolForkserverMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolSpawnExecutorDeadlockTest(ProcessPoolSpawnMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): From 7e8d7598660ea50e9cc941d51341aee58043176e Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Sat, 18 Feb 2017 07:29:37 -0500 Subject: [PATCH 3/8] Add _management_thread to check queue_mgr failures tomMoral/loky#48 This extra thread checks that the _queue_manager_thread is alive and working. If not, it permits to avoid deadlocks and raise an appropriate Error. It also checks that the QueueFeederThread is alive. --- Lib/concurrent/futures/process.py | 161 ++++++++++++++++++++++++++-- Lib/multiprocessing/queues.py | 21 +++- Lib/test/_test_multiprocessing.py | 36 +++++++ Lib/test/test_concurrent_futures.py | 16 ++- 4 files changed, 218 insertions(+), 16 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 24c401d78b28ae..7c644050a7f46d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -7,11 +7,14 @@ |======================= In-process =====================|== Out-of-process ==| -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | + +------------------+ + | Thread watcher | + +------------------+ ++----------+ +----------+ | +-----------+ +---------+ +| | => | Work Ids | v | Call Q | | Process | +| | +----------+ +--------+ +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -52,6 +55,7 @@ from queue import Full import multiprocessing as mp from multiprocessing.connection import wait +from multiprocessing.queues import Queue import threading import weakref from functools import partial @@ -110,6 +114,7 @@ def _python_exit(): # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): @@ -151,6 +156,26 @@ def __init__(self, work_id, fn, args, kwargs): self.args = args self.kwargs = kwargs + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items): + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + work_item.future.set_exception(e) + del work_item + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -369,8 +394,101 @@ def shutdown_worker(): pass executor = None + +def _management_worker(executor_reference, queue_management_thread, processes, + pending_work_items, call_queue): + """Checks the state of the executor management and communications. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + queue_management_thread: the Queue manager thread of the Executor. It + is used to ensure that the management is still running. + process: A list of the ctx.Process instances used as workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + call_queue: A ctx.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + """ + executor = None + from time import sleep + + def is_shutting_down(): + return (_global_shutdown or executor is None + or executor._shutdown_thread) + + while True: + broken_qm = not queue_management_thread.is_alive() + + if broken_qm: + broken = (call_queue._thread is not None and + not call_queue._thread.is_alive()) + broken |= any([p.exitcode for p in processes.values()]) + if not broken: + cause_msg = ("The QueueManagerThread was terminated " + "abruptly while the future was running or " + "pending. This can be caused by an " + "unpickling error of a result.") + _shutdown_crash(executor_reference, processes, + pending_work_items, call_queue, cause_msg) + return + elif _is_crashed(call_queue._thread): + executor = executor_reference() + if is_shutting_down(): + mp.util.debug("shutting down") + return + executor = None + cause_msg = ("The QueueFeederThread was terminated abruptly " + "while feeding a new job. This can be due to " + "a job pickling error.") + _shutdown_crash(executor_reference, processes, pending_work_items, + call_queue, cause_msg) + return + executor = executor_reference() + if is_shutting_down(): + mp.util.debug("shutting down") + return + executor = None + sleep(.1) + + +def _shutdown_crash(executor_reference, processes, pending_work_items, + call_queue, cause_msg): + mp.util.info("Crash detected, marking executor as broken and terminating " + "worker processes. " + cause_msg) + executor = executor_reference() + if executor: + executor.broken = True + executor = None + call_queue.close() + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + while processes: + _, p = processes.popitem() + p.terminate() + p.join() + # All futures in flight must be marked failed + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception(BrokenProcessPool(cause_msg)) + # Delete references to object. See issue16284 + del work_item + pending_work_items.clear() + + +def _is_crashed(thread): + """helper to check if a thread is started for any version of python""" + if thread is None: + return False + return thread._started.is_set() and not thread.is_alive() + + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -436,17 +554,21 @@ def __init__(self, max_workers=None, ctx=None): if ctx is None: ctx = mp.get_context() self._ctx = ctx + self._pending_work_items = {} # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. - self._call_queue = ctx.Queue(self._max_workers + EXTRA_QUEUED_CALLS) + self._call_queue = _SafeQueue( + max_size=self._max_workers + EXTRA_QUEUED_CALLS, ctx=ctx, + pending_work_items=self._pending_work_items) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. self._call_queue._ignore_epipe = True self._result_queue = ctx.SimpleQueue() self._work_ids = queue.Queue() + self._management_thread = None self._queue_management_thread = None # Map of pids to processes self._processes = {} @@ -456,7 +578,6 @@ def __init__(self, max_workers=None, ctx=None): self._shutdown_lock = threading.Lock() self._broken = False self._queue_count = 0 - self._pending_work_items = {} # Permits to wake_up the queue_manager_thread independently of # result_queue state. This avoid deadlocks caused by the non @@ -472,7 +593,6 @@ def weakref_cb(_, wakeup=self._wakeup): if self._queue_management_thread is None: # Start the processes so that their sentinels are known. - self._adjust_process_count() self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), @@ -487,6 +607,29 @@ def weakref_cb(_, wakeup=self._wakeup): self._queue_management_thread.start() _threads_wakeup[self._queue_management_thread] = self._wakeup + def _start_thread_management_thread(self): + if self._management_thread is None: + mp.util.debug('_start_thread_management_thread called') + # Start the processes so that their sentinels are known. + self._management_thread = threading.Thread( + target=_management_worker, + args=(weakref.ref(self), + self._queue_management_thread, + self._processes, + self._pending_work_items, + self._call_queue), + name="ThreadManager") + self._management_thread.daemon = True + self._management_thread.start() + + def _ensure_executor_running(self): + """ensures all workers and management thread are running + """ + if len(self._processes) != self._max_workers: + self._adjust_process_count() + self._start_queue_management_thread() + self._start_thread_management_thread() + def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): p = self._ctx.Process( @@ -513,7 +656,7 @@ def submit(self, fn, *args, **kwargs): # Wake up queue management thread self._wakeup.set() - self._start_queue_management_thread() + self._ensure_executor_running() return f submit.__doc__ = _base.Executor.submit.__doc__ diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 328efbd95fe63d..d66d37a5c3e2eb 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -160,9 +160,10 @@ def _start_thread(self): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe), + self._wlock, self._writer.close, self._ignore_epipe, + self._on_queue_feeder_error), name='QueueFeederThread' - ) + ) self._thread.daemon = True debug('doing self._thread.start()') @@ -201,7 +202,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, + onerror): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -253,8 +255,17 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): info('error in queue thread: %s', e) return else: - import traceback - traceback.print_exc() + onerror(e, obj) + + @staticmethod + def _on_queue_feeder_error(e, obj): + """ + Private API hook called when feeding data in the background thread + raises an exception. For overriding by concurrent.futures. + """ + import traceback + traceback.print_exc() + _sentinel = object() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d6fe7d62675631..c2a653a402cd2a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -944,6 +944,42 @@ def __reduce__(self): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index d1cab014dbc6cd..404bb1f8731a2b 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -13,6 +13,7 @@ import time import unittest import weakref +from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( @@ -750,7 +751,6 @@ class TimingWrapper(object): """Creates a wrapper for a function which records the time it takes to finish """ - def __init__(self, func): self.func = func self.elapsed = None @@ -783,13 +783,19 @@ def _fail_on_deadlock(self, executor): faulthandler.dump_traceback(file=f) f.seek(0) tb = f.read() - executor.shutdown(wait=True, kill_workers=True) + for p in executor._processes.values(): + p.terminate() + executor.shutdown(wait=True) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Deadlock executor:\n\n{tb}") def test_crash(self): # extensive testing for deadlock caused by crash in a pool crash_cases = [ + # Check problem occuring while pickling a task in + # the task_handler thread + (id, (ExitAtPickle(),), BrokenProcessPool, "exit at task pickle"), + (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), # Check problem occuring while unpickling a task on workers (id, (ExitAtUnpickle(),), BrokenProcessPool, "exit at task unpickle"), @@ -812,6 +818,12 @@ def test_crash(self): "exit during result pickle on worker"), (_return_instance, (ErrorAtPickle,), BrokenProcessPool, "error during result pickle on worker"), + # Check problem occuring while unpickling a task in + # the result_handler thread + (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, + "exit during result unpickle in result_handler"), + (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, + "error during result unpickle in result_handler") ] for func, args, error, name in crash_cases: with self.subTest(name): From 820ea14fb4838f237e583f09a67c80bf8c757a46 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Mon, 27 Feb 2017 17:59:24 -0500 Subject: [PATCH 4/8] Improve shutdown to correctly flag canceled jobs Add a _ExecutorFlags object that hold the state of the ProcessPoolExecutor. This permits to introspect the executor state even after it has been gc and allow to handle correctly the Errors. It also introduces a ShutdownExecutorError for jobs that were cancel on shutdown. Also, this changes the `for` loop on `processes` to while loop to avoid concurrent dictionary updates errors. --- Lib/concurrent/futures/process.py | 182 +++++++++++++++++++--------- Lib/test/test_concurrent_futures.py | 59 +++++++-- 2 files changed, 174 insertions(+), 67 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7c644050a7f46d..76aa23d4ef0167 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -98,6 +98,31 @@ def get_and_unset(self): return s +class _ExecutorFlags(object): + """necessary references to maintain executor states without preventing gc + + It permits to keep the information needed by queue_management_thread + and management_thread to maintain the pool without preventing the garbage + collection of unreferenced executors. + """ + def __init__(self): + + self.shutdown = False + self.broken = False + self.kill_workers = False + self.shutdown_lock = threading.Lock() + + def flag_as_shutting_down(self, kill_workers=False): + with self.shutdown_lock: + self.shutdown = True + self.kill_workers = kill_workers + + def flag_as_broken(self): + with self.shutdown_lock: + self.shutdown = True + self.broken = True + + def _python_exit(): global _global_shutdown _global_shutdown = True @@ -264,7 +289,9 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, + executor_flags, processes, pending_work_items, work_ids_queue, @@ -279,6 +306,7 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. + executor_flags: Flags holding the state of the ProcessPoolExecutor. process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. @@ -293,21 +321,35 @@ def _queue_management_worker(executor_reference, """ executor = None - def shutting_down(): - return (_global_shutdown or executor is None - or executor._shutdown_thread) - - def shutdown_worker(): + def is_shutting_down(): + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that own this worker is not broken AND + # * The executor that owns this worker has been collected OR + # * The executor that owns this worker has been shutdown. + # If the executor is broken, it should be detected in the next loop. + return (_global_shutdown or + ((executor is None or executor_flags.shutdown) + and not executor_flags.broken)) + + def shutdown_all_workers(): + mp.util.debug("queue management thread shutting down") + executor_flags.flag_as_shutting_down() # This is an upper bound nb_children_alive = sum(p.is_alive() for p in processes.values()) for i in range(0, nb_children_alive): call_queue.put_nowait(None) + # Release the queue's resources as soon as possible. call_queue.close() + # If .join() is not called on the created processes then # some ctx.Queue methods may deadlock on Mac OS X. - for p in processes.values(): + while processes: + _, p = processes.popitem() p.join() + mp.util.debug("queue management thread clean shutdown of worker " + "processes: {}".format(processes)) reader = result_queue._reader @@ -334,36 +376,34 @@ def shutdown_worker(): result_item = reader.recv() elif len(ready) > 0: # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = True - executor._shutdown_thread = True - executor = None + executor_flags.flag_as_broken() + # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." + "A process in the process pool was terminated abruptly" + " while the future was running or pending." )) # Delete references to object. See issue16284 del work_item pending_work_items.clear() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. - for p in processes.values(): + while processes: + _, p = processes.popitem() p.terminate() - shutdown_worker() + p.join() + shutdown_all_workers() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() + assert is_shutting_down() p = processes.pop(result_item) p.join() if not processes: - shutdown_worker() + shutdown_all_workers() return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) @@ -381,21 +421,34 @@ def shutdown_worker(): # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): - try: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_worker() - return - except Full: - # This is not a problem: we will eventually be woken up (in - # result_queue.get()) and be able to send a sentinel again. - pass + if is_shutting_down(): + if executor_flags.kill_workers: + while pending_work_items: + _, work_item = pending_work_items.popitem() + work_item.future.set_exception(ShutdownExecutorError( + "The Executor was shutdown before this job could " + "complete.")) + del work_item + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + while processes: + _, p = processes.popitem() + p.terminate() + p.join() + shutdown_all_workers() + return + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_all_workers() + return + elif executor_flags.broken: + return executor = None -def _management_worker(executor_reference, queue_management_thread, processes, +def _management_worker(executor_reference, executor_flags, + queue_management_thread, processes, pending_work_items, call_queue): """Checks the state of the executor management and communications. @@ -405,6 +458,7 @@ def _management_worker(executor_reference, queue_management_thread, processes, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. + executor_flags: Flags holding the state of the ProcessPoolExecutor. queue_management_thread: the Queue manager thread of the Executor. It is used to ensure that the management is still running. process: A list of the ctx.Process instances used as workers. @@ -417,8 +471,8 @@ def _management_worker(executor_reference, queue_management_thread, processes, from time import sleep def is_shutting_down(): - return (_global_shutdown or executor is None - or executor._shutdown_thread) + return (_global_shutdown or executor_flags.shutdown or + (executor is None and not queue_management_thread.is_alive())) while True: broken_qm = not queue_management_thread.is_alive() @@ -432,7 +486,7 @@ def is_shutting_down(): "abruptly while the future was running or " "pending. This can be caused by an " "unpickling error of a result.") - _shutdown_crash(executor_reference, processes, + _shutdown_crash(executor_flags, processes, pending_work_items, call_queue, cause_msg) return elif _is_crashed(call_queue._thread): @@ -444,7 +498,7 @@ def is_shutting_down(): cause_msg = ("The QueueFeederThread was terminated abruptly " "while feeding a new job. This can be due to " "a job pickling error.") - _shutdown_crash(executor_reference, processes, pending_work_items, + _shutdown_crash(executor_flags, processes, pending_work_items, call_queue, cause_msg) return executor = executor_reference() @@ -455,14 +509,11 @@ def is_shutting_down(): sleep(.1) -def _shutdown_crash(executor_reference, processes, pending_work_items, +def _shutdown_crash(executor_flags, processes, pending_work_items, call_queue, cause_msg): mp.util.info("Crash detected, marking executor as broken and terminating " "worker processes. " + cause_msg) - executor = executor_reference() - if executor: - executor.broken = True - executor = None + executor_flags.flag_as_broken() call_queue.close() # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. @@ -525,12 +576,17 @@ def _chain_from_iterable_of_lists(iterable): class BrokenProcessPool(RuntimeError): - """ - Raised when a process in a ProcessPoolExecutor terminated abruptly + """Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. """ +class ShutdownExecutorError(RuntimeError): + """Raised when a ProcessPoolExecutor is shutdown while a future was in the + running or pending state. + """ + + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None, ctx=None): """Initializes a new ProcessPoolExecutor instance. @@ -585,6 +641,10 @@ def __init__(self, max_workers=None, ctx=None): # _result_queue write lock. self._wakeup = _Sentinel() + # Flag to hold the state of the Executor. This permits to introspect + # the Executor state even once it has been garbage collected. + self._flags = _ExecutorFlags() + def _start_queue_management_thread(self): # When the executor gets lost, the weakref callback will wake up # the queue management thread. @@ -596,6 +656,7 @@ def weakref_cb(_, wakeup=self._wakeup): self._queue_management_thread = threading.Thread( target=_queue_management_worker, args=(weakref.ref(self, weakref_cb), + self._flags, self._processes, self._pending_work_items, self._work_ids, @@ -614,6 +675,7 @@ def _start_thread_management_thread(self): self._management_thread = threading.Thread( target=_management_worker, args=(weakref.ref(self), + self._flags, self._queue_management_thread, self._processes, self._pending_work_items, @@ -640,12 +702,14 @@ def _adjust_process_count(self): self._processes[p.pid] = p def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenProcessPool('A child process terminated ' - 'abruptly, the process pool is not usable anymore') - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') + with self._flags.shutdown_lock: + if self._flags.broken: + raise BrokenProcessPool( + 'A child process terminated abruptly, the process pool is' + ' not usable anymore') + if self._flags.shutdown: + raise ShutdownExecutorError( + 'cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -689,24 +753,28 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True + def shutdown(self, wait=True, kill_workers=False): + mp.util.debug('shutting down executor %s' % self) + self._flags.flag_as_shutting_down(kill_workers) if self._queue_management_thread: # Wake up queue management thread self._wakeup.set() - if wait: + if wait and self._queue_management_thread.is_alive(): self._queue_management_thread.join() - # To reduce the risk of opening too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - if self._call_queue is not None: + if self._management_thread: + if wait and self._management_thread.is_alive(): + self._management_thread.join() + if self._call_queue: self._call_queue.close() if wait: self._call_queue.join_thread() - self._call_queue = None + # To reduce the risk of opening too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._management_thread = None + self._call_queue = None self._result_queue = None - self._processes = None + self._processes.clear() shutdown.__doc__ = _base.Executor.shutdown.__doc__ atexit.register(_python_exit) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 404bb1f8731a2b..fb5b900ccae789 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -18,7 +18,7 @@ from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.process import BrokenProcessPool, ShutdownExecutorError from multiprocessing import get_context @@ -52,6 +52,11 @@ def sleep_and_print(t, msg): sys.stdout.flush() +def sleep_and_return(t, x): + time.sleep(t) + return x + + class MyObject(object): def my_method(self): pass @@ -231,6 +236,27 @@ class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, BaseTestCa def _prime_executor(self): pass + @staticmethod + def _id(x): + return x + + def test_shutdown(self): + processes = [p for p in self.executor._processes.values()] + self.assertEqual(self.executor.submit(self._id, 42).result(), 42) + self.executor.shutdown(wait=True) + + self.assertListEqual([p.exitcode for p in processes], + [0 for _ in processes]) + + def test_shutdown_wait(self): + processes = [p for p in self.executor._processes.values()] + f = self.executor.submit(sleep_and_return, .5, 42) + self.executor.shutdown(wait=True) + self.assertEqual(f.result(), 42) + + self.assertListEqual([p.exitcode for p in processes], + [0 for _ in processes]) + def test_processes_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) @@ -769,11 +795,6 @@ class ExecutorDeadlockTest: # faulthandler will print the traceback and exit. TIMEOUT = 15 - @classmethod - def _sleep_id(cls, x, delay): - time.sleep(delay) - return x - def _fail_on_deadlock(self, executor): # If we did not recover before TIMEOUT seconds, # consider that the executor is in a deadlock state @@ -783,9 +804,7 @@ def _fail_on_deadlock(self, executor): faulthandler.dump_traceback(file=f) f.seek(0) tb = f.read() - for p in executor._processes.values(): - p.terminate() - executor.shutdown(wait=True) + executor.shutdown(wait=True, kill_workers=False) print(f"\nTraceback:\n {tb}", file=sys.__stderr__) self.fail(f"Deadlock executor:\n\n{tb}") @@ -882,8 +901,9 @@ def test_crash_races(self): self._test_getpid, [None] * n_proc)] assert None not in pids res = self.executor.map( - self._sleep_id, repeat(True, 2 * n_proc), + sleep_and_return, [.001 * (j // 2) for j in range(2 * n_proc)], + repeat(True, 2 * n_proc), chunksize=1) assert all(res) res = executor.map(self._test_kill_worker, pids[::-1], @@ -907,6 +927,25 @@ def test_shutdown_deadlock(self): time.sleep(.01) executor.shutdown() + def test_shutdown_kill(self): + """ProcessPoolExecutor does not wait job end and shutdowm with + kill_worker flag set to True. Ensure that the future returns + ShutdownExecutorError. + """ + from itertools import repeat + executor = self.executor_type(max_workers=2, + ctx=get_context(self.ctx)) + res1 = executor.map(sleep_and_return, repeat(.001), range(50)) + res2 = executor.map(sleep_and_return, repeat(.1), range(50)) + assert list(res1) == list(range(50)) + # We should get an error as the executor shutdowned before we fetched + # the results from the operation. + shutdown = TimingWrapper(executor.shutdown) + shutdown(wait=True, kill_workers=True) + assert shutdown.elapsed < .5 + with self.assertRaises(ShutdownExecutorError): + list(res2) + class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, ExecutorDeadlockTest, From 37e928cd9e202c654dd068a9f26ce705e7504f16 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Mon, 26 Jun 2017 12:47:34 +0200 Subject: [PATCH 5/8] CLN add NEWS.d entry --- .../next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst diff --git a/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst b/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst new file mode 100644 index 00000000000000..b59c9067bf908b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst @@ -0,0 +1,4 @@ +Add context argument for `concurrent.fututres.ProcessPoolExecutor`. Make +`ProcessPoolExecutor` more robust to faulty user code such as unpicklable +items and fix other race conditions. Add tests for possible deadlocks in +`ProcessPoolExecutor`. Patch by Thomas Moreau and Olivier Grisel From ad34f5612f65af65dfd8f2a5f54cd06aaed0908a Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 8 Sep 2017 15:45:29 +0200 Subject: [PATCH 6/8] FIX test broken by rebasing --- Lib/test/test_concurrent_futures.py | 34 ++++++++++++----------------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index fb5b900ccae789..9c0ad047171813 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -66,7 +66,7 @@ def make_dummy_object(_): return MyObject() -class BaseTestCase(BaseTestCase): +class BaseTestCase(unittest.TestCase): def setUp(self): self._thread_key = test.support.threading_setup() @@ -83,7 +83,13 @@ def setUp(self): self.t1 = time.time() try: - self.executor = self.executor_type(max_workers=self.worker_count) + if hasattr(self, "ctx"): + self.executor = self.executor_type( + max_workers=self.worker_count, + ctx=get_context(self.ctx)) + else: + self.executor = self.executor_type( + max_workers=self.worker_count) except NotImplementedError as e: self.skipTest(str(e)) self._prime_executor() @@ -113,21 +119,8 @@ class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor -class ProcessPoolMixin(ExecutorMixin): +class ProcessPoolForkMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor - - def setUp(self): - self.t1 = time.time() - try: - self.executor = self.executor_type( - max_workers=self.worker_count, - ctx=get_context(self.ctx)) - except NotImplementedError as e: - self.skipTest(str(e)) - self._prime_executor() - - -class ProcessPoolForkMixin(ProcessPoolMixin): ctx = "fork" def setUp(self): @@ -136,11 +129,13 @@ def setUp(self): super().setUp() -class ProcessPoolSpawnMixin(ProcessPoolMixin): +class ProcessPoolSpawnMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor ctx = "spawn" -class ProcessPoolForkserverMixin(ProcessPoolMixin): +class ProcessPoolForkserverMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor ctx = "forkserver" def setUp(self): @@ -232,7 +227,7 @@ def test_thread_names_default(self): t.join() -class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, BaseTestCase): +class ProcessPoolShutdownTest(ExecutorShutdownTest): def _prime_executor(self): pass @@ -519,7 +514,6 @@ class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase pass - class ProcessPoolForkAsCompletedTests(ProcessPoolForkMixin, AsCompletedTests, BaseTestCase): pass From 3b07a2bcc21003c1d61dab9a3da7ae1dc81f157e Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 8 Sep 2017 18:12:32 +0200 Subject: [PATCH 7/8] ENH improve shutdown robustness from loky experience --- Lib/concurrent/futures/process.py | 274 ++++++++++++++++------------ Lib/test/test_concurrent_futures.py | 16 +- 2 files changed, 167 insertions(+), 123 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 76aa23d4ef0167..31732cd7af448a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -50,9 +50,10 @@ import atexit import os +import sys from concurrent.futures import _base import queue -from queue import Full +from pickle import PicklingError import multiprocessing as mp from multiprocessing.connection import wait from multiprocessing.queues import Queue @@ -61,15 +62,17 @@ from functools import partial import itertools import traceback +import time -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a + +# Workers are created as daemon threads and processes. This is done to allow +# the interpreter to exit when there are still idle processes in a # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, # allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. +# - The workers would still be running during interpreter shutdown, meaning +# that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could be +# bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the @@ -102,8 +105,8 @@ class _ExecutorFlags(object): """necessary references to maintain executor states without preventing gc It permits to keep the information needed by queue_management_thread - and management_thread to maintain the pool without preventing the garbage - collection of unreferenced executors. + and crash_detection_thread to maintain the pool without preventing the + garbage collection of unreferenced executors. """ def __init__(self): @@ -133,6 +136,25 @@ def _python_exit(): for t, _ in items: t.join() + +def _flag_current_thread_clean_exit(): + """Put a ``_clean_exit`` flag on the current thread.""" + thread = threading.current_thread() + thread._clean_exit = True + + +def _is_crashed(thread): + """helper to check if a thread has started and then crashed. + + This thread target should call _flag_current_thread_clean_exit when it + exits cleanly to avoid false alarms. + """ + if thread is None: + return False + terminate = thread._started.is_set() and not thread.is_alive() + return terminate and not getattr(thread, "_clean_exit", False) + + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently @@ -145,22 +167,27 @@ def _python_exit(): class _RemoteTraceback(Exception): def __init__(self, tb): self.tb = tb + def __str__(self): return self.tb + class _ExceptionWithTraceback: def __init__(self, exc, tb): tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) self.exc = exc self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): return _rebuild_exc, (self.exc, self.tb) + def _rebuild_exc(exc, tb): exc.__cause__ = _RemoteTraceback(tb) return exc + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -168,12 +195,14 @@ def __init__(self, future, fn, args, kwargs): self.args = args self.kwargs = kwargs + class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): self.work_id = work_id self.exception = exception self.result = result + class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id @@ -184,7 +213,8 @@ def __init__(self, work_id, fn, args, kwargs): class _SafeQueue(Queue): """Safe Queue set exception to the future object linked to a job""" - def __init__(self, max_size=0, *, ctx, pending_work_items): + def __init__(self, max_size=0, *, ctx, pending_work_items, wakeup): + self.wakeup = wakeup self.pending_work_items = pending_work_items super().__init__(max_size, ctx=ctx) @@ -197,6 +227,7 @@ def _on_queue_feeder_error(self, e, obj): if work_item is not None: work_item.future.set_exception(e) del work_item + self.wakeup.set() else: super()._on_queue_feeder_error(e, obj) @@ -210,6 +241,7 @@ def _get_chunks(*iterables, chunksize): return yield chunk + def _process_chunk(fn, chunk): """ Processes a chunk of an iterable passed to map. @@ -221,6 +253,7 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. @@ -235,7 +268,7 @@ def _process_worker(call_queue, result_queue): while True: call_item = call_queue.get(block=True) if call_item is None: - # Wake up queue management thread + # Notify queue management thread about clean worker shutdown result_queue.put(os.getpid()) return try: @@ -244,8 +277,14 @@ def _process_worker(call_queue, result_queue): exc = _ExceptionWithTraceback(e, e.__traceback__) result_queue.put(_ResultItem(call_item.work_id, exception=exc)) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + try: + result_queue.put(_ResultItem(call_item.work_id, result=r)) + except PicklingError as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + except BaseException as e: + traceback.print_exc() + sys.exit(1) # Liberate the resource as soon as possible, to avoid holding onto # open files or shared memory that is not needed anymore @@ -306,7 +345,9 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. - executor_flags: Flags holding the state of the ProcessPoolExecutor. + executor_flags: A ExecutorFlags holding internal states of the + ProcessPoolExecutor. It permits to know if the executor is broken + even the object has been gc. process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. @@ -335,12 +376,18 @@ def is_shutting_down(): def shutdown_all_workers(): mp.util.debug("queue management thread shutting down") executor_flags.flag_as_shutting_down() - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) + # Create a list to avoid RuntimeError due to concurrent modification of + # processe. nb_children_alive is thus an upper bound + nb_children_alive = sum(p.is_alive() for p in list(processes.values())) for i in range(0, nb_children_alive): call_queue.put_nowait(None) - # Release the queue's resources as soon as possible. + # Release the queue's resources as soon as possible. Flag the feeder + # thread for clean exit to avoid the manager_thread flagging the + # Executor as broken during the shutdown. This is safe as either: + # * We don't need to communicate with the workers anymore + # * There is nothing left in the Queue buffer except None sentinels + call_queue._thread._clean_exit = True call_queue.close() # If .join() is not called on the created processes then @@ -388,6 +435,7 @@ def shutdown_all_workers(): # Delete references to object. See issue16284 del work_item pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. while processes: @@ -395,6 +443,7 @@ def shutdown_all_workers(): p.terminate() p.join() shutdown_all_workers() + _flag_current_thread_clean_exit() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID @@ -436,20 +485,22 @@ def shutdown_all_workers(): p.terminate() p.join() shutdown_all_workers() + _flag_current_thread_clean_exit() return # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: shutdown_all_workers() + _flag_current_thread_clean_exit() return elif executor_flags.broken: return executor = None -def _management_worker(executor_reference, executor_flags, - queue_management_thread, processes, - pending_work_items, call_queue): +def _crash_detection_worker(executor_reference, executor_flags, + queue_management_thread, processes, + pending_work_items, call_queue): """Checks the state of the executor management and communications. This function is run in a local thread. @@ -461,52 +512,43 @@ def _management_worker(executor_reference, executor_flags, executor_flags: Flags holding the state of the ProcessPoolExecutor. queue_management_thread: the Queue manager thread of the Executor. It is used to ensure that the management is still running. - process: A list of the ctx.Process instances used as workers. + processes: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. """ - executor = None - from time import sleep - - def is_shutting_down(): - return (_global_shutdown or executor_flags.shutdown or - (executor is None and not queue_management_thread.is_alive())) - while True: - broken_qm = not queue_management_thread.is_alive() - - if broken_qm: - broken = (call_queue._thread is not None and - not call_queue._thread.is_alive()) - broken |= any([p.exitcode for p in processes.values()]) - if not broken: - cause_msg = ("The QueueManagerThread was terminated " - "abruptly while the future was running or " - "pending. This can be caused by an " - "unpickling error of a result.") - _shutdown_crash(executor_flags, processes, - pending_work_items, call_queue, cause_msg) + + if _is_crashed(queue_management_thread): + cause_msg = ("The QueueManagerThread was terminated " + "abruptly while the future was running or " + "pending. This can be caused by an " + "unpickling error of a result.") + _shutdown_crash(executor_flags, processes, + pending_work_items, call_queue, cause_msg) return + elif _is_crashed(call_queue._thread): - executor = executor_reference() - if is_shutting_down(): - mp.util.debug("shutting down") - return - executor = None cause_msg = ("The QueueFeederThread was terminated abruptly " "while feeding a new job. This can be due to " "a job pickling error.") _shutdown_crash(executor_flags, processes, pending_work_items, call_queue, cause_msg) return - executor = executor_reference() - if is_shutting_down(): + if getattr(queue_management_thread, "_clean_exit", False): mp.util.debug("shutting down") return + + # Detect if all the worker timed out while a new job was submitted and + # launch new workers if it is the case. + executor = executor_reference() + if (executor is not None and len(processes) == 0 and + len(executor._pending_work_items) > 0): + mp.util.debug("All workers timed out. Adjusting process count.") + executor._adjust_process_count() executor = None - sleep(.1) + time.sleep(.1) def _shutdown_crash(executor_flags, processes, pending_work_items, @@ -515,27 +557,22 @@ def _shutdown_crash(executor_flags, processes, pending_work_items, "worker processes. " + cause_msg) executor_flags.flag_as_broken() call_queue.close() + # All futures in flight must be marked failed. We do that before killing + # the processes so that when process get killed, queue_manager_thread is + # woken up and realizes it can shutdown + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception(BrokenProcessPool(cause_msg)) + # Delete references to object. See issue16284 + del work_item # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. while processes: _, p = processes.popitem() p.terminate() p.join() - # All futures in flight must be marked failed - for work_id, work_item in pending_work_items.items(): - work_item.future.set_exception(BrokenProcessPool(cause_msg)) - # Delete references to object. See issue16284 - del work_item pending_work_items.clear() -def _is_crashed(thread): - """helper to check if a thread is started for any version of python""" - if thread is None: - return False - return thread._started.is_set() and not thread.is_alive() - - _system_limits_checked = False _system_limited = None @@ -552,7 +589,7 @@ def _check_system_limits(): # sysconf not available or setting not available return if nsems_max == -1: - # indetermined limit, assume that limit is determined + # undetermined limit, assume that limit is determined # by available memory only return if nsems_max >= 256: @@ -588,15 +625,15 @@ class ShutdownExecutorError(RuntimeError): class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None, ctx=None): + def __init__(self, max_workers=None, context=None): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. - ctx: A multiprocessing context to launch the workers. This object - should provide SimpleQueue, Queue and Process. + context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. """ _check_system_limits() @@ -605,35 +642,20 @@ def __init__(self, max_workers=None, ctx=None): else: if max_workers <= 0: raise ValueError("max_workers must be greater than 0") - self._max_workers = max_workers - if ctx is None: - ctx = mp.get_context() - self._ctx = ctx - self._pending_work_items = {} - # Make the call queue slightly larger than the number of processes to - # prevent the worker processes from idling. But don't make it too big - # because futures in the call queue cannot be cancelled. - self._call_queue = _SafeQueue( - max_size=self._max_workers + EXTRA_QUEUED_CALLS, ctx=ctx, - pending_work_items=self._pending_work_items) - # Killed worker processes can produce spurious "broken pipe" - # tracebacks in the queue's own worker thread. But we detect killed - # processes anyway, so silence the tracebacks. - self._call_queue._ignore_epipe = True - self._result_queue = ctx.SimpleQueue() - self._work_ids = queue.Queue() - self._management_thread = None - self._queue_management_thread = None - # Map of pids to processes - self._processes = {} + if context is None: + context = mp.get_context() + self._context = context - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False + # Internal variables of the ProcessPoolExecutor + self._processes = {} self._queue_count = 0 + self._pending_work_items = {} + self._work_ids = queue.Queue() + self._processes_management_lock = self._context.Lock() + self._crash_detection_thread = None + self._queue_management_thread = None # Permits to wake_up the queue_manager_thread independently of # result_queue state. This avoid deadlocks caused by the non @@ -645,13 +667,32 @@ def __init__(self, max_workers=None, ctx=None): # the Executor state even once it has been garbage collected. self._flags = _ExecutorFlags() + # Finally setup the queues for interprocess communication + self._setup_queues() + + def _setup_queues(self): + # Make the call queue slightly larger than the number of processes to + # prevent the worker processes from idling. But don't make it too big + # because futures in the call queue cannot be cancelled. + queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, pending_work_items=self._pending_work_items, + wakeup=self._wakeup, ctx=self._context) + # Killed worker processes can produce spurious "broken pipe" + # tracebacks in the queue's own worker thread. But we detect killed + # processes anyway, so silence the tracebacks. + self._call_queue._ignore_epipe = True + + self._result_queue = self._context.SimpleQueue() + def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, wakeup=self._wakeup): - wakeup.set() if self._queue_management_thread is None: + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, wakeup=self._wakeup): + wakeup.set() + # Start the processes so that their sentinels are known. self._queue_management_thread = threading.Thread( target=_queue_management_worker, @@ -666,14 +707,17 @@ def weakref_cb(_, wakeup=self._wakeup): name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() + + # register this executor in a mechanism that ensures it will wakeup + # when the interpreter is exiting. _threads_wakeup[self._queue_management_thread] = self._wakeup - def _start_thread_management_thread(self): - if self._management_thread is None: + def _start_crash_detection_thread(self): + if self._crash_detection_thread is None: mp.util.debug('_start_thread_management_thread called') # Start the processes so that their sentinels are known. - self._management_thread = threading.Thread( - target=_management_worker, + self._crash_detection_thread = threading.Thread( + target=_crash_detection_worker, args=(weakref.ref(self), self._flags, self._queue_management_thread, @@ -681,8 +725,16 @@ def _start_thread_management_thread(self): self._pending_work_items, self._call_queue), name="ThreadManager") - self._management_thread.daemon = True - self._management_thread.start() + self._crash_detection_thread.daemon = True + self._crash_detection_thread.start() + + def _adjust_process_count(self): + for _ in range(len(self._processes), self._max_workers): + p = self._context.Process( + target=_process_worker, + args=(self._call_queue, self._result_queue)) + p.start() + self._processes[p.pid] = p def _ensure_executor_running(self): """ensures all workers and management thread are running @@ -690,16 +742,7 @@ def _ensure_executor_running(self): if len(self._processes) != self._max_workers: self._adjust_process_count() self._start_queue_management_thread() - self._start_thread_management_thread() - - def _adjust_process_count(self): - for _ in range(len(self._processes), self._max_workers): - p = self._ctx.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue)) - p.start() - self._processes[p.pid] = p + self._start_crash_detection_thread() def submit(self, fn, *args, **kwargs): with self._flags.shutdown_lock: @@ -734,7 +777,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. - If set to one, the items in the list will be sent one at a time. + If set to one, the items in the list will be sent one at a + time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -761,9 +805,9 @@ def shutdown(self, wait=True, kill_workers=False): self._wakeup.set() if wait and self._queue_management_thread.is_alive(): self._queue_management_thread.join() - if self._management_thread: - if wait and self._management_thread.is_alive(): - self._management_thread.join() + if self._crash_detection_thread: + if wait and self._crash_detection_thread.is_alive(): + self._crash_detection_thread.join() if self._call_queue: self._call_queue.close() if wait: @@ -771,7 +815,7 @@ def shutdown(self, wait=True, kill_workers=False): # To reduce the risk of opening too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None - self._management_thread = None + self._crash_detection_thread = None self._call_queue = None self._result_queue = None self._processes.clear() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 9c0ad047171813..c59b15942582df 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -86,7 +86,7 @@ def setUp(self): if hasattr(self, "ctx"): self.executor = self.executor_type( max_workers=self.worker_count, - ctx=get_context(self.ctx)) + context=get_context(self.ctx)) else: self.executor = self.executor_type( max_workers=self.worker_count) @@ -829,7 +829,7 @@ def test_crash(self): "crash during result pickle on worker"), (_return_instance, (ExitAtPickle,), BrokenProcessPool, "exit during result pickle on worker"), - (_return_instance, (ErrorAtPickle,), BrokenProcessPool, + (_return_instance, (ErrorAtPickle,), PicklingError, "error during result pickle on worker"), # Check problem occuring while unpickling a task in # the result_handler thread @@ -845,8 +845,8 @@ def test_crash(self): # skip the test involving pickle errors with thread as the # tasks and results are not pickled in this case with test.support.captured_stderr(): - executor = self.executor_type(max_workers=2, - ctx=get_context(self.ctx)) + executor = self.executor_type( + max_workers=2, context=get_context(self.ctx)) res = executor.submit(func, *args) with self.assertRaises(error): try: @@ -884,8 +884,8 @@ def test_crash_races(self): with self.subTest(n_proc=n_proc): # Test for external crash signal comming from neighbor # with various race setup - executor = self.executor_type(max_workers=2, - ctx=get_context(self.ctx)) + executor = self.executor_type( + max_workers=2, context=get_context(self.ctx)) try: raise AttributeError() pids = [p.pid for p in executor._processes] @@ -916,7 +916,7 @@ def test_shutdown_deadlock(self): # if a worker failed with self.executor_type(max_workers=2, - ctx=get_context(self.ctx)) as executor: + context=get_context(self.ctx)) as executor: executor.submit(self._test_kill_worker, ()) time.sleep(.01) executor.shutdown() @@ -928,7 +928,7 @@ def test_shutdown_kill(self): """ from itertools import repeat executor = self.executor_type(max_workers=2, - ctx=get_context(self.ctx)) + context=get_context(self.ctx)) res1 = executor.map(sleep_and_return, repeat(.001), range(50)) res2 = executor.map(sleep_and_return, repeat(.1), range(50)) assert list(res1) == list(range(50)) From 16ffe65e9c98ddc9df138188d7263c498d44c674 Mon Sep 17 00:00:00 2001 From: Thomas Moreau Date: Fri, 8 Sep 2017 18:49:58 +0200 Subject: [PATCH 8/8] FIX avoid launching multiple tests --- Lib/test/test_concurrent_futures.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c59b15942582df..dc43c722d103d2 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -157,8 +157,9 @@ def test_interpreter_shutdown(self): from concurrent.futures import {executor_type} from time import sleep from test.test_concurrent_futures import sleep_and_print - t = {executor_type}(5) - t.submit(sleep_and_print, 1.0, "apple") + if __name__ == "__main__": + t = {executor_type}(5) + t.submit(sleep_and_print, 1.0, "apple") """.format(executor_type=self.executor_type.__name__)) # Errors in atexit hooks don't change the process exit code, check # stderr manually.