Skip to content

bpo-29861: release references to multiprocessing Pool tasks (#743) #801

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ def worker(inqueue, outqueue, initializer=None, initargs=(), maxtasks=None,
util.debug("Possible encoding error while sending result: %s" % (
wrapped))
put((job, i, (False, wrapped)))

task = job = result = func = args = kwds = None
completed += 1
util.debug('worker exiting after %d tasks' % completed)

Expand Down Expand Up @@ -402,10 +404,11 @@ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
if set_length:
util.debug('doing set_length()')
set_length(i+1)
finally:
task = taskseq = job = None
else:
util.debug('task handler got sentinel')


try:
# tell result handler to finish when cache is empty
util.debug('task handler sending sentinel to result handler')
Expand Down Expand Up @@ -445,6 +448,7 @@ def _handle_results(outqueue, get, cache):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

while cache and thread._state != TERMINATE:
try:
Expand All @@ -461,6 +465,7 @@ def _handle_results(outqueue, get, cache):
cache[job]._set(i, obj)
except KeyError:
pass
task = job = obj = None

if hasattr(outqueue, '_reader'):
util.debug('ensuring that outqueue is not full')
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import logging
import struct
import operator
import weakref
import test.support
import test.support.script_helper

Expand Down Expand Up @@ -1668,6 +1669,19 @@ def sqr(x, wait=0.0):
def mul(x, y):
return x*y

def identity(x):
return x

class CountedObject(object):
n_instances = 0

def __new__(cls):
cls.n_instances += 1
return object.__new__(cls)

def __del__(self):
type(self).n_instances -= 1

class SayWhenError(ValueError): pass

def exception_throwing_generator(total, when):
Expand All @@ -1676,6 +1690,7 @@ def exception_throwing_generator(total, when):
raise SayWhenError("Somebody said when")
yield i


class _TestPool(BaseTestCase):

@classmethod
Expand Down Expand Up @@ -1910,6 +1925,19 @@ def test_wrapped_exception(self):
with self.assertRaises(RuntimeError):
p.apply(self._test_wrapped_exception)

def test_release_task_refs(self):
# Issue #29861: task arguments and results should not be kept
# alive after we are done with them.
objs = [CountedObject() for i in range(10)]
refs = [weakref.ref(o) for o in objs]
self.pool.map(identity, objs)

del objs
self.assertEqual(set(wr() for wr in refs), {None})
# With a process pool, copies of the objects are returned, check
# they were released too.
self.assertEqual(CountedObject.n_instances, 0)


def raising():
raise KeyError("key")
Expand Down
3 changes: 3 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Extension Modules
Library
-------

- bpo-29861: Release references to tasks, their arguments and their results
as soon as they are finished in multiprocessing.Pool.

- bpo-29884: faulthandler: Restore the old sigaltstack during teardown.
Patch by Christophe Zeitouny.

Expand Down