Skip to content

bpo-39995: CLN remove some locks in ProcessPoolExecutor #19788

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,18 +89,25 @@ def _python_exit():
global _global_shutdown
_global_shutdown = True
items = list(_threads_wakeups.items())
for _, thread_wakeup in items:
# call not protected by ProcessPoolExecutor._shutdown_lock
thread_wakeup.wakeup()
for _, (shutdown_lock, thread_wakeup) in items:
with shutdown_lock:
thread_wakeup.wakeup()
for t, _ in items:
t.join()


# Register for `_python_exit()` to be called just before joining all
# non-daemon threads. This is used instead of `atexit.register()` for
# compatibility with subinterpreters, which no longer support daemon threads.
# See bpo-39812 for context.
threading._register_atexit(_python_exit)


# With the fork context, _thread_wakeups is propagated to children.
# Clear it after fork to avoid some situation that can cause some
# freeze when joining the workers.
mp.util.register_after_fork(_threads_wakeups, lambda obj: obj.clear())

# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
Expand Down Expand Up @@ -158,10 +165,8 @@ def __init__(self, work_id, fn, args, kwargs):

class _SafeQueue(Queue):
"""Safe Queue set exception to the future object linked to a job"""
def __init__(self, max_size=0, *, ctx, pending_work_items, shutdown_lock,
thread_wakeup):
def __init__(self, max_size=0, *, ctx, pending_work_items, thread_wakeup):
self.pending_work_items = pending_work_items
self.shutdown_lock = shutdown_lock
self.thread_wakeup = thread_wakeup
super().__init__(max_size, ctx=ctx)

Expand All @@ -170,8 +175,7 @@ def _on_queue_feeder_error(self, e, obj):
tb = traceback.format_exception(type(e), e, e.__traceback__)
e.__cause__ = _RemoteTraceback('\n"""\n{}"""'.format(''.join(tb)))
work_item = self.pending_work_items.pop(obj.work_id, None)
with self.shutdown_lock:
self.thread_wakeup.wakeup()
self.thread_wakeup.wakeup()
# work_item can be None if another process terminated. In this
# case, the executor_manager_thread fails all work_items
# with BrokenProcessPool
Expand Down Expand Up @@ -390,8 +394,7 @@ def wait_result_broken_or_wakeup(self):
elif wakeup_reader in ready:
is_broken = False

with self.shutdown_lock:
self.thread_wakeup.clear()
self.thread_wakeup.clear()

return result_item, is_broken, cause

Expand Down Expand Up @@ -643,7 +646,6 @@ def __init__(self, max_workers=None, mp_context=None,
self._call_queue = _SafeQueue(
max_size=queue_size, ctx=self._mp_context,
pending_work_items=self._pending_work_items,
shutdown_lock=self._shutdown_lock,
thread_wakeup=self._executor_manager_thread_wakeup)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
Expand All @@ -658,7 +660,8 @@ def _start_executor_manager_thread(self):
self._executor_manager_thread = _ExecutorManagerThread(self)
self._executor_manager_thread.start()
_threads_wakeups[self._executor_manager_thread] = \
self._executor_manager_thread_wakeup
(self._shutdown_lock,
self._executor_manager_thread_wakeup)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I never tried a weakref.WeakKeyDictionary() where the value a tuple of two objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think this could be a problem? I did not even thing about this..

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as they key isn't a temporary object this is fine.


def _adjust_process_count(self):
# if there's an idle process, we don't need to spawn a new one.
Expand Down