Skip to content

Fix a concurrent.futures.as_completed() refleak previously introduced in bpo-27144 #3270

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 19 additions & 7 deletions Lib/concurrent/futures/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,15 +171,24 @@ def _create_and_install_waiters(fs, return_when):
return waiter


def _yield_and_decref(fs, ref_collect):
def _yield_finished_futures(fs, waiter, ref_collect):
"""
Iterate on the list *fs*, yielding objects one by one in reverse order.
Before yielding an object, it is removed from each set in
the collection of sets *ref_collect*.
Iterate on the list *fs*, yielding finished futures one by one in
reverse order.
Before yielding a future, *waiter* is removed from its waiters
and the future is removed from each set in the collection of sets
*ref_collect*.

The aim of this function is to avoid keeping stale references after
the future is yielded and before the iterator resumes.
"""
while fs:
f = fs[-1]
for futures_set in ref_collect:
futures_set.remove(fs[-1])
futures_set.remove(f)
with f._condition:
f._waiters.remove(waiter)
del f
# Careful not to keep a reference to the popped value
yield fs.pop()

Expand Down Expand Up @@ -216,7 +225,8 @@ def as_completed(fs, timeout=None):
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
yield from _yield_and_decref(finished, ref_collect=(fs,))
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs,))

while pending:
if timeout is None:
Expand All @@ -237,9 +247,11 @@ def as_completed(fs, timeout=None):

# reverse to keep finishing order
finished.reverse()
yield from _yield_and_decref(finished, ref_collect=(fs, pending))
yield from _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending))

finally:
# Remove waiter from unfinished futures
for f in fs:
with f._condition:
f._waiters.remove(waiter)
Expand Down
2 changes: 1 addition & 1 deletion Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,7 @@ def test_free_reference_yielded_future(self):
# to finished futures.
futures_list = [Future() for _ in range(8)]
futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
futures_list.append(create_future(state=SUCCESSFUL_FUTURE))
futures_list.append(create_future(state=FINISHED, result=42))

with self.assertRaises(futures.TimeoutError):
for future in futures.as_completed(futures_list, timeout=0):
Expand Down