8
8
|======================= In-process =====================|== Out-of-process ==|
9
9
10
10
+----------+ +----------+ +--------+ +-----------+ +---------+
11
- | | => | Work Ids | => | | => | Call Q | => | |
12
- | | +----------+ | | +-----------+ | |
13
- | | | ... | | | | ... | | |
14
- | | | 6 | | | | 5, call() | | |
11
+ | | => | Work Ids | | | | Call Q | | Process |
12
+ | | +----------+ | | +-----------+ | Pool |
13
+ | | | ... | | | | ... | +---------+
14
+ | | | 6 | => | | => | 5, call() | => | |
15
15
| | | 7 | | | | ... | | |
16
16
| Process | | ... | | Local | +-----------+ | Process |
17
17
| Pool | +----------+ | Worker | | #1..n |
52
52
from queue import Full
53
53
import multiprocessing as mp
54
54
from multiprocessing .connection import wait
55
+ from multiprocessing .queues import Queue
55
56
import threading
56
57
import weakref
57
58
from functools import partial
72
73
# workers to exit when their work queues are empty and then waits until the
73
74
# threads/processes finish.
74
75
75
- _threads_queues = weakref .WeakKeyDictionary ()
76
+ _threads_wakeups = weakref .WeakKeyDictionary ()
76
77
_global_shutdown = False
77
78
79
+
80
+ class _ThreadWakeup :
81
+ __slot__ = ["_state" ]
82
+
83
+ def __init__ (self ):
84
+ self ._reader , self ._writer = mp .Pipe (duplex = False )
85
+
86
+ def wakeup (self ):
87
+ self ._writer .send_bytes (b"" )
88
+
89
+ def clear (self ):
90
+ while self ._reader .poll ():
91
+ self ._reader .recv_bytes ()
92
+
93
+
78
94
def _python_exit ():
79
95
global _global_shutdown
80
96
_global_shutdown = True
81
- items = list (_threads_queues .items ())
82
- for t , q in items :
83
- q . put ( None )
84
- for t , q in items :
97
+ items = list (_threads_wakeups .items ())
98
+ for _ , thread_wakeup in items :
99
+ thread_wakeup . wakeup ( )
100
+ for t , _ in items :
85
101
t .join ()
86
102
87
103
# Controls how many more calls than processes will be queued in the call queue.
@@ -90,6 +106,7 @@ def _python_exit():
90
106
# (Futures in the call queue cannot be cancelled).
91
107
EXTRA_QUEUED_CALLS = 1
92
108
109
+
93
110
# Hack to embed stringification of remote traceback in local traceback
94
111
95
112
class _RemoteTraceback (Exception ):
@@ -132,6 +149,25 @@ def __init__(self, work_id, fn, args, kwargs):
132
149
self .kwargs = kwargs
133
150
134
151
152
+ class _SafeQueue (Queue ):
153
+ """Safe Queue set exception to the future object linked to a job"""
154
+ def __init__ (self , max_size = 0 , * , ctx , pending_work_items ):
155
+ self .pending_work_items = pending_work_items
156
+ super ().__init__ (max_size , ctx = ctx )
157
+
158
+ def _on_queue_feeder_error (self , e , obj ):
159
+ if isinstance (obj , _CallItem ):
160
+ tb = traceback .format_exception (type (e ), e , e .__traceback__ )
161
+ e .__cause__ = _RemoteTraceback ('\n """\n {}"""' .format ('' .join (tb )))
162
+ work_item = self .pending_work_items .pop (obj .work_id , None )
163
+ # work_item can be None if another process terminated. In this case,
164
+ # the queue_manager_thread fails all work_items with BrokenProcessPool
165
+ if work_item is not None :
166
+ work_item .future .set_exception (e )
167
+ else :
168
+ super ()._on_queue_feeder_error (e , obj )
169
+
170
+
135
171
def _get_chunks (* iterables , chunksize ):
136
172
""" Iterates over zip()ed iterables in chunks. """
137
173
it = zip (* iterables )
@@ -152,6 +188,17 @@ def _process_chunk(fn, chunk):
152
188
"""
153
189
return [fn (* args ) for args in chunk ]
154
190
191
+
192
+ def _sendback_result (result_queue , work_id , result = None , exception = None ):
193
+ """Safely send back the given result or exception"""
194
+ try :
195
+ result_queue .put (_ResultItem (work_id , result = result ,
196
+ exception = exception ))
197
+ except BaseException as e :
198
+ exc = _ExceptionWithTraceback (e , e .__traceback__ )
199
+ result_queue .put (_ResultItem (work_id , exception = exc ))
200
+
201
+
155
202
def _process_worker (call_queue , result_queue , initializer , initargs ):
156
203
"""Evaluates calls from call_queue and places the results in result_queue.
157
204
@@ -183,10 +230,9 @@ def _process_worker(call_queue, result_queue, initializer, initargs):
183
230
r = call_item .fn (* call_item .args , ** call_item .kwargs )
184
231
except BaseException as e :
185
232
exc = _ExceptionWithTraceback (e , e .__traceback__ )
186
- result_queue . put ( _ResultItem ( call_item .work_id , exception = exc ) )
233
+ _sendback_result ( result_queue , call_item .work_id , exception = exc )
187
234
else :
188
- result_queue .put (_ResultItem (call_item .work_id ,
189
- result = r ))
235
+ _sendback_result (result_queue , call_item .work_id , result = r )
190
236
191
237
# Liberate the resource as soon as possible, to avoid holding onto
192
238
# open files or shared memory that is not needed anymore
@@ -230,12 +276,14 @@ def _add_call_item_to_queue(pending_work_items,
230
276
del pending_work_items [work_id ]
231
277
continue
232
278
279
+
233
280
def _queue_management_worker (executor_reference ,
234
281
processes ,
235
282
pending_work_items ,
236
283
work_ids_queue ,
237
284
call_queue ,
238
- result_queue ):
285
+ result_queue ,
286
+ thread_wakeup ):
239
287
"""Manages the communication between this process and the worker processes.
240
288
241
289
This function is run in a local thread.
@@ -253,6 +301,9 @@ def _queue_management_worker(executor_reference,
253
301
derived from _WorkItems for processing by the process workers.
254
302
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
255
303
process workers.
304
+ thread_wakeup: A _ThreadWakeup to allow waking up the
305
+ queue_manager_thread from the main Thread and avoid deadlocks
306
+ caused by permanently locked queues.
256
307
"""
257
308
executor = None
258
309
@@ -261,30 +312,59 @@ def shutting_down():
261
312
or executor ._shutdown_thread )
262
313
263
314
def shutdown_worker ():
264
- # This is an upper bound
265
- nb_children_alive = sum (p .is_alive () for p in processes .values ())
266
- for i in range (0 , nb_children_alive ):
267
- call_queue .put_nowait (None )
315
+ # This is an upper bound on the number of children alive.
316
+ n_children_alive = sum (p .is_alive () for p in processes .values ())
317
+ n_children_to_stop = n_children_alive
318
+ n_sentinels_sent = 0
319
+ # Send the right number of sentinels, to make sure all children are
320
+ # properly terminated.
321
+ while n_sentinels_sent < n_children_to_stop and n_children_alive > 0 :
322
+ for i in range (n_children_to_stop - n_sentinels_sent ):
323
+ try :
324
+ call_queue .put_nowait (None )
325
+ n_sentinels_sent += 1
326
+ except Full :
327
+ break
328
+ n_children_alive = sum (p .is_alive () for p in processes .values ())
329
+
268
330
# Release the queue's resources as soon as possible.
269
331
call_queue .close ()
270
332
# If .join() is not called on the created processes then
271
333
# some ctx.Queue methods may deadlock on Mac OS X.
272
334
for p in processes .values ():
273
335
p .join ()
274
336
275
- reader = result_queue ._reader
337
+ result_reader = result_queue ._reader
338
+ wakeup_reader = thread_wakeup ._reader
339
+ readers = [result_reader , wakeup_reader ]
276
340
277
341
while True :
278
342
_add_call_item_to_queue (pending_work_items ,
279
343
work_ids_queue ,
280
344
call_queue )
281
345
282
- sentinels = [p .sentinel for p in processes .values ()]
283
- assert sentinels
284
- ready = wait ([reader ] + sentinels )
285
- if reader in ready :
286
- result_item = reader .recv ()
287
- else :
346
+ # Wait for a result to be ready in the result_queue while checking
347
+ # that all worker processes are still running, or for a wake up
348
+ # signal send. The wake up signals come either from new tasks being
349
+ # submitted, from the executor being shutdown/gc-ed, or from the
350
+ # shutdown of the python interpreter.
351
+ worker_sentinels = [p .sentinel for p in processes .values ()]
352
+ ready = wait (readers + worker_sentinels )
353
+
354
+ cause = None
355
+ is_broken = True
356
+ if result_reader in ready :
357
+ try :
358
+ result_item = result_reader .recv ()
359
+ is_broken = False
360
+ except BaseException as e :
361
+ cause = traceback .format_exception (type (e ), e , e .__traceback__ )
362
+
363
+ elif wakeup_reader in ready :
364
+ is_broken = False
365
+ result_item = None
366
+ thread_wakeup .clear ()
367
+ if is_broken :
288
368
# Mark the process pool broken so that submits fail right now.
289
369
executor = executor_reference ()
290
370
if executor is not None :
@@ -293,14 +373,15 @@ def shutdown_worker():
293
373
'usable anymore' )
294
374
executor ._shutdown_thread = True
295
375
executor = None
376
+ bpe = BrokenProcessPool ("A process in the process pool was "
377
+ "terminated abruptly while the future was "
378
+ "running or pending." )
379
+ if cause is not None :
380
+ bpe .__cause__ = _RemoteTraceback (
381
+ f"\n '''\n { '' .join (cause )} '''" )
296
382
# All futures in flight must be marked failed
297
383
for work_id , work_item in pending_work_items .items ():
298
- work_item .future .set_exception (
299
- BrokenProcessPool (
300
- "A process in the process pool was "
301
- "terminated abruptly while the future was "
302
- "running or pending."
303
- ))
384
+ work_item .future .set_exception (bpe )
304
385
# Delete references to object. See issue16284
305
386
del work_item
306
387
pending_work_items .clear ()
@@ -329,6 +410,9 @@ def shutdown_worker():
329
410
work_item .future .set_result (result_item .result )
330
411
# Delete references to object. See issue16284
331
412
del work_item
413
+ # Delete reference to result_item
414
+ del result_item
415
+
332
416
# Check whether we should start shutting down.
333
417
executor = executor_reference ()
334
418
# No more work items can be added if:
@@ -348,8 +432,11 @@ def shutdown_worker():
348
432
pass
349
433
executor = None
350
434
435
+
351
436
_system_limits_checked = False
352
437
_system_limited = None
438
+
439
+
353
440
def _check_system_limits ():
354
441
global _system_limits_checked , _system_limited
355
442
if _system_limits_checked :
@@ -369,7 +456,8 @@ def _check_system_limits():
369
456
# minimum number of semaphores available
370
457
# according to POSIX
371
458
return
372
- _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
459
+ _system_limited = ("system provides too few semaphores (%d"
460
+ " available, 256 necessary)" % nsems_max )
373
461
raise NotImplementedError (_system_limited )
374
462
375
463
@@ -415,6 +503,7 @@ def __init__(self, max_workers=None, mp_context=None,
415
503
raise ValueError ("max_workers must be greater than 0" )
416
504
417
505
self ._max_workers = max_workers
506
+
418
507
if mp_context is None :
419
508
mp_context = mp .get_context ()
420
509
self ._mp_context = mp_context
@@ -424,34 +513,52 @@ def __init__(self, max_workers=None, mp_context=None,
424
513
self ._initializer = initializer
425
514
self ._initargs = initargs
426
515
516
+ # Management thread
517
+ self ._queue_management_thread = None
518
+
519
+ # Map of pids to processes
520
+ self ._processes = {}
521
+
522
+ # Shutdown is a two-step process.
523
+ self ._shutdown_thread = False
524
+ self ._shutdown_lock = threading .Lock ()
525
+ self ._broken = False
526
+ self ._queue_count = 0
527
+ self ._pending_work_items = {}
528
+
529
+ # Create communication channels for the executor
427
530
# Make the call queue slightly larger than the number of processes to
428
531
# prevent the worker processes from idling. But don't make it too big
429
532
# because futures in the call queue cannot be cancelled.
430
533
queue_size = self ._max_workers + EXTRA_QUEUED_CALLS
431
- self ._call_queue = mp_context .Queue (queue_size )
534
+ self ._call_queue = _SafeQueue (
535
+ max_size = queue_size , ctx = self ._mp_context ,
536
+ pending_work_items = self ._pending_work_items )
432
537
# Killed worker processes can produce spurious "broken pipe"
433
538
# tracebacks in the queue's own worker thread. But we detect killed
434
539
# processes anyway, so silence the tracebacks.
435
540
self ._call_queue ._ignore_epipe = True
436
541
self ._result_queue = mp_context .SimpleQueue ()
437
542
self ._work_ids = queue .Queue ()
438
- self ._queue_management_thread = None
439
- # Map of pids to processes
440
- self ._processes = {}
441
543
442
- # Shutdown is a two-step process.
443
- self ._shutdown_thread = False
444
- self ._shutdown_lock = threading .Lock ()
445
- self ._broken = False
446
- self ._queue_count = 0
447
- self ._pending_work_items = {}
544
+ # _ThreadWakeup is a communication channel used to interrupt the wait
545
+ # of the main loop of queue_manager_thread from another thread (e.g.
546
+ # when calling executor.submit or executor.shutdown). We do not use the
547
+ # _result_queue to send the wakeup signal to the queue_manager_thread
548
+ # as it could result in a deadlock if a worker process dies with the
549
+ # _result_queue write lock still acquired.
550
+ self ._queue_management_thread_wakeup = _ThreadWakeup ()
448
551
449
552
def _start_queue_management_thread (self ):
450
- # When the executor gets lost, the weakref callback will wake up
451
- # the queue management thread.
452
- def weakref_cb (_ , q = self ._result_queue ):
453
- q .put (None )
454
553
if self ._queue_management_thread is None :
554
+ # When the executor gets garbarge collected, the weakref callback
555
+ # will wake up the queue management thread so that it can terminate
556
+ # if there is no pending work item.
557
+ def weakref_cb (_ ,
558
+ thread_wakeup = self ._queue_management_thread_wakeup ):
559
+ mp .util .debug ('Executor collected: triggering callback for'
560
+ ' QueueManager wakeup' )
561
+ thread_wakeup .wakeup ()
455
562
# Start the processes so that their sentinels are known.
456
563
self ._adjust_process_count ()
457
564
self ._queue_management_thread = threading .Thread (
@@ -461,10 +568,13 @@ def weakref_cb(_, q=self._result_queue):
461
568
self ._pending_work_items ,
462
569
self ._work_ids ,
463
570
self ._call_queue ,
464
- self ._result_queue ))
571
+ self ._result_queue ,
572
+ self ._queue_management_thread_wakeup ),
573
+ name = "QueueManagerThread" )
465
574
self ._queue_management_thread .daemon = True
466
575
self ._queue_management_thread .start ()
467
- _threads_queues [self ._queue_management_thread ] = self ._result_queue
576
+ _threads_wakeups [self ._queue_management_thread ] = \
577
+ self ._queue_management_thread_wakeup
468
578
469
579
def _adjust_process_count (self ):
470
580
for _ in range (len (self ._processes ), self ._max_workers ):
@@ -491,7 +601,7 @@ def submit(self, fn, *args, **kwargs):
491
601
self ._work_ids .put (self ._queue_count )
492
602
self ._queue_count += 1
493
603
# Wake up queue management thread
494
- self ._result_queue . put ( None )
604
+ self ._queue_management_thread_wakeup . wakeup ( )
495
605
496
606
self ._start_queue_management_thread ()
497
607
return f
@@ -531,7 +641,7 @@ def shutdown(self, wait=True):
531
641
self ._shutdown_thread = True
532
642
if self ._queue_management_thread :
533
643
# Wake up queue management thread
534
- self ._result_queue . put ( None )
644
+ self ._queue_management_thread_wakeup . wakeup ( )
535
645
if wait :
536
646
self ._queue_management_thread .join ()
537
647
# To reduce the risk of opening too many files, remove references to
0 commit comments