-
-
Notifications
You must be signed in to change notification settings - Fork 32.1k
bpo-30006 More robust concurrent.futures.ProcessPoolExecutor #1013
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@tomMoral, thanks for your PR! By analyzing the history of the files in this pull request, we identified @brianquinlan, @asvetlov and @shibturn to be potential reviewers. |
Hello, and thanks for your contribution! I'm a bot set up to make sure that the project can legally accept your contribution by verifying you have signed the PSF contributor agreement (CLA). Unfortunately our records indicate you have not signed the CLA. For legal reasons we need you to sign this before we can look at your contribution. Please follow the steps outlined in the CPython devguide to rectify this issue. Thanks again to your contribution and we look forward to looking at it! |
47a1046
to
389eb02
Compare
874f50e
to
9859dc5
Compare
9859dc5
to
f8e81e0
Compare
I have a hard time understanding the codecov report on this PR but it seems that the way tests are currently run on travis, it is not possible to collect coverage data in part of the code that is only executed in multiprocessing worker processes. For loky we had to use advanced coverage configuration to collect such data, see for instance:
Please let us know if you want us to adapt such configuration to better collect coverage data on children process code in the cpython project. |
7f67dfd
to
e8ce1b6
Compare
3bd48fe
to
c15c8d6
Compare
c15c8d6
to
1f3bbc5
Compare
79ce609
to
495a2d2
Compare
|
495a2d2
to
5df6c68
Compare
5df6c68
to
d988a24
Compare
Lib/test/_test_multiprocessing.py
Outdated
|
||
class NotSerializable(object): | ||
def __init__(self): | ||
self.pass_test = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be set to False
by default, otherwise the test is too easy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If I set it to False
, the test will always fail as I use &=
afterward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok then to make the test more readable we should have two flags:
- one named
reduce_was_called
set toFalse
in__init__
and subsequently set to True in__reduce__
- another named
on_queue_feeder_error_was_called
set toFalse
in__init__
and subsequently set to True in_on_queue_feeder_error
by the Queue itself ife
andobj
have the expected type.
And the reverse the order of q.put(unserializable_obj)
and q.put(True)
to as to make sure that we can make self.assertTrue
on the flags after self.assertTrue(q.get(timeout=0.1))
has succeeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok I did it and also updated the timeout to match the new timeout from #2148
d988a24
to
6a77782
Compare
I updated this PR in the last few days:
|
6a77782
to
95b1957
Compare
tomMoral/loky#48 * Add context argument to allow non forking ProcessPoolExecutor * Do some cleaning (pep8+nonused code+naming) * Liberate the ressource earlier in the `_worker_process`
tomMoral/loky#48 This avoids deadlocks if a Process dies while: * Unpickling the _CallItem * Pickling a _ResultItem Wakeups are done with _Sentinel object that cannot be used with wait. We do not use a Connection/Queue as it brings lot of overhead in the Executor to use only a small part of it. We might want to implement a Sentinel object that can be waited upon to simplify and robustify the code. Test no deadlock with crashes * TST crash in CallItem unpickling * TST crash in func call run * TST crash in REsult pickling the test include crashes with PythonError/SystemExist/SegFault
tomMoral/loky#48 This extra thread checks that the _queue_manager_thread is alive and working. If not, it permits to avoid deadlocks and raise an appropriate Error. It also checks that the QueueFeederThread is alive.
Add a _ExecutorFlags object that hold the state of the ProcessPoolExecutor. This permits to introspect the executor state even after it has been gc and allow to handle correctly the Errors. It also introduces a ShutdownExecutorError for jobs that were cancel on shutdown. Also, this changes the `for` loop on `processes` to while loop to avoid concurrent dictionary updates errors.
95b1957
to
ad34f56
Compare
8b78bd1
to
7bbdf51
Compare
7bbdf51
to
3b07a2b
Compare
This PR still requires some small fixes (particularly to avoid multiple test run in travis gcc tests). |
@tomMoral, do you think it would be possible to split this PR into several ones based on the different issues being fixed (or improvements being made)? I think I'll have a hard time reviewing the PR as is, given how delicate multiprocessing (or concurrent.futures) code generally is. |
Ok, closing. |
This set of patches intends to fix some silent deadlocks that can happened with
the present
ProcessPoolExecutor
implementation. The deadlock situationsnotably include:
The commit message of each patch gives more details but in essence:
fork
onposix systems.
result_queue.wlock
acquired.queue_manager_thread
and thequeue_feeder_thread
.ProcessPoolExecutor
instance has beeen gc'ed.The commits are incremental, each one adds new fixes on top of the previous
ones to handle extra deadlock situations. If you think that some changes in the
last commits need more discussion or refinement, let me now so I can separate
them and open different tickets/ PR.
Each commit passes the test suit on its own but the addition of the 3 context
testing makes
test_concurrent_futures
longer (~140s on my computer). Onefix would be to split the test for each
context
in seprate files (allowing testparallelization). I did not do it as it changes the structure of the test suit but
I can implement it if it is necessary.
This work was done as part of the
loky
project in collaboration with@ogrisel. It provides a backport of the same features for older versions of
python including 2.7 for legacy users and reusable
executors
.https://bugs.python.org/issue30006