Skip to content

bpo-31540 Add context management for concurrent.futures.ProcessPoolExecutor #3682

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 3, 2017

Conversation

tomMoral
Copy link
Contributor

@tomMoral tomMoral commented Sep 21, 2017

The ProcessPoolExecutor processes start method can only be change by changing the global default context with set_start_method at the beginning of a script. We propose to allow passing a context argument in the constructor to allow more flexible control of the executor. Doing so, we also add some tests for all the available context, to make sure the executor is working correctly.

In addition, we made the following changes, which can be put in another PR if necessary:

  • Rename the global variable _shutdown to _global_shutdown to make its function in the code clearer.
  • Liberate the ressource earlier in the _worker_process. Indeed, with the actual behavior, the ressources are not freed before the worker receives a new task or shutdown.

This work was done as part of the loky project in collaboration with
@ogrisel. See #1013 for the details.

https://bugs.python.org/issue31540

@tomMoral tomMoral changed the title Add context management for concurrent.futures.ProcessPoolExecutor bpo-31540 Add context management for concurrent.futures.ProcessPoolExecutor Sep 21, 2017
tomMoral/loky#48
* Add context argument to allow non forking ProcessPoolExecutor
* Do some cleaning (pep8+nonused code+naming)
* Liberate the ressource earlier in the `_worker_process`
@tomMoral tomMoral force-pushed the PR_contextual_executor branch from f2f41e0 to 6376291 Compare September 22, 2017 12:25
@@ -180,6 +177,11 @@ def _process_worker(call_queue, result_queue):
result_queue.put(_ResultItem(call_item.work_id,
result=r))

# Liberate the resource as soon as possible, to avoid holding onto
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be easy to add a test for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a test for this behavior.

"""Initializes a new ProcessPoolExecutor instance.

Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
context: A multiprocessing context to launch the workers. This
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a nit, I think calling this parameter mp_context would be a bit more explicit.

t = {executor_type}(5)
t.submit(sleep_and_print, 1.0, "apple")
if __name__ == "__main__":
t = {executor_type}(5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we want to pass the right context argument here?

@@ -56,7 +56,7 @@ matrix:
./venv/bin/python -m test.pythoninfo
script:
# Skip tests that re-run the entire test suite.
- ./venv/bin/python -m coverage run --pylib -m test --fail-env-changed -uall,-cpu -x test_multiprocessing_fork -x test_multiprocessing_forkserver -x test_multiprocessing_spawn
- ./venv/bin/python -m coverage run --pylib -m test --fail-env-changed -uall,-cpu -x test_multiprocessing_fork -x test_multiprocessing_forkserver -x test_multiprocessing_spawn -x test_concurrent_futures
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot the rationale for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When coverage is used with multiprocessing, the spawning of new interpreter launch a new test session, resulting in a mess. I think it was previously working with the fork context but when I launch the test with the three backends, it also launches new test sessions.

Thus, I disabled coverage tests with test_concurrent_futures. I am not sure of what I should do if this is not the case. The duplicated test sessions could results from some command line arguments parsing either in the semaphore tracker or the forkserver but I do not think it is linked to this PR.

Let me know if this make sense.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does make sense, thank you.

@pitrou
Copy link
Member

pitrou commented Oct 2, 2017

Thank you! This looks fine on the principle (apart from a couple small things mentioned in the review). You'll need to update the docs in Doc/library/concurrent.futures.rst.

- Rename context to mp_context in ProcessPoolExecutor constructor
- Fix the context used in test_interpreter_shutdown
- Ensure that the job argument passed are freed asap

An :class:`Executor` subclass that executes calls asynchronously using a pool
of at most *max_workers* processes. If *max_workers* is ``None`` or not
given, it will default to the number of processors on the machine.
If *max_workers* is lower or equal to ``0``, then a :exc:`ValueError`
will be raised.
*mp_context* can be a multiprocessing context or any object providing a
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather restrict it to a multiprocessing context. Perhaps we'll use other multiprocessing APIs in the future.

@@ -55,6 +56,15 @@ def my_method(self):
pass


class EventfulGCObj():
def __init__(self, ctx):
mgr = get_context(ctx).Manager()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the rationale for using a manager here? Since the object is instantiated in the parent, it should be inheritable by the child anyway.

Copy link
Contributor Author

@tomMoral tomMoral Oct 3, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The executor is launched in the setup method of the TestCase. Thus, there is no possibility to pass the Event object thru inheritence and the job (id, obj) is passed via pickle, which requires the Manager.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, right. I've never used managers and I was surprised to see this...

future = self.executor.submit(id, obj)
future.result()

assert obj.event.wait(timeout=1)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By convention, we'd use self.assertTrue (also so that assertions are still checked if running with -O).

@pitrou pitrou merged commit e8c368d into python:master Oct 3, 2017
@pitrou
Copy link
Member

pitrou commented Oct 3, 2017

Thank you @tomMoral !

@tomMoral tomMoral deleted the PR_contextual_executor branch October 3, 2017 10:23
@tomMoral
Copy link
Contributor Author

tomMoral commented Oct 3, 2017

Thank @pitrou
I will follow up with the other PR from #1013 asap.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants