-
-
Notifications
You must be signed in to change notification settings - Fork 32.2k
bpo-39104: Fix hanging ProcessPoolExecutor on shutdown nowait with pickling failure #17670
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
44f70fd
38d8c98
b4ec893
de5ef88
8d6b285
9d47cf9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -87,7 +87,12 @@ def close(self): | |
self._reader.close() | ||
|
||
def wakeup(self): | ||
self._writer.send_bytes(b"") | ||
try: | ||
self._writer.send_bytes(b"") | ||
except OSError: | ||
# This can happen if the QueueManagerThread has been shutdown by | ||
# another thread before this wakeup call. | ||
pass | ||
|
||
def clear(self): | ||
while self._reader.poll(): | ||
|
@@ -160,15 +165,17 @@ def __init__(self, work_id, fn, args, kwargs): | |
|
||
class _SafeQueue(Queue): | ||
"""Safe Queue set exception to the future object linked to a job""" | ||
def __init__(self, max_size=0, *, ctx, pending_work_items): | ||
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup): | ||
self.pending_work_items = pending_work_items | ||
self.thread_wakeup = thread_wakeup | ||
super().__init__(max_size, ctx=ctx) | ||
|
||
def _on_queue_feeder_error(self, e, obj): | ||
if isinstance(obj, _CallItem): | ||
tb = traceback.format_exception(type(e), e, e.__traceback__) | ||
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb))) | ||
work_item = self.pending_work_items.pop(obj.work_id, None) | ||
self.thread_wakeup.wakeup() | ||
# work_item can be None if another process terminated. In this case, | ||
# the queue_manager_thread fails all work_items with BrokenProcessPool | ||
if work_item is not None: | ||
|
@@ -339,6 +346,8 @@ def shutdown_worker(): | |
|
||
# Release the queue's resources as soon as possible. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not a requirement for this PR, but it seems to me that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes this would be a very much easier to read indeed. I will try to make a new PR with this change when I get a chance. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looking forward to your PR then @tomMoral :-) |
||
call_queue.close() | ||
call_queue.join_thread() | ||
thread_wakeup.close() | ||
# If .join() is not called on the created processes then | ||
# some ctx.Queue methods may deadlock on Mac OS X. | ||
for p in processes.values(): | ||
|
@@ -547,29 +556,30 @@ def __init__(self, max_workers=None, mp_context=None, | |
self._queue_count = 0 | ||
self._pending_work_items = {} | ||
|
||
# _ThreadWakeup is a communication channel used to interrupt the wait | ||
# of the main loop of queue_manager_thread from another thread (e.g. | ||
# when calling executor.submit or executor.shutdown). We do not use the | ||
# _result_queue to send the wakeup signal to the queue_manager_thread | ||
# as it could result in a deadlock if a worker process dies with the | ||
# _result_queue write lock still acquired. | ||
self._queue_management_thread_wakeup = _ThreadWakeup() | ||
|
||
# Create communication channels for the executor | ||
# Make the call queue slightly larger than the number of processes to | ||
# prevent the worker processes from idling. But don't make it too big | ||
# because futures in the call queue cannot be cancelled. | ||
queue_size = self._max_workers + EXTRA_QUEUED_CALLS | ||
self._call_queue = _SafeQueue( | ||
max_size=queue_size, ctx=self._mp_context, | ||
pending_work_items=self._pending_work_items) | ||
pending_work_items=self._pending_work_items, | ||
thread_wakeup=self._queue_management_thread_wakeup) | ||
# Killed worker processes can produce spurious "broken pipe" | ||
# tracebacks in the queue's own worker thread. But we detect killed | ||
# processes anyway, so silence the tracebacks. | ||
self._call_queue._ignore_epipe = True | ||
self._result_queue = mp_context.SimpleQueue() | ||
self._work_ids = queue.Queue() | ||
|
||
# _ThreadWakeup is a communication channel used to interrupt the wait | ||
# of the main loop of queue_manager_thread from another thread (e.g. | ||
# when calling executor.submit or executor.shutdown). We do not use the | ||
# _result_queue to send the wakeup signal to the queue_manager_thread | ||
# as it could result in a deadlock if a worker process dies with the | ||
# _result_queue write lock still acquired. | ||
self._queue_management_thread_wakeup = _ThreadWakeup() | ||
|
||
def _start_queue_management_thread(self): | ||
if self._queue_management_thread is None: | ||
# When the executor gets garbarge collected, the weakref callback | ||
|
@@ -671,16 +681,11 @@ def shutdown(self, wait=True): | |
# To reduce the risk of opening too many files, remove references to | ||
# objects that use file descriptors. | ||
self._queue_management_thread = None | ||
if self._call_queue is not None: | ||
self._call_queue.close() | ||
if wait: | ||
self._call_queue.join_thread() | ||
self._call_queue = None | ||
self._call_queue = None | ||
self._result_queue = None | ||
self._processes = None | ||
|
||
if self._queue_management_thread_wakeup: | ||
self._queue_management_thread_wakeup.close() | ||
self._queue_management_thread_wakeup = None | ||
|
||
shutdown.__doc__ = _base.Executor.shutdown.__doc__ | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a _closed flag might be better than catching the exception? You could also then check the flag in the clear method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right, this looks better.