49
49
import os
50
50
from concurrent .futures import _base
51
51
import queue
52
- from queue import Full
53
52
import multiprocessing as mp
54
53
import multiprocessing .connection
55
54
from multiprocessing .queues import Queue
@@ -272,7 +271,7 @@ class _ExecutorManagerThread(threading.Thread):
272
271
273
272
def __init__ (self , executor ):
274
273
275
- # When the executor gets garbarge collected, the weakref callback
274
+ # When the executor gets garbage collected, the weakref callback
276
275
# will wake up the queue management thread so that it can terminate
277
276
# if there is no pending work item.
278
277
self .thread_wakeup = executor ._executor_manager_thread_wakeup
@@ -312,9 +311,14 @@ def run(self):
312
311
# while waiting on new results.
313
312
del result_item
314
313
315
- if (self .is_shutting_down ()
316
- and self .shutdown_executor_when_no_pending_tasks ()):
317
- return
314
+ if self .is_shutting_down ():
315
+ self .flag_executor_shutting_down ()
316
+
317
+ # Since no new work items can be added, it is safe to shutdown
318
+ # this thread if there are no pending work items.
319
+ if not self .pending_work_items :
320
+ self .join_executor_internals ()
321
+ return
318
322
319
323
def add_call_item_to_queue (self ):
320
324
# Fills call_queue with _WorkItems from pending_work_items.
@@ -363,7 +367,6 @@ def wait_result_broken_or_wakeup(self):
363
367
364
368
elif wakeup_reader in ready :
365
369
is_broken = False
366
- result_item = None
367
370
self .thread_wakeup .clear ()
368
371
369
372
return result_item , is_broken , cause
@@ -379,7 +382,7 @@ def process_result_item(self, result_item):
379
382
p = self .processes .pop (result_item )
380
383
p .join ()
381
384
if not self .processes :
382
- self .shutdown_worker ()
385
+ self .join_executor_internals ()
383
386
return
384
387
else :
385
388
# Received a _ResultItem so mark the future as completed.
@@ -402,9 +405,9 @@ def is_shutting_down(self):
402
405
or executor ._shutdown_thread )
403
406
404
407
def terminate_broken (self , cause ):
405
- # Terminate the executor because it is in broken state. The cause
408
+ # Terminate the executor because it is in a broken state. The cause
406
409
# argument can be used to display more information on the error that
407
- # leaded the executor to be broken.
410
+ # lead the executor into becoming broken.
408
411
409
412
# Mark the process pool broken so that submits fail right now.
410
413
executor = self .executor_reference ()
@@ -436,10 +439,10 @@ def terminate_broken(self, cause):
436
439
for p in self .processes .values ():
437
440
p .terminate ()
438
441
439
- # clean up ressources
440
- self .shutdown_worker ()
442
+ # clean up resources
443
+ self .join_executor_internals ()
441
444
442
- def shutdown_executor_when_no_pending_tasks (self ):
445
+ def flag_executor_shutting_down (self ):
443
446
# Flag the executor as shutting down and cancel remaining tasks if
444
447
# requested as early as possible if it is not gc-ed yet.
445
448
executor = self .executor_reference ()
@@ -465,13 +468,7 @@ def shutdown_executor_when_no_pending_tasks(self):
465
468
# on running processes over and over.
466
469
executor ._cancel_pending_futures = False
467
470
468
- # Since no new work items can be added, it is safe to shutdown
469
- # this thread if there are no pending work items.
470
- if not self .pending_work_items :
471
- self .shutdown_worker ()
472
- return True
473
-
474
- def shutdown_worker (self ):
471
+ def shutdown_workers (self ):
475
472
n_children_to_stop = self .get_n_children_alive ()
476
473
n_sentinels_sent = 0
477
474
# Send the right number of sentinels, to make sure all children are
@@ -482,9 +479,11 @@ def shutdown_worker(self):
482
479
try :
483
480
self .call_queue .put_nowait (None )
484
481
n_sentinels_sent += 1
485
- except Full :
482
+ except queue . Full :
486
483
break
487
484
485
+ def join_executor_internals (self ):
486
+ self .shutdown_workers ()
488
487
# Release the queue's resources as soon as possible.
489
488
self .call_queue .close ()
490
489
self .call_queue .join_thread ()
0 commit comments