Skip to content

Commit 13de654

Browse files
committed
Offering body error must be done while holding lock
1 parent ee6b902 commit 13de654

File tree

1 file changed

+52
-51
lines changed

1 file changed

+52
-51
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -51,15 +51,24 @@
5151
using namespace swift;
5252

5353
#if 0
54-
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
54+
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) \
5555
fprintf(stderr, "[%#lx] [%s:%d][group(%p%s)] (%s) " fmt "\n", \
5656
(unsigned long)Thread::current().platformThreadId(), \
5757
__FILE__, __LINE__, \
5858
group, group->isDiscardingResults() ? ",discardResults" : "", \
5959
__FUNCTION__, \
6060
__VA_ARGS__)
61+
62+
#define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) \
63+
fprintf(stderr, "[%#lx] [%s:%d][group(%p)] (%s) " fmt "\n", \
64+
(unsigned long)Thread::current().platformThreadId(), \
65+
__FILE__, __LINE__, \
66+
group, \
67+
__FUNCTION__, \
68+
__VA_ARGS__)
6169
#else
6270
#define SWIFT_TASK_GROUP_DEBUG_LOG(group, fmt, ...) (void)0
71+
#define SWIFT_TASK_GROUP_DEBUG_LOG_0(group, fmt, ...) (void)0
6372
#endif
6473

6574
using FutureFragment = AsyncTask::FutureFragment;
@@ -354,7 +363,11 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
354363
/// There can be only at-most-one waiting task on a group at any given time,
355364
/// and the waiting task is expected to be the parent task in which the group
356365
/// body is running.
357-
PollResult waitAll(AsyncTask *waitingTask);
366+
///
367+
/// \param bodyError error thrown by the body of a with...TaskGroup method
368+
/// \param waitingTask the task waiting on the group
369+
/// \return how the waiting task should be handled, e.g. must wait or can be completed immediately
370+
PollResult waitAll(SwiftError* bodyError, AsyncTask *waitingTask);
358371

359372
// Enqueue the completed task onto ready queue if there are no waiting tasks yet
360373
virtual void enqueueCompletedTask(AsyncTask *completedTask, bool hadErrorResult) = 0;
@@ -411,6 +424,16 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
411424
virtual TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) = 0;
412425
};
413426

427+
[[maybe_unused]]
428+
static std::string to_string(TaskGroupBase::PollStatus status) {
429+
switch (status) {
430+
case TaskGroupBase::PollStatus::Empty: return "Empty";
431+
case TaskGroupBase::PollStatus::MustWait: return "MustWait";
432+
case TaskGroupBase::PollStatus::Success: return "Success";
433+
case TaskGroupBase::PollStatus::Error: return "Error";
434+
}
435+
}
436+
414437
/// The status of a task group.
415438
///
416439
/// Its exact structure depends on the type of group, and therefore a group must be passed to operations
@@ -619,7 +642,7 @@ class AccumulatingTaskGroup: public TaskGroupBase {
619642
/// so unconditionally.
620643
///
621644
/// Returns *assumed* new status, including the just performed +1.
622-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) {
645+
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
623646
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
624647
std::memory_order_relaxed);
625648
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
@@ -713,7 +736,7 @@ class DiscardingTaskGroup: public TaskGroupBase {
713736
/// so unconditionally.
714737
///
715738
/// Returns *assumed* new status, including the just performed +1.
716-
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) {
739+
TaskGroupStatus statusAddPendingTaskRelaxed(bool unconditionally) override {
717740
auto old = status.fetch_add(TaskGroupStatus::onePendingTask,
718741
std::memory_order_relaxed);
719742
auto s = TaskGroupStatus{old + TaskGroupStatus::onePendingTask};
@@ -772,8 +795,6 @@ class DiscardingTaskGroup: public TaskGroupBase {
772795
/// and the waitingTask eventually be woken up by a completion.
773796
PollResult poll(AsyncTask *waitingTask);
774797

775-
bool offerBodyError(SwiftError* _Nonnull bodyError);
776-
777798
private:
778799
/// Resume waiting task with specified error
779800
void resumeWaitingTaskWithError(SwiftError *error, TaskGroupStatus &assumed);
@@ -855,8 +876,8 @@ static void swift_taskGroup_initializeWithFlagsImpl(size_t rawGroupFlags,
855876
TaskGroup *group, const Metadata *T) {
856877

857878
TaskGroupFlags groupFlags(rawGroupFlags);
858-
SWIFT_TASK_DEBUG_LOG("group(%p) create; flags: isDiscardingResults=%d",
859-
group, groupFlags.isDiscardResults());
879+
SWIFT_TASK_GROUP_DEBUG_LOG_0(group, "create group; flags: isDiscardingResults=%d",
880+
groupFlags.isDiscardResults());
860881

861882
TaskGroupBase *impl;
862883
if (groupFlags.isDiscardResults()) {
@@ -1151,7 +1172,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
11511172
assert(completedTask->hasChildFragment());
11521173
assert(completedTask->hasGroupChildFragment());
11531174
assert(completedTask->groupChildFragment()->getGroup() == asAbstract(this));
1154-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p , status:%s", completedTask, statusString().c_str());
1175+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, completedTask:%p, status:%s", completedTask, statusString().c_str());
11551176

11561177
// The current ownership convention is that we are *not* given ownership
11571178
// of a retain on completedTask; we're called from the task completion
@@ -1217,7 +1238,7 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12171238
}
12181239

12191240
auto afterComplete = statusCompletePendingAssumeRelease();
1220-
(void)afterComplete; // silence "not used" warning
1241+
(void) afterComplete;
12211242
SWIFT_TASK_GROUP_DEBUG_LOG(this, "offer, either more pending tasks, or no waiting task, status:%s",
12221243
afterComplete.to_string(this).c_str());
12231244
}
@@ -1628,7 +1649,7 @@ static void swift_taskGroup_waitAllImpl(
16281649
waitingTask->ResumeContext = rawContext;
16291650

16301651
auto group = asBaseImpl(_group);
1631-
PollResult polled = group->waitAll(waitingTask);
1652+
PollResult polled = group->waitAll(bodyError, waitingTask);
16321653

16331654
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
16341655
context->ResumeParent =
@@ -1637,23 +1658,13 @@ static void swift_taskGroup_waitAllImpl(
16371658
context->errorResult = nullptr;
16381659
context->successResultPointer = resultPointer;
16391660

1640-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s",
1641-
waitingTask, bodyError, group->statusString().c_str());
1661+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl, waiting task = %p, bodyError = %p, status:%s, polled.status = %s",
1662+
waitingTask, bodyError, group->statusString().c_str(), to_string(polled.status).c_str());
16421663

16431664
switch (polled.status) {
16441665
case PollStatus::MustWait:
1645-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll MustWait, pending tasks exist, waiting task = %p",
1666+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl MustWait, pending tasks exist, waiting task = %p",
16461667
waitingTask);
1647-
if (bodyError && group->isDiscardingResults()) {
1648-
auto discardingGroup = asDiscardingImpl(_group);
1649-
bool storedBodyError = discardingGroup->offerBodyError(bodyError);
1650-
if (storedBodyError) {
1651-
SWIFT_TASK_GROUP_DEBUG_LOG(
1652-
group, "waitAll, stored error thrown by with...Group body, error = %p",
1653-
bodyError);
1654-
}
1655-
}
1656-
16571668
// The waiting task has been queued on the channel,
16581669
// there were pending tasks so it will be woken up eventually.
16591670
#ifdef __ARM_ARCH_7K__
@@ -1664,7 +1675,7 @@ static void swift_taskGroup_waitAllImpl(
16641675
#endif /* __ARM_ARCH_7K__ */
16651676

16661677
case PollStatus::Error:
1667-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll found error, waiting task = %p, body error = %p, status:%s",
1678+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl Error, waiting task = %p, body error = %p, status:%s",
16681679
waitingTask, bodyError, group->statusString().c_str());
16691680
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
16701681
if (bodyError) {
@@ -1686,17 +1697,10 @@ static void swift_taskGroup_waitAllImpl(
16861697
return waitingTask->runInFullyEstablishedContext();
16871698

16881699
case PollStatus::Empty:
1689-
/// Anything else than a "MustWait" can be treated as a successful poll.
1690-
/// Only if there are in flight pending tasks do we need to wait after all.
1691-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll %s, waiting task = %p, status:%s",
1692-
polled.status == TaskGroupBase::PollStatus::Empty ? "empty" : "success",
1693-
waitingTask, group->statusString().c_str());
1694-
1695-
16961700
case PollStatus::Success:
16971701
/// Anything else than a "MustWait" can be treated as a successful poll.
16981702
/// Only if there are in flight pending tasks do we need to wait after all.
1699-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAll %s, waiting task = %p, status:%s",
1703+
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl %s, waiting task = %p, status:%s",
17001704
polled.status == TaskGroupBase::PollStatus::Empty ? "empty" : "success",
17011705
waitingTask, group->statusString().c_str());
17021706

@@ -1711,26 +1715,13 @@ static void swift_taskGroup_waitAllImpl(
17111715
}
17121716
}
17131717

1714-
bool DiscardingTaskGroup::offerBodyError(SwiftError* _Nonnull bodyError) {
1718+
/// Must be called while holding the `taskGroup.lock`!
1719+
/// This is because the discarding task group still has some follow-up operations that must
1720+
/// be performed atomically after this operation sometimes, so we cannot unlock inside `waitAll` itself.
1721+
PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask) {
17151722
lock(); // TODO: remove group lock, and use status for synchronization
17161723

1717-
if (!readyQueue.isEmpty()) {
1718-
// already other error stored, discard this one
1719-
unlock();
1720-
return false;
1721-
}
1722-
1723-
auto readyItem = ReadyQueueItem::getRawError(this, bodyError);
1724-
readyQueue.enqueue(readyItem);
1725-
unlock();
1726-
1727-
return true;
1728-
}
1729-
1730-
PollResult TaskGroupBase::waitAll(AsyncTask *waitingTask) {
1731-
lock(); // TODO: remove group lock, and use status for synchronization
1732-
1733-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, status = %s", statusString().c_str());
1724+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, bodyError = %p, status = %s", bodyError, statusString().c_str());
17341725
PollResult result = PollResult::getEmpty(this->successType);
17351726
result.status = PollStatus::Empty;
17361727
result.storage = nullptr;
@@ -1774,6 +1765,16 @@ PollResult TaskGroupBase::waitAll(AsyncTask *waitingTask) {
17741765
}
17751766

17761767
// ==== 2) Add to wait queue -------------------------------------------------
1768+
1769+
// ---- 2.1) Discarding task group may need to story the bodyError before we park
1770+
if (bodyError && isDiscardingResults()) {
1771+
auto discardingGroup = asDiscardingImpl(this);
1772+
assert(readyQueue.isEmpty() &&
1773+
"only a single error may be stored in discarding task group, but something was enqueued already");
1774+
auto readyItem = ReadyQueueItem::getRawError(discardingGroup, bodyError);
1775+
readyQueue.enqueue(readyItem);
1776+
}
1777+
17771778
auto waitHead = waitQueue.load(std::memory_order_acquire);
17781779
_swift_tsan_release(static_cast<Job *>(waitingTask));
17791780
while (true) {

0 commit comments

Comments
 (0)