Skip to content

Commit 945cd59

Browse files
committed
detach child task after filling in result
1 parent 27de643 commit 945cd59

File tree

2 files changed

+22
-17
lines changed

2 files changed

+22
-17
lines changed

stdlib/public/Concurrency/DiscardingTaskGroup.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ public func withDiscardingTaskGroup<GroupResult>(
8181

8282
let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self)
8383
var group = DiscardingTaskGroup(group: _group)
84-
defer { Builtin.destroyTaskGroup(_group) }
8584

8685
let result = await body(&group)
8786

8887
try! await group.awaitAllRemainingTasks() // try!-safe, cannot throw since this is a non throwing group
88+
89+
Builtin.destroyTaskGroup(_group)
8990
return result
9091
#else
9192
fatalError("Swift compiler is incompatible with this SDK version")
@@ -385,7 +386,6 @@ public func withThrowingDiscardingTaskGroup<GroupResult>(
385386

386387
let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self)
387388
var group = ThrowingDiscardingTaskGroup<Error>(group: _group)
388-
defer { Builtin.destroyTaskGroup(_group) }
389389

390390
let result: GroupResult
391391
do {
@@ -395,11 +395,13 @@ public func withThrowingDiscardingTaskGroup<GroupResult>(
395395

396396
try await group.awaitAllRemainingTasks(bodyError: error)
397397

398+
Builtin.destroyTaskGroup(_group)
398399
throw error
399400
}
400401

401402
try await group.awaitAllRemainingTasks(bodyError: nil)
402403

404+
Builtin.destroyTaskGroup(_group)
403405
return result
404406
#else
405407
fatalError("Swift compiler is incompatible with this SDK version")

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ class NaiveTaskGroupQueue {
114114
queue = std::move(other.queue);
115115
}
116116

117-
virtual ~NaiveTaskGroupQueue() {}
117+
~NaiveTaskGroupQueue() {}
118118

119119
bool dequeue(T &output) {
120120
if (queue.empty()) {
@@ -1035,7 +1035,7 @@ static void _enqueueRawError(DiscardingTaskGroup* _Nonnull group,
10351035
// TaskGroup is locked upon entry and exit
10361036
void AccumulatingTaskGroup::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) {
10371037
// Retain the task while it is in the queue; it must remain alive until
1038-
// it is found by poll. This retain will balanced by the release in poll.
1038+
// it is found by poll. This retain will be balanced by the release in poll.
10391039
swift_retain(completedTask);
10401040

10411041
_enqueueCompletedTask(&readyQueue, completedTask, hadErrorResult);
@@ -1181,9 +1181,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11811181
if (readyQueue.dequeue(readyErrorItem)) {
11821182
switch (readyErrorItem.getStatus()) {
11831183
case ReadyStatus::RawError:
1184+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with raw error:%p", readyErrorItem.getRawError(this));
11841185
resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus);
11851186
break;
11861187
case ReadyStatus::Error:
1188+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with errorItem.task:%p", readyErrorItem.getTask());
11871189
resumeWaitingTask(readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus);
11881190
break;
11891191
default:
@@ -1197,16 +1199,18 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11971199
} else if (readyQueue.isEmpty()) {
11981200
// There was no waiting task, or other tasks are still pending, so we cannot
11991201
// it is the first error we encountered, thus we need to store it for future throwing
1202+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, enqueue child task:%p", completedTask);
12001203
enqueueCompletedTask(completedTask, hadErrorResult);
12011204
} else {
1205+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard child task:%p", completedTask);
12021206
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
12031207
}
12041208

12051209
unlock();
12061210
return;
12071211
}
12081212

1209-
assert(!hadErrorResult);
1213+
assert(!hadErrorResult && "only successfully completed tasks can reach here");
12101214
if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) {
12111215
SWIFT_TASK_GROUP_DEBUG_LOG(this,
12121216
"offer, last pending task completed successfully, resume waitingTask with completedTask:%p",
@@ -1295,6 +1299,12 @@ void TaskGroupBase::resumeWaitingTask(
12951299
"resume waiting DONE, task = %p, backup = %p, complete with = %p, status = %s",
12961300
waitingTask, backup, completedTask, statusString().c_str());
12971301

1302+
auto waitingContext =
1303+
static_cast<TaskFutureWaitAsyncContext *>(
1304+
waitingTask->ResumeContext);
1305+
1306+
fillGroupNextResult(waitingContext, result);
1307+
12981308
// Remove the child from the task group's running tasks list.
12991309
// The parent task isn't currently running (we're about to wake
13001310
// it up), so we're still synchronous with it. We can safely
@@ -1305,12 +1315,6 @@ void TaskGroupBase::resumeWaitingTask(
13051315
// we can't be holding its locks ourselves.
13061316
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
13071317

1308-
auto waitingContext =
1309-
static_cast<TaskFutureWaitAsyncContext *>(
1310-
waitingTask->ResumeContext);
1311-
1312-
fillGroupNextResult(waitingContext, result);
1313-
13141318
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
13151319
// TODO: allow the caller to suggest an executor
13161320
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
@@ -1660,8 +1664,8 @@ static void swift_taskGroup_waitAllImpl(
16601664
context->errorResult = nullptr;
16611665
context->successResultPointer = resultPointer;
16621666

1663-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s, status.NOW=%s",
1664-
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str(), group->statusString().c_str());
1667+
// SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s, status.NOW=%s",
1668+
// waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str(), group->statusString().c_str());
16651669

16661670
switch (polled.status) {
16671671
case PollStatus::MustWait: {
@@ -1789,16 +1793,13 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
17891793
waitingTask->flagAsSuspended();
17901794
}
17911795
// Put the waiting task at the beginning of the wait queue.
1792-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "WATCH OUT, set waiter onto... waitQueue.head = %p", waitQueue.load(std::memory_order_relaxed));
1793-
17941796
if (waitQueue.compare_exchange_strong(
17951797
waitHead, waitingTask,
17961798
/*success*/ std::memory_order_release,
17971799
/*failure*/ std::memory_order_acquire)) {
17981800
statusMarkWaitingAssumeRelease();
17991801
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, marked waiting status = %s", statusString().c_str());
1800-
unlock();
1801-
1802+
18021803
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
18031804
// The logic here is paired with the logic in TaskGroupBase::offer. Once
18041805
// we run the
@@ -1824,6 +1825,8 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
18241825
// no ready tasks, so we must wait.
18251826
result.status = PollStatus::MustWait;
18261827
_swift_task_clearCurrent();
1828+
1829+
unlock();
18271830
return result;
18281831
} // else, try again
18291832
}

0 commit comments

Comments
 (0)