diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 50ee296ac89b78..31732cd7af448a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -7,11 +7,14 @@ |======================= In-process =====================|== Out-of-process ==| -+----------+ +----------+ +--------+ +-----------+ +---------+ -| | => | Work Ids | => | | => | Call Q | => | | -| | +----------+ | | +-----------+ | | -| | | ... | | | | ... | | | -| | | 6 | | | | 5, call() | | | + +------------------+ + | Thread watcher | + +------------------+ ++----------+ +----------+ | +-----------+ +---------+ +| | => | Work Ids | v | Call Q | | Process | +| | +----------+ +--------+ +-----------+ | Pool | +| | | ... | | | | ... | +---------+ +| | | 6 | => | | => | 5, call() | => | | | | | 7 | | | | ... | | | | Process | | ... | | Local | +-----------+ | Process | | Pool | +----------+ | Worker | | #1..n | @@ -47,71 +50,144 @@ import atexit import os +import sys from concurrent.futures import _base import queue -from queue import Full -import multiprocessing -from multiprocessing import SimpleQueue +from pickle import PicklingError +import multiprocessing as mp from multiprocessing.connection import wait +from multiprocessing.queues import Queue import threading import weakref from functools import partial import itertools import traceback +import time -# Workers are created as daemon threads and processes. This is done to allow the -# interpreter to exit when there are still idle processes in a + +# Workers are created as daemon threads and processes. This is done to allow +# the interpreter to exit when there are still idle processes in a # ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However, # allowing workers to die with the interpreter has two undesirable properties: -# - The workers would still be running during interpreter shutdown, -# meaning that they would fail in unpredictable ways. -# - The workers could be killed while evaluating a work item, which could -# be bad if the callable being evaluated has external side-effects e.g. +# - The workers would still be running during interpreter shutdown, meaning +# that they would fail in unpredictable ways. +# - The workers could be killed while evaluating a work item, which could be +# bad if the callable being evaluated has external side-effects e.g. # writing to a file. # # To work around this problem, an exit handler is installed which tells the # workers to exit when their work queues are empty and then waits until the # threads/processes finish. -_threads_queues = weakref.WeakKeyDictionary() -_shutdown = False +_threads_wakeup = weakref.WeakKeyDictionary() +_global_shutdown = False + +_POLL_TIMEOUT = .001 + + +class _Sentinel: + __slot__ = ["_state"] + + def __init__(self): + self._state = False + + def set(self): + self._state = True + + def get_and_unset(self): + s = self._state + if s: + self._state = False + return s + + +class _ExecutorFlags(object): + """necessary references to maintain executor states without preventing gc + + It permits to keep the information needed by queue_management_thread + and crash_detection_thread to maintain the pool without preventing the + garbage collection of unreferenced executors. + """ + def __init__(self): + + self.shutdown = False + self.broken = False + self.kill_workers = False + self.shutdown_lock = threading.Lock() + + def flag_as_shutting_down(self, kill_workers=False): + with self.shutdown_lock: + self.shutdown = True + self.kill_workers = kill_workers + + def flag_as_broken(self): + with self.shutdown_lock: + self.shutdown = True + self.broken = True + def _python_exit(): - global _shutdown - _shutdown = True - items = list(_threads_queues.items()) - for t, q in items: - q.put(None) - for t, q in items: + global _global_shutdown + _global_shutdown = True + items = list(_threads_wakeup.items()) + for t, wakeup in items: + if t.is_alive(): + wakeup.set() + for t, _ in items: t.join() + +def _flag_current_thread_clean_exit(): + """Put a ``_clean_exit`` flag on the current thread.""" + thread = threading.current_thread() + thread._clean_exit = True + + +def _is_crashed(thread): + """helper to check if a thread has started and then crashed. + + This thread target should call _flag_current_thread_clean_exit when it + exits cleanly to avoid false alarms. + """ + if thread is None: + return False + terminate = thread._started.is_set() and not thread.is_alive() + return terminate and not getattr(thread, "_clean_exit", False) + + # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for # work while a larger number will make Future.cancel() succeed less frequently # (Futures in the call queue cannot be cancelled). EXTRA_QUEUED_CALLS = 1 + # Hack to embed stringification of remote traceback in local traceback class _RemoteTraceback(Exception): def __init__(self, tb): self.tb = tb + def __str__(self): return self.tb + class _ExceptionWithTraceback: def __init__(self, exc, tb): tb = traceback.format_exception(type(exc), exc, tb) tb = ''.join(tb) self.exc = exc self.tb = '\n"""\n%s"""' % tb + def __reduce__(self): return _rebuild_exc, (self.exc, self.tb) + def _rebuild_exc(exc, tb): exc.__cause__ = _RemoteTraceback(tb) return exc + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): self.future = future @@ -119,12 +195,14 @@ def __init__(self, future, fn, args, kwargs): self.args = args self.kwargs = kwargs + class _ResultItem(object): def __init__(self, work_id, exception=None, result=None): self.work_id = work_id self.exception = exception self.result = result + class _CallItem(object): def __init__(self, work_id, fn, args, kwargs): self.work_id = work_id @@ -132,6 +210,28 @@ def __init__(self, work_id, fn, args, kwargs): self.args = args self.kwargs = kwargs + +class _SafeQueue(Queue): + """Safe Queue set exception to the future object linked to a job""" + def __init__(self, max_size=0, *, ctx, pending_work_items, wakeup): + self.wakeup = wakeup + self.pending_work_items = pending_work_items + super().__init__(max_size, ctx=ctx) + + def _on_queue_feeder_error(self, e, obj): + if isinstance(obj, _CallItem): + tb = traceback.format_exception(type(e), e, e.__traceback__) + e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) + work_item = self.pending_work_items.pop(obj.work_id, None) + # work_item can be None if another process terminated (see above) + if work_item is not None: + work_item.future.set_exception(e) + del work_item + self.wakeup.set() + else: + super()._on_queue_feeder_error(e, obj) + + def _get_chunks(*iterables, chunksize): """ Iterates over zip()ed iterables in chunks. """ it = zip(*iterables) @@ -141,6 +241,7 @@ def _get_chunks(*iterables, chunksize): return yield chunk + def _process_chunk(fn, chunk): """ Processes a chunk of an iterable passed to map. @@ -152,23 +253,22 @@ def _process_chunk(fn, chunk): """ return [fn(*args) for args in chunk] + def _process_worker(call_queue, result_queue): """Evaluates calls from call_queue and places the results in result_queue. This worker is run in a separate process. Args: - call_queue: A multiprocessing.Queue of _CallItems that will be read and + call_queue: A ctx.Queue of _CallItems that will be read and evaluated by the worker. - result_queue: A multiprocessing.Queue of _ResultItems that will written + result_queue: A ctx.Queue of _ResultItems that will written to by the worker. - shutdown: A multiprocessing.Event that will be set as a signal to the - worker that it should exit when call_queue is empty. """ while True: call_item = call_queue.get(block=True) if call_item is None: - # Wake up queue management thread + # Notify queue management thread about clean worker shutdown result_queue.put(os.getpid()) return try: @@ -177,8 +277,19 @@ def _process_worker(call_queue, result_queue): exc = _ExceptionWithTraceback(e, e.__traceback__) result_queue.put(_ResultItem(call_item.work_id, exception=exc)) else: - result_queue.put(_ResultItem(call_item.work_id, - result=r)) + try: + result_queue.put(_ResultItem(call_item.work_id, result=r)) + except PicklingError as e: + exc = _ExceptionWithTraceback(e, e.__traceback__) + result_queue.put(_ResultItem(call_item.work_id, exception=exc)) + except BaseException as e: + traceback.print_exc() + sys.exit(1) + + # Liberate the resource as soon as possible, to avoid holding onto + # open files or shared memory that is not needed anymore + del call_item + def _add_call_item_to_queue(pending_work_items, work_ids, @@ -217,12 +328,15 @@ def _add_call_item_to_queue(pending_work_items, del pending_work_items[work_id] continue + def _queue_management_worker(executor_reference, + executor_flags, processes, pending_work_items, work_ids_queue, call_queue, - result_queue): + result_queue, + wakeup): """Manages the communication between this process and the worker processes. This function is run in a local thread. @@ -231,32 +345,58 @@ def _queue_management_worker(executor_reference, executor_reference: A weakref.ref to the ProcessPoolExecutor that owns this thread. Used to determine if the ProcessPoolExecutor has been garbage collected and that this function can exit. - process: A list of the multiprocessing.Process instances used as + executor_flags: A ExecutorFlags holding internal states of the + ProcessPoolExecutor. It permits to know if the executor is broken + even the object has been gc. + process: A list of the ctx.Process instances used as workers. pending_work_items: A dict mapping work ids to _WorkItems e.g. {5: <_WorkItem...>, 6: <_WorkItem...>, ...} work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]). - call_queue: A multiprocessing.Queue that will be filled with _CallItems + call_queue: A ctx.Queue that will be filled with _CallItems derived from _WorkItems for processing by the process workers. - result_queue: A multiprocessing.Queue of _ResultItems generated by the + result_queue: A ctx.SimpleQueue of _ResultItems generated by the process workers. + wakeup: A _Sentinel to allow waking up the queue_manager_thread from + the main Thread and avoid deadlocks caused by broken queues. """ executor = None - def shutting_down(): - return _shutdown or executor is None or executor._shutdown_thread - - def shutdown_worker(): - # This is an upper bound - nb_children_alive = sum(p.is_alive() for p in processes.values()) + def is_shutting_down(): + # No more work items can be added if: + # - The interpreter is shutting down OR + # - The executor that own this worker is not broken AND + # * The executor that owns this worker has been collected OR + # * The executor that owns this worker has been shutdown. + # If the executor is broken, it should be detected in the next loop. + return (_global_shutdown or + ((executor is None or executor_flags.shutdown) + and not executor_flags.broken)) + + def shutdown_all_workers(): + mp.util.debug("queue management thread shutting down") + executor_flags.flag_as_shutting_down() + # Create a list to avoid RuntimeError due to concurrent modification of + # processe. nb_children_alive is thus an upper bound + nb_children_alive = sum(p.is_alive() for p in list(processes.values())) for i in range(0, nb_children_alive): call_queue.put_nowait(None) - # Release the queue's resources as soon as possible. + + # Release the queue's resources as soon as possible. Flag the feeder + # thread for clean exit to avoid the manager_thread flagging the + # Executor as broken during the shutdown. This is safe as either: + # * We don't need to communicate with the workers anymore + # * There is nothing left in the Queue buffer except None sentinels + call_queue._thread._clean_exit = True call_queue.close() + # If .join() is not called on the created processes then - # some multiprocessing.Queue methods may deadlock on Mac OS X. - for p in processes.values(): + # some ctx.Queue methods may deadlock on Mac OS X. + while processes: + _, p = processes.popitem() p.join() + mp.util.debug("queue management thread clean shutdown of worker " + "processes: {}".format(processes)) reader = result_queue._reader @@ -265,43 +405,54 @@ def shutdown_worker(): work_ids_queue, call_queue) - sentinels = [p.sentinel for p in processes.values()] - assert sentinels - ready = wait([reader] + sentinels) + # As the wakeup object is not waitable, we need to implement a polling + # loop. This loop is similar to the one in the wait function. A + # specific waitable event object complient to wait could be implemented + while not wakeup.get_and_unset(): + sentinels = [p.sentinel for p in processes.values()] + assert sentinels + ready = wait([reader] + sentinels, timeout=_POLL_TIMEOUT) + if len(ready) > 0: + break + else: + # The thread have just been awaken + ready = [] + result_item = None + if reader in ready: result_item = reader.recv() - else: + elif len(ready) > 0: # Mark the process pool broken so that submits fail right now. - executor = executor_reference() - if executor is not None: - executor._broken = True - executor._shutdown_thread = True - executor = None + executor_flags.flag_as_broken() + # All futures in flight must be marked failed for work_id, work_item in pending_work_items.items(): work_item.future.set_exception( BrokenProcessPool( - "A process in the process pool was " - "terminated abruptly while the future was " - "running or pending." + "A process in the process pool was terminated abruptly" + " while the future was running or pending." )) # Delete references to object. See issue16284 del work_item pending_work_items.clear() + # Terminate remaining workers forcibly: the queues or their # locks may be in a dirty state and block forever. - for p in processes.values(): + while processes: + _, p = processes.popitem() p.terminate() - shutdown_worker() + p.join() + shutdown_all_workers() + _flag_current_thread_clean_exit() return if isinstance(result_item, int): # Clean shutdown of a worker using its PID # (avoids marking the executor broken) - assert shutting_down() + assert is_shutting_down() p = processes.pop(result_item) p.join() if not processes: - shutdown_worker() + shutdown_all_workers() return elif result_item is not None: work_item = pending_work_items.pop(result_item.work_id, None) @@ -319,21 +470,113 @@ def shutdown_worker(): # - The interpreter is shutting down OR # - The executor that owns this worker has been collected OR # - The executor that owns this worker has been shutdown. - if shutting_down(): - try: - # Since no new work items can be added, it is safe to shutdown - # this thread if there are no pending work items. - if not pending_work_items: - shutdown_worker() - return - except Full: - # This is not a problem: we will eventually be woken up (in - # result_queue.get()) and be able to send a sentinel again. - pass + if is_shutting_down(): + if executor_flags.kill_workers: + while pending_work_items: + _, work_item = pending_work_items.popitem() + work_item.future.set_exception(ShutdownExecutorError( + "The Executor was shutdown before this job could " + "complete.")) + del work_item + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + while processes: + _, p = processes.popitem() + p.terminate() + p.join() + shutdown_all_workers() + _flag_current_thread_clean_exit() + return + # Since no new work items can be added, it is safe to shutdown + # this thread if there are no pending work items. + if not pending_work_items: + shutdown_all_workers() + _flag_current_thread_clean_exit() + return + elif executor_flags.broken: + return executor = None + +def _crash_detection_worker(executor_reference, executor_flags, + queue_management_thread, processes, + pending_work_items, call_queue): + """Checks the state of the executor management and communications. + + This function is run in a local thread. + + Args: + executor_reference: A weakref.ref to the ProcessPoolExecutor that owns + this thread. Used to determine if the ProcessPoolExecutor has been + garbage collected and that this function can exit. + executor_flags: Flags holding the state of the ProcessPoolExecutor. + queue_management_thread: the Queue manager thread of the Executor. It + is used to ensure that the management is still running. + processes: A list of the ctx.Process instances used as workers. + pending_work_items: A dict mapping work ids to _WorkItems e.g. + {5: <_WorkItem...>, 6: <_WorkItem...>, ...} + call_queue: A ctx.Queue that will be filled with _CallItems + derived from _WorkItems for processing by the process workers. + """ + while True: + + if _is_crashed(queue_management_thread): + cause_msg = ("The QueueManagerThread was terminated " + "abruptly while the future was running or " + "pending. This can be caused by an " + "unpickling error of a result.") + _shutdown_crash(executor_flags, processes, + pending_work_items, call_queue, cause_msg) + return + + elif _is_crashed(call_queue._thread): + cause_msg = ("The QueueFeederThread was terminated abruptly " + "while feeding a new job. This can be due to " + "a job pickling error.") + _shutdown_crash(executor_flags, processes, pending_work_items, + call_queue, cause_msg) + return + if getattr(queue_management_thread, "_clean_exit", False): + mp.util.debug("shutting down") + return + + # Detect if all the worker timed out while a new job was submitted and + # launch new workers if it is the case. + executor = executor_reference() + if (executor is not None and len(processes) == 0 and + len(executor._pending_work_items) > 0): + mp.util.debug("All workers timed out. Adjusting process count.") + executor._adjust_process_count() + executor = None + time.sleep(.1) + + +def _shutdown_crash(executor_flags, processes, pending_work_items, + call_queue, cause_msg): + mp.util.info("Crash detected, marking executor as broken and terminating " + "worker processes. " + cause_msg) + executor_flags.flag_as_broken() + call_queue.close() + # All futures in flight must be marked failed. We do that before killing + # the processes so that when process get killed, queue_manager_thread is + # woken up and realizes it can shutdown + for work_id, work_item in pending_work_items.items(): + work_item.future.set_exception(BrokenProcessPool(cause_msg)) + # Delete references to object. See issue16284 + del work_item + # Terminate remaining workers forcibly: the queues or their + # locks may be in a dirty state and block forever. + while processes: + _, p = processes.popitem() + p.terminate() + p.join() + pending_work_items.clear() + + _system_limits_checked = False _system_limited = None + + def _check_system_limits(): global _system_limits_checked, _system_limited if _system_limits_checked: @@ -346,7 +589,7 @@ def _check_system_limits(): # sysconf not available or setting not available return if nsems_max == -1: - # indetermined limit, assume that limit is determined + # undetermined limit, assume that limit is determined # by available memory only return if nsems_max >= 256: @@ -370,20 +613,27 @@ def _chain_from_iterable_of_lists(iterable): class BrokenProcessPool(RuntimeError): - """ - Raised when a process in a ProcessPoolExecutor terminated abruptly + """Raised when a process in a ProcessPoolExecutor terminated abruptly while a future was in the running state. """ +class ShutdownExecutorError(RuntimeError): + """Raised when a ProcessPoolExecutor is shutdown while a future was in the + running or pending state. + """ + + class ProcessPoolExecutor(_base.Executor): - def __init__(self, max_workers=None): + def __init__(self, max_workers=None, context=None): """Initializes a new ProcessPoolExecutor instance. Args: max_workers: The maximum number of processes that can be used to execute the given calls. If None or not given then as many worker processes will be created as the machine has processors. + context: A multiprocessing context to launch the workers. This + object should provide SimpleQueue, Queue and Process. """ _check_system_limits() @@ -392,67 +642,117 @@ def __init__(self, max_workers=None): else: if max_workers <= 0: raise ValueError("max_workers must be greater than 0") - self._max_workers = max_workers + if context is None: + context = mp.get_context() + self._context = context + + # Internal variables of the ProcessPoolExecutor + self._processes = {} + self._queue_count = 0 + self._pending_work_items = {} + self._work_ids = queue.Queue() + self._processes_management_lock = self._context.Lock() + self._crash_detection_thread = None + self._queue_management_thread = None + + # Permits to wake_up the queue_manager_thread independently of + # result_queue state. This avoid deadlocks caused by the non + # transmission of wakeup signal when a worker died with the + # _result_queue write lock. + self._wakeup = _Sentinel() + + # Flag to hold the state of the Executor. This permits to introspect + # the Executor state even once it has been garbage collected. + self._flags = _ExecutorFlags() + + # Finally setup the queues for interprocess communication + self._setup_queues() + + def _setup_queues(self): # Make the call queue slightly larger than the number of processes to # prevent the worker processes from idling. But don't make it too big # because futures in the call queue cannot be cancelled. - self._call_queue = multiprocessing.Queue(self._max_workers + - EXTRA_QUEUED_CALLS) + queue_size = 2 * self._max_workers + EXTRA_QUEUED_CALLS + self._call_queue = _SafeQueue( + max_size=queue_size, pending_work_items=self._pending_work_items, + wakeup=self._wakeup, ctx=self._context) # Killed worker processes can produce spurious "broken pipe" # tracebacks in the queue's own worker thread. But we detect killed # processes anyway, so silence the tracebacks. self._call_queue._ignore_epipe = True - self._result_queue = SimpleQueue() - self._work_ids = queue.Queue() - self._queue_management_thread = None - # Map of pids to processes - self._processes = {} - # Shutdown is a two-step process. - self._shutdown_thread = False - self._shutdown_lock = threading.Lock() - self._broken = False - self._queue_count = 0 - self._pending_work_items = {} + self._result_queue = self._context.SimpleQueue() def _start_queue_management_thread(self): - # When the executor gets lost, the weakref callback will wake up - # the queue management thread. - def weakref_cb(_, q=self._result_queue): - q.put(None) + if self._queue_management_thread is None: + # When the executor gets lost, the weakref callback will wake up + # the queue management thread. + def weakref_cb(_, wakeup=self._wakeup): + wakeup.set() + # Start the processes so that their sentinels are known. - self._adjust_process_count() self._queue_management_thread = threading.Thread( - target=_queue_management_worker, - args=(weakref.ref(self, weakref_cb), - self._processes, - self._pending_work_items, - self._work_ids, - self._call_queue, - self._result_queue)) + target=_queue_management_worker, + args=(weakref.ref(self, weakref_cb), + self._flags, + self._processes, + self._pending_work_items, + self._work_ids, + self._call_queue, + self._result_queue, + self._wakeup), + name="QueueManagerThread") self._queue_management_thread.daemon = True self._queue_management_thread.start() - _threads_queues[self._queue_management_thread] = self._result_queue + + # register this executor in a mechanism that ensures it will wakeup + # when the interpreter is exiting. + _threads_wakeup[self._queue_management_thread] = self._wakeup + + def _start_crash_detection_thread(self): + if self._crash_detection_thread is None: + mp.util.debug('_start_thread_management_thread called') + # Start the processes so that their sentinels are known. + self._crash_detection_thread = threading.Thread( + target=_crash_detection_worker, + args=(weakref.ref(self), + self._flags, + self._queue_management_thread, + self._processes, + self._pending_work_items, + self._call_queue), + name="ThreadManager") + self._crash_detection_thread.daemon = True + self._crash_detection_thread.start() def _adjust_process_count(self): for _ in range(len(self._processes), self._max_workers): - p = multiprocessing.Process( - target=_process_worker, - args=(self._call_queue, - self._result_queue)) + p = self._context.Process( + target=_process_worker, + args=(self._call_queue, self._result_queue)) p.start() self._processes[p.pid] = p + def _ensure_executor_running(self): + """ensures all workers and management thread are running + """ + if len(self._processes) != self._max_workers: + self._adjust_process_count() + self._start_queue_management_thread() + self._start_crash_detection_thread() + def submit(self, fn, *args, **kwargs): - with self._shutdown_lock: - if self._broken: - raise BrokenProcessPool('A child process terminated ' - 'abruptly, the process pool is not usable anymore') - if self._shutdown_thread: - raise RuntimeError('cannot schedule new futures after shutdown') + with self._flags.shutdown_lock: + if self._flags.broken: + raise BrokenProcessPool( + 'A child process terminated abruptly, the process pool is' + ' not usable anymore') + if self._flags.shutdown: + raise ShutdownExecutorError( + 'cannot schedule new futures after shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) @@ -461,9 +761,9 @@ def submit(self, fn, *args, **kwargs): self._work_ids.put(self._queue_count) self._queue_count += 1 # Wake up queue management thread - self._result_queue.put(None) + self._wakeup.set() - self._start_queue_management_thread() + self._ensure_executor_running() return f submit.__doc__ = _base.Executor.submit.__doc__ @@ -477,7 +777,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): is no limit on the wait time. chunksize: If greater than one, the iterables will be chopped into chunks of size chunksize and submitted to the process pool. - If set to one, the items in the list will be sent one at a time. + If set to one, the items in the list will be sent one at a + time. Returns: An iterator equivalent to: map(func, *iterables) but the calls may @@ -496,24 +797,28 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): - with self._shutdown_lock: - self._shutdown_thread = True + def shutdown(self, wait=True, kill_workers=False): + mp.util.debug('shutting down executor %s' % self) + self._flags.flag_as_shutting_down(kill_workers) if self._queue_management_thread: # Wake up queue management thread - self._result_queue.put(None) - if wait: + self._wakeup.set() + if wait and self._queue_management_thread.is_alive(): self._queue_management_thread.join() - # To reduce the risk of opening too many files, remove references to - # objects that use file descriptors. - self._queue_management_thread = None - if self._call_queue is not None: + if self._crash_detection_thread: + if wait and self._crash_detection_thread.is_alive(): + self._crash_detection_thread.join() + if self._call_queue: self._call_queue.close() if wait: self._call_queue.join_thread() - self._call_queue = None + # To reduce the risk of opening too many files, remove references to + # objects that use file descriptors. + self._queue_management_thread = None + self._crash_detection_thread = None + self._call_queue = None self._result_queue = None - self._processes = None + self._processes.clear() shutdown.__doc__ = _base.Executor.shutdown.__doc__ atexit.register(_python_exit) diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 328efbd95fe63d..d66d37a5c3e2eb 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -160,9 +160,10 @@ def _start_thread(self): self._thread = threading.Thread( target=Queue._feed, args=(self._buffer, self._notempty, self._send_bytes, - self._wlock, self._writer.close, self._ignore_epipe), + self._wlock, self._writer.close, self._ignore_epipe, + self._on_queue_feeder_error), name='QueueFeederThread' - ) + ) self._thread.daemon = True debug('doing self._thread.start()') @@ -201,7 +202,8 @@ def _finalize_close(buffer, notempty): notempty.notify() @staticmethod - def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): + def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe, + onerror): debug('starting thread to feed data to pipe') nacquire = notempty.acquire nrelease = notempty.release @@ -253,8 +255,17 @@ def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe): info('error in queue thread: %s', e) return else: - import traceback - traceback.print_exc() + onerror(e, obj) + + @staticmethod + def _on_queue_feeder_error(e, obj): + """ + Private API hook called when feeding data in the background thread + raises an exception. For overriding by concurrent.futures. + """ + import traceback + traceback.print_exc() + _sentinel = object() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index d6fe7d62675631..c2a653a402cd2a 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -944,6 +944,42 @@ def __reduce__(self): self.assertTrue(q.get(timeout=1.0)) close_queue(q) + def test_queue_feeder_on_queue_feeder_error(self): + # bpo-30006: verify feeder handles exceptions using the + # _on_queue_feeder_error hook. + if self.TYPE != 'processes': + self.skipTest('test not appropriate for {}'.format(self.TYPE)) + + class NotSerializable(object): + """Mock unserializable object""" + def __init__(self): + self.reduce_was_called = False + self.on_queue_feeder_error_was_called = False + + def __reduce__(self): + self.reduce_was_called = True + raise AttributeError + + class SafeQueue(multiprocessing.queues.Queue): + """Queue with overloaded _on_queue_feeder_error hook""" + @staticmethod + def _on_queue_feeder_error(e, obj): + if (isinstance(e, AttributeError) and + isinstance(obj, NotSerializable)): + obj.on_queue_feeder_error_was_called = True + + not_serializable_obj = NotSerializable() + with test.support.captured_stderr(): + q = SafeQueue(ctx=multiprocessing.get_context()) + q.put(not_serializable_obj) + + # Verify that q is still functionning correctly + q.put(True) + self.assertTrue(q.get(timeout=1.0)) + + # Assert that the serialization and the hook have been called correctly + self.assertTrue(not_serializable_obj.reduce_was_called) + self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) # # # diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index a888dcacc49160..dc43c722d103d2 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -13,11 +13,13 @@ import time import unittest import weakref +from pickle import PicklingError from concurrent import futures from concurrent.futures._base import ( PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future) -from concurrent.futures.process import BrokenProcessPool +from concurrent.futures.process import BrokenProcessPool, ShutdownExecutorError +from multiprocessing import get_context def create_future(state=PENDING, exception=None, result=None): @@ -50,6 +52,11 @@ def sleep_and_print(t, msg): sys.stdout.flush() +def sleep_and_return(t, x): + time.sleep(t) + return x + + class MyObject(object): def my_method(self): pass @@ -76,7 +83,13 @@ def setUp(self): self.t1 = time.time() try: - self.executor = self.executor_type(max_workers=self.worker_count) + if hasattr(self, "ctx"): + self.executor = self.executor_type( + max_workers=self.worker_count, + context=get_context(self.ctx)) + else: + self.executor = self.executor_type( + max_workers=self.worker_count) except NotImplementedError as e: self.skipTest(str(e)) self._prime_executor() @@ -106,8 +119,29 @@ class ThreadPoolMixin(ExecutorMixin): executor_type = futures.ThreadPoolExecutor -class ProcessPoolMixin(ExecutorMixin): +class ProcessPoolForkMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + ctx = "fork" + + def setUp(self): + if sys.platform == "win32": + self.skipTest("require unix system") + super().setUp() + + +class ProcessPoolSpawnMixin(ExecutorMixin): + executor_type = futures.ProcessPoolExecutor + ctx = "spawn" + + +class ProcessPoolForkserverMixin(ExecutorMixin): executor_type = futures.ProcessPoolExecutor + ctx = "forkserver" + + def setUp(self): + if sys.platform == "win32": + self.skipTest("require unix system") + super().setUp() class ExecutorShutdownTest: @@ -123,8 +157,9 @@ def test_interpreter_shutdown(self): from concurrent.futures import {executor_type} from time import sleep from test.test_concurrent_futures import sleep_and_print - t = {executor_type}(5) - t.submit(sleep_and_print, 1.0, "apple") + if __name__ == "__main__": + t = {executor_type}(5) + t.submit(sleep_and_print, 1.0, "apple") """.format(executor_type=self.executor_type.__name__)) # Errors in atexit hooks don't change the process exit code, check # stderr manually. @@ -193,10 +228,31 @@ def test_thread_names_default(self): t.join() -class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, BaseTestCase): +class ProcessPoolShutdownTest(ExecutorShutdownTest): def _prime_executor(self): pass + @staticmethod + def _id(x): + return x + + def test_shutdown(self): + processes = [p for p in self.executor._processes.values()] + self.assertEqual(self.executor.submit(self._id, 42).result(), 42) + self.executor.shutdown(wait=True) + + self.assertListEqual([p.exitcode for p in processes], + [0 for _ in processes]) + + def test_shutdown_wait(self): + processes = [p for p in self.executor._processes.values()] + f = self.executor.submit(sleep_and_return, .5, 42) + self.executor.shutdown(wait=True) + self.assertEqual(f.result(), 42) + + self.assertListEqual([p.exitcode for p in processes], + [0 for _ in processes]) + def test_processes_terminate(self): self.executor.submit(mul, 21, 2) self.executor.submit(mul, 6, 7) @@ -229,6 +285,22 @@ def test_del_shutdown(self): p.join() +class ProcessPoolForkShutdownTest(ProcessPoolForkMixin, BaseTestCase, + ProcessPoolShutdownTest): + pass + + +class ProcessPoolForkserverShutdownTest(ProcessPoolForkserverMixin, + BaseTestCase, + ProcessPoolShutdownTest): + pass + + +class ProcessPoolSpawnShutdownTest(ProcessPoolSpawnMixin, BaseTestCase, + ProcessPoolShutdownTest): + pass + + class WaitTests: def test_first_completed(self): @@ -348,7 +420,17 @@ def future_func(): sys.setswitchinterval(oldswitchinterval) -class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests, BaseTestCase): +class ProcessPoolForkWaitTests(ProcessPoolForkMixin, WaitTests, BaseTestCase): + pass + + +class ProcessPoolForkserverWaitTests(ProcessPoolForkserverMixin, WaitTests, + BaseTestCase): + pass + + +class ProcessPoolSpawnWaitTests(ProcessPoolSpawnMixin, BaseTestCase, + WaitTests): pass @@ -433,7 +515,19 @@ class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase pass -class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests, BaseTestCase): +class ProcessPoolForkAsCompletedTests(ProcessPoolForkMixin, AsCompletedTests, + BaseTestCase): + pass + + +class ProcessPoolForkserverAsCompletedTests(ProcessPoolForkserverMixin, + AsCompletedTests, + BaseTestCase): + pass + + +class ProcessPoolSpawnAsCompletedTests(ProcessPoolSpawnMixin, AsCompletedTests, + BaseTestCase): pass @@ -533,7 +627,7 @@ def test_default_workers(self): (os.cpu_count() or 1) * 5) -class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest, BaseTestCase): +class ProcessPoolExecutorTest(ExecutorTest): def test_killed_child(self): # When a child process is abruptly terminated, the whole pool gets # "broken". @@ -589,6 +683,283 @@ def test_traceback(self): f1.getvalue()) +class ProcessPoolForkExecutorTest(ProcessPoolForkMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + +class ProcessPoolForkserverExecutorTest(ProcessPoolForkserverMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + +class ProcessPoolSpawnExecutorTest(ProcessPoolSpawnMixin, + ProcessPoolExecutorTest, + BaseTestCase): + pass + + +def hide_process_stderr(): + import io + setattr(sys, "stderr", io.StringIO()) + + +def _crash(): + """Induces a segfault""" + import faulthandler + faulthandler.disable() + faulthandler._sigsegv() + + +def _exit(): + """Induces a sys exit with exitcode 1""" + sys.exit(1) + + +def _raise_error(Err): + """Function that raises an Exception in process""" + hide_process_stderr() + raise Err() + + +def _return_instance(cls): + """Function that returns a instance of cls""" + hide_process_stderr() + return cls() + + +class CrashAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _crash() + + +class CrashAtUnpickle(object): + """Bad object that triggers a segfault at unpickling time.""" + def __reduce__(self): + return _crash, () + + +class ExitAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + _exit() + + +class ExitAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + return _exit, () + + +class ErrorAtPickle(object): + """Bad object that triggers a segfault at pickling time.""" + def __reduce__(self): + from pickle import PicklingError + raise PicklingError("Error in pickle") + + +class ErrorAtUnpickle(object): + """Bad object that triggers a process exit at unpickling time.""" + def __reduce__(self): + from pickle import UnpicklingError + return _raise_error, (UnpicklingError, ) + + +class TimingWrapper(object): + """Creates a wrapper for a function which records the time it takes to + finish + """ + def __init__(self, func): + self.func = func + self.elapsed = None + + def __call__(self, *args, **kwds): + t = time.time() + try: + return self.func(*args, **kwds) + finally: + self.elapsed = time.time() - t + + +class ExecutorDeadlockTest: + # If ExecutorDeadlockTest takes more than 100secs to complete, it is very + # likely caught in a deadlock. As there is no easy way to detect it, + # faulthandler will print the traceback and exit. + TIMEOUT = 15 + + def _fail_on_deadlock(self, executor): + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + import faulthandler + from tempfile import TemporaryFile + with TemporaryFile(mode="w+") as f: + faulthandler.dump_traceback(file=f) + f.seek(0) + tb = f.read() + executor.shutdown(wait=True, kill_workers=False) + print(f"\nTraceback:\n {tb}", file=sys.__stderr__) + self.fail(f"Deadlock executor:\n\n{tb}") + + def test_crash(self): + # extensive testing for deadlock caused by crash in a pool + crash_cases = [ + # Check problem occuring while pickling a task in + # the task_handler thread + (id, (ExitAtPickle(),), BrokenProcessPool, "exit at task pickle"), + (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), + # Check problem occuring while unpickling a task on workers + (id, (ExitAtUnpickle(),), BrokenProcessPool, + "exit at task unpickle"), + (id, (ErrorAtUnpickle(),), BrokenProcessPool, + "error at task unpickle"), + (id, (CrashAtUnpickle(),), BrokenProcessPool, + "crash at task unpickle"), + # Check problem occuring during func execution on workers + (_crash, (), BrokenProcessPool, + "crash during func execution on worker"), + (_exit, (), SystemExit, + "exit during func execution on worker"), + (_raise_error, (RuntimeError, ), RuntimeError, + "error during func execution on worker"), + # Check problem occuring while pickling a task result + # on workers + (_return_instance, (CrashAtPickle,), BrokenProcessPool, + "crash during result pickle on worker"), + (_return_instance, (ExitAtPickle,), BrokenProcessPool, + "exit during result pickle on worker"), + (_return_instance, (ErrorAtPickle,), PicklingError, + "error during result pickle on worker"), + # Check problem occuring while unpickling a task in + # the result_handler thread + (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, + "exit during result unpickle in result_handler"), + (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, + "error during result unpickle in result_handler") + ] + for func, args, error, name in crash_cases: + with self.subTest(name): + # skip the test involving pickle errors with manager as it + # breaks the manager and not the pool in this cases + # skip the test involving pickle errors with thread as the + # tasks and results are not pickled in this case + with test.support.captured_stderr(): + executor = self.executor_type( + max_workers=2, context=get_context(self.ctx)) + res = executor.submit(func, *args) + with self.assertRaises(error): + try: + res.result(timeout=self.TIMEOUT) + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + @classmethod + def _test_getpid(cls, a): + return os.getpid() + + @classmethod + def _test_kill_worker(cls, pid=None, delay=0.01): + """Function that send SIGKILL at process pid after delay second""" + time.sleep(delay) + if pid is None: + pid = os.getpid() + try: + from signal import SIGKILL + except ImportError: + from signal import SIGTERM as SIGKILL + try: + os.kill(pid, SIGKILL) + time.sleep(.01) + except (ProcessLookupError, PermissionError): + pass + + def test_crash_races(self): + + from itertools import repeat + for n_proc in [1, 2, 5, 17]: + with self.subTest(n_proc=n_proc): + # Test for external crash signal comming from neighbor + # with various race setup + executor = self.executor_type( + max_workers=2, context=get_context(self.ctx)) + try: + raise AttributeError() + pids = [p.pid for p in executor._processes] + assert len(pids) == n_proc + except AttributeError: + pids = [pid for pid in executor.map( + self._test_getpid, [None] * n_proc)] + assert None not in pids + res = self.executor.map( + sleep_and_return, + [.001 * (j // 2) for j in range(2 * n_proc)], + repeat(True, 2 * n_proc), + chunksize=1) + assert all(res) + res = executor.map(self._test_kill_worker, pids[::-1], + timeout=self.TIMEOUT) + with self.assertRaises(BrokenProcessPool): + try: + [v for v in res] + except futures.TimeoutError: + # If we did not recover before TIMEOUT seconds, + # consider that the executor is in a deadlock state + self._fail_on_deadlock(executor) + executor.shutdown(wait=True) + + def test_shutdown_deadlock(self): + # Test that the pool calling shutdown do not cause deadlock + # if a worker failed + + with self.executor_type(max_workers=2, + context=get_context(self.ctx)) as executor: + executor.submit(self._test_kill_worker, ()) + time.sleep(.01) + executor.shutdown() + + def test_shutdown_kill(self): + """ProcessPoolExecutor does not wait job end and shutdowm with + kill_worker flag set to True. Ensure that the future returns + ShutdownExecutorError. + """ + from itertools import repeat + executor = self.executor_type(max_workers=2, + context=get_context(self.ctx)) + res1 = executor.map(sleep_and_return, repeat(.001), range(50)) + res2 = executor.map(sleep_and_return, repeat(.1), range(50)) + assert list(res1) == list(range(50)) + # We should get an error as the executor shutdowned before we fetched + # the results from the operation. + shutdown = TimingWrapper(executor.shutdown) + shutdown(wait=True, kill_workers=True) + assert shutdown.elapsed < .5 + with self.assertRaises(ShutdownExecutorError): + list(res2) + + +class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolForkserverExecutorDeadlockTest(ProcessPoolForkserverMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + +class ProcessPoolSpawnExecutorDeadlockTest(ProcessPoolSpawnMixin, + ExecutorDeadlockTest, + unittest.TestCase): + pass + + class FutureTests(BaseTestCase): def test_done_callback_with_result(self): callback_result = None diff --git a/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst b/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst new file mode 100644 index 00000000000000..b59c9067bf908b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2017-06-26-12-46-36.bpo-30006.V2CEg4.rst @@ -0,0 +1,4 @@ +Add context argument for `concurrent.fututres.ProcessPoolExecutor`. Make +`ProcessPoolExecutor` more robust to faulty user code such as unpicklable +items and fix other race conditions. Add tests for possible deadlocks in +`ProcessPoolExecutor`. Patch by Thomas Moreau and Olivier Grisel