Skip to content

Commit 04e1dc1

Browse files
committed
ENH independent com channel for wakeup to avoid deadlocks in shutdown
1 parent 5a27c11 commit 04e1dc1

File tree

2 files changed

+85
-43
lines changed

2 files changed

+85
-43
lines changed

Lib/concurrent/futures/process.py

Lines changed: 81 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -76,12 +76,36 @@
7676
_threads_queues = weakref.WeakKeyDictionary()
7777
_global_shutdown = False
7878

79+
80+
# This constants control the maximal wakeup. If a job is submitted to the
81+
# Executor, it might take up to _POLL_TIMEOUT for the executor to notice and
82+
# start launching the job. This _POLL_TIMEOUT is to be cumulated with the
83+
# communication overhead.
84+
_POLL_TIMEOUT = .001
85+
86+
87+
class _Sentinel:
88+
__slot__ = ["_state"]
89+
90+
def __init__(self):
91+
self._state = False
92+
93+
def set(self):
94+
self._state = True
95+
96+
def get_and_unset(self):
97+
s = self._state
98+
if s:
99+
self._state = False
100+
return s
101+
102+
79103
def _python_exit():
80104
global _global_shutdown
81105
_global_shutdown = True
82106
items = list(_threads_queues.items())
83-
for t, q in items:
84-
q.put(None)
107+
for t, wakeup in items:
108+
wakeup.set()
85109
for t, q in items:
86110
t.join()
87111

@@ -266,7 +290,8 @@ def _queue_management_worker(executor_reference,
266290
pending_work_items,
267291
work_ids_queue,
268292
call_queue,
269-
result_queue):
293+
result_queue,
294+
wakeup):
270295
"""Manages the communication between this process and the worker processes.
271296
272297
This function is run in a local thread.
@@ -284,6 +309,8 @@ def _queue_management_worker(executor_reference,
284309
derived from _WorkItems for processing by the process workers.
285310
result_queue: A ctx.SimpleQueue of _ResultItems generated by the
286311
process workers.
312+
wakeup: A _Sentinel to allow waking up the queue_manager_thread from
313+
the main Thread and avoid deadlocks caused by broken queues.
287314
"""
288315
executor = None
289316

@@ -292,38 +319,56 @@ def shutting_down():
292319
or executor._shutdown_thread)
293320

294321
def shutdown_worker():
295-
# This is an upper bound
296-
nb_children_alive = sum(p.is_alive() for p in processes.values())
297-
for i in range(0, nb_children_alive):
298-
try:
299-
call_queue.put_nowait(None)
300-
except Full:
301-
pass
322+
# This is an upper bound on the number of children alive.
323+
n_children_alive = sum(p.is_alive() for p in processes.values())
324+
n_children_to_stop = n_children_alive
325+
n_sentinels_sent = 0
326+
# Sent the right number of sentinels, to make sure all children are
327+
# properly terminated.
328+
while n_sentinels_sent < n_children_to_stop and n_children_alive > 0:
329+
for i in range(n_children_to_stop - n_sentinels_sent):
330+
try:
331+
call_queue.put_nowait(None)
332+
n_sentinels_sent += 1
333+
except Full:
334+
break
335+
n_children_alive = sum(p.is_alive() for p in processes.values())
336+
302337
# Release the queue's resources as soon as possible.
303338
call_queue.close()
304339
# If .join() is not called on the created processes then
305340
# some ctx.Queue methods may deadlock on Mac OS X.
306341
for p in processes.values():
307342
p.join()
308343

309-
reader = result_queue._reader
344+
result_reader = result_queue._reader
310345

311346
while True:
312347
_add_call_item_to_queue(pending_work_items,
313348
work_ids_queue,
314349
call_queue)
315350

