Skip to content

Commit 8525a45

Browse files
committed
detach child task after filling result; dont mutate context outside of lock
1 parent 27de643 commit 8525a45

File tree

2 files changed

+75
-98
lines changed

2 files changed

+75
-98
lines changed

stdlib/public/Concurrency/DiscardingTaskGroup.swift

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,11 +81,12 @@ public func withDiscardingTaskGroup<GroupResult>(
8181

8282
let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self)
8383
var group = DiscardingTaskGroup(group: _group)
84-
defer { Builtin.destroyTaskGroup(_group) }
8584

8685
let result = await body(&group)
8786

8887
try! await group.awaitAllRemainingTasks() // try!-safe, cannot throw since this is a non throwing group
88+
89+
Builtin.destroyTaskGroup(_group)
8990
return result
9091
#else
9192
fatalError("Swift compiler is incompatible with this SDK version")
@@ -385,7 +386,6 @@ public func withThrowingDiscardingTaskGroup<GroupResult>(
385386

386387
let _group = Builtin.createTaskGroupWithFlags(flags, GroupResult.self)
387388
var group = ThrowingDiscardingTaskGroup<Error>(group: _group)
388-
defer { Builtin.destroyTaskGroup(_group) }
389389

390390
let result: GroupResult
391391
do {
@@ -395,11 +395,13 @@ public func withThrowingDiscardingTaskGroup<GroupResult>(
395395

396396
try await group.awaitAllRemainingTasks(bodyError: error)
397397

398+
Builtin.destroyTaskGroup(_group)
398399
throw error
399400
}
400401

401402
try await group.awaitAllRemainingTasks(bodyError: nil)
402403

404+
Builtin.destroyTaskGroup(_group)
403405
return result
404406
#else
405407
fatalError("Swift compiler is incompatible with this SDK version")

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 71 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050

5151
using namespace swift;
5252

53-
#if 1
53+
#if 0
5454
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
5555
fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \
5656
(unsigned long)Thread::current().platformThreadId(), \
@@ -114,7 +114,7 @@ class NaiveTaskGroupQueue {
114114
queue = std::move(other.queue);
115115
}
116116

117-
virtual ~NaiveTaskGroupQueue() {}
117+
~NaiveTaskGroupQueue() {}
118118

119119
bool dequeue(T &output) {
120120
if (queue.empty()) {
@@ -367,8 +367,10 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
367367
/// \param bodyError error thrown by the body of a with...TaskGroup method
368368
/// \param waitingTask the task waiting on the group
369369
/// \param rawContext used to resume the waiting task
370-
/// \return how the waiting task should be handled, e.g. must wait or can be completed immediately
371-
PollResult waitAll(SwiftError* bodyError, AsyncTask *waitingTask, AsyncContext* rawContext);
370+
void waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
371+
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
372+
ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
373+
AsyncContext *rawContext);
372374

373375
// Enqueue the completed task onto ready queue if there are no waiting tasks yet
374376
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0;
@@ -1035,7 +1037,7 @@ static void _enqueueRawError(DiscardingTaskGroup* _Nonnull group,
10351037
// TaskGroup is locked upon entry and exit
10361038
void AccumulatingTaskGroup::enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) {
10371039
// Retain the task while it is in the queue; it must remain alive until
1038-
// it is found by poll. This retain will balanced by the release in poll.
1040+
// it is found by poll. This retain will be balanced by the release in poll.
10391041
swift_retain(completedTask);
10401042

10411043
_enqueueCompletedTask(&readyQueue, completedTask, hadErrorResult);
@@ -1181,9 +1183,11 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11811183
if (readyQueue.dequeue(readyErrorItem)) {
11821184
switch (readyErrorItem.getStatus()) {
11831185
case ReadyStatus::RawError:
1186+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with raw error:%p", readyErrorItem.getRawError(this));
11841187
resumeWaitingTaskWithError(readyErrorItem.getRawError(this), assumed, alreadyDecrementedStatus);
11851188
break;
11861189
case ReadyStatus::Error:
1190+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, resume with errorItem.task:%p", readyErrorItem.getTask());
11871191
resumeWaitingTask(readyErrorItem.getTask(), assumed, /*hadErrorResult=*/true, alreadyDecrementedStatus);
11881192
break;
11891193
default:
@@ -1197,16 +1201,18 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11971201
} else if (readyQueue.isEmpty()) {
11981202
// There was no waiting task, or other tasks are still pending, so we cannot
11991203
// it is the first error we encountered, thus we need to store it for future throwing
1204+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, enqueue child task:%p", completedTask);
12001205
enqueueCompletedTask(completedTask, hadErrorResult);
12011206
} else {
1207+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, complete, discard child task:%p", completedTask);
12021208
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
12031209
}
12041210

12051211
unlock();
12061212
return;
12071213
}
12081214

1209-
assert(!hadErrorResult);
1215+
assert(!hadErrorResult && "only successfully completed tasks can reach here");
12101216
if (afterComplete.hasWaitingTask() && afterComplete.pendingTasks(this) == 0) {
12111217
SWIFT_TASK_GROUP_DEBUG_LOG(this,
12121218
"offer, last pending task completed successfully, resume waitingTask with completedTask:%p",
@@ -1230,7 +1236,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12301236
} else {
12311237
// This is the last task, we have a waiting task and there was no error stored previously;
12321238
// We must resume the waiting task with a success, so let us return here.
1233-
resumeWaitingTask(completedTask, assumed, hadErrorResult, alreadyDecrementedStatus);
1239+
resumeWaitingTask(completedTask, assumed, /*hadErrorResult=*/false, alreadyDecrementedStatus);
12341240
}
12351241
} else {
12361242
// it wasn't the last pending task, and there is no-one to resume;
@@ -1292,8 +1298,14 @@ void TaskGroupBase::resumeWaitingTask(
12921298
// Run the task.
12931299
auto result = PollResult::get(completedTask, hadErrorResult);
12941300
SWIFT_TASK_GROUP_DEBUG_LOG(this,
1295-
"resume waiting DONE, task = %p, backup = %p, complete with = %p, status = %s",
1296-
waitingTask, backup, completedTask, statusString().c_str());
1301+
"resume waiting DONE, task = %p, backup = %p, error:%d, complete with = %p, status = %s",
1302+
waitingTask, backup, hadErrorResult, completedTask, statusString().c_str());
1303+
1304+
auto waitingContext =
1305+
static_cast<TaskFutureWaitAsyncContext *>(
1306+
waitingTask->ResumeContext);
1307+
1308+
fillGroupNextResult(waitingContext, result);
12971309

12981310
// Remove the child from the task group's running tasks list.
12991311
// The parent task isn't currently running (we're about to wake
@@ -1305,12 +1317,6 @@ void TaskGroupBase::resumeWaitingTask(
13051317
// we can't be holding its locks ourselves.
13061318
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
13071319

1308-
auto waitingContext =
1309-
static_cast<TaskFutureWaitAsyncContext *>(
1310-
waitingTask->ResumeContext);
1311-
1312-
fillGroupNextResult(waitingContext, result);
1313-
13141320
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
13151321
// TODO: allow the caller to suggest an executor
13161322
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
@@ -1651,80 +1657,29 @@ static void swift_taskGroup_waitAllImpl(
16511657
auto waitingTask = swift_task_getCurrent();
16521658

16531659
auto group = asBaseImpl(_group);
1654-
PollResult polled = group->waitAll(bodyError, waitingTask, rawContext);
1655-
1656-
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
1657-
context->ResumeParent =
1658-
reinterpret_cast<TaskContinuationFunction *>(resumeFunction);
1659-
context->Parent = callerContext;
1660-
context->errorResult = nullptr;
1661-
context->successResultPointer = resultPointer;
1662-
1663-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s, status.NOW=%s",
1664-
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str(), group->statusString().c_str());
1665-
1666-
switch (polled.status) {
1667-
case PollStatus::MustWait: {
1668-
// The waiting task has been queued on the channel,
1669-
// there were pending tasks so it will be woken up eventually.
1670-
#ifdef __ARM_ARCH_7K__
1671-
workaround_function_swift_taskGroup_waitAllImpl(
1672-
resultPointer, callerContext, _group, bodyError, resumeFunction, rawContext);
1673-
#endif /* __ARM_ARCH_7K__ */
1674-
return;
1675-
}
1676-
1677-
case PollStatus::Error: {
1678-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl Error, waiting task = %p, body error = %p, status:%s",
1679-
waitingTask, bodyError, group->statusString().c_str());
1680-
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1681-
if (bodyError) {
1682-
fillGroupNextErrorResult(context, bodyError);
1683-
} else {
1684-
fillGroupNextResult(context, polled);
1685-
}
1686-
#else // so, not SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1687-
fillGroupNextResult(context, polled);
1688-
#endif // SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1689-
if (auto completedTask = polled.retainedTask) {
1690-
// Remove the child from the task group's running tasks list.
1691-
_swift_taskGroup_detachChild(asAbstract(group), completedTask);
1692-
1693-
// Balance the retain done by enqueueCompletedTask.
1694-
swift_release(completedTask);
1695-
}
1696-
1697-
return waitingTask->runInFullyEstablishedContext();
1698-
}
1699-
1700-
case PollStatus::Empty:
1701-
case PollStatus::Success: {
1702-
/// Anything else than a "MustWait" can be treated as a successful poll.
1703-
/// Only if there are in flight pending tasks do we need to wait after all.
1704-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl %s, waiting task = %p, status:%s",
1705-
polled.status == TaskGroupBase::PollStatus::Empty ? "empty" : "success",
1706-
waitingTask, group->statusString().c_str());
1707-
1708-
if (bodyError) {
1709-
// None of the inner tasks have thrown, so we have to "re throw" the body error:
1710-
fillGroupNextErrorResult(context, bodyError);
1711-
} else {
1712-
fillGroupNextNilResult(context, polled);
1713-
}
1714-
1715-
return waitingTask->runInFullyEstablishedContext();
1716-
}
1717-
}
1660+
return group->waitAll(
1661+
bodyError, waitingTask,
1662+
resultPointer, callerContext, resumeFunction, rawContext);
17181663
}
17191664

1720-
PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask, AsyncContext *rawContext) {
1665+
void TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
1666+
OpaqueValue *resultPointer, SWIFT_ASYNC_CONTEXT AsyncContext *callerContext,
1667+
ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
1668+
AsyncContext *rawContext) {
17211669
lock();
17221670

17231671
// must mutate the waiting task while holding the group lock,
17241672
// so we don't get an offer concurrently trying to do so
17251673
waitingTask->ResumeTask = task_group_wait_resume_adapter;
17261674
waitingTask->ResumeContext = rawContext;
17271675

1676+
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
1677+
context->ResumeParent =
1678+
reinterpret_cast<TaskContinuationFunction *>(resumeFunction);
1679+
context->Parent = callerContext;
1680+
context->errorResult = nullptr;
1681+
context->successResultPointer = resultPointer;
1682+
17281683
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, bodyError = %p, status = %s", bodyError, statusString().c_str());
17291684
PollResult result = PollResult::getEmpty(this->successType);
17301685
result.status = PollStatus::Empty;
@@ -1748,6 +1703,7 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
17481703
/// out of waitAll - providing the "the first error gets thrown" semantics of the group.
17491704
/// The readyQueue is allowed to have exactly one error element in this case.
17501705
if (isDiscardingResults()) {
1706+
// ---- 1.1) A discarding group needs to check if there is a stored error to throw
17511707
auto discardingGroup = asDiscardingImpl(this);
17521708
ReadyQueueItem firstErrorItem;
17531709
if (readyQueue.dequeue(firstErrorItem)) {
@@ -1758,25 +1714,41 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
17581714
result.status = PollStatus::Error;
17591715
}
17601716
} // else, we're definitely Empty
1761-
unlock();
1762-
return result;
1763-
}
1717+
} // else (in an accumulating group), a waitAll can bail out early Empty
17641718

1765-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "group is empty, no pending tasks, status = %s", assumed.to_string(this).c_str());
1719+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, early return, no pending tasks, bodyError:%p, status = %s",
1720+
bodyError, assumed.to_string(this).c_str());
17661721
// No tasks in flight, we know no tasks were submitted before this poll
17671722
// was issued, and if we parked here we'd potentially never be woken up.
1768-
// Bail out and return `nil` from `group.next()`.
1723+
1724+
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1725+
if (bodyError) {
1726+
fillGroupNextErrorResult(context, bodyError);
1727+
} else {
1728+
fillGroupNextResult(context, result);
1729+
}
1730+
#else // so, not SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1731+
fillGroupNextResult(context, polled);
1732+
#endif // SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
1733+
if (auto completedTask = result.retainedTask) {
1734+
// Remove the child from the task group's running tasks list.
1735+
_swift_taskGroup_detachChild(asAbstract(this), completedTask);
1736+
1737+
// Balance the retain done by enqueueCompletedTask.
1738+
swift_release(completedTask);
1739+
}
1740+
1741+
waitingTask->runInFullyEstablishedContext();
1742+
17691743
unlock();
1770-
return result;
1744+
return;
17711745
}
17721746

17731747
// ==== 2) Add to wait queue -------------------------------------------------
17741748

17751749
// ---- 2.1) Discarding task group may need to story the bodyError before we park
1776-
if (bodyError && isDiscardingResults()) {
1750+
if (bodyError && isDiscardingResults() && readyQueue.isEmpty()) {
17771751
auto discardingGroup = asDiscardingImpl(this);
1778-
assert(readyQueue.isEmpty() &&
1779-
"only a single error may be stored in discarding task group, but something was enqueued already");
17801752
auto readyItem = ReadyQueueItem::getRawError(discardingGroup, bodyError);
17811753
readyQueue.enqueue(readyItem);
17821754
}
@@ -1789,16 +1761,13 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
17891761
waitingTask->flagAsSuspended();
17901762
}
17911763
// Put the waiting task at the beginning of the wait queue.
1792-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "WATCH OUT, set waiter onto... waitQueue.head = %p", waitQueue.load(std::memory_order_relaxed));
1793-
17941764
if (waitQueue.compare_exchange_strong(
17951765
waitHead, waitingTask,
17961766
/*success*/ std::memory_order_release,
17971767
/*failure*/ std::memory_order_acquire)) {
17981768
statusMarkWaitingAssumeRelease();
17991769
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, marked waiting status = %s", statusString().c_str());
1800-
unlock();
1801-
1770+
18021771
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
18031772
// The logic here is paired with the logic in TaskGroupBase::offer. Once
18041773
// we run the
@@ -1821,10 +1790,16 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask,
18211790
_swift_task_setCurrent(oldTask);
18221791
goto reevaluate_if_TaskGroup_has_results;
18231792
#endif
1824-
// no ready tasks, so we must wait.
1825-
result.status = PollStatus::MustWait;
1793+
// The waiting task has been queued on the channel,
1794+
// there were pending tasks so it will be woken up eventually.
1795+
#ifdef __ARM_ARCH_7K__
1796+
workaround_function_swift_taskGroup_waitAllImpl(
1797+
resultPointer, callerContext, _group, bodyError, resumeFunction, rawContext);
1798+
#endif /* __ARM_ARCH_7K__ */
1799+
18261800
_swift_task_clearCurrent();
1827-
return result;
1801+
unlock();
1802+
return;
18281803
} // else, try again
18291804
}
18301805
}

0 commit comments

Comments
 (0)