Skip to content

gh-104090: Add exit code to multiprocessing ResourceTracker #115410

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
70a35a2
Updated Resource Tracker to return exit code based on leaking found o…
bityob Jul 16, 2023
da7e245
Add NEWS file
bityob Jul 16, 2023
1de022d
Use subTest for the parametrize the same tests with different values
bityob Jul 17, 2023
48bea84
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 17, 2023
31bc4af
Updated tests based on PR comments, and fix the resource_tracker code…
bityob Jul 18, 2023
47551a3
Clean test
bityob Jul 18, 2023
a02fdbe
Added comment for test and simplified the logic
bityob Jul 18, 2023
9d0369e
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
4dbb777
Fix tests and News based on PR comments
bityob Jul 18, 2023
bfd1e8e
Added reseting the exit code when the process should run again
bityob Jul 18, 2023
043ae90
Added more clearing queue methods beside del (closing and calling gc.…
bityob Jul 18, 2023
850b5bb
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 18, 2023
6e82940
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 19, 2023
a5cb934
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Jul 19, 2023
ccd736c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 20, 2023
3b21d5c
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
bityob Jul 23, 2023
fbd18ec
Merge main
bityob Aug 31, 2023
e258cb3
Moved FailingInitializerResourcesTest class from old path (Lib/test/t…
bityob Aug 31, 2023
5b886bf
Merge branch 'gh-104090-fix-leaked-semaphors-on-test_concurrent_futur…
bityob Aug 31, 2023
915c8a3
Added missing import
bityob Aug 31, 2023
cd377e6
Merge branch 'main' into gh-104090-fix-leaked-semaphors-on-test_concu…
pitrou Sep 19, 2023
e3babf0
Merge in the main branch
encukou Feb 5, 2024
8aef868
gh-106807: test_resource_tracker_exit_code: Use a separate ResourceTr…
encukou Feb 5, 2024
16d0142
Merge in the main branch
encukou Feb 13, 2024
72c05ac
Update the NEWS entry
encukou Feb 13, 2024
9694295
Merge branch 'main' into resourcetracker-status
encukou Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 32 additions & 6 deletions Lib/multiprocessing/resource_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,12 @@
_HAVE_SIGMASK = hasattr(signal, 'pthread_sigmask')
_IGNORED_SIGNALS = (signal.SIGINT, signal.SIGTERM)

def cleanup_noop(name):
raise RuntimeError('noop should never be registered or cleaned up')

_CLEANUP_FUNCS = {
'noop': lambda: None,
'noop': cleanup_noop,
'dummy': lambda name: None, # Dummy resource used in tests
}

if os.name == 'posix':
Expand Down Expand Up @@ -61,6 +65,7 @@ def __init__(self):
self._lock = threading.RLock()
self._fd = None
self._pid = None
self._exitcode = None

def _reentrant_call_error(self):
# gh-109629: this happens if an explicit call to the ResourceTracker
Expand All @@ -84,9 +89,16 @@ def _stop(self):
os.close(self._fd)
self._fd = None

os.waitpid(self._pid, 0)
_, status = os.waitpid(self._pid, 0)

self._pid = None

try:
self._exitcode = os.waitstatus_to_exitcode(status)
except ValueError:
# os.waitstatus_to_exitcode may raise an exception for invalid values
self._exitcode = None

def getfd(self):
self.ensure_running()
return self._fd
Expand Down Expand Up @@ -119,6 +131,7 @@ def ensure_running(self):
pass
self._fd = None
self._pid = None
self._exitcode = None

warnings.warn('resource_tracker: process died unexpectedly, '
'relaunching. Some resources might leak.')
Expand Down Expand Up @@ -221,6 +234,8 @@ def main(fd):
pass

cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()}
exit_code = 0

try:
# keep track of registered/unregistered resources
with open(fd, 'rb') as f:
Expand All @@ -242,6 +257,7 @@ def main(fd):
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
exit_code = 3
try:
sys.excepthook(*sys.exc_info())
except:
Expand All @@ -251,10 +267,17 @@ def main(fd):
for rtype, rtype_cache in cache.items():
if rtype_cache:
try:
warnings.warn(
f'resource_tracker: There appear to be {len(rtype_cache)} '
f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}'
)
exit_code = 1
if rtype == 'dummy':
# The test 'dummy' resource is expected to leak.
# We skip the warning (and *only* the warning) for it.
pass
else:
warnings.warn(
f'resource_tracker: There appear to be '
f'{len(rtype_cache)} leaked {rtype} objects to '
f'clean up at shutdown: {rtype_cache}'
)
except Exception:
pass
for name in rtype_cache:
Expand All @@ -265,6 +288,9 @@ def main(fd):
try:
_CLEANUP_FUNCS[rtype](name)
except Exception as e:
exit_code = 2
warnings.warn('resource_tracker: %r: %s' % (name, e))
finally:
pass

