@@ -270,24 +270,40 @@ class _ExecutorManagerThread(threading.Thread):
270
270
"""
271
271
272
272
def __init__ (self , executor ):
273
+ # Store references to necessary internals of the executor.
274
+
275
+ # A _ThreadWakeup to allow waking up the queue_manager_thread from the
276
+ # main Thread and avoid deadlocks caused by permanently locked queues.
277
+ self .thread_wakeup = executor ._executor_manager_thread_wakeup
273
278
279
+ # A weakref.ref to the ProcessPoolExecutor that owns this thread. Used
280
+ # to determine if the ProcessPoolExecutor has been garbage collected
281
+ # and that the manager can exit.
274
282
# When the executor gets garbage collected, the weakref callback
275
283
# will wake up the queue management thread so that it can terminate
276
284
# if there is no pending work item.
277
- self .thread_wakeup = executor ._executor_manager_thread_wakeup
278
-
279
285
def weakref_cb (_ , thread_wakeup = self .thread_wakeup ):
280
286
mp .util .debug ('Executor collected: triggering callback for'
281
287
' QueueManager wakeup' )
282
288
thread_wakeup .wakeup ()
283
289
284
290
self .executor_reference = weakref .ref (executor , weakref_cb )
285
291
286
- # Store references to necessary internals of the executor .
292
+ # A list of the ctx.Process instances used as workers .
287
293
self .processes = executor ._processes
294
+
295
+ # A ctx.Queue that will be filled with _CallItems derived from
296
+ # _WorkItems for processing by the process workers.
288
297
self .call_queue = executor ._call_queue
298
+
299
+ # A ctx.SimpleQueue of _ResultItems generated by the process workers.
289
300
self .result_queue = executor ._result_queue
301
+
302
+ # A queue.Queue of work ids e.g. Queue([5, 6, ...]).
290
303
self .work_ids_queue = executor ._work_ids
304
+
305
+ # A dict mapping work ids to _WorkItems e.g.
306
+ # {5: <_WorkItem...>, 6: <_WorkItem...>, ...}
291
307
self .pending_work_items = executor ._pending_work_items
292
308
293
309
# Set this thread to be daemonized
0 commit comments