Skip to content

bpo-36668: FIX reuse semaphore tracker for child processes #5172

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 24, 2019
Merged
36 changes: 26 additions & 10 deletions Lib/multiprocessing/semaphore_tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,23 @@ def ensure_running(self):
This can be run from any process. Usually a child process will use
the semaphore created by its parent.'''
with self._lock:
if self._pid is not None:
if self._fd is not None:
# semaphore tracker was launched before, is it still running?
if self._check_alive():
# => still alive
return
# => dead, launch it again
os.close(self._fd)

# Clean-up to avoid dangling processes.
try:
pid, _ = os.waitpid(self._pid, os.WNOHANG)
# _pid can be None if this process is a child from another
# python process, which has started the semaphore_tracker.
if self._pid is not None:
os.waitpid(self._pid, 0)
except ChildProcessError:
# The process terminated
# The semaphore_tracker has already been terminated.
pass
else:
if not pid:
# => still alive
return

# => dead, launch it again
os.close(self._fd)
self._fd = None
self._pid = None

Expand Down Expand Up @@ -99,6 +102,17 @@ def ensure_running(self):
finally:
os.close(r)

def _check_alive(self):
'''Check that the pipe has not been closed by sending a probe.'''
try:
# We cannot use send here as it calls ensure_running, creating
# a cycle.
os.write(self._fd, b'PROBE:0\n')
except OSError:
return False
else:
return True

def register(self, name):
'''Register name of semaphore with semaphore tracker.'''
self._send('REGISTER', name)
Expand Down Expand Up @@ -150,6 +164,8 @@ def main(fd):
cache.add(name)
elif cmd == b'UNREGISTER':
cache.remove(name)
elif cmd == b'PROBE':
pass
else:
raise RuntimeError('unrecognized command %r' % cmd)
except Exception:
Expand Down
28 changes: 28 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -4895,6 +4895,34 @@ def test_semaphore_tracker_sigkill(self):
# Uncatchable signal.
self.check_semaphore_tracker_death(signal.SIGKILL, True)

@staticmethod
def _is_semaphore_tracker_reused(conn, pid):
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
# The pid should be None in the child process, expect for the fork
# context. It should not be a new value.
reused = _semaphore_tracker._pid in (None, pid)
reused &= _semaphore_tracker._check_alive()
conn.send(reused)

def test_semaphore_tracker_reused(self):
from multiprocessing.semaphore_tracker import _semaphore_tracker
_semaphore_tracker.ensure_running()
pid = _semaphore_tracker._pid

r, w = multiprocessing.Pipe(duplex=False)
p = multiprocessing.Process(target=self._is_semaphore_tracker_reused,
args=(w, pid))
p.start()
is_semaphore_tracker_reused = r.recv()

# Clean up
p.join()
w.close()
r.close()

self.assertTrue(is_semaphore_tracker_reused)


class TestSimpleQueue(unittest.TestCase):

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix the multiprocessing.semaphore_tracker so it is reused by child processes