sys.exit(exit_code)
35 changes: 34 additions & 1 deletion Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -5609,8 +5609,9 @@ def create_and_register_resource(rtype):
'''
for rtype in resource_tracker._CLEANUP_FUNCS:
with self.subTest(rtype=rtype):
if rtype == "noop":
if rtype in ("noop", "dummy"):
# Artefact resource type used by the resource_tracker
# or tests
continue
r, w = os.pipe()
p = subprocess.Popen([sys.executable,
Expand Down Expand Up @@ -5730,6 +5731,38 @@ def test_too_long_name_resource(self):
with self.assertRaises(ValueError):
resource_tracker.register(too_long_name_resource, rtype)

def _test_resource_tracker_leak_resources(self, cleanup):
# We use a separate instance for testing, since the main global
# _resource_tracker may be used to watch test infrastructure.
from multiprocessing.resource_tracker import ResourceTracker
tracker = ResourceTracker()
tracker.ensure_running()
self.assertTrue(tracker._check_alive())

self.assertIsNone(tracker._exitcode)
tracker.register('somename', 'dummy')
if cleanup:
tracker.unregister('somename', 'dummy')
expected_exit_code = 0
else:
expected_exit_code = 1

self.assertTrue(tracker._check_alive())
self.assertIsNone(tracker._exitcode)
tracker._stop()
self.assertEqual(tracker._exitcode, expected_exit_code)

def test_resource_tracker_exit_code(self):
"""
Test the exit code of the resource tracker.

If no leaked resources were found, exit code should be 0, otherwise 1
"""
for cleanup in [True, False]:
with self.subTest(cleanup=cleanup):
self._test_resource_tracker_leak_resources(
cleanup=cleanup,
)

class TestSimpleQueue(unittest.TestCase):

Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import queue
import time
import unittest
import sys
from concurrent.futures._base import BrokenExecutor
from logging.handlers import QueueHandler

Expand Down Expand Up @@ -109,6 +110,31 @@ def _assert_logged(self, msg):
create_executor_tests(globals(), FailingInitializerMixin)


@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows")
class FailingInitializerResourcesTest(unittest.TestCase):
"""
Source: https://github.com/python/cpython/issues/104090
"""

def _test(self, test_class):
runner = unittest.TextTestRunner()
runner.run(test_class('test_initializer'))

# GH-104090:
# Stop resource tracker manually now, so we can verify there are not leaked resources by checking
# the process exit code
from multiprocessing.resource_tracker import _resource_tracker
_resource_tracker._stop()

self.assertEqual(_resource_tracker._exitcode, 0)

def test_spawn(self):
self._test(ProcessPoolSpawnFailingInitializerTest)

def test_forkserver(self):
self._test(ProcessPoolForkserverFailingInitializerTest)


def setUpModule():
setup_module()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
The multiprocessing resource tracker now exits with non-zero status code if a resource
leak was detected. It still exits with status code 0 otherwise.