mirror of
https://github.com/python/cpython.git
synced 2026-05-06 12:49:07 -04:00
[3.13] gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316) (#148426)
gh-146313: Fix multiprocessing ResourceTracker deadlock after os.fork() (GH-146316)
`ResourceTracker.__del__` (added in gh-88887 circa Python 3.12) calls
os.waitpid(pid, 0) which blocks indefinitely if a process created via os.fork()
still holds the tracker pipe's write end. The tracker never sees EOF, never
exits, and the parent hangs at interpreter shutdown.
Fix with two layers:
- **At-fork handler.** An os.register_at_fork(after_in_child=...)
handler closes the inherited pipe fd in the child unless a preserve
flag is set. popen_fork.Popen._launch() sets the flag before its
fork so mp.Process(fork) children keep the fd and reuse the parent's
tracker (preserving gh-80849). Raw os.fork() children close the fd,
letting the parent reap promptly.
- **Timeout safety-net.** _stop_locked() gains a wait_timeout
parameter. When called from `__del__`, it polls with WNOHANG using
exponential backoff for up to 1 second instead of blocking
indefinitely. The at-fork handler makes this unreachable in
well-behaved paths; it remains for abnormal shutdowns.
(cherry picked from commit 3a7df632c9)
Co-authored-by: Gregory P. Smith <68491+gpshead@users.noreply.github.com>
Co-authored-by: Itamar Oren <itamarost@gmail.com>
This commit is contained in:
committed by
GitHub
parent
4830d291e7
commit
a268d3fcef
@@ -64,7 +64,17 @@ class Popen(object):
|
||||
code = 1
|
||||
parent_r, child_w = os.pipe()
|
||||
child_r, parent_w = os.pipe()
|
||||
self.pid = os.fork()
|
||||
# gh-146313: Tell the resource tracker's at-fork handler to keep
|
||||
# the inherited pipe fd so this child reuses the parent's tracker
|
||||
# (gh-80849) rather than closing it and launching its own.
|
||||
from .resource_tracker import _fork_intent
|
||||
_fork_intent.preserve_fd = True
|
||||
try:
|
||||
self.pid = os.fork()
|
||||
finally:
|
||||
# Reset in both parent and child so the flag does not leak
|
||||
# into a subsequent raw os.fork() or nested Process launch.
|
||||
_fork_intent.preserve_fd = False
|
||||
if self.pid == 0:
|
||||
try:
|
||||
atexit._clear()
|
||||
|
||||
@@ -20,6 +20,7 @@ import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import warnings
|
||||
from collections import deque
|
||||
|
||||
@@ -79,6 +80,10 @@ class ResourceTracker(object):
|
||||
# The reader should understand all formats.
|
||||
self._use_simple_format = True
|
||||
|
||||
# Set to True by _stop_locked() if the waitpid polling loop ran to
|
||||
# its timeout without reaping the tracker. Exposed for tests.
|
||||
self._waitpid_timed_out = False
|
||||
|
||||
def _reentrant_call_error(self):
|
||||
# gh-109629: this happens if an explicit call to the ResourceTracker
|
||||
# gets interrupted by a garbage collection, invoking a finalizer (*)
|
||||
@@ -91,16 +96,51 @@ class ResourceTracker(object):
|
||||
# making sure child processess are cleaned before ResourceTracker
|
||||
# gets destructed.
|
||||
# see https://github.com/python/cpython/issues/88887
|
||||
self._stop(use_blocking_lock=False)
|
||||
# gh-146313: use a timeout to avoid deadlocking if a forked child
|
||||
# still holds the pipe's write end open.
|
||||
self._stop(use_blocking_lock=False, wait_timeout=1.0)
|
||||
|
||||
def _stop(self, use_blocking_lock=True):
|
||||
def _after_fork_in_child(self):
|
||||
# gh-146313: Called in the child right after os.fork().
|
||||
#
|
||||
# The tracker process is a child of the *parent*, not of us, so we
|
||||
# could never waitpid() it anyway. Clearing _pid means our __del__
|
||||
# becomes a no-op (the early return for _pid is None).
|
||||
#
|
||||
# Whether we keep the inherited _fd depends on who forked us:
|
||||
#
|
||||
# - multiprocessing.Process with the 'fork' start method sets
|
||||
# _fork_intent.preserve_fd before forking. The child keeps the
|
||||
# fd and reuses the parent's tracker (gh-80849). This is safe
|
||||
# because multiprocessing's atexit handler joins all children
|
||||
# before the parent's __del__ runs, so by then the fd copies
|
||||
# are gone and the parent can reap the tracker promptly.
|
||||
#
|
||||
# - A raw os.fork() leaves the flag unset. We close the fd in the child after forking so
|
||||
# the parent's __del__ can reap the tracker without waiting
|
||||
# for the child to exit. If we later need a tracker, ensure_running()
|
||||
# will launch a fresh one.
|
||||
self._lock._at_fork_reinit()
|
||||
self._reentrant_messages.clear()
|
||||
self._pid = None
|
||||
self._exitcode = None
|
||||
if (self._fd is not None and
|
||||
not getattr(_fork_intent, 'preserve_fd', False)):
|
||||
fd = self._fd
|
||||
self._fd = None
|
||||
try:
|
||||
os.close(fd)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
def _stop(self, use_blocking_lock=True, wait_timeout=None):
|
||||
if use_blocking_lock:
|
||||
with self._lock:
|
||||
self._stop_locked()
|
||||
self._stop_locked(wait_timeout=wait_timeout)
|
||||
else:
|
||||
acquired = self._lock.acquire(blocking=False)
|
||||
try:
|
||||
self._stop_locked()
|
||||
self._stop_locked(wait_timeout=wait_timeout)
|
||||
finally:
|
||||
if acquired:
|
||||
self._lock.release()
|
||||
@@ -110,6 +150,10 @@ class ResourceTracker(object):
|
||||
close=os.close,
|
||||
waitpid=os.waitpid,
|
||||
waitstatus_to_exitcode=os.waitstatus_to_exitcode,
|
||||
monotonic=time.monotonic,
|
||||
sleep=time.sleep,
|
||||
WNOHANG=getattr(os, 'WNOHANG', None),
|
||||
wait_timeout=None,
|
||||
):
|
||||
# This shouldn't happen (it might when called by a finalizer)
|
||||
# so we check for it anyway.
|
||||
@@ -126,7 +170,30 @@ class ResourceTracker(object):
|
||||
self._fd = None
|
||||
|
||||
try:
|
||||
_, status = waitpid(self._pid, 0)
|
||||
if wait_timeout is None:
|
||||
_, status = waitpid(self._pid, 0)
|
||||
else:
|
||||
# gh-146313: A forked child may still hold the pipe's write
|
||||
# end open, preventing the tracker from seeing EOF and
|
||||
# exiting. Poll with WNOHANG to avoid blocking forever.
|
||||
deadline = monotonic() + wait_timeout
|
||||
delay = 0.001
|
||||
while True:
|
||||
result_pid, status = waitpid(self._pid, WNOHANG)
|
||||
if result_pid != 0:
|
||||
break
|
||||
remaining = deadline - monotonic()
|
||||
if remaining <= 0:
|
||||
# The tracker is still running; it will be
|
||||
# reparented to PID 1 (or the nearest subreaper)
|
||||
# when we exit, and reaped there once all pipe
|
||||
# holders release their fd.
|
||||
self._pid = None
|
||||
self._exitcode = None
|
||||
self._waitpid_timed_out = True
|
||||
return
|
||||
delay = min(delay * 2, remaining, 0.1)
|
||||
sleep(delay)
|
||||
except ChildProcessError:
|
||||
self._pid = None
|
||||
self._exitcode = None
|
||||
@@ -312,12 +379,24 @@ class ResourceTracker(object):
|
||||
|
||||
self._ensure_running_and_write(msg)
|
||||
|
||||
# gh-146313: Per-thread flag set by .popen_fork.Popen._launch() just before
|
||||
# os.fork(), telling _after_fork_in_child() to keep the inherited pipe fd so
|
||||
# the child can reuse this tracker (gh-80849). Unset for raw os.fork() calls,
|
||||
# where the child instead closes the fd so the parent's __del__ can reap the
|
||||
# tracker. Using threading.local() keeps multiple threads calling
|
||||
# popen_fork.Popen._launch() at once from clobbering eachothers intent.
|
||||
_fork_intent = threading.local()
|
||||
|
||||
_resource_tracker = ResourceTracker()
|
||||
ensure_running = _resource_tracker.ensure_running
|
||||
register = _resource_tracker.register
|
||||
unregister = _resource_tracker.unregister
|
||||
getfd = _resource_tracker.getfd
|
||||
|
||||
# gh-146313: See _after_fork_in_child docstring.
|
||||
if hasattr(os, 'register_at_fork'):
|
||||
os.register_at_fork(after_in_child=_resource_tracker._after_fork_in_child)
|
||||
|
||||
|
||||
def _decode_message(line):
|
||||
if line.startswith(b'{'):
|
||||
|
||||
@@ -6007,8 +6007,9 @@ class TestResourceTracker(unittest.TestCase):
|
||||
def _is_resource_tracker_reused(conn, pid):
|
||||
from multiprocessing.resource_tracker import _resource_tracker
|
||||
_resource_tracker.ensure_running()
|
||||
# The pid should be None in the child process, expect for the fork
|
||||
# context. It should not be a new value.
|
||||
# The pid should be None in the child (the at-fork handler clears
|
||||
# it for fork; spawn/forkserver children never had it set). It
|
||||
# should not be a new value.
|
||||
reused = _resource_tracker._pid in (None, pid)
|
||||
reused &= _resource_tracker._check_alive()
|
||||
conn.send(reused)
|
||||
@@ -6093,6 +6094,183 @@ class TestResourceTracker(unittest.TestCase):
|
||||
# restore sigmask to what it was before executing test
|
||||
signal.pthread_sigmask(signal.SIG_SETMASK, orig_sigmask)
|
||||
|
||||
@only_run_in_forkserver_testsuite("avoids redundant testing.")
|
||||
def test_resource_tracker_fork_deadlock(self):
|
||||
# gh-146313: ResourceTracker.__del__ used to deadlock if a forked
|
||||
# child still held the pipe's write end open when the parent
|
||||
# exited, because the parent would block in waitpid() waiting for
|
||||
# the tracker to exit, but the tracker would never see EOF.
|
||||
cmd = '''if 1:
|
||||
import os, signal
|
||||
from multiprocessing.resource_tracker import ensure_running
|
||||
ensure_running()
|
||||
if os.fork() == 0:
|
||||
signal.pause()
|
||||
os._exit(0)
|
||||
# parent falls through and exits, triggering __del__
|
||||
'''
|
||||
proc = subprocess.Popen([sys.executable, '-c', cmd],
|
||||
start_new_session=True)
|
||||
try:
|
||||
try:
|
||||
proc.wait(timeout=support.SHORT_TIMEOUT)
|
||||
except subprocess.TimeoutExpired:
|
||||
self.fail(
|
||||
"Parent process deadlocked in ResourceTracker.__del__"
|
||||
)
|
||||
self.assertEqual(proc.returncode, 0)
|
||||
finally:
|
||||
try:
|
||||
os.killpg(proc.pid, signal.SIGKILL)
|
||||
except ProcessLookupError:
|
||||
pass
|
||||
proc.wait()
|
||||
|
||||
@only_run_in_forkserver_testsuite("avoids redundant testing.")
|
||||
def test_resource_tracker_mp_fork_reuse_and_prompt_reap(self):
|
||||
# gh-146313 / gh-80849: A child started via multiprocessing.Process
|
||||
# with the 'fork' start method should reuse the parent's resource
|
||||
# tracker (the at-fork handler preserves the inherited pipe fd),
|
||||
# *and* the parent should be able to reap the tracker promptly
|
||||
# after joining the child, without hitting the waitpid timeout.
|
||||
cmd = textwrap.dedent('''
|
||||
import multiprocessing as mp
|
||||
from multiprocessing.resource_tracker import _resource_tracker
|
||||
|
||||
def child(conn):
|
||||
# Prove we can talk to the parent's tracker by registering
|
||||
# and unregistering a dummy resource over the inherited fd.
|
||||
# If the fd were closed, ensure_running would launch a new
|
||||
# tracker and _pid would be non-None.
|
||||
_resource_tracker.register("x", "dummy")
|
||||
_resource_tracker.unregister("x", "dummy")
|
||||
conn.send((_resource_tracker._fd is not None,
|
||||
_resource_tracker._pid is None,
|
||||
_resource_tracker._check_alive()))
|
||||
|
||||
if __name__ == "__main__":
|
||||
mp.set_start_method("fork")
|
||||
_resource_tracker.ensure_running()
|
||||
r, w = mp.Pipe(duplex=False)
|
||||
p = mp.Process(target=child, args=(w,))
|
||||
p.start()
|
||||
child_has_fd, child_pid_none, child_alive = r.recv()
|
||||
p.join()
|
||||
w.close(); r.close()
|
||||
|
||||
# Now simulate __del__: the child has exited and released
|
||||
# its fd copy, so the tracker should see EOF and exit
|
||||
# promptly -- no timeout.
|
||||
_resource_tracker._stop(wait_timeout=5.0)
|
||||
print(child_has_fd, child_pid_none, child_alive,
|
||||
_resource_tracker._waitpid_timed_out,
|
||||
_resource_tracker._exitcode)
|
||||
''')
|
||||
rc, out, err = script_helper.assert_python_ok('-c', cmd)
|
||||
parts = out.decode().split()
|
||||
self.assertEqual(parts, ['True', 'True', 'True', 'False', '0'],
|
||||
f"unexpected: {parts!r} stderr={err!r}")
|
||||
|
||||
@only_run_in_forkserver_testsuite("avoids redundant testing.")
|
||||
def test_resource_tracker_raw_fork_prompt_reap(self):
|
||||
# gh-146313: After a raw os.fork() the at-fork handler closes the
|
||||
# child's inherited fd, so the parent can reap the tracker
|
||||
# immediately -- even while the child is still alive -- rather
|
||||
# than waiting out the 1s timeout.
|
||||
cmd = textwrap.dedent('''
|
||||
import os, signal
|
||||
from multiprocessing.resource_tracker import _resource_tracker
|
||||
|
||||
_resource_tracker.ensure_running()
|
||||
r, w = os.pipe()
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
os.close(r)
|
||||
# Report whether our fd was closed by the at-fork handler.
|
||||
os.write(w, b"1" if _resource_tracker._fd is None else b"0")
|
||||
os.close(w)
|
||||
signal.pause() # stay alive so parent's reap is meaningful
|
||||
os._exit(0)
|
||||
os.close(w)
|
||||
child_fd_closed = os.read(r, 1) == b"1"
|
||||
os.close(r)
|
||||
|
||||
# Child is still alive and paused. Because it closed its fd
|
||||
# copy, our close below is the last one and the tracker exits.
|
||||
_resource_tracker._stop(wait_timeout=5.0)
|
||||
|
||||
os.kill(pid, signal.SIGKILL)
|
||||
os.waitpid(pid, 0)
|
||||
print(child_fd_closed,
|
||||
_resource_tracker._waitpid_timed_out,
|
||||
_resource_tracker._exitcode)
|
||||
''')
|
||||
rc, out, err = script_helper.assert_python_ok('-c', cmd)
|
||||
parts = out.decode().split()
|
||||
self.assertEqual(parts, ['True', 'False', '0'],
|
||||
f"unexpected: {parts!r} stderr={err!r}")
|
||||
|
||||
@only_run_in_forkserver_testsuite("avoids redundant testing.")
|
||||
def test_resource_tracker_lock_reinit_after_fork(self):
|
||||
# gh-146313: If a parent thread held the tracker's lock at fork
|
||||
# time, the child would inherit the held lock and deadlock on
|
||||
# its next ensure_running(). The at-fork handler reinits it.
|
||||
cmd = textwrap.dedent('''
|
||||
import os, threading
|
||||
from multiprocessing.resource_tracker import _resource_tracker
|
||||
|
||||
held = threading.Event()
|
||||
release = threading.Event()
|
||||
def hold():
|
||||
with _resource_tracker._lock:
|
||||
held.set()
|
||||
release.wait()
|
||||
t = threading.Thread(target=hold)
|
||||
t.start()
|
||||
held.wait()
|
||||
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
ok = _resource_tracker._lock.acquire(timeout=5.0)
|
||||
os._exit(0 if ok else 1)
|
||||
|
||||
release.set()
|
||||
t.join()
|
||||
_, status = os.waitpid(pid, 0)
|
||||
print(os.waitstatus_to_exitcode(status))
|
||||
''')
|
||||
rc, out, err = script_helper.assert_python_ok(
|
||||
'-W', 'ignore::DeprecationWarning', '-c', cmd)
|
||||
self.assertEqual(out.strip(), b'0',
|
||||
f"child failed to acquire lock: stderr={err!r}")
|
||||
|
||||
@only_run_in_forkserver_testsuite("avoids redundant testing.")
|
||||
def test_resource_tracker_safety_net_timeout(self):
|
||||
# gh-146313: When an mp.Process(fork) child holds the preserved
|
||||
# fd and the parent calls _stop() without joining (simulating
|
||||
# abnormal shutdown), the safety-net timeout should fire rather
|
||||
# than deadlocking.
|
||||
cmd = textwrap.dedent('''
|
||||
import multiprocessing as mp
|
||||
import signal
|
||||
from multiprocessing.resource_tracker import _resource_tracker
|
||||
|
||||
if __name__ == "__main__":
|
||||
mp.set_start_method("fork")
|
||||
_resource_tracker.ensure_running()
|
||||
p = mp.Process(target=signal.pause)
|
||||
p.start()
|
||||
# Stop WITHOUT joining -- child still holds preserved fd
|
||||
_resource_tracker._stop(wait_timeout=0.5)
|
||||
print(_resource_tracker._waitpid_timed_out)
|
||||
p.terminate()
|
||||
p.join()
|
||||
''')
|
||||
rc, out, err = script_helper.assert_python_ok('-c', cmd)
|
||||
self.assertEqual(out.strip(), b'True',
|
||||
f"safety-net timeout did not fire: stderr={err!r}")
|
||||
|
||||
|
||||
class TestSimpleQueue(unittest.TestCase):
|
||||
|
||||
@classmethod
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
Fix a deadlock in :mod:`multiprocessing`'s resource tracker
|
||||
where the parent process could hang indefinitely in :func:`os.waitpid`
|
||||
during interpreter shutdown if a child created via :func:`os.fork` still
|
||||
held the resource tracker's pipe open.
|
||||
Reference in New Issue
Block a user