Skip to content

Commit e1ddd40

Browse files
ktosoDougGregor
authored andcommitted
[Concurrency] TaskGroup children remove their records as they complete
If we didn't do this (and we didn't), the tasks get released as we perform the next() impl, and move the value from the ready task to the waiting task. Then, the ready task gets destroyed. But as the task group exists, it performs a cancelAll() and that iterates over all records. Those records were not removed previously (!!!) which meant we were pointing at now deallocated tasks. Previously this worked because we didn't deallocate the tasks, so they leaked, but we didn't crash. With the memory leak fixed, this began to crash since we'd attempt to cancel already destroyed tasks. Solution: - Remove task records whenever they complete a waiting task. - This can ONLY be done by the "group owning task" itself, becuause the contract of ONLY this task being allowed to modify records. o It MUST NOT be done by the completing tasks as they complete, as it would race with the owning task modifying this linked list of child tasks in the group record. (cherry picked from commit f336404)
1 parent 740e87b commit e1ddd40

File tree

8 files changed

+142
-41
lines changed

8 files changed

+142
-41
lines changed

include/swift/ABI/TaskStatus.h

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,6 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
182182

183183
/// Attach the passed in `child` task to this group.
184184
void attachChild(AsyncTask *child) {
185-
assert(child->groupChildFragment());
186185
assert(child->hasGroupChildFragment());
187186
assert(child->groupChildFragment()->getGroup() == getGroup());
188187

@@ -211,6 +210,33 @@ class TaskGroupTaskStatusRecord : public TaskStatusRecord {
211210
cur->childFragment()->setNextChild(child);
212211
}
213212

213+
void detachChild(AsyncTask *child) {
214+
assert(child && "cannot remove a null child from group");
215+
if (FirstChild == child) {
216+
FirstChild = getNextChildTask(child);
217+
return;
218+
}
219+
220+
AsyncTask *prev = FirstChild;
221+
// Remove the child from the linked list, i.e.:
222+
// prev -> afterPrev -> afterChild
223+
// ==
224+
// child -> afterChild
225+
// Becomes:
226+
// prev --------------> afterChild
227+
while (prev) {
228+
auto afterPrev = getNextChildTask(prev);
229+
230+
if (afterPrev == child) {
231+
auto afterChild = getNextChildTask(child);
232+
prev->childFragment()->setNextChild(afterChild);
233+
return;
234+
}
235+
236+
prev = afterPrev;
237+
}
238+
}
239+
214240
static AsyncTask *getNextChildTask(AsyncTask *task) {
215241
return task->childFragment()->getNextChild();
216242
}

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
//
33
// This source file is part of the Swift.org open source project
44
//
5-
// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
5+
// Copyright (c) 2014 - 2021 Apple Inc. and the Swift project authors
66
// Licensed under Apache License v2.0 with Runtime Library Exception
77
//
88
// See https://swift.org/LICENSE.txt for license information
@@ -278,7 +278,7 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
278278

279279
private:
280280

281-
// // TODO: move to lockless via the status atomic
281+
// TODO: move to lockless via the status atomic (make readyQueue an mpsc_queue_t<ReadyQueueItem>)
282282
mutable std::mutex mutex;
283283

284284
/// Used for queue management, counting number of waiting and ready tasks
@@ -289,7 +289,6 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
289289
/// The low bits contain the status, the rest of the pointer is the
290290
/// AsyncTask.
291291
NaiveQueue<ReadyQueueItem> readyQueue;
292-
// mpsc_queue_t<ReadyQueueItem> readyQueue; // TODO: can we get away with an MPSC queue here once actor executors land?
293292

294293
/// Single waiting `AsyncTask` currently waiting on `group.next()`,
295294
/// or `nullptr` if no task is currently waiting.
@@ -304,10 +303,8 @@ class TaskGroupImpl: public TaskGroupTaskStatusRecord {
304303
: TaskGroupTaskStatusRecord(),
305304
status(GroupStatus::initial().status),
306305
readyQueue(),
307-
// readyQueue(ReadyQueueItem::get(ReadyStatus::Empty, nullptr)),
308306
waitQueue(nullptr), successType(T) {}
309307

310-
311308
TaskGroupTaskStatusRecord *getTaskRecord() {
312309
return reinterpret_cast<TaskGroupTaskStatusRecord *>(this);
313310
}
@@ -471,11 +468,18 @@ static void swift_taskGroup_initializeImpl(TaskGroup *group, const Metadata *T)
471468

472469
// =============================================================================
473470
// ==== add / attachChild ------------------------------------------------------
471+
474472
SWIFT_CC(swift)
475473
static void swift_taskGroup_attachChildImpl(TaskGroup *group,
476474
AsyncTask *child) {
475+
SWIFT_TASK_DEBUG_LOG("attach child task = %p to group = %p\n",
476+
child, group);
477+
478+
// The counterpart of this (detachChild) is performed by the group itself,
479+
// when it offers the completed (child) task's value to a waiting task -
480+
// during the implementation of `await group.next()`.
477481
auto groupRecord = asImpl(group)->getTaskRecord();
478-
return groupRecord->attachChild(child);
482+
groupRecord->attachChild(child);
479483
}
480484

481485
// =============================================================================
@@ -505,8 +509,7 @@ bool TaskGroup::isCancelled() {
505509
}
506510

507511
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
508-
PollResult result,
509-
bool releaseResultRetainedTask) {
512+
PollResult result) {
510513
/// Fill in the result value
511514
switch (result.status) {
512515
case PollStatus::MustWait:
@@ -515,7 +518,7 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
515518

516519
case PollStatus::Error: {
517520
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
518-
break;
521+
return;
519522
}
520523

521524
case PollStatus::Success: {
@@ -527,28 +530,17 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
527530
// remaining references to it.
528531
successType->vw_initializeWithCopy(destPtr, result.storage);
529532
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
530-
break;
533+
return;
531534
}
532535

533536
case PollStatus::Empty: {
534537
// Initialize the result as a nil Optional<Success>.
535538
const Metadata *successType = result.successType;
536539
OpaqueValue *destPtr = context->successResultPointer;
537540
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
538-
break;
541+
return;
539542
}
540543
}
541-
542-
// We only release if asked to; This is because this function is called in two
543-
// cases "immediately":
544-
// a) when a completed task arrives and a waiting one existed then we don't
545-
// need to retain the completed task at all, thus we also don't release it.
546-
// b) when the task was stored in the readyQueue it was retained. As a
547-
// waitingTask arrives we will fill-in with the value from the retained
548-
// task. In this situation we must release the ready task, to allow it to
549-
// be destroyed.
550-
if (releaseResultRetainedTask)
551-
swift_release(result.retainedTask);
552544
}
553545

554546
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
@@ -557,7 +549,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
557549
assert(completedTask->hasChildFragment());
558550
assert(completedTask->hasGroupChildFragment());
559551
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
560-
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p\n", completedTask, group);
552+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p", completedTask, this);
561553

562554
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
563555

@@ -603,7 +595,9 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
603595
static_cast<TaskFutureWaitAsyncContext *>(
604596
waitingTask->ResumeContext);
605597

606-
fillGroupNextResult(waitingContext, result, /*release*/false);
598+
fillGroupNextResult(waitingContext, result);
599+
detachChild(result.retainedTask);
600+
607601
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
608602

609603
// TODO: allow the caller to suggest an executor
@@ -621,11 +615,12 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
621615
// queue when a task polls during next() it will notice that we have a value
622616
// ready for it, and will process it immediately without suspending.
623617
assert(!waitQueue.load(std::memory_order_relaxed));
624-
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, store ready task = %p",
618+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, RETAIN and store ready task = %p",
625619
completedTask);
626620
// Retain the task while it is in the queue;
627621
// it must remain alive until the task group is alive.
628622
swift_retain(completedTask);
623+
629624
auto readyItem = ReadyQueueItem::get(
630625
hadErrorResult ? ReadyStatus::Error : ReadyStatus::Success,
631626
completedTask
@@ -691,7 +686,7 @@ static void swift_taskGroup_wait_next_throwingImpl(
691686
PollResult polled = group->poll(waitingTask);
692687
switch (polled.status) {
693688
case PollStatus::MustWait:
694-
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p\n",
689+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p",
695690
group, waitingTask);
696691
// The waiting task has been queued on the channel,
697692
// there were pending tasks so it will be woken up eventually.
@@ -705,15 +700,22 @@ static void swift_taskGroup_wait_next_throwingImpl(
705700
case PollStatus::Empty:
706701
case PollStatus::Error:
707702
case PollStatus::Success:
708-
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p\n",
703+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p",
709704
group, waitingTask, polled.retainedTask);
710-
fillGroupNextResult(context, polled, /*release*/true);
705+
fillGroupNextResult(context, polled);
706+
if (auto completedTask = polled.retainedTask) {
707+
// it would be null for PollStatus::Empty, then we don't need to release
708+
group->detachChild(polled.retainedTask);
709+
swift_release(polled.retainedTask);
710+
}
711+
711712
return waitingTask->runInFullyEstablishedContext();
712713
}
713714
}
714715

