Skip to content

Fix bpo-30596: Add close() method to multiprocessing.Process #2010

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,16 @@ The :mod:`multiprocessing` package mostly replicates the API of the
acquired a lock or semaphore etc. then terminating it is liable to
cause other processes to deadlock.

.. method:: close()

Close the :class:`Process` object, releasing all resources associated
with it. :exc:`ValueError` is raised if the underlying process
is still running. Once :meth:`close` returns successfully, most
other methods and attributes of the :class:`Process` object will
raise :exc:`ValueError`.

.. versionadded:: 3.7

Note that the :meth:`start`, :meth:`join`, :meth:`is_alive`,
:meth:`terminate` and :attr:`exitcode` methods should only be called by
the process that created the process object.
Expand Down
16 changes: 12 additions & 4 deletions Lib/multiprocessing/forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,12 @@ def sigchld_handler(*_unused):
else:
assert os.WIFEXITED(sts)
returncode = os.WEXITSTATUS(sts)
# Write the exit code to the pipe
write_signed(child_w, returncode)
# Send exit code to client process
try:
write_signed(child_w, returncode)
except BrokenPipeError:
# client vanished
pass
os.close(child_w)
else:
# This shouldn't happen really
Expand Down Expand Up @@ -241,8 +245,12 @@ def sigchld_handler(*_unused):
finally:
os._exit(code)
else:
# Send pid to client processes
write_signed(child_w, pid)
# Send pid to client process
try:
write_signed(child_w, pid)
except BrokenPipeError:
# client vanished
pass
pid_to_fd[pid] = child_w
os.close(child_r)
for fd in fds:
Expand Down
7 changes: 6 additions & 1 deletion Lib/multiprocessing/popen_fork.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ def __init__(self, process_obj):
sys.stdout.flush()
sys.stderr.flush()
self.returncode = None
self.finalizer = None
self._launch(process_obj)

def duplicate_for_child(self, fd):
Expand Down Expand Up @@ -70,5 +71,9 @@ def _launch(self, process_obj):
os._exit(code)
else:
os.close(child_w)
util.Finalize(self, os.close, (parent_r,))
self.finalizer = util.Finalize(self, os.close, (parent_r,))
self.sentinel = parent_r

def close(self):
if self.finalizer is not None:
self.finalizer()
2 changes: 1 addition & 1 deletion Lib/multiprocessing/popen_forkserver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def _launch(self, process_obj):
set_spawning_popen(None)

self.sentinel, w = forkserver.connect_to_new_process(self._fds)
util.Finalize(self, os.close, (self.sentinel,))
self.finalizer = util.Finalize(self, os.close, (self.sentinel,))
with open(w, 'wb', closefd=True) as f:
f.write(buf.getbuffer())
self.pid = forkserver.read_signed(self.sentinel)
Expand Down
2 changes: 1 addition & 1 deletion Lib/multiprocessing/popen_spawn_posix.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def _launch(self, process_obj):
f.write(fp.getbuffer())
finally:
if parent_r is not None:
util.Finalize(self, os.close, (parent_r,))
self.finalizer = util.Finalize(self, os.close, (parent_r,))
for fd in (child_r, child_w, parent_w):
if fd is not None:
os.close(fd)
5 changes: 4 additions & 1 deletion Lib/multiprocessing/popen_spawn_win32.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ def __init__(self, process_obj):
self.returncode = None
self._handle = hp
self.sentinel = int(hp)
util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
self.finalizer = util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))

# send information to child
set_spawning_popen(self)
Expand Down Expand Up @@ -96,3 +96,6 @@ def terminate(self):
except OSError:
if self.wait(timeout=1.0) is None:
raise

def close(self):
self.finalizer()
35 changes: 35 additions & 0 deletions Lib/multiprocessing/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
self._config = _current_process._config.copy()
self._parent_pid = os.getpid()
self._popen = None
self._closed = False
self._target = target
self._args = tuple(args)
self._kwargs = dict(kwargs)
Expand All @@ -85,6 +86,10 @@ def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
self.daemon = daemon
_dangling.add(self)

def _check_closed(self):
if self._closed:
raise ValueError("process object is closed")

def run(self):
'''
Method to be run in sub-process; can be overridden in sub-class
Expand All @@ -96,6 +101,7 @@ def start(self):
'''
Start child process
'''
self._check_closed()
assert self._popen is None, 'cannot start a process twice'
assert self._parent_pid == os.getpid(), \
'can only start a process object created by current process'
Expand All @@ -110,12 +116,14 @@ def terminate(self):
'''
Terminate process; sends SIGTERM signal or uses TerminateProcess()
'''
self._check_closed()
self._popen.terminate()

def join(self, timeout=None):
'''
Wait until child process terminates
'''
self._check_closed()
assert self._parent_pid == os.getpid(), 'can only join a child process'
assert self._popen is not None, 'can only join a started process'
res = self._popen.wait(timeout)
Expand All @@ -126,6 +134,7 @@ def is_alive(self):
'''
Return whether process is alive
'''
self._check_closed()
if self is _current_process:
return True
assert self._parent_pid == os.getpid(), 'can only test a child process'
Expand All @@ -134,6 +143,23 @@ def is_alive(self):
self._popen.poll()
return self._popen.returncode is None

def close(self):
'''
Close the Process object.

This method releases resources held by the Process object. It is
an error to call this method if the child process is still running.
'''
if self._popen is not None:
if self._popen.poll() is None:
raise ValueError("Cannot close a process while it is still running. "
"You should first call join() or terminate().")
self._popen.close()
self._popen = None
del self._sentinel
_children.discard(self)
self._closed = True

@property
def name(self):
return self._name
Expand Down Expand Up @@ -174,6 +200,7 @@ def exitcode(self):
'''
Return exit code of process or `None` if it has yet to stop
'''
self._check_closed()
if self._popen is None:
return self._popen
return self._popen.poll()
Expand All @@ -183,6 +210,7 @@ def ident(self):
'''
Return identifier (PID) of process or `None` if it has yet to start
'''
self._check_closed()
if self is _current_process:
return os.getpid()
else:
Expand All @@ -196,6 +224,7 @@ def sentinel(self):
Return a file descriptor (Unix) or handle (Windows) suitable for
waiting for process termination.
'''
self._check_closed()
try:
return self._sentinel
except AttributeError:
Expand All @@ -204,6 +233,8 @@ def sentinel(self):
def __repr__(self):
if self is _current_process:
status = 'started'
elif self._closed:
status = 'closed'
elif self._parent_pid != os.getpid():
status = 'unknown'
elif self._popen is None:
Expand Down Expand Up @@ -295,6 +326,7 @@ def __init__(self):
self._name = 'MainProcess'
self._parent_pid = None
self._popen = None
self._closed = False
self._config = {'authkey': AuthenticationString(os.urandom(32)),
'semprefix': '/mp'}
# Note that some versions of FreeBSD only allow named
Expand All @@ -307,6 +339,9 @@ def __init__(self):
# Everything in self._config will be inherited by descendant
# processes.

def close(self):
pass


_current_process = _MainProcess()
_process_counter = itertools.count(1)
Expand Down
36 changes: 36 additions & 0 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,42 @@ def test_sentinel(self):
p.join()
self.assertTrue(wait_for_handle(sentinel, timeout=1))

@classmethod
def _test_close(cls, rc=0, q=None):
if q is not None:
q.get()
sys.exit(rc)

def test_close(self):
if self.TYPE == "threads":
self.skipTest('test not appropriate for {}'.format(self.TYPE))
q = self.Queue()
p = self.Process(target=self._test_close, kwargs={'q': q})
p.daemon = True
p.start()
self.assertEqual(p.is_alive(), True)
# Child is still alive, cannot close
with self.assertRaises(ValueError):
p.close()

q.put(None)
p.join()
self.assertEqual(p.is_alive(), False)
self.assertEqual(p.exitcode, 0)
p.close()
with self.assertRaises(ValueError):
p.is_alive()
with self.assertRaises(ValueError):
p.join()
with self.assertRaises(ValueError):
p.terminate()
p.close()

wr = weakref.ref(p)
del p
gc.collect()
self.assertIs(wr(), None)

def test_many_processes(self):
if self.TYPE == 'threads':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a ``close()`` method to ``multiprocessing.Process``.