From 4f9078ab360aab3baf385d3dbbde6e14d415c7fe Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 13 Jun 2017 12:27:26 +0200 Subject: [PATCH 1/3] bpo-24484: Avoid race condition in multiprocessing cleanup The finalizer registry can be mutated while inspected by multiprocessing at process exit. --- Lib/multiprocessing/util.py | 34 +++++++++------ Lib/test/_test_multiprocessing.py | 70 +++++++++++++++++++++++++++++++ 2 files changed, 91 insertions(+), 13 deletions(-) diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py index 0ce274ceca6057..b490caa7e64333 100644 --- a/Lib/multiprocessing/util.py +++ b/Lib/multiprocessing/util.py @@ -241,20 +241,28 @@ def _run_finalizers(minpriority=None): return if minpriority is None: - f = lambda p : p[0][0] is not None + f = lambda p : p[0] is not None else: - f = lambda p : p[0][0] is not None and p[0][0] >= minpriority - - items = [x for x in list(_finalizer_registry.items()) if f(x)] - items.sort(reverse=True) - - for key, finalizer in items: - sub_debug('calling %s', finalizer) - try: - finalizer() - except Exception: - import traceback - traceback.print_exc() + f = lambda p : p[0] is not None and p[0] >= minpriority + + # Careful: _finalizer_registry may be mutated while this function + # is running (either by a GC run or by another thread). + + # list(_finalizer_registry) should be atomic, while + # list(_finalizer_registry.items()) is not. + keys = [key for key in list(_finalizer_registry) if f(key)] + keys.sort(reverse=True) + + for key in keys: + finalizer = _finalizer_registry.get(key) + # key may have been removed from the registry + if finalizer is not None: + sub_debug('calling %s', finalizer) + try: + finalizer() + except Exception: + import traceback + traceback.print_exc() if minpriority is None: _finalizer_registry.clear() diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 70ecc54bfec2c5..97d293996ae759 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3110,6 +3110,14 @@ class _TestFinalize(BaseTestCase): ALLOWED_TYPES = ('processes',) + def setUp(self): + self.registry_backup = util._finalizer_registry.copy() + util._finalizer_registry.clear() + + def tearDown(self): + self.assertFalse(util._finalizer_registry) + util._finalizer_registry.update(self.registry_backup) + @classmethod def _test_finalize(cls, conn): class Foo(object): @@ -3159,6 +3167,68 @@ def test_finalize(self): result = [obj for obj in iter(conn.recv, 'STOP')] self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) + def test_thread_safety(self): + # bpo-24484: _run_finalizers() should be thread-safe + def cb(): + pass + + class Foo(object): + def __init__(self): + self.ref = self # create reference cycle + # insert finalizer at random key + util.Finalize(self, cb, exitpriority=random.randint(1, 100)) + + finish = False + exc = None + + def run_finalizers(): + nonlocal exc + while not finish: + time.sleep(random.random() * 1e-1) + try: + # A GC run will eventually happen during this, + # collecting stale Foo's and mutating the registry + util._run_finalizers() + except Exception as e: + exc = e + + def make_finalizers(): + nonlocal exc + d = {} + threshold = 60 + while not finish: + try: + # Old Foo's get gradually replaced and later + # collected by the GC (because of the cyclic ref) + d[random.getrandbits(5)] = {Foo() for i in range(10)} + except Exception as e: + exc = e + d.clear() + + old_interval = sys.getswitchinterval() + old_threshold = gc.get_threshold() + try: + sys.setswitchinterval(1e-6) + gc.set_threshold(5, 5, 5) + threads = [] + t = threading.Thread(target=run_finalizers) + t.start() + threads.append(t) + t = threading.Thread(target=make_finalizers) + t.start() + threads.append(t) + time.sleep(4.0) # Wait a bit to trigger race condition + finish = True + for t in threads: + t.join() + if exc is not None: + raise exc + finally: + sys.setswitchinterval(old_interval) + gc.set_threshold(*old_threshold) + gc.collect() # Collect remaining Foo's + + # # Test that from ... import * works for each module # From 9ec9cbc01961b12de4914912462fdd20c8b9ecdc Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 13 Jun 2017 14:49:30 +0200 Subject: [PATCH 2/3] Use test.support.start_threads() --- Lib/test/_test_multiprocessing.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 97d293996ae759..d49e9c68c84bc7 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3195,7 +3195,6 @@ def run_finalizers(): def make_finalizers(): nonlocal exc d = {} - threshold = 60 while not finish: try: # Old Foo's get gradually replaced and later @@ -3210,17 +3209,11 @@ def make_finalizers(): try: sys.setswitchinterval(1e-6) gc.set_threshold(5, 5, 5) - threads = [] - t = threading.Thread(target=run_finalizers) - t.start() - threads.append(t) - t = threading.Thread(target=make_finalizers) - t.start() - threads.append(t) - time.sleep(4.0) # Wait a bit to trigger race condition - finish = True - for t in threads: - t.join() + threads = [threading.Thread(target=run_finalizers), + threading.Thread(target=make_finalizers)] + with test.support.start_threads(threads): + time.sleep(4.0) # Wait a bit to trigger race condition + finish = True if exc is not None: raise exc finally: From 0e2b636f8b2a65d785c3333690303bcb70d8ee38 Mon Sep 17 00:00:00 2001 From: Antoine Pitrou Date: Tue, 13 Jun 2017 16:53:55 +0200 Subject: [PATCH 3/3] Add Misc/NEWS --- Misc/NEWS | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Misc/NEWS b/Misc/NEWS index f9aa1630ebf192..f91695d3350d46 100644 --- a/Misc/NEWS +++ b/Misc/NEWS @@ -362,6 +362,8 @@ Extension Modules Library ------- +- bpo-24484: Avoid race condition in multiprocessing cleanup. + - bpo-30589: Fix multiprocessing.Process.exitcode to return the opposite of the signal number when the process is killed by a signal (instead of 255) when using the "forkserver" method.