Skip to content

Commit c49bd0a

Browse files
committed
back to locking around
1 parent f2e0c54 commit c49bd0a

File tree

1 file changed

+19
-15
lines changed

1 file changed

+19
-15
lines changed

stdlib/public/Concurrency/TaskGroup.cpp

Lines changed: 19 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,6 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
273273
}
274274
};
275275

276-
protected:
277276
#if SWIFT_STDLIB_SINGLE_THREADED_CONCURRENCY || SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
278277
// Synchronization is simple here. In a single threaded mode, all swift tasks
279278
// run on a single thread so no coordination is needed. In a task-to-thread
@@ -294,6 +293,7 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
294293
void unlock() const { mutex_.unlock(); }
295294
#endif
296295

296+
protected:
297297
/// Used for queue management, counting number of waiting and ready tasks
298298
std::atomic<uint64_t> status;
299299

@@ -351,6 +351,8 @@ class TaskGroupBase : public TaskGroupTaskStatusRecord {
351351

352352
/// Attempt to park the `waitingTask` in the waiting queue.
353353
///
354+
/// Must be called while holding `group.lock`.
355+
///
354356
/// If unable to complete the waiting task immediately (with an readily
355357
/// available completed task), either returns an `PollStatus::Empty`
356358
/// result if it is known that there are no pending tasks in the group,
@@ -526,7 +528,7 @@ struct TaskGroupStatus {
526528
/// TaskGroupStatus{ C:{cancelled} W:{waiting task} R:{ready tasks} P:{pending tasks} {binary repr} }
527529
/// If discarding results:
528530
/// TaskGroupStatus{ C:{cancelled} W:{waiting task} P:{pending tasks} {binary repr} }
529-
std::string to_string(const TaskGroupBase* _Nonnull group) {
531+
std::string to_string(const TaskGroupBase* group) {
530532
std::string str;
531533
str.append("TaskGroupStatus{ ");
532534
str.append("C:"); // cancelled
@@ -553,7 +555,7 @@ struct TaskGroupStatus {
553555
bool TaskGroupBase::statusCompletePendingReadyWaiting(TaskGroupStatus &old) {
554556
return status.compare_exchange_strong(
555557
old.status, old.completingPendingReadyWaiting(this).status,
556-
/*success*/ std::memory_order_relaxed,
558+
/*success*/ std::memory_order_release,
557559
/*failure*/ std::memory_order_relaxed);
558560
}
559561

@@ -1148,7 +1150,7 @@ void AccumulatingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *contex
11481150
hadErrorResult = true;
11491151
}
11501152

1151-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "ready: %d, pending: %u",
1153+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "ready: %d, pending: %llu",
11521154
assumed.readyTasks(this), assumed.pendingTasks(this));
11531155

11541156
// ==== a) has waiting task, so let us complete it right away
@@ -1201,7 +1203,8 @@ void DiscardingTaskGroup::offer(AsyncTask *completedTask, AsyncContext *context)
12011203

12021204
/// If we're the last task we've been waiting for, and there is a waiting task on the group
12031205
bool lastPendingTaskAndWaitingTask =
1204-
assumed.pendingTasks(this) == 1 && assumed.hasWaitingTask();
1206+
assumed.pendingTasks(this) == 1 &&
1207+
assumed.hasWaitingTask();
12051208

12061209
// Immediately decrement the pending count.
12071210
// We can do this, since in this mode there is no ready count to keep track of,
@@ -1297,6 +1300,8 @@ void TaskGroupBase::resumeWaitingTask(
12971300
if (statusCompletePendingReadyWaiting(assumed)) {
12981301
// Run the task.
12991302
auto result = PollResult::get(completedTask, hadErrorResult);
1303+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "resume waiting DONE, task = %p, complete with = %p, status = %s",
1304+
waitingTask, completedTask, statusString().c_str());
13001305

13011306
// Remove the child from the task group's running tasks list.
13021307
// The parent task isn't currently running (we're about to wake
@@ -1652,6 +1657,7 @@ static void swift_taskGroup_waitAllImpl(
16521657
waitingTask->ResumeContext = rawContext;
16531658

16541659
auto group = asBaseImpl(_group);
1660+
group->lock();
16551661
PollResult polled = group->waitAll(bodyError, waitingTask);
16561662

16571663
auto context = static_cast<TaskFutureWaitAsyncContext *>(rawContext);
@@ -1666,8 +1672,8 @@ static void swift_taskGroup_waitAllImpl(
16661672

16671673
switch (polled.status) {
16681674
case PollStatus::MustWait: {
1669-
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl MustWait, pending tasks exist, waiting task = %p",
1670-
waitingTask);
1675+
group->unlock();
1676+
16711677
// The waiting task has been queued on the channel,
16721678
// there were pending tasks so it will be woken up eventually.
16731679
#ifdef __ARM_ARCH_7K__
@@ -1679,6 +1685,8 @@ static void swift_taskGroup_waitAllImpl(
16791685
}
16801686

16811687
case PollStatus::Error: {
1688+
group->unlock();
1689+
16821690
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl Error, waiting task = %p, body error = %p, status:%s",
16831691
waitingTask, bodyError, group->statusString().c_str());
16841692
#if SWIFT_TASK_GROUP_BODY_THROWN_ERROR_WINS
@@ -1703,6 +1711,8 @@ static void swift_taskGroup_waitAllImpl(
17031711

17041712
case PollStatus::Empty:
17051713
case PollStatus::Success: {
1714+
group->unlock();
1715+
17061716
/// Anything else than a "MustWait" can be treated as a successful poll.
17071717
/// Only if there are in flight pending tasks do we need to wait after all.
17081718
SWIFT_TASK_GROUP_DEBUG_LOG(group, "waitAllImpl %s, waiting task = %p, status:%s",
@@ -1721,9 +1731,8 @@ static void swift_taskGroup_waitAllImpl(
17211731
}
17221732
}
17231733

1734+
/// Must be called while holding `group.lock`.
17241735
PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask) {
1725-
lock(); // TODO: remove group lock, and use status for synchronization
1726-
17271736
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, bodyError = %p, status = %s", bodyError, statusString().c_str());
17281737
PollResult result = PollResult::getEmpty(this->successType);
17291738
result.status = PollStatus::Empty;
@@ -1739,8 +1748,7 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
17391748
// otherwise we don't modify the status
17401749
auto assumed = statusLoadAcquire();
17411750

1742-
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, LOAD STATUS, status = %s",
1743-
assumed.to_string(this).c_str());
1751+
SWIFT_TASK_GROUP_DEBUG_LOG(this, "waitAll, status = %s", assumed.to_string(this).c_str());
17441752

17451753
// ==== 1) may be able to bail out early if no tasks are pending -------------
17461754
if (assumed.isEmpty(this)) {
@@ -1758,16 +1766,13 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
17581766
result.status = PollStatus::Error;
17591767
}
17601768
} // else, we're definitely Empty
1761-
1762-
unlock();
17631769
return result;
17641770
}
17651771

17661772
SWIFT_TASK_GROUP_DEBUG_LOG(this, "group is empty, no pending tasks, status = %s", assumed.to_string(this).c_str());
17671773
// No tasks in flight, we know no tasks were submitted before this poll
17681774
// was issued, and if we parked here we'd potentially never be woken up.
17691775
// Bail out and return `nil` from `group.next()`.
1770-
unlock();
17711776
return result;
17721777
}
17731778

@@ -1795,7 +1800,6 @@ PollResult TaskGroupBase::waitAll(SwiftError* bodyError, AsyncTask *waitingTask)
17951800
/*success*/ std::memory_order_release,
17961801
/*failure*/ std::memory_order_acquire)) {
17971802
statusMarkWaitingAssumeRelease();
1798-
unlock(); // TODO: remove fragment lock, and use status for synchronization
17991803
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
18001804
// The logic here is paired with the logic in TaskGroupBase::offer. Once
18011805
// we run the

0 commit comments

Comments
 (0)