Skip to content

Commit 2380060

Browse files
committed
When waiting on a task, escalate it before enqueuing the waiting task.
As soon as the waiting task is successfully enqueued on the blocking task, both tasks have to be considered invalidated because the blocking task can concurrently complete and resume its waiters: - The waiting task ensures that the blocking task is valid while it's waiting. However, that's measured from the perspective of the waiting task, not from the perspective of the thread that was previously executing it. As soon as the waiting task is resumed, the wait call completes and the validity guarantee on the blocking task disappears, so the blocking task must be treated as invalidated. - The waiting task ensures that it is valid as long as it isn't complete. Since it's trying to wait, it must not be complete. However, as soon we resume it, it can complete, so the waiting task must also be treated as invalidated. This is one of those things that's not really easy to test, and the need for a fix is pretty urgent, so I'm submitting this patch without a test. I'll try to land a race test that demonstrates the bug in the next few days. @kavon deserves all the credit here for some truly heroic debugging and finally recognizing the flaw in the code; I'm just popping in at the last minute to sheepishly patch the bug. Fixes rdar://92666987
1 parent 7611f3f commit 2380060

File tree

1 file changed

+42
-5
lines changed

1 file changed

+42
-5
lines changed

stdlib/public/Concurrency/Task.cpp

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
107107

108108
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
109109
bool contextIntialized = false;
110+
auto escalatedPriority = JobPriority::Unspecified;
110111
while (true) {
111112
switch (queueHead.getStatus()) {
112113
case Status::Error:
@@ -139,6 +140,47 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
139140
waitingTask->flagAsSuspended();
140141
}
141142

143+
// Escalate the blocking task to the priority of the waiting task.
144+
// FIXME: Also record that the waiting task is now waiting on the
145+
// blocking task so that escalators of the waiting task can propagate
146+
// the escalation to the blocking task.
147+
//
148+
// Recording this dependency is tricky because we need escalators
149+
// to be able to escalate without worrying about the blocking task
150+
// concurrently finishing, resuming the escalated task, and being
151+
// invalidated. So we're not doing that yet. In the meantime, we
152+
// do the best-effort alternative of escalating the blocking task
153+
// as a one-time deal to the current priority of the waiting task.
154+
// If the waiting task is escalated after this point, the priority
155+
// will not be escalated, but that's inevitable in the absence of
156+
// propagation during escalation.
157+
//
158+
// We have to do the escalation before we successfully enqueue the
159+
// waiting task on the blocking task's wait queue, because as soon as
160+
// we do, this thread is no longer blocking the resumption of the
161+
// waiting task, and so both the blocking task (which is retained
162+
// during the wait only from the waiting task's perspective) and the
163+
// waiting task (which can simply terminate) must be treat as
164+
// invalidated from this thread's perspective.
165+
//
166+
// When we do fix this bug to record the dependency, we will have to
167+
// do it before this escalation of the blocking task so that there
168+
// isn't a race where an escalation of the waiting task can fail
169+
// to propagate to the blocking task. The correct priority to
170+
// escalate to is the priority we observe when we successfully record
171+
// the dependency; any later escalations will automatically propagate.
172+
//
173+
// If the blocking task finishes while we're doing this escalation,
174+
// the escalation will be innocuous. The wasted effort is acceptable;
175+
// programmers should be encouraged to give tasks that will block
176+
// other tasks the correct priority to begin with.
177+
auto waitingStatus =
178+
waitingTask->_private()._status().load(std::memory_order_relaxed);
179+
if (waitingStatus.getStoredPriority() > escalatedPriority) {
180+
swift_task_escalate(this, waitingStatus.getStoredPriority());
181+
escalatedPriority = waitingStatus.getStoredPriority();
182+
}
183+
142184
// Put the waiting task at the beginning of the wait queue.
143185
waitingTask->getNextWaitingTask() = queueHead.getTask();
144186
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
@@ -147,11 +189,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
147189
/*success*/ std::memory_order_release,
148190
/*failure*/ std::memory_order_acquire)) {
149191

150-
// Escalate the priority of this task based on the priority
151-
// of the waiting task.
152-
auto status = waitingTask->_private()._status().load(std::memory_order_relaxed);
153-
swift_task_escalate(this, status.getStoredPriority());
154-
155192
_swift_task_clearCurrent();
156193
return FutureFragment::Status::Executing;
157194
}

0 commit comments

Comments
 (0)