Skip to content

Commit d581bba

Browse files
committed
bpo-27144: concurrent.futures as_complie and map iterators do not keep
reference to returned object
1 parent c67bae0 commit d581bba

File tree

4 files changed

+80
-10
lines changed

4 files changed

+80
-10
lines changed

Lib/concurrent/futures/_base.py

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,16 @@ def _create_and_install_waiters(fs, return_when):
170170

171171
return waiter
172172

173+
174+
def _yield_and_decref(fs, ref_collect):
175+
"""Yields a future. Before yielding, removes the future
176+
from each set in collection of sets (`ref_collect`)."""
177+
while fs:
178+
for futures_set in ref_collect:
179+
futures_set.remove(fs[-1])
180+
yield fs.pop()
181+
182+
173183
def as_completed(fs, timeout=None):
174184
"""An iterator over the given futures that yields each as it completes.
175185
@@ -191,16 +201,18 @@ def as_completed(fs, timeout=None):
191201
if timeout is not None:
192202
end_time = timeout + time.time()
193203

204+
total_futures = len(fs)
205+
194206
fs = set(fs)
195207
with _AcquireFutures(fs):
196208
finished = set(
197209
f for f in fs
198210
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
199211
pending = fs - finished
200212
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
201-
213+
finished = list(finished)
202214
try:
203-
yield from finished
215+
yield from _yield_and_decref(finished, ref_collect=(fs,))
204216

205217
while pending:
206218
if timeout is None:
@@ -210,7 +222,7 @@ def as_completed(fs, timeout=None):
210222
if wait_timeout < 0:
211223
raise TimeoutError(
212224
'%d (of %d) futures unfinished' % (
213-
len(pending), len(fs)))
225+
len(pending), total_futures))
214226

215227
waiter.event.wait(wait_timeout)
216228

@@ -219,9 +231,9 @@ def as_completed(fs, timeout=None):
219231
waiter.finished_futures = []
220232
waiter.event.clear()
221233

222-
for future in finished:
223-
yield future
224-
pending.remove(future)
234+
# reverse to keep finishing order
235+
finished.reverse()
236+
yield from _yield_and_decref(finished, ref_collect=(fs, pending))
225237

226238
finally:
227239
for f in fs:
@@ -551,11 +563,13 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
551563
# before the first iterator value is required.
552564
def result_iterator():
553565
try:
554-
for future in fs:
566+
# reverse to keep finishing order
567+
fs.reverse()
568+
while fs:
555569
if timeout is None:
556-
yield future.result()
570+
yield fs.pop().result()
557571
else:
558-
yield future.result(end_time - time.time())
572+
yield fs.pop().result(end_time - time.time())
559573
finally:
560574
for future in fs:
561575
future.cancel()

Lib/concurrent/futures/process.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,18 @@ def _check_system_limits():
357357
raise NotImplementedError(_system_limited)
358358

359359

360+
def _chain_from_iterable(iterable):
361+
"""
362+
Different implementation of itertools.chain.from_iterable.
363+
The difference is _chain_from_iterable do not keep reference to returned objects.
364+
"""
365+
366+
for element in iterable:
367+
element.reverse()
368+
while element:
369+
yield element.pop()
370+
371+
360372
class BrokenProcessPool(RuntimeError):
361373
"""
362374
Raised when a process in a ProcessPoolExecutor terminated abruptly
@@ -482,7 +494,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
482494
results = super().map(partial(_process_chunk, fn),
483495
_get_chunks(*iterables, chunksize=chunksize),
484496
timeout=timeout)
485-
return itertools.chain.from_iterable(results)
497+
return _chain_from_iterable(results)
486498

487499
def shutdown(self, wait=True):
488500
with self._shutdown_lock:

Lib/test/test_concurrent_futures.py

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ def sleep_and_print(t, msg):
5454
sys.stdout.flush()
5555

5656

57+
def _dummy_object_fn(_):
58+
return object()
59+
60+
5761
class MyObject(object):
5862
def my_method(self):
5963
pass
@@ -396,6 +400,34 @@ def test_duplicate_futures(self):
396400
completed = [f for f in futures.as_completed([future1,future1])]
397401
self.assertEqual(len(completed), 1)
398402

403+
def test_free_reference_yielded_future(self):
404+
# Issue #14406: Generator should not keep reference
405+
# for finished futures.
406+
futures_list = [Future() for _ in range(8)]
407+
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
408+
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
409+
410+
with self.assertRaises(futures.TimeoutError):
411+
for future in futures.as_completed(futures_list, timeout=0):
412+
futures_list.remove(future)
413+
self.assertEqual(sys.getrefcount(future), 2)
414+
415+
futures_list[0].set_result("test")
416+
for future in futures.as_completed(futures_list):
417+
futures_list.remove(future)
418+
self.assertEqual(sys.getrefcount(future), 2)
419+
if futures_list:
420+
futures_list[0].set_result("test")
421+
422+
def test_correct_timeout_exception_msg(self):
423+
futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
424+
RUNNING_FUTURE, SUCCESSFUL_FUTURE]
425+
426+
with self.assertRaises(futures.TimeoutError) as cm:
427+
list(futures.as_completed(futures_list, timeout=0))
428+
429+
self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
430+
399431

400432
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, BaseTestCase):
401433
pass
@@ -421,6 +453,10 @@ def test_map(self):
421453
list(self.executor.map(pow, range(10), range(10))),
422454
list(map(pow, range(10), range(10))))
423455

456+
self.assertEqual(
457+
list(self.executor.map(pow, range(10), range(10), chunksize=3)),
458+
list(map(pow, range(10), range(10))))
459+
424460
def test_map_exception(self):
425461
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
426462
self.assertEqual(i.__next__(), (0, 1))
@@ -471,6 +507,12 @@ def test_max_workers_negative(self):
471507
"than 0"):
472508
self.executor_type(max_workers=number)
473509

510+
def test_free_reference(self):
511+
# Issue #14406: Result iterator should not keep reference
512+
# for finished futures.
513+
for result_object in self.executor.map(_dummy_object_fn, range(10)):
514+
self.assertEqual(sys.getrefcount(result_object), 2)
515+
474516

475517
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
476518
def test_map_submits_without_iteration(self):
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
concurrent.futures as_complie and map iterators do not keep reference to
2+
returned object

0 commit comments

Comments
 (0)