Skip to content

Commit d4ebc58

Browse files
committed
[Concurrency] Fix group child tasks not being released
The proper handling of task group child tasks is that: - if it completes a waiting task immediately, we don't need to retain it - we just move the value to the waiting task and can destroy the task - if we need to store the ready task and wait for a waiting task (for a task that hits `await group.next()`) then we need to retain the ready task. - as the waiting task arrives, we move the value from the ready task to the waiting task, and swift_release the ready task -- it will now be destroyed safely.
1 parent 5b10335 commit d4ebc58

File tree

6 files changed

+90
-45
lines changed

6 files changed

+90
-45
lines changed

stdlib/public/Concurrency/Task.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ void NullaryContinuationJob::process(Job *_job) {
144144
void AsyncTask::completeFuture(AsyncContext *context) {
145145
using Status = FutureFragment::Status;
146146
using WaitQueueItem = FutureFragment::WaitQueueItem;
147-
147+
SWIFT_TASK_DEBUG_LOG("complete future = %p", this);
148148
assert(isFuture());
149149
auto fragment = futureFragment();
150150

@@ -230,7 +230,6 @@ AsyncTask::~AsyncTask() {
230230
SWIFT_CC(swift)
231231
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
232232
auto task = static_cast<AsyncTask*>(obj);
233-
234233
task->~AsyncTask();
235234

236235
// The task execution itself should always hold a reference to it, so

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -490,16 +490,8 @@ void TaskGroupImpl::destroy() {
490490
// First, remove the group from the task and deallocate the record
491491
swift_task_removeStatusRecord(getTaskRecord());
492492

493-
mutex.lock(); // TODO: remove lock, and use status for synchronization
494-
// Release all ready tasks which are kept retained, the group destroyed,
495-
// so no other task will ever await on them anymore;
496-
ReadyQueueItem item;
497-
bool taskDequeued = readyQueue.dequeue(item);
498-
while (taskDequeued) {
499-
swift_release(item.getTask());
500-
taskDequeued = readyQueue.dequeue(item);
501-
}
502-
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
493+
// By the time we call destroy, all tasks inside the group must have been
494+
// awaited on already; We handle this on the swift side.
503495
}
504496

505497
// =============================================================================
@@ -514,16 +506,18 @@ bool TaskGroup::isCancelled() {
514506
}
515507

516508
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
517-
PollResult result) {
509+
PollResult result,
510+
bool releaseResultRetainedTask) {
518511
/// Fill in the result value
519512
switch (result.status) {
520513
case PollStatus::MustWait:
521514
assert(false && "filling a waiting status?");
522515
return;
523516

524-
case PollStatus::Error:
525-
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
526-
return;
517+
case PollStatus::Error: {
518+
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
519+
break;
520+
}
527521

528522
case PollStatus::Success: {
529523
// Initialize the result as an Optional<Success>.
@@ -534,17 +528,28 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
534528
// remaining references to it.
535529
successType->vw_initializeWithCopy(destPtr, result.storage);
536530
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
537-
return;
531+
break;
538532
}
539533

540534
case PollStatus::Empty: {
541535
// Initialize the result as a nil Optional<Success>.
542536
const Metadata *successType = result.successType;
543537
OpaqueValue *destPtr = context->successResultPointer;
544538
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
545-
return;
539+
break;
546540
}
547541
}
542+
543+
// We only release if asked to; This is because this function is called in two
544+
// cases "immediately":
545+
// a) when a completed task arrives and a waiting one existed then we don't
546+
// need to retain the completed task at all, thus we also don't release it.
547+
// b) when the task was stored in the readyQueue it was retained. As a
548+
// waitingTask arrives we will fill-in with the value from the retained
549+
// task. In this situation we must release the ready task, to allow it to
550+
// be destroyed.
551+
if (releaseResultRetainedTask)
552+
swift_release(result.retainedTask);
548553
}
549554

550555
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
@@ -553,16 +558,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
553558
assert(completedTask->hasChildFragment());
554559
assert(completedTask->hasGroupChildFragment());
555560
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
556-
557-
// We retain the completed task, because we will either:
558-
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
559-
// - (b) will need to store this task until the group task enters next() and
560-
// picks up this task.
561-
// either way, there is some time between us returning here, and the `completeTask`
562-
// issuing a swift_release on this very task. We need to keep it alive until
563-
// we have the chance to poll it from the queue (via the waiter task entering
564-
// calling next()).
565-
swift_retain(completedTask);
561+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p\n", completedTask, group);
566562

567563
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
568564

@@ -586,6 +582,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
586582
// ==== a) has waiting task, so let us complete it right away
587583
if (assumed.hasWaitingTask()) {
588584
auto waitingTask = waitQueue.load(std::memory_order_acquire);
585+
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
586+
waitingTask, completedTask);
589587
while (true) {
590588
// ==== a) run waiting task directly -------------------------------------
591589
assert(assumed.hasWaitingTask());
@@ -606,15 +604,16 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
606604
static_cast<TaskFutureWaitAsyncContext *>(
607605
waitingTask->ResumeContext);
608606

609-
fillGroupNextResult(waitingContext, result);
610-
607+
fillGroupNextResult(waitingContext, result, /*release*/false);
611608
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
612609

613610
// TODO: allow the caller to suggest an executor
614611
swift_task_enqueueGlobal(waitingTask);
615612
return;
616613
} // else, try again
617614
}
615+
616+
llvm_unreachable("should have enqueued and returned.");
618617
}
619618

620619
// ==== b) enqueue completion ------------------------------------------------
@@ -623,7 +622,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
623622
// queue when a task polls during next() it will notice that we have a value
624623
// ready for it, and will process it immediately without suspending.
625624
assert(!waitQueue.load(std::memory_order_relaxed));
626-
625+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, store ready task = %p",
626+
completedTask);
627627
// Retain the task while it is in the queue;
628628
// it must remain alive until the task group is alive.
629629
swift_retain(completedTask);
@@ -668,6 +668,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t
668668

669669
// =============================================================================
670670
// ==== group.next() implementation (wait_next and groupPoll) ------------------
671+
671672
SWIFT_CC(swiftasync)
672673
static void swift_taskGroup_wait_next_throwingImpl(
673674
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
@@ -691,6 +692,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
691692
PollResult polled = group->poll(waitingTask);
692693
switch (polled.status) {
693694
case PollStatus::MustWait:
695+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p\n",
696+
group, waitingTask);
694697
// The waiting task has been queued on the channel,
695698
// there were pending tasks so it will be woken up eventually.
696699
#ifdef __ARM_ARCH_7K__
@@ -703,7 +706,9 @@ static void swift_taskGroup_wait_next_throwingImpl(
703706
case PollStatus::Empty:
704707
case PollStatus::Error:
705708
case PollStatus::Success:
706-
fillGroupNextResult(context, polled);
709+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p\n",
710+
group, waitingTask, polled.retainedTask);
711+
fillGroupNextResult(context, polled, /*release*/true);
707712
return waitingTask->runInFullyEstablishedContext();
708713
}
709714
}

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,9 @@ namespace swift {
4444
// Set to 1 to enable helpful debug spew to stderr
4545
#if 0
4646
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
47-
fprintf(stderr, "[%lu] " fmt "\n", (unsigned long)_swift_get_thread_id(), \
47+
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \
48+
(unsigned long)_swift_get_thread_id(), \
49+
__FILE__, __LINE__, __FUNCTION__, \
4850
__VA_ARGS__)
4951
#else
5052
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0

test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,11 @@ func test_taskGroup_next() async {
2626
do {
2727
while let r = try await group.next() {
2828
sum += r
29+
print("add \(r) -> sum: \(sum)")
2930
}
3031
} catch {
3132
catches += 1
33+
print("catch: \(catches)")
3234
}
3335
}
3436

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// UNSUPPORTED: use_os_stdlib
5+
// UNSUPPORTED: back_deployment_runtime
6+
// UNSUPPORTED: linux
7+
8+
#if os(Linux)
9+
import Glibc
10+
#elseif os(Windows)
11+
import MSVCRT
12+
#else
13+
import Darwin
14+
#endif
15+
16+
@available(SwiftStdlib 5.5, *)
17+
func test_taskGroup_next() async {
18+
_ = await withTaskGroup(of: Int.self, returning: Int.self) { group in
19+
for n in 0..<100 {
20+
group.spawn {
21+
return n
22+
}
23+
}
24+
await Task.sleep(2_000_000)
25+
26+
var sum = 0
27+
for await value in group {
28+
sum += 1
29+
}
30+
31+
return sum
32+
}
33+
34+
// CHECK: result with group.next(): 100
35+
print("result with group.next(): \(100)")
36+
}
37+
38+
@available(SwiftStdlib 5.5, *)
39+
@main struct Main {
40+
static func main() async {
41+
await test_taskGroup_next()
42+
}
43+
}

test/Concurrency/Runtime/async_taskgroup_next_on_completed.swift

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,23 @@ func test_sum_nextOnCompleted() async {
1515
let numbers = [1, 2, 3, 4, 5]
1616
let expected = 15 // FIXME: numbers.reduce(0, +) this hangs?
1717

18-
let sum = try! await withTaskGroup(of: Int.self) {
19-
(group) async -> Int in
18+
let sum = try! await withTaskGroup(of: Int.self) { group async -> Int in
2019
for n in numbers {
2120
group.spawn {
22-
() async -> Int in
2321
print(" complete group.spawn { \(n) }")
2422
return n
2523
}
2624
}
2725

2826
// We specifically want to await on completed child tasks in this test,
2927
// so give them some time to complete before we hit group.next()
30-
await Task.sleep(2_000_000_000)
28+
try! await Task.sleep(nanoseconds: 2_000_000_000)
3129

3230
var sum = 0
33-
do {
34-
while let r = try await group.next() {
35-
print("next: \(r)")
36-
sum += r
37-
print("sum: \(sum)")
38-
}
39-
} catch {
40-
print("ERROR: \(error)")
31+
while let r = await group.next() {
32+
print("next: \(r)")
33+
sum += r
34+
print("sum: \(sum)")
4135
}
4236

4337
assert(group.isEmpty, "Group must be empty after we consumed all tasks")

0 commit comments

Comments
 (0)