Skip to content

bpo-24484: Avoid race condition in multiprocessing cleanup #2159

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jun 13, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 21 additions & 13 deletions Lib/multiprocessing/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,20 +241,28 @@ def _run_finalizers(minpriority=None):
return

if minpriority is None:
f = lambda p : p[0][0] is not None
f = lambda p : p[0] is not None
else:
f = lambda p : p[0][0] is not None and p[0][0] >= minpriority

items = [x for x in list(_finalizer_registry.items()) if f(x)]
items.sort(reverse=True)

for key, finalizer in items:
sub_debug('calling %s', finalizer)
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()
f = lambda p : p[0] is not None and p[0] >= minpriority

# Careful: _finalizer_registry may be mutated while this function
# is running (either by a GC run or by another thread).

# list(_finalizer_registry) should be atomic, while
# list(_finalizer_registry.items()) is not.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it is possible to make list(dict.items()) almost atomic (except allocating new tuples) or even truly atomic. Is it worth?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a separate PR if you like, but this fix needs to be backported anyway.

keys = [key for key in list(_finalizer_registry) if f(key)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be written also as

keys = list(_finalizer_registry)
keys = sorted(filter(f, keys), reverse=True)

if you prefer.

keys.sort(reverse=True)

for key in keys:
finalizer = _finalizer_registry.get(key)
# key may have been removed from the registry
if finalizer is not None:
sub_debug('calling %s', finalizer)
try:
finalizer()
except Exception:
import traceback
traceback.print_exc()

if minpriority is None:
_finalizer_registry.clear()
Expand Down
63 changes: 63 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3110,6 +3110,14 @@ class _TestFinalize(BaseTestCase):

ALLOWED_TYPES = ('processes',)

def setUp(self):
self.registry_backup = util._finalizer_registry.copy()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed for every test or just for test_thread_safety?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for test_thread_safety, but it was easier to put it here.

util._finalizer_registry.clear()

def tearDown(self):
self.assertFalse(util._finalizer_registry)
util._finalizer_registry.update(self.registry_backup)

@classmethod
def _test_finalize(cls, conn):
class Foo(object):
Expand Down Expand Up @@ -3159,6 +3167,61 @@ def test_finalize(self):
result = [obj for obj in iter(conn.recv, 'STOP')]
self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])

def test_thread_safety(self):
# bpo-24484: _run_finalizers() should be thread-safe
def cb():
pass

class Foo(object):
def __init__(self):
self.ref = self # create reference cycle
# insert finalizer at random key
util.Finalize(self, cb, exitpriority=random.randint(1, 100))

finish = False
exc = None

def run_finalizers():
nonlocal exc
while not finish:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add and not exc?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, perhaps.

time.sleep(random.random() * 1e-1)
try:
# A GC run will eventually happen during this,
# collecting stale Foo's and mutating the registry
util._run_finalizers()
except Exception as e:
exc = e

def make_finalizers():
nonlocal exc
d = {}
while not finish:
try:
# Old Foo's get gradually replaced and later
# collected by the GC (because of the cyclic ref)
d[random.getrandbits(5)] = {Foo() for i in range(10)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a set?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured it would make deallocation order less deterministic than a list.

except Exception as e:
exc = e
d.clear()

old_interval = sys.getswitchinterval()
old_threshold = gc.get_threshold()
try:
sys.setswitchinterval(1e-6)
gc.set_threshold(5, 5, 5)
threads = [threading.Thread(target=run_finalizers),
threading.Thread(target=make_finalizers)]
with test.support.start_threads(threads):
time.sleep(4.0) # Wait a bit to trigger race condition
finish = True
if exc is not None:
raise exc
finally:
sys.setswitchinterval(old_interval)
gc.set_threshold(*old_threshold)
gc.collect() # Collect remaining Foo's


#
# Test that from ... import * works for each module
#
Expand Down
2 changes: 2 additions & 0 deletions Misc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,8 @@ Extension Modules
Library
-------

- bpo-24484: Avoid race condition in multiprocessing cleanup.

- bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite
of the signal number when the process is killed by a signal (instead
of 255) when using the "forkserver" method.
Expand Down