Skip to content

Commit 22551cf

Browse files
authored
Merge pull request #39158 from ktoso/wip-fix-group-leak
[Concurrency] Stop TaskGroup from holding onto Tasks forever (leaking)
2 parents 9a75847 + f336404 commit 22551cf

11 files changed

+187
-49
lines changed

include/swift/ABI/TaskStatus.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
182182

183183
/// Attach the passed in `child` task to this group.
184184
void attachChild(AsyncTask *child) {
185-
assert(child->groupChildFragment());
186185
assert(child->hasGroupChildFragment());
187186
assert(child->groupChildFragment()->getGroup() == getGroup());
188187

@@ -211,6 +210,33 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
211210
cur->childFragment()->setNextChild(child);
212211
}
213212

213+
void detachChild(AsyncTask *child) {
214+
assert(child && "cannot remove a null child from group");
215+
if (FirstChild == child) {
216+
FirstChild = getNextChildTask(child);
217+
return;
218+
}
219+
220+
AsyncTask *prev = FirstChild;
221+
// Remove the child from the linked list, i.e.:
222+
// prev -> afterPrev -> afterChild
223+
// ==
224+
// child -> afterChild
225+
// Becomes:
226+
// prev --------------> afterChild
227+
while (prev) {
228+
auto afterPrev = getNextChildTask(prev);
229+
230+
if (afterPrev == child) {
231+
auto afterChild = getNextChildTask(child);
232+
prev->childFragment()->setNextChild(afterChild);
233+
return;
234+
}
235+
236+
prev = afterPrev;
237+
}
238+
}
239+
214240
static AsyncTask *getNextChildTask(AsyncTask *task) {
215241
return task->childFragment()->getNextChild();
216242
}

stdlib/public/Concurrency/Task.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ void NullaryContinuationJob::process(Job *_job) {
149149
void AsyncTask::completeFuture(AsyncContext *context) {
150150
using Status = FutureFragment::Status;
151151
using WaitQueueItem = FutureFragment::WaitQueueItem;
152-
152+
SWIFT_TASK_DEBUG_LOG("complete future = %p", this);
153153
assert(isFuture());
154154
auto fragment = futureFragment();
155155

@@ -235,7 +235,6 @@ AsyncTask::~AsyncTask() {
235235
SWIFT_CC(swift)
236236
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
237237
auto task = static_cast<AsyncTask*>(obj);
238-
239238
task->~AsyncTask();
240239

241240
// The task execution itself should always hold a reference to it, so

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift.org open source project
44
//
5-
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5+
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
66
// Licensed under Apache License v2.0 with Runtime Library Exception
77
//
88
// See https://swift.org/LICENSE.txt for license information
@@ -279,7 +279,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
279279

280280
private:
281281

282-
// // TODO: move to lockless via the status atomic
282+
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
283283
mutable std::mutex mutex;
284284

285285
/// Used for queue management, counting number of waiting and ready tasks
@@ -290,7 +290,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
290290
/// The low bits contain the status, the rest of the pointer is the
291291
/// AsyncTask.
292292
NaiveQueue<ReadyQueueItem> readyQueue;
293-
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
294293

295294
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
296295
/// or `nullptr` if no task is currently waiting.
@@ -305,10 +304,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
305304
: TaskGroupTaskStatusRecord(),
306305
status(GroupStatus::initial().status),
307306
readyQueue(),
308-
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
309307
waitQueue(nullptr), successType(T) {}
310308

311-
312309
TaskGroupTaskStatusRecord *getTaskRecord() {
313310
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
314311
}
@@ -472,11 +469,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
472469

473470
// =============================================================================
474471
// ==== add / attachChild ------------------------------------------------------
472+
475473
SWIFT_CC(swift)
476474
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
477475
AsyncTask *child) {
476+
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p\n",
477+
child, group);
478+
479+
// The counterpart of this (detachChild) is performed by the group itself,
480+
// when it offers the completed (child) task's value to a waiting task -
481+
// during the implementation of `await group.next()`.
478482
auto groupRecord = asImpl(group)->getTaskRecord();
479-
return groupRecord->attachChild(child);
483+
groupRecord->attachChild(child);
480484
}
481485

482486
// =============================================================================
@@ -490,16 +494,8 @@ void TaskGroupImpl::destroy() {
490494
// First, remove the group from the task and deallocate the record
491495
swift_task_removeStatusRecord(getTaskRecord());
492496

493-
mutex.lock(); // TODO: remove lock, and use status for synchronization
494-
// Release all ready tasks which are kept retained, the group destroyed,
495-
// so no other task will ever await on them anymore;
496-
ReadyQueueItem item;
497-
bool taskDequeued = readyQueue.dequeue(item);
498-
while (taskDequeued) {
499-
swift_release(item.getTask());
500-
taskDequeued = readyQueue.dequeue(item);
501-
}
502-
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
497+
// By the time we call destroy, all tasks inside the group must have been
498+
// awaited on already; We handle this on the swift side.
503499
}
504500

505501
// =============================================================================
@@ -521,9 +517,10 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
521517
assert(false && "filling a waiting status?");
522518
return;
523519

524-
case PollStatus::Error:
525-
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
520+
case PollStatus::Error: {
521+
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
526522
return;
523+
}
527524

528525
case PollStatus::Success: {
529526
// Initialize the result as an Optional<Success>.
@@ -553,16 +550,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
553550
assert(completedTask->hasChildFragment());
554551
assert(completedTask->hasGroupChildFragment());
555552
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
556-
557-
// We retain the completed task, because we will either:
558-
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
559-
// - (b) will need to store this task until the group task enters next() and
560-
// picks up this task.
561-
// either way, there is some time between us returning here, and the `completeTask`
562-
// issuing a swift_release on this very task. We need to keep it alive until
563-
// we have the chance to poll it from the queue (via the waiter task entering
564-
// calling next()).
565-
swift_retain(completedTask);
553+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);
566554

567555
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
568556

@@ -586,6 +574,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
586574
// ==== a) has waiting task, so let us complete it right away
587575
if (assumed.hasWaitingTask()) {
588576
auto waitingTask = waitQueue.load(std::memory_order_acquire);
577+
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
578+
waitingTask, completedTask);
589579
while (true) {
590580
// ==== a) run waiting task directly -------------------------------------
591581
assert(assumed.hasWaitingTask());
@@ -607,6 +597,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
607597
waitingTask->ResumeContext);
608598

609599
fillGroupNextResult(waitingContext, result);
600+
detachChild(result.retainedTask);
610601

611602
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
612603

@@ -615,6 +606,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
615606
return;
616607
} // else, try again
617608
}
609+
610+
llvm_unreachable("should have enqueued and returned.");
618611
}
619612

620613
// ==== b) enqueue completion ------------------------------------------------
@@ -623,10 +616,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
623616
// queue when a task polls during next() it will notice that we have a value
624617
// ready for it, and will process it immediately without suspending.
625618
assert(!waitQueue.load(std::memory_order_relaxed));
626-
619+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
620+
completedTask);
627621
// Retain the task while it is in the queue;
628622
// it must remain alive until the task group is alive.
629623
swift_retain(completedTask);
624+
630625
auto readyItem = ReadyQueueItem::get(
631626
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
632627
completedTask
@@ -668,6 +663,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t
668663

669664
// =============================================================================
670665
// ==== group.next() implementation (wait_next and groupPoll) ------------------
666+
671667
SWIFT_CC(swiftasync)
672668
static void swift_taskGroup_wait_next_throwingImpl(
673669
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
@@ -691,6 +687,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
691687
PollResult polled = group->poll(waitingTask);
692688
switch (polled.status) {
693689
case PollStatus::MustWait:
690+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
691+
group, waitingTask);
694692
// The waiting task has been queued on the channel,
695693
// there were pending tasks so it will be woken up eventually.
696694
#ifdef __ARM_ARCH_7K__
@@ -703,13 +701,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
703701
case PollStatus::Empty:
704702
case PollStatus::Error:
705703
case PollStatus::Success:
704+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
705+
group, waitingTask, polled.retainedTask);
706706
fillGroupNextResult(context, polled);
707+
if (auto completedTask = polled.retainedTask) {
708+
// it would be null for PollStatus::Empty, then we don't need to release
709+
group->detachChild(polled.retainedTask);
710+
swift_release(polled.retainedTask);
711+
}
712+
707713
return waitingTask->runInFullyEstablishedContext();
708714
}
709715
}
710716

711717
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
712718
mutex.lock(); // TODO: remove group lock, and use status for synchronization
719+
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
713720
auto assumed = statusMarkWaitingAssumeAcquire();
714721

715722
PollResult result;
@@ -719,6 +726,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
719726

720727
// ==== 1) bail out early if no tasks are pending ----------------------------
721728
if (assumed.isEmpty()) {
729+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
722730
// No tasks in flight, we know no tasks were submitted before this poll
723731
// was issued, and if we parked here we'd potentially never be woken up.
724732
// Bail out and return `nil` from `group.next()`.
@@ -736,6 +744,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
736744

737745
// ==== 2) Ready task was polled, return with it immediately -----------------
738746
if (assumed.readyTasks()) {
747+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
748+
this, assumed.readyTasks());
749+
739750
auto assumedStatus = assumed.status;
740751
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
741752
if (status.compare_exchange_weak(
@@ -839,13 +850,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
839850
}
840851

841852
bool TaskGroupImpl::cancelAll() {
853+
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);
854+
842855
// store the cancelled bit
843856
auto old = statusCancel();
844857
if (old.isCancelled()) {
845858
// already was cancelled previously, nothing to do?
846859
return false;
847860
}
848861

862+
// FIXME: must also remove the records!!!!
849863
// cancel all existing tasks within the group
850864
swift_task_cancel_group_child_tasks(asAbstract(this));
851865
return true;

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,12 @@
4242
namespace swift {
4343

4444
// Set to 1 to enable helpful debug spew to stderr
45+
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
4546
#if 0
4647
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
47-
fprintf(stderr, "[%lu] " fmt "\n", (unsigned long)_swift_get_thread_id(), \
48+
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \
49+
(unsigned long)_swift_get_thread_id(), \
50+
__FILE__, __LINE__, __FUNCTION__, \
4851
__VA_ARGS__)
4952
#else
5053
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
354354
SWIFT_CC(swift)
355355
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
356356
auto task = swift_task_getCurrent();
357+
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
358+
record, task);
357359

358360
// Load the current state.
359361
auto &status = task->_private().Status;
@@ -454,6 +456,8 @@ static ChildTaskStatusRecord*
454456
swift_task_attachChildImpl(AsyncTask *child) {
455457
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
456458
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
459+
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
460+
child, record, swift_task_getCurrent());
457461
swift_task_addStatusRecord(record);
458462
return record;
459463
}
@@ -548,6 +552,7 @@ static void performGroupCancellationAction(TaskStatusRecord *record) {
548552

549553
SWIFT_CC(swift)
550554
static void swift_task_cancelImpl(AsyncTask *task) {
555+
SWIFT_TASK_DEBUG_LOG("cancel task = %p", task);
551556
Optional<StatusRecordLockRecord> recordLockRecord;
552557

553558
// Acquire the status record lock.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
6+
// UNSUPPORTED: use_os_stdlib
7+
// UNSUPPORTED: back_deployment_runtime
8+
9+
#if os(Linux)
10+
import Glibc
11+
#elseif os(Windows)
12+
import MSVCRT
13+
#else
14+
import Darwin
15+
#endif
16+
17+
func test_withUnsafeCurrentTask() async {
18+
// The task we're running in ("main")
19+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
20+
21+
// CHECK: creating task [[TASK:0x.*]] with parent 0x0
22+
let t = Task.detached {
23+
withUnsafeCurrentTask { task in
24+
fputs("OK: \(task!)", stderr)
25+
}
26+
fputs("DONE", stderr)
27+
}
28+
29+
// CHECK: OK: UnsafeCurrentTask(_task: (Opaque Value))
30+
// CHECK: DONE
31+
// CHECK: destroy task [[TASK]]
32+
await t.value
33+
}
34+
35+
@main struct Main {
36+
static func main() async {
37+
await test_withUnsafeCurrentTask()
38+
}
39+
}

test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ func test_taskGroup_next() async {
2626
do {
2727
while let r = try await group.next() {
2828
sum += r
29+
print("add \(r) -> sum: \(sum)")
2930
}
3031
} catch {
3132
catches += 1
33+
print("catch: \(catches)")
3234
}
3335
}
3436

0 commit comments

Comments
 (0)