316-
sentinels = [p.sentinel for p in processes.values()]
317-
assert sentinels
318-
ready = wait([reader] + sentinels)
319-
351+
# Wait for a result to be ready in the result_queue while checking
352+
# that worker process are still running.
353+
worker_sentinels = [p.sentinel for p in processes.values()]
320354
received_item = False
321-
if reader in ready:
355+
while not wakeup.get_and_unset():
356+
ready = wait([result_reader] + worker_sentinels,
357+
timeout=_POLL_TIMEOUT)
358+
if len(ready) > 0:
359+
break
360+
else:
361+
# The thread has been woken up by the main thread or the gc.
362+
ready = []
363+
result_item = None
364+
received_item = True
365+
366+
if result_reader in ready:
322367
try:
323-
result_item = reader.recv()
368+
result_item = result_reader.recv()
324369
received_item = True
325-
except:
326-
pass
370+
except BaseException as e:
371+
traceback.print_exc()
327372
if not received_item:
328373
# Mark the process pool broken so that submits fail right now.
329374
executor = executor_reference()
@@ -499,12 +544,20 @@ def __init__(self, max_workers=None, mp_context=None,
499544
self._result_queue = mp_context.SimpleQueue()
500545
self._work_ids = queue.Queue()
501546

547+
# Permits to wake_up the queue_manager_thread independently of
548+
# result_queue state. This avoid deadlocks caused by the non
549+
# transmission of wakeup signal when a worker died with the
550+
# _result_queue write lock.
551+
self._wakeup = _Sentinel()
552+
502553
def _start_queue_management_thread(self):
503-
# When the executor gets lost, the weakref callback will wake up
504-
# the queue management thread.
505-
def weakref_cb(_, q=self._result_queue):
506-
q.put(None)
507554
if self._queue_management_thread is None:
555+
# When the executor gets lost, the weakref callback will wake up
556+
# the queue management thread.
557+
def weakref_cb(_, wakeup=self._wakeup):
558+
mp.util.debug('Executor collected: triggering callback for'
559+
' QueueManager wakeup')
560+
wakeup.set()
508561
# Start the processes so that their sentinels are known.
509562
self._adjust_process_count()
510563
self._queue_management_thread = threading.Thread(
@@ -514,11 +567,12 @@ def weakref_cb(_, q=self._result_queue):
514567
self._pending_work_items,
515568
self._work_ids,
516569
self._call_queue,
517-
self._result_queue),
570+
self._result_queue,
571+
self._wakeup),
518572
name="QueueManagerThread")
519573
self._queue_management_thread.daemon = True
520574
self._queue_management_thread.start()
521-
_threads_queues[self._queue_management_thread] = self._result_queue
575+
_threads_queues[self._queue_management_thread] = self._wakeup
522576

523577
def _adjust_process_count(self):
524578
for _ in range(len(self._processes), self._max_workers):
@@ -545,7 +599,7 @@ def submit(self, fn, *args, **kwargs):
545599
self._work_ids.put(self._queue_count)
546600
self._queue_count += 1
547601
# Wake up queue management thread
548-
self._result_queue.put(None)
602+
self._wakeup.set()
549603

550604
self._start_queue_management_thread()
551605
return f
@@ -585,7 +639,7 @@ def shutdown(self, wait=True):
585639
self._shutdown_thread = True
586640
if self._queue_management_thread:
587641
# Wake up queue management thread
588-
self._result_queue.put(None)
642+
self._wakeup.set()
589643
if wait:
590644
self._queue_management_thread.join()
591645
# To reduce the risk of opening too many files, remove references to

Lib/test/test_concurrent_futures.py

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -988,22 +988,10 @@ def test_shutdown_deadlock(self):
988988
executor.shutdown(wait=True)
989989

990990

991-
class ProcessPoolForkExecutorDeadlockTest(ProcessPoolForkMixin,
992-
ExecutorDeadlockTest,
993-
unittest.TestCase):
994-
pass
995-
996-
997-
class ProcessPoolForkserverExecutorDeadlockTest(ProcessPoolForkserverMixin,
998-
ExecutorDeadlockTest,
999-
unittest.TestCase):
1000-
pass
1001-
1002-
1003-
class ProcessPoolSpawnExecutorDeadlockTest(ProcessPoolSpawnMixin,
1004-
ExecutorDeadlockTest,
1005-
unittest.TestCase):
1006-
pass
991+
create_executor_tests(ExecutorDeadlockTest,
992+
executor_mixins=(ProcessPoolForkMixin,
993+
ProcessPoolForkserverMixin,
994+
ProcessPoolSpawnMixin))
1007995

1008996

1009997
class FutureTests(BaseTestCase):

0 commit comments

Comments
 (0)