Skip to content

Commit 188c222

Browse files
committed
bpo-39098 Fix OSError: handle is closed in ProcessPoolExecutor on shutdown(wait=False)
When a ProcessPoolExecutor was created, a job added and then shutdown with wait=False, an OSError: handle is closed error was raised by the ThreadWakeup class. The threadwakeup should not be closed on shutdown if wait is False, but in the shutdown_worker method.
1 parent 0d63bac commit 188c222

File tree

2 files changed

+38
-7
lines changed

2 files changed

+38
-7
lines changed

Lib/concurrent/futures/process.py

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -80,18 +80,23 @@
8080

8181
class _ThreadWakeup:
8282
def __init__(self):
83+
self._closed = False
8384
self._reader, self._writer = mp.Pipe(duplex=False)
8485

8586
def close(self):
86-
self._writer.close()
87-
self._reader.close()
87+
if not self._closed:
88+
self._closed = True
89+
self._writer.close()
90+
self._reader.close()
8891

8992
def wakeup(self):
90-
self._writer.send_bytes(b"")
93+
if not self._closed:
94+
self._writer.send_bytes(b"")
9195

9296
def clear(self):
93-
while self._reader.poll():
94-
self._reader.recv_bytes()
97+
if not self._closed:
98+
while self._reader.poll():
99+
self._reader.recv_bytes()
95100

96101

97102
def _python_exit():
@@ -339,6 +344,7 @@ def shutdown_worker():
339344

340345
# Release the queue's resources as soon as possible.
341346
call_queue.close()
347+
thread_wakeup.close()
342348
# If .join() is not called on the created processes then
343349
# some ctx.Queue methods may deadlock on Mac OS X.
344350
for p in processes.values():
@@ -439,6 +445,7 @@ def shutdown_worker():
439445
# this thread if there are no pending work items.
440446
if not pending_work_items:
441447
shutdown_worker()
448+
thread_wakeup = None
442449
return
443450
except Full:
444451
# This is not a problem: we will eventually be woken up (in
@@ -672,15 +679,16 @@ def shutdown(self, wait=True):
672679
# objects that use file descriptors.
673680
self._queue_management_thread = None
674681
if self._call_queue is not None:
675-
self._call_queue.close()
676682
if wait:
683+
self._call_queue.close()
677684
self._call_queue.join_thread()
678685
self._call_queue = None
679686
self._result_queue = None
680687
self._processes = None
681688

682689
if self._queue_management_thread_wakeup:
683-
self._queue_management_thread_wakeup.close()
690+
if wait:
691+
self._queue_management_thread_wakeup.close()
684692
self._queue_management_thread_wakeup = None
685693

686694
shutdown.__doc__ = _base.Executor.shutdown.__doc__

Lib/test/test_concurrent_futures.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,14 @@ def test_del_shutdown(self):
380380
for t in threads:
381381
t.join()
382382

383+
def test_shutdown_no_wait(self):
384+
executor = futures.ThreadPoolExecutor(max_workers=5)
385+
executor.map(abs, range(-5, 5))
386+
threads = executor._threads
387+
executor.shutdown(wait=False)
388+
for t in threads:
389+
t.join()
390+
383391
def test_thread_names_assigned(self):
384392
executor = futures.ThreadPoolExecutor(
385393
max_workers=5, thread_name_prefix='SpecialPool')
@@ -444,6 +452,21 @@ def test_del_shutdown(self):
444452
p.join()
445453
call_queue.join_thread()
446454

455+
def test_shutdown_no_wait(self):
456+
executor = futures.ProcessPoolExecutor(max_workers=5)
457+
list(executor.map(abs, range(-5, 5)))
458+
processes = executor._processes
459+
call_queue = executor._call_queue
460+
queue_management_thread = executor._queue_management_thread
461+
executor.shutdown(wait=False)
462+
463+
# Make sure that all the executor resources were properly cleaned by
464+
# the shutdown process
465+
queue_management_thread.join()
466+
for p in processes.values():
467+
p.join()
468+
call_queue.join_thread()
469+
447470

448471
create_executor_tests(ProcessPoolShutdownTest,
449472
executor_mixins=(ProcessPoolForkMixin,

0 commit comments

Comments
 (0)