715716
PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
716717
mutex.lock(); // TODO: remove group lock, and use status for synchronization
718+
SWIFT_TASK_DEBUG_LOG("poll group = %p", this);
717719
auto assumed = statusMarkWaitingAssumeAcquire();
718720

719721
PollResult result;
@@ -723,6 +725,7 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
723725

724726
// ==== 1) bail out early if no tasks are pending ----------------------------
725727
if (assumed.isEmpty()) {
728+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group is empty, no pending tasks", this);
726729
// No tasks in flight, we know no tasks were submitted before this poll
727730
// was issued, and if we parked here we'd potentially never be woken up.
728731
// Bail out and return `nil` from `group.next()`.
@@ -740,6 +743,9 @@ PollResult TaskGroupImpl::poll(AsyncTask *waitingTask) {
740743

741744
// ==== 2) Ready task was polled, return with it immediately -----------------
742745
if (assumed.readyTasks()) {
746+
SWIFT_TASK_DEBUG_LOG("poll group = %p, group has ready tasks = %d",
747+
this, assumed.readyTasks());
748+
743749
auto assumedStatus = assumed.status;
744750
auto newStatus = TaskGroupImpl::GroupStatus{assumedStatus};
745751
if (status.compare_exchange_weak(
@@ -843,13 +849,16 @@ static void swift_taskGroup_cancelAllImpl(TaskGroup *group) {
843849
}
844850

845851
bool TaskGroupImpl::cancelAll() {
852+
SWIFT_TASK_DEBUG_LOG("cancel all tasks in group = %p", this);
853+
846854
// store the cancelled bit
847855
auto old = statusCancel();
848856
if (old.isCancelled()) {
849857
// already was cancelled previously, nothing to do?
850858
return false;
851859
}
852860

861+
// FIXME: must also remove the records!!!!
853862
// cancel all existing tasks within the group
854863
swift_task_cancel_group_child_tasks(asAbstract(this));
855864
return true;

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ namespace swift {
3333
//#define SWIFT_TASK_PRINTF_DEBUG 1
3434

3535
// Set to 1 to enable helpful debug spew to stderr
36+
// If this is enabled, tests with `swift_task_debug_log` requirement can run.
3637
#if 0
3738
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
3839
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,8 @@ static bool swift_task_tryAddStatusRecordImpl(TaskStatusRecord *newRecord) {
354354
SWIFT_CC(swift)
355355
static bool swift_task_removeStatusRecordImpl(TaskStatusRecord *record) {
356356
auto task = swift_task_getCurrent();
357+
SWIFT_TASK_DEBUG_LOG("remove status record = %p, from current task = %p",
358+
record, task);
357359

358360
// Load the current state.
359361
auto &status = task->_private().Status;
@@ -454,6 +456,8 @@ static ChildTaskStatusRecord*
454456
swift_task_attachChildImpl(AsyncTask *child) {
455457
void *allocation = malloc(sizeof(swift::ChildTaskStatusRecord));
456458
auto record = new (allocation) swift::ChildTaskStatusRecord(child);
459+
SWIFT_TASK_DEBUG_LOG("attach child task = %p, record = %p, to current task = %p",
460+
child, record, swift_task_getCurrent());
457461
swift_task_addStatusRecord(record);
458462
return record;
459463
}
@@ -548,6 +552,7 @@ static void performGroupCancellationAction(TaskStatusRecord *record) {
548552

549553
SWIFT_CC(swift)
550554
static void swift_task_cancelImpl(AsyncTask *task) {
555+
SWIFT_TASK_DEBUG_LOG("cancel task = %p", task);
551556
Optional<StatusRecordLockRecord> recordLockRecord;
552557

553558
// Acquire the status record lock.
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
6+
// UNSUPPORTED: use_os_stdlib
7+
// UNSUPPORTED: back_deployment_runtime
8+
9+
#if os(Linux)
10+
import Glibc
11+
#elseif os(Windows)
12+
import MSVCRT
13+
#else
14+
import Darwin
15+
#endif
16+
17+
func test_withUnsafeCurrentTask() async {
18+
// The task we're running in ("main")
19+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
20+
21+
// CHECK: creating task [[TASK:0x.*]] with parent 0x0
22+
let t = Task.detached {
23+
withUnsafeCurrentTask { task in
24+
fputs("OK: \(task!)", stderr)
25+
}
26+
fputs("DONE", stderr)
27+
}
28+
29+
// CHECK: OK: UnsafeCurrentTask(_task: (Opaque Value))
30+
// CHECK: DONE
31+
// CHECK: destroy task [[TASK]]
32+
await t.value
33+
}
34+
35+
@main struct Main {
36+
static func main() async {
37+
await test_withUnsafeCurrentTask()
38+
}
39+
}

test/Concurrency/Runtime/async_taskgroup_dontLeakTasks.swift

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) 2>&1 | %FileCheck %s --dump-input=always
22
// REQUIRES: executable_test
33
// REQUIRES: concurrency
4+
// REQUIRES: swift_task_debug_log
5+
46
// UNSUPPORTED: use_os_stdlib
57
// UNSUPPORTED: back_deployment_runtime
6-
// UNSUPPORTED: linux
78

89
#if os(Linux)
910
import Glibc
@@ -13,10 +14,16 @@ import MSVCRT
1314
import Darwin
1415
#endif
1516

16-
@available(SwiftStdlib 5.5, *)
1717
func test_taskGroup_next() async {
18+
// CHECK: creating task [[MAIN_TASK:0x.*]] with parent 0x0
19+
// CHECK: creating task [[GROUP_TASK_1:0x.*]] with parent [[MAIN_TASK]]
20+
// CHECK: creating task [[GROUP_TASK_2:0x.*]] with parent [[MAIN_TASK]]
21+
// CHECK: creating task [[GROUP_TASK_3:0x.*]] with parent [[MAIN_TASK]]
22+
// CHECK: creating task [[GROUP_TASK_4:0x.*]] with parent [[MAIN_TASK]]
23+
// CHECK: creating task [[GROUP_TASK_5:0x.*]] with parent [[MAIN_TASK]]
24+
1825
_ = await withTaskGroup(of: Int.self, returning: Int.self) { group in
19-
for n in 0..<100 {
26+
for n in 0..<5 {
2027
group.spawn {
2128
return n
2229
}
@@ -30,12 +37,18 @@ func test_taskGroup_next() async {
3037

3138
return sum
3239
}
33-
34-
// CHECK: result with group.next(): 100
35-
print("result with group.next(): \(100)")
40+
// as we exit the group, it must be guaranteed that its child tasks were destroyed
41+
//
42+
// NOTE: there is no great way to express "any of GROUP_TASK_n",
43+
// so we just check that 5 tasks were destroyed
44+
//
45+
// CHECK: destroy task [[DESTROY_GROUP_TASK_1:0x.*]]
46+
// CHECK: destroy task [[DESTROY_GROUP_TASK_2:0x.*]]
47+
// CHECK: destroy task [[DESTROY_GROUP_TASK_3:0x.*]]
48+
// CHECK: destroy task [[DESTROY_GROUP_TASK_4:0x.*]]
49+
// CHECK: destroy task [[DESTROY_GROUP_TASK_5:0x.*]]
3650
}
3751

38-
@available(SwiftStdlib 5.5, *)
3952
@main struct Main {
4053
static func main() async {
4154
await test_taskGroup_next()

test/Concurrency/Runtime/async_taskgroup_throw_recover.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,15 @@ func test_taskGroup_throws() async {
4141
return 3
4242
}
4343

44-
guard let third = try! await group.next() else {
44+
switch await group.nextResult() {
45+
case .success(let third):
46+
print("task group returning normally: \(third)")
47+
return third
48+
49+
case .failure(let error):
50+
fatalError("got an erroneous third result: \(error)")
51+
52+
case .none:
4553
print("task group failed to get 3")
4654
return 0
4755
}

0 commit comments

Comments
 (0)