Skip to content

Commit 740e87b

Browse files
ktosoDougGregor
authored andcommitted
[Concurrency] Fix group child tasks not being released
The proper handling of task group child tasks is that: - if it completes a waiting task immediately, we don't need to retain it - we just move the value to the waiting task and can destroy the task - if we need to store the ready task and wait for a waiting task (for a task that hits `await group.next()`) then we need to retain the ready task. - as the waiting task arrives, we move the value from the ready task to the waiting task, and swift_release the ready task -- it will now be destroyed safely. (cherry picked from commit d4ebc58)
1 parent 83fa59d commit 740e87b

File tree

6 files changed

+112
-44
lines changed

6 files changed

+112
-44
lines changed

stdlib/public/Concurrency/Task.cpp

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ void NullaryContinuationJob::process(Job *_job) {
143143
void AsyncTask::completeFuture(AsyncContext *context) {
144144
using Status = FutureFragment::Status;
145145
using WaitQueueItem = FutureFragment::WaitQueueItem;
146-
146+
SWIFT_TASK_DEBUG_LOG("complete future = %p", this);
147147
assert(isFuture());
148148
auto fragment = futureFragment();
149149

@@ -232,7 +232,6 @@ AsyncTask::~AsyncTask() {
232232
SWIFT_CC(swift)
233233
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
234234
auto task = static_cast<AsyncTask*>(obj);
235-
236235
task->~AsyncTask();
237236

238237
// The task execution itself should always hold a reference to it, so

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 35 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -489,16 +489,8 @@ void TaskGroupImpl::destroy() {
489489
// First, remove the group from the task and deallocate the record
490490
swift_task_removeStatusRecord(getTaskRecord());
491491

492-
mutex.lock(); // TODO: remove lock, and use status for synchronization
493-
// Release all ready tasks which are kept retained, the group destroyed,
494-
// so no other task will ever await on them anymore;
495-
ReadyQueueItem item;
496-
bool taskDequeued = readyQueue.dequeue(item);
497-
while (taskDequeued) {
498-
swift_release(item.getTask());
499-
taskDequeued = readyQueue.dequeue(item);
500-
}
501-
mutex.unlock(); // TODO: remove fragment lock, and use status for synchronization
492+
// By the time we call destroy, all tasks inside the group must have been
493+
// awaited on already; We handle this on the swift side.
502494
}
503495

504496
// =============================================================================
@@ -513,16 +505,18 @@ bool TaskGroup::isCancelled() {
513505
}
514506

515507
static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
516-
PollResult result) {
508+
PollResult result,
509+
bool releaseResultRetainedTask) {
517510
/// Fill in the result value
518511
switch (result.status) {
519512
case PollStatus::MustWait:
520513
assert(false && "filling a waiting status?");
521514
return;
522515

523-
case PollStatus::Error:
524-
context->fillWithError(reinterpret_cast<SwiftError*>(result.storage));
525-
return;
516+
case PollStatus::Error: {
517+
context->fillWithError(reinterpret_cast<SwiftError *>(result.storage));
518+
break;
519+
}
526520

527521
case PollStatus::Success: {
528522
// Initialize the result as an Optional<Success>.
@@ -533,17 +527,28 @@ static void fillGroupNextResult(TaskFutureWaitAsyncContext *context,
533527
// remaining references to it.
534528
successType->vw_initializeWithCopy(destPtr, result.storage);
535529
successType->vw_storeEnumTagSinglePayload(destPtr, 0, 1);
536-
return;
530+
break;
537531
}
538532

539533
case PollStatus::Empty: {
540534
// Initialize the result as a nil Optional<Success>.
541535
const Metadata *successType = result.successType;
542536
OpaqueValue *destPtr = context->successResultPointer;
543537
successType->vw_storeEnumTagSinglePayload(destPtr, 1, 1);
544-
return;
538+
break;
545539
}
546540
}
541+
542+
// We only release if asked to; This is because this function is called in two
543+
// cases "immediately":
544+
// a) when a completed task arrives and a waiting one existed then we don't
545+
// need to retain the completed task at all, thus we also don't release it.
546+
// b) when the task was stored in the readyQueue it was retained. As a
547+
// waitingTask arrives we will fill-in with the value from the retained
548+
// task. In this situation we must release the ready task, to allow it to
549+
// be destroyed.
550+
if (releaseResultRetainedTask)
551+
swift_release(result.retainedTask);
547552
}
548553

549554
void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
@@ -552,16 +557,7 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
552557
assert(completedTask->hasChildFragment());
553558
assert(completedTask->hasGroupChildFragment());
554559
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
555-
556-
// We retain the completed task, because we will either:
557-
// - (a) schedule the waiter to resume on the next() that it is waiting on, or
558-
// - (b) will need to store this task until the group task enters next() and
559-
// picks up this task.
560-
// either way, there is some time between us returning here, and the `completeTask`
561-
// issuing a swift_release on this very task. We need to keep it alive until
562-
// we have the chance to poll it from the queue (via the waiter task entering
563-
// calling next()).
564-
swift_retain(completedTask);
560+
SWIFT_TASK_DEBUG_LOG("offer task %p to group %p\n", completedTask, group);
565561

566562
mutex.lock(); // TODO: remove fragment lock, and use status for synchronization
567563

@@ -585,6 +581,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
585581
// ==== a) has waiting task, so let us complete it right away
586582
if (assumed.hasWaitingTask()) {
587583
auto waitingTask = waitQueue.load(std::memory_order_acquire);
584+
SWIFT_TASK_DEBUG_LOG("group has waiting task = %p, complete with = %p",
585+
waitingTask, completedTask);
588586
while (true) {
589587
// ==== a) run waiting task directly -------------------------------------
590588
assert(assumed.hasWaitingTask());
@@ -605,15 +603,16 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
605603
static_cast<TaskFutureWaitAsyncContext *>(
606604
waitingTask->ResumeContext);
607605

608-
fillGroupNextResult(waitingContext, result);
609-
606+
fillGroupNextResult(waitingContext, result, /*release*/false);
610607
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
611608

612609
// TODO: allow the caller to suggest an executor
613610
swift_task_enqueueGlobal(waitingTask);
614611
return;
615612
} // else, try again
616613
}
614+
615+
llvm_unreachable("should have enqueued and returned.");
617616
}
618617

619618
// ==== b) enqueue completion ------------------------------------------------
@@ -622,7 +621,8 @@ void TaskGroupImpl::offer(AsyncTask *completedTask, AsyncContext *context) {
622621
// queue when a task polls during next() it will notice that we have a value
623622
// ready for it, and will process it immediately without suspending.
624623
assert(!waitQueue.load(std::memory_order_relaxed));
625-
624+
SWIFT_TASK_DEBUG_LOG("group has no waiting tasks, store ready task = %p",
625+
completedTask);
626626
// Retain the task while it is in the queue;
627627
// it must remain alive until the task group is alive.
628628
swift_retain(completedTask);
@@ -667,6 +667,7 @@ SWIFT_CC(swiftasync) static void workaround_function_swift_taskGroup_wait_next_t
667667

668668
// =============================================================================
669669
// ==== group.next() implementation (wait_next and groupPoll) ------------------
670+
670671
SWIFT_CC(swiftasync)
671672
static void swift_taskGroup_wait_next_throwingImpl(
672673
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
@@ -690,6 +691,8 @@ static void swift_taskGroup_wait_next_throwingImpl(
690691
PollResult polled = group->poll(waitingTask);
691692
switch (polled.status) {
692693
case PollStatus::MustWait:
694+
SWIFT_TASK_DEBUG_LOG("poll group = %p, no ready tasks, waiting task = %p\n",
695+
group, waitingTask);
693696
// The waiting task has been queued on the channel,
694697
// there were pending tasks so it will be woken up eventually.
695698
#ifdef __ARM_ARCH_7K__
@@ -702,7 +705,9 @@ static void swift_taskGroup_wait_next_throwingImpl(
702705
case PollStatus::Empty:
703706
case PollStatus::Error:
704707
case PollStatus::Success:
705-
fillGroupNextResult(context, polled);
708+
SWIFT_TASK_DEBUG_LOG("poll group = %p, task = %p, ready task available = %p\n",
709+
group, waitingTask, polled.retainedTask);
710+
fillGroupNextResult(context, polled, /*release*/true);
706711
return waitingTask->runInFullyEstablishedContext();
707712
}
708713
}

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,31 @@ namespace swift {
3232
// Uncomment to enable helpful debug spew to stderr
3333
//#define SWIFT_TASK_PRINTF_DEBUG 1
3434

35+
// Set to 1 to enable helpful debug spew to stderr
36+
#if 0
37+
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) \
38+
fprintf(stderr, "[%lu] [%s:%d](%s) " fmt "\n", \
39+
(unsigned long)_swift_get_thread_id(), \
40+
__FILE__, __LINE__, __FUNCTION__, \
41+
__VA_ARGS__)
42+
#else
43+
#define SWIFT_TASK_DEBUG_LOG(fmt, ...) (void)0
44+
#endif
45+
46+
#if defined(_WIN32)
47+
using ThreadID = decltype(GetCurrentThreadId());
48+
#else
49+
using ThreadID = decltype(pthread_self());
50+
#endif
51+
52+
inline ThreadID _swift_get_thread_id() {
53+
#if defined(_WIN32)
54+
return GetCurrentThreadId();
55+
#else
56+
return pthread_self();
57+
#endif
58+
}
59+
3560
class AsyncTask;
3661
class TaskGroup;
3762

test/Concurrency/Runtime/async_taskgroup_asynciterator_semantics.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ func test_taskGroup_next() async {
2727
do {
2828
while let r = try await group.next() {
2929
sum += r
30+
print("add \(r) -> sum: \(sum)")
3031
}
3132
} catch {
3233
catches += 1
34+
print("catch: \(catches)")
3335
}
3436
}
3537

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
// RUN: %target-run-simple-swift( -Xfrontend -disable-availability-checking -parse-as-library) | %FileCheck %s
2+
// REQUIRES: executable_test
3+
// REQUIRES: concurrency
4+
// UNSUPPORTED: use_os_stdlib
5+
// UNSUPPORTED: back_deployment_runtime
6+
// UNSUPPORTED: linux
7+
8+
#if os(Linux)
9+
import Glibc
10+
#elseif os(Windows)
11+
import MSVCRT
12+
#else
13+
import Darwin
14+
#endif
15+
16+
@available(SwiftStdlib 5.5, *)
17+
func test_taskGroup_next() async {
18+
_ = await withTaskGroup(of: Int.self, returning: Int.self) { group in
19+
for n in 0..<100 {
20+
group.spawn {
21+
return n
22+
}
23+
}
24+
await Task.sleep(2_000_000)
25+
26+
var sum = 0
27+
for await value in group {
28+
sum += 1
29+
}
30+
31+
return sum
32+
}
33+
34+
// CHECK: result with group.next(): 100
35+
print("result with group.next(): \(100)")
36+
}
37+
38+
@available(SwiftStdlib 5.5, *)
39+
@main struct Main {
40+
static func main() async {
41+
await test_taskGroup_next()
42+
}
43+
}

test/Concurrency/Runtime/async_taskgroup_next_on_completed.swift

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,23 @@ func test_sum_nextOnCompleted() async {
1515
let numbers = [1, 2, 3, 4, 5]
1616
let expected = 15 // FIXME: numbers.reduce(0, +) this hangs?
1717

18-
let sum = try! await withTaskGroup(of: Int.self) {
19-
(group) async -> Int in
18+
let sum = try! await withTaskGroup(of: Int.self) { group async -> Int in
2019
for n in numbers {
2120
group.spawn {
22-
() async -> Int in
2321
print(" complete group.spawn { \(n) }")
2422
return n
2523
}
2624
}
2725

2826
// We specifically want to await on completed child tasks in this test,
2927
// so give them some time to complete before we hit group.next()
30-
await Task.sleep(2_000_000_000)
28+
try! await Task.sleep(nanoseconds: 2_000_000_000)
3129

3230
var sum = 0
33-
do {
34-
while let r = try await group.next() {
35-
print("next: \(r)")
36-
sum += r
37-
print("sum: \(sum)")
38-
}
39-
} catch {
40-
print("ERROR: \(error)")
31+
while let r = await group.next() {
32+
print("next: \(r)")
33+
sum += r
34+
print("sum: \(sum)")
4135
}
4236

4337
assert(group.isEmpty, "Group must be empty after we consumed all tasks")

0 commit comments

Comments
 (0)