@@ -366,8 +366,9 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
366
366
// /
367
367
// / \param bodyError error thrown by the body of a with...TaskGroup method
368
368
// / \param waitingTask the task waiting on the group
369
+ // / \param rawContext used to resume the waiting task
369
370
// / \return how the waiting task should be handled, e.g. must wait or can be completed immediately
370
- PollResult waitAll (SwiftError* bodyError, AsyncTask *waitingTask);
371
+ PollResult waitAll (SwiftError* bodyError, AsyncTask *waitingTask, AsyncContext* rawContext );
371
372
372
373
// Enqueue the completed task onto ready queue if there are no waiting tasks yet
373
374
virtual void enqueueCompletedTask (AsyncTask *completedTask, bool hadErrorResult) = 0;
@@ -378,6 +379,7 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
378
379
// ==== Status manipulation -------------------------------------------------
379
380
380
381
TaskGroupStatus statusLoadRelaxed () const ;
382
+ TaskGroupStatus statusLoadAcquire () const ;
381
383
382
384
std::string statusString () const ;
383
385
@@ -409,6 +411,10 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
409
411
// / Remove waiting status bit.
410
412
TaskGroupStatus statusRemoveWaitingRelease ();
411
413
414
+ // / Mark the waiting status bit.
415
+ // / A waiting task MUST have been already enqueued in the `waitQueue`.
416
+ TaskGroupStatus statusMarkWaitingAssumeRelease ();
417
+
412
418
// / Cancels the group and returns true if was already cancelled before.
413
419
// / After this function returns, the group is guaranteed to be cancelled.
414
420
// /
@@ -521,7 +527,7 @@ struct TaskGroupStatus {
521
527
// / TaskGroupStatus{ C:{cancelled} W:{waiting task} R:{ready tasks} P:{pending tasks} {binary repr} }
522
528
// / If discarding results:
523
529
// / TaskGroupStatus{ C:{cancelled} W:{waiting task} P:{pending tasks} {binary repr} }
524
- std::string to_string (const TaskGroupBase* _Nonnull group) {
530
+ std::string to_string (const TaskGroupBase* group) {
525
531
std::string str;
526
532
str.append (" TaskGroupStatus{ " );
527
533
str.append (" C:" ); // cancelled
@@ -548,7 +554,7 @@ struct TaskGroupStatus {
548
554
bool TaskGroupBase::statusCompletePendingReadyWaiting (TaskGroupStatus &old) {
549
555
return status.compare_exchange_strong (
550
556
old.status , old.completingPendingReadyWaiting (this ).status ,
551
- /* success*/ std::memory_order_relaxed ,
557
+ /* success*/ std::memory_order_release ,
552
558
/* failure*/ std::memory_order_relaxed);
553
559
}
554
560
@@ -561,6 +567,10 @@ TaskGroupStatus TaskGroupBase::statusLoadRelaxed() const {
561
567
return TaskGroupStatus{status.load (std::memory_order_relaxed)};
562
568
}
563
569
570
+ TaskGroupStatus TaskGroupBase::statusLoadAcquire () const {
571
+ return TaskGroupStatus{status.load (std::memory_order_acquire)};
572
+ }
573
+
564
574
std::string TaskGroupBase::statusString () const {
565
575
return statusLoadRelaxed ().to_string (this );
566
576
}
@@ -580,6 +590,12 @@ TaskGroupStatus TaskGroupBase::statusMarkWaitingAssumeAcquire() {
580
590
return TaskGroupStatus{old | TaskGroupStatus::waiting};
581
591
}
582
592
593
+ TaskGroupStatus TaskGroupBase::statusMarkWaitingAssumeRelease () {
594
+ auto old = status.fetch_or (TaskGroupStatus::waiting,
595
+ std::memory_order_release);
596
+ return TaskGroupStatus{old | TaskGroupStatus::waiting};
597
+ }
598
+
583
599
TaskGroupStatus TaskGroupBase::statusRemoveWaitingRelease () {
584
600
auto old = status.fetch_and (~TaskGroupStatus::waiting,
585
601
std::memory_order_release);
@@ -702,18 +718,6 @@ class DiscardingTaskGroup: public TaskGroupBase {
702
718
return true ;
703
719
}
704
720
705
- // / Returns *assumed* new status, including the just performed +1.
706
- TaskGroupStatus statusMarkWaitingAssumeAcquire () {
707
- auto old = status.fetch_or (TaskGroupStatus::waiting, std::memory_order_acquire);
708
- return TaskGroupStatus{old | TaskGroupStatus::waiting};
709
- }
710
-
711
- TaskGroupStatus statusRemoveWaitingRelease () {
712
- auto old = status.fetch_and (~TaskGroupStatus::waiting,
713
- std::memory_order_release);
714
- return TaskGroupStatus{old};
715
- }
716
-
717
721
// / Returns *assumed* new status.
718
722
TaskGroupStatus statusAddReadyAssumeAcquire (const DiscardingTaskGroup *group) {
719
723
assert (group->isDiscardingResults ());
@@ -1145,7 +1149,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex
1145
1149
hadErrorResult = true ;
1146
1150
}
1147
1151
1148
- SWIFT_TASK_GROUP_DEBUG_LOG (this , " ready: %d, pending: %u " ,
1152
+ SWIFT_TASK_GROUP_DEBUG_LOG (this , " ready: %d, pending: %llu " ,
1149
1153
assumed.readyTasks (this ), assumed.pendingTasks (this ));
1150
1154
1151
1155
// ==== a) has waiting task, so let us complete it right away
@@ -1198,13 +1202,14 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
1198
1202
1199
1203
// / If we're the last task we've been waiting for, and there is a waiting task on the group
1200
1204
bool lastPendingTaskAndWaitingTask =
1201
- assumed.pendingTasks (this ) == 1 && assumed.hasWaitingTask ();
1205
+ assumed.pendingTasks (this ) == 1 &&
1206
+ assumed.hasWaitingTask ();
1202
1207
1203
1208
// Immediately decrement the pending count.
1204
1209
// We can do this, since in this mode there is no ready count to keep track of,
1205
1210
// and we immediately discard the result.
1206
- SWIFT_TASK_GROUP_DEBUG_LOG (this , " discard result, hadError:%d, was pending:%llu" ,
1207
- hadErrorResult, assumed.pendingTasks (this ));
1211
+ SWIFT_TASK_GROUP_DEBUG_LOG (this , " discard result, hadError:%d, was pending:%llu, status = %s " ,
1212
+ hadErrorResult, assumed.pendingTasks (this ), assumed. to_string ( this ). c_str () );
1208
1213
// If this was the last pending task, and there is a waiting task (from waitAll),
1209
1214
// we must resume the task; but not otherwise. There cannot be any waiters on next()
1210
1215
// while we're discarding results.
@@ -1294,6 +1299,8 @@ void TaskGroupBase::resumeWaitingTask(
1294
1299
if (statusCompletePendingReadyWaiting (assumed)) {
1295
1300
// Run the task.
1296
1301
auto result = PollResult::get (completedTask, hadErrorResult);
1302
+ SWIFT_TASK_GROUP_DEBUG_LOG (this , " resume waiting DONE, task = %p, complete with = %p, status = %s" ,
1303
+ waitingTask, completedTask, statusString ().c_str ());
1297
1304
1298
1305
// Remove the child from the task group's running tasks list.
1299
1306
// The parent task isn't currently running (we're about to wake
@@ -1645,11 +1652,9 @@ static void swift_taskGroup_waitAllImpl(
1645
1652
ThrowingTaskFutureWaitContinuationFunction *resumeFunction,
1646
1653
AsyncContext *rawContext) {
1647
1654
auto waitingTask = swift_task_getCurrent ();
1648
- waitingTask->ResumeTask = task_group_wait_resume_adapter;
1649
- waitingTask->ResumeContext = rawContext;
1650
1655
1651
1656
auto group = asBaseImpl (_group);
1652
- PollResult polled = group->waitAll (bodyError, waitingTask);
1657
+ PollResult polled = group->waitAll (bodyError, waitingTask, rawContext );
1653
1658
1654
1659
auto context = static_cast <TaskFutureWaitAsyncContext *>(rawContext);
1655
1660
context->ResumeParent =
@@ -1662,19 +1667,17 @@ static void swift_taskGroup_waitAllImpl(
1662
1667
waitingTask, bodyError, group->statusString ().c_str (), to_string (polled.status ).c_str ());
1663
1668
1664
1669
switch (polled.status ) {
1665
- case PollStatus::MustWait:
1666
- SWIFT_TASK_GROUP_DEBUG_LOG (group, " waitAllImpl MustWait, pending tasks exist, waiting task = %p" ,
1667
- waitingTask);
1670
+ case PollStatus::MustWait: {
1668
1671
// The waiting task has been queued on the channel,
1669
1672
// there were pending tasks so it will be woken up eventually.
1670
1673
#ifdef __ARM_ARCH_7K__
1671
- return workaround_function_swift_taskGroup_waitAllImpl (
1674
+ workaround_function_swift_taskGroup_waitAllImpl (
1672
1675
resultPointer, callerContext, _group, bodyError, resumeFunction, rawContext);
1673
- #else /* __ARM_ARCH_7K__ */
1674
- return ;
1675
1676
#endif /* __ARM_ARCH_7K__ */
1677
+ return ;
1678
+ }
1676
1679
1677
- case PollStatus::Error:
1680
+ case PollStatus::Error: {
1678
1681
SWIFT_TASK_GROUP_DEBUG_LOG (group, " waitAllImpl Error, waiting task = %p, body error = %p, status:%s" ,
1679
1682
waitingTask, bodyError, group->statusString ().c_str ());
1680
1683
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
@@ -1695,9 +1698,10 @@ static void swift_taskGroup_waitAllImpl(
1695
1698
}
1696
1699
1697
1700
return waitingTask->runInFullyEstablishedContext ();
1701
+ }
1698
1702
1699
1703
case PollStatus::Empty:
1700
- case PollStatus::Success:
1704
+ case PollStatus::Success: {
1701
1705
// / Anything else than a "MustWait" can be treated as a successful poll.
1702
1706
// / Only if there are in flight pending tasks do we need to wait after all.
1703
1707
SWIFT_TASK_GROUP_DEBUG_LOG (group, " waitAllImpl %s, waiting task = %p, status:%s" ,
@@ -1712,14 +1716,17 @@ static void swift_taskGroup_waitAllImpl(
1712
1716
}
1713
1717
1714
1718
return waitingTask->runInFullyEstablishedContext ();
1719
+ }
1715
1720
}
1716
1721
}
1717
1722
1718
- // / Must be called while holding the `taskGroup.lock`!
1719
- // / This is because the discarding task group still has some follow-up operations that must
1720
- // / be performed atomically after this operation sometimes, so we cannot unlock inside `waitAll` itself.
1721
- PollResult TaskGroupBase::waitAll (SwiftError* bodyError, AsyncTask *waitingTask) {
1722
- lock (); // TODO: remove group lock, and use status for synchronization
1723
+ PollResult TaskGroupBase::waitAll (SwiftError* bodyError, AsyncTask *waitingTask, AsyncContext *rawContext) {
1724
+ lock ();
1725
+
1726
+ // must mutate the waiting task while holding the group lock,
1727
+ // so we don't get an offer concurrently trying to do so
1728
+ waitingTask->ResumeTask = task_group_wait_resume_adapter;
1729
+ waitingTask->ResumeContext = rawContext;
1723
1730
1724
1731
SWIFT_TASK_GROUP_DEBUG_LOG (this , " waitAll, bodyError = %p, status = %s" , bodyError, statusString ().c_str ());
1725
1732
PollResult result = PollResult::getEmpty (this ->successType );
@@ -1732,7 +1739,11 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
1732
1739
bool haveRunOneChildTaskInline = false ;
1733
1740
1734
1741
reevaluate_if_TaskGroup_has_results:;
1735
- auto assumed = statusMarkWaitingAssumeAcquire ();
1742
+ // Paired with a release when marking Waiting,
1743
+ // otherwise we don't modify the status
1744
+ auto assumed = statusLoadAcquire ();
1745
+
1746
+ SWIFT_TASK_GROUP_DEBUG_LOG (this , " waitAll, status = %s" , assumed.to_string (this ).c_str ());
1736
1747
1737
1748
// ==== 1) may be able to bail out early if no tasks are pending -------------
1738
1749
if (assumed.isEmpty (this )) {
@@ -1750,7 +1761,6 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
1750
1761
result.status = PollStatus::Error;
1751
1762
}
1752
1763
} // else, we're definitely Empty
1753
-
1754
1764
unlock ();
1755
1765
return result;
1756
1766
}
@@ -1759,7 +1769,6 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
1759
1769
// No tasks in flight, we know no tasks were submitted before this poll
1760
1770
// was issued, and if we parked here we'd potentially never be woken up.
1761
1771
// Bail out and return `nil` from `group.next()`.
1762
- statusRemoveWaitingRelease ();
1763
1772
unlock ();
1764
1773
return result;
1765
1774
}
@@ -1787,7 +1796,9 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
1787
1796
waitHead, waitingTask,
1788
1797
/* success*/ std::memory_order_release,
1789
1798
/* failure*/ std::memory_order_acquire)) {
1790
- unlock (); // TODO: remove fragment lock, and use status for synchronization
1799
+ statusMarkWaitingAssumeRelease ();
1800
+ unlock ();
1801
+
1791
1802
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
1792
1803
// The logic here is paired with the logic in TaskGroupBase::offer. Once
1793
1804
// we run the
0 commit comments