Skip to content

bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure #17670

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 22 additions & 17 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ def close(self):
self._reader.close()

def wakeup(self):
self._writer.send_bytes(b"")
try:
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a _closed flag might be better than catching the exception? You could also then check the flag in the clear method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, this looks better.

self._writer.send_bytes(b"")
except OSError:
# This can happen if the QueueManagerThread has been shutdown by
# another thread before this wakeup call.
pass

def clear(self):
while self._reader.poll():
Expand Down Expand Up @@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

def _on_queue_feeder_error(self, e, obj):
if isinstance(obj, _CallItem):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this case,
# the queue_manager_thread fails all work_items with BrokenProcessPool
if work_item is not None:
Expand Down Expand Up @@ -339,6 +346,8 @@ def shutdown_worker():

# Release the queue's resources as soon as possible.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a requirement for this PR, but it seems to me that the _queue_management_worker function has grown quite long and complicated. Do you think it could be turned into an object with a bunch of short and readable helper methods?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this would be a very much easier to read indeed. I will try to make a new PR with this change when I get a chance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking forward to your PR then @tomMoral :-)

call_queue.close()
call_queue.join_thread()
thread_wakeup.close()
# If .join() is not called on the created processes then
# some ctx.Queue methods may deadlock on Mac OS X.
for p in processes.values():
Expand Down Expand Up @@ -547,29 +556,30 @@ def __init__(self, max_workers=None, mp_context=None,
self._queue_count = 0
self._pending_work_items = {}

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
queue_size = self._max_workers + EXTRA_QUEUED_CALLS
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items)
pending_work_items=self._pending_work_items,
thread_wakeup=self._queue_management_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = mp_context.SimpleQueue()
self._work_ids = queue.Queue()

# _ThreadWakeup is a communication channel used to interrupt the wait
# of the main loop of queue_manager_thread from another thread (e.g.
# when calling executor.submit or executor.shutdown). We do not use the
# _result_queue to send the wakeup signal to the queue_manager_thread
# as it could result in a deadlock if a worker process dies with the
# _result_queue write lock still acquired.
self._queue_management_thread_wakeup = _ThreadWakeup()

def _start_queue_management_thread(self):
if self._queue_management_thread is None:
# When the executor gets garbarge collected, the weakref callback
Expand Down Expand Up @@ -671,16 +681,11 @@ def shutdown(self, wait=True):
# To reduce the risk of opening too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
if self._call_queue is not None:
self._call_queue.close()
if wait:
self._call_queue.join_thread()
self._call_queue = None
self._call_queue = None
self._result_queue = None
self._processes = None

if self._queue_management_thread_wakeup:
self._queue_management_thread_wakeup.close()
self._queue_management_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__
Expand Down
26 changes: 26 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,32 @@ def test_shutdown_deadlock(self):
with self.assertRaises(BrokenProcessPool):
f.result()

def test_shutdown_deadlock_pickle(self):
# Test that the pool calling shutdown with wait=False does not cause
# a deadlock if a task fails at pickle after the shutdown call.
# Reported in GH#6034.
self.executor.shutdown(wait=True)
with self.executor_type(max_workers=2,
mp_context=get_context(self.ctx)) as executor:
self.executor = executor # Allow clean up in fail_on_deadlock

# Start the executor and get the queue_management_thread to collect
# the threads and avoid dangling thread that should be cleaned up
# asynchronously.
executor.submit(id, 42).result()
queue_manager = executor._queue_management_thread

# Submit a task that fails at pickle and shutdown the executor
# without waiting
f = executor.submit(id, ErrorAtPickle())
executor.shutdown(wait=False)
with self.assertRaises(PicklingError):
f.result()

# Make sure the executor is eventually shutdown and do not leave
# dangling threads
queue_manager.join()


create_executor_tests(ExecutorDeadlockTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down