Skip to content

Commit e13a614

Browse files
authored
Merge pull request #39204 from DougGregor/task-group-leak-fix-5.5
Fix group child tasks not being released
2 parents 0b57c76 + f4bcc3e commit e13a614

11 files changed

+187
-49
lines changed

include/swift/ABI/TaskStatus.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
182182

183183
/// Attach the passed in `child` task to this group.
184184
void attachChild(AsyncTask *child) {
185-
assert(child->groupChildFragment());
186185
assert(child->hasGroupChildFragment());
187186
assert(child->groupChildFragment()->getGroup() == getGroup());
188187

@@ -211,6 +210,33 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
211210
cur->childFragment()->setNextChild(child);
212211
}
213212

213+
void detachChild(AsyncTask *child) {
214+
assert(child && "cannot remove a null child from group");
215+
if (FirstChild == child) {
216+
FirstChild = getNextChildTask(child);
217+
return;
218+
}
219+
220+
AsyncTask *prev = FirstChild;
221+
// Remove the child from the linked list, i.e.:
222+
// prev -> afterPrev -> afterChild
223+
// ==
224+
// child -> afterChild
225+
// Becomes:
226+
// prev --------------> afterChild
227+
while (prev) {
228+
auto afterPrev = getNextChildTask(prev);
229+
230+
if (afterPrev == child) {
231+
auto afterChild = getNextChildTask(child);
232+
prev->childFragment()->setNextChild(afterChild);
233+
return;
234+
}
235+
236+
prev = afterPrev;
237+
}
238+
}
239+
214240
static AsyncTask *getNextChildTask(AsyncTask *task) {
215241
return task->childFragment()->getNextChild();
216242
}

stdlib/public/Concurrency/Task.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ void NullaryContinuationJob::process(Job *_job) {
141141
void AsyncTask::completeFuture(AsyncContext *context) {
142142
using Status = FutureFragment::Status;
143143
using WaitQueueItem = FutureFragment::WaitQueueItem;
144-
144+
SWIFT_TASK_DEBUG_LOG("complete future = %p", this);
145145
assert(isFuture());
146146
auto fragment = futureFragment();
147147

@@ -227,7 +227,6 @@ AsyncTask::~AsyncTask() {
227227
SWIFT_CC(swift)
228228
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
229229
auto task = static_cast<AsyncTask*>(obj);
230-
231230
task->~AsyncTask();
232231

233232
// The task execution itself should always hold a reference to it, so

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift.org open source project
44
//
5-
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5+
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
66
// Licensed under Apache License v2.0 with Runtime Library Exception
77
//
88
// See https://swift.org/LICENSE.txt for license information
@@ -278,7 +278,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
278278

279279
private:
280280

281-
// // TODO: move to lockless via the status atomic
281+
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
282282
mutable std::mutex mutex;
283283

284284
/// Used for queue management, counting number of waiting and ready tasks
@@ -289,7 +289,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
289289
/// The low bits contain the status, the rest of the pointer is the
290290
/// AsyncTask.
291291
NaiveQueue<ReadyQueueItem> readyQueue;
292-
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
293292

294293
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
295294
/// or `nullptr` if no task is currently waiting.
@@ -304,10 +303,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
304303
: TaskGroupTaskStatusRecord(),
305304
status(GroupStatus::initial().status),
306305
readyQueue(),
307-
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
308306
waitQueue(nullptr), successType(T) {}
309307

310-
311308
TaskGroupTaskStatusRecord *getTaskRecord() {
312309
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
313310
}
@@ -471,11 +468,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
471468

472469
// =============================================================================
473470
// ==== add / attachChild ------------------------------------------------------
471+
474472
SWIFT_CC(swift)
475473
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
476474
AsyncTask *child) {
475+
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p\n",
476+
child, group);
477+
478+
// The counterpart of this (detachChild) is performed by the group itself,
479+
// when it offers the completed (child) task's value to a waiting task -
480+
// during the implementation of `await group.next()`.
477481
auto groupRecord = asImpl(group)->getTaskRecord();
478-
return groupRecord->attachChild(child);
482+
groupRecord->attachChild(child);
479483
}
480484

481485
// =============================================================================
@@ -489,16 +493,8 @@ void TaskGroupImpl::destroy() {
489493
// First, remove the group from the task and deallocate the record
490494
swift_task_removeStatusRecord(getTaskRecord());
491495

492-
mutex.lock(); // TODO: remove lock, and use status for synchronization
493-
// Release all ready tasks which are kept retained, the group destroyed,
494-
// so no other task will ever await on them anymore;
495-
ReadyQueueItem item;
496-
bool taskDequeued = readyQueue.dequeue(item);
497-
while (taskDequeued) {
498-
swift_release(item.getTask());
499-
taskDequeued = readyQueue.dequeue(item);
500-
}
501-
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
496+
// By the time we call destroy, all tasks inside the group must have been
497+
// awaited on already; We handle this on the swift side.
502498
}
503499

504500
// =============================================================================
@@ -520,9 +516,10 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
520516
assert(false && "filling a waiting status?");
521517
return;
522518

523-
case PollStatus::Error:
524-
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
519+
case PollStatus::Error: {
520+
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
525521
return;
522+
}
526523

527524
case PollStatus::Success: {
528525
// Initialize the result as an Optional<Success>.
@@ -552,16 +549,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
552549
assert(completedTask->hasChildFragment());
553550
assert(completedTask->hasGroupChildFragment());
554551
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
555-
556-
// We retain the completed task, because we will either:
557-
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
558-
// - (b) will need to store this task until the group task enters next() and
559-
// picks up this task.
560-
// either way, there is some time between us returning here, and the `completeTask`
561-
// issuing a swift_release on this very task. We need to keep it alive until
562-
// we have the chance to poll it from the queue (via the waiter task entering
563-
// calling next()).
564-
swift_retain(completedTask);
552+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);
565553

566554
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
567555

@@ -585,6 +573,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
585573
// ==== a) has waiting task, so let us complete it right away
586574
if (assumed.hasWaitingTask()) {
587575
auto waitingTask = waitQueue.load(std::memory_order_acquire);
576+
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
577+
waitingTask, completedTask);
588578
while (true) {
589579
// ==== a) run waiting task directly -------------------------------------
590580
assert(assumed.hasWaitingTask());
@@ -606,6 +596,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
606596
waitingTask->ResumeContext);
607597

608598
fillGroupNextResult(waitingContext, result);
599+
detachChild(result.retainedTask);
609600

610601
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
611602

@@ -614,6 +605,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
614605
return;
615606
} // else, try again
616607
}
608+
609+
llvm_unreachable("should have enqueued and returned.");
617610
}
618611

619612
// ==== b) enqueue completion ------------------------------------------------
@@ -622,10 +615,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
622615
// queue when a task polls during next() it will notice that we have a value
623616
// ready for it, and will process it immediately without suspending.
624617
assert(!waitQueue.load(std::memory_order_relaxed));
625-
618+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
619+
completedTask);
626620
// Retain the task while it is in the queue;
627621
// it must remain alive until the task group is alive.
628622
swift_retain(completedTask);
623+
629624
auto readyItem = ReadyQueueItem::get(
630625
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
631626
completedTask
@@ -667,6 +662,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t
667662

668663
// =============================================================================
669664
// ==== group.next() implementation (wait_next and groupPoll) ------------------
665+
670666
SWIFT_CC(swiftasync)
671667
static void swift_taskGroup_wait_next_throwingImpl(
672668
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
@@ -690,6 +686,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
690686
PollResult polled = group->poll(waitingTask);
691687
switch (polled.status) {
692688
case PollStatus::MustWait:
689+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
690+
group, waitingTask);
693691
// The waiting task has been queued on the channel,
694692
// there were pending tasks so it will be woken up eventually.
695693
#ifdef __ARM_ARCH_7K__
@@ -702,13 +700,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
702700
case PollStatus::Empty:
703701
case PollStatus::Error:
704702
case PollStatus::Success:
703+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
704+
group, waitingTask, polled.retainedTask);
705705
fillGroupNextResult(context, polled);
706+
if (auto completedTask = polled.retainedTask) {
707+
// it would be null for PollStatus::Empty, then we don't need to release
708+
group->detachChild(polled.retainedTask);
709+
swift_release(polled.retainedTask);
710+
}
711+
706712
return waitingTask->runInFullyEstablishedContext();
707713
}
708714
}
709715

710716
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
711717
mutex.lock(); // TODO: remove group lock, and use status for synchronization
718+
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
712719
auto assumed = statusMarkWaitingAssumeAcquire();
713720

714721
PollResult result;
@@ -718,6 +725,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
718725

719726
// ==== 1) bail out early if no tasks are pending ----------------------------
720727
if (assumed.isEmpty()) {
728+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
721729
// No tasks in flight, we know no tasks were submitted before this poll
722730
// was issued, and if we parked here we'd potentially never be woken up.
723731
// Bail out and return `nil` from `group.next()`.
@@ -735,6 +743,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
735743

736744
// ==== 2) Ready task was polled, return with it immediately -----------------
737745
if (assumed.readyTasks()) {
746+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
747+
this, assumed.readyTasks());
748+
738749
auto assumedStatus = assumed.status;
739750
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
740751
if (status.compare_exchange_weak(
@@ -838,13 +849,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
838849
}
839850

840851
bool TaskGroupImpl::cancelAll() {
852+
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);
853+
841854
// store the cancelled bit
842855
auto old = statusCancel();
843856
if (old.isCancelled()) {
844857
// already was cancelled previously, nothing to do?
845858
return false;
846859
}
847860

861+
// FIXME: must also remove the records!!!!
848862
// cancel all existing tasks within the group
849863
swift_task_cancel_group_child_tasks(asAbstract(this));
850864
return true;

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,12 @@
3030
namespace swift {
3131

3232
// Set to 1 to enable helpful debug spew to stderr
33+
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
3334
#if 0
3435
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
35-
fprintf(stderr, "[%lu] " fmt "\n", (unsigned long)_swift_get_thread_id(), \
36+
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \
37+
(unsigned long)_swift_get_thread_id(), \
38+
__FILE__, __LINE__, __FUNCTION__, \
3639
__VA_ARGS__)
3740
#else
3841
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
354354
SWIFT_CC(swift)
355355
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
356356
auto task = swift_task_getCurrent();
357+
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
358+
record, task);
357359

358360
// Load the current state.
359361
auto &status = task->_private().Status;
@@ -454,6 +456,8 @@ static ChildTaskStatusRecord*
454456
swift_task_attachChildImpl(AsyncTask *child) {
455457
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
456458
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
459+
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
460+
child, record, swift_task_getCurrent());
457461
swift_task_addStatusRecord(record);
458462
return record;
459463
}
@@ -548,6 +552,7 @@ static void performGroupCancellationAction(TaskStatusRecord *record) {
548552

549553
SWIFT_CC(swift)
550554
static void swift_task_cancelImpl(AsyncTask *task) {
555+
SWIFT_TASK_DEBUG_LOG("cancel task = %p", task);
551556
Optional<StatusRecordLockRecord> recordLockRecord;
552557

553558
// Acquire the status record lock.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
6+
// UNSUPPORTED: use_os_stdlib
7+
// UNSUPPORTED: back_deployment_runtime
8+
9+
#if os(Linux)
10+
import Glibc
11+
#elseif os(Windows)
12+
import MSVCRT
13+
#else
14+
import Darwin
15+
#endif
16+
17+
func test_withUnsafeCurrentTask() async {
18+
// The task we're running in ("main")
19+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
20+
21+
// CHECK: creating task [[TASK:0x.*]] with parent 0x0
22+
let t = Task.detached {
23+
withUnsafeCurrentTask { task in
24+
fputs("OK: \(task!)", stderr)
25+
}
26+
fputs("DONE", stderr)
27+
}
28+
29+
// CHECK: OK: UnsafeCurrentTask(_task: (Opaque Value))
30+
// CHECK: DONE
31+
// CHECK: destroy task [[TASK]]
32+
await t.value
33+
}
34+
35+
@main struct Main {
36+
static func main() async {
37+
await test_withUnsafeCurrentTask()
38+
}
39+
}

test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ func test_taskGroup_next() async {
2727
do {
2828
while let r = try await group.next() {
2929
sum += r
30+
print("add \(r) -> sum: \(sum)")
3031
}
3132
} catch {
3233
catches += 1
34+
print("catch: \(catches)")
3335
}
3436
}
3537

0 commit comments

Comments
 (0)