Skip to content

When waiting on a task, escalate it before enqueuing the waiting task #59377

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,

auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
bool contextInitialized = false;
auto escalatedPriority = JobPriority::Unspecified;
while (true) {
switch (queueHead.getStatus()) {
case Status::Error:
Expand Down Expand Up @@ -145,6 +146,47 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
waitingTask->flagAsSuspended();
}

// Escalate the blocking task to the priority of the waiting task.
// FIXME: Also record that the waiting task is now waiting on the
// blocking task so that escalators of the waiting task can propagate
// the escalation to the blocking task.
//
// Recording this dependency is tricky because we need escalators
// to be able to escalate without worrying about the blocking task
// concurrently finishing, resuming the escalated task, and being
// invalidated. So we're not doing that yet. In the meantime, we
// do the best-effort alternative of escalating the blocking task
// as a one-time deal to the current priority of the waiting task.
// If the waiting task is escalated after this point, the priority
// will not be escalated, but that's inevitable in the absence of
// propagation during escalation.
//
// We have to do the escalation before we successfully enqueue the
// waiting task on the blocking task's wait queue, because as soon as
// we do, this thread is no longer blocking the resumption of the
// waiting task, and so both the blocking task (which is retained
// during the wait only from the waiting task's perspective) and the
// waiting task (which can simply terminate) must be treat as
// invalidated from this thread's perspective.
//
// When we do fix this bug to record the dependency, we will have to
// do it before this escalation of the blocking task so that there
// isn't a race where an escalation of the waiting task can fail
// to propagate to the blocking task. The correct priority to
// escalate to is the priority we observe when we successfully record
// the dependency; any later escalations will automatically propagate.
//
// If the blocking task finishes while we're doing this escalation,
// the escalation will be innocuous. The wasted effort is acceptable;
// programmers should be encouraged to give tasks that will block
// other tasks the correct priority to begin with.
auto waitingStatus =
waitingTask->_private()._status().load(std::memory_order_relaxed);
if (waitingStatus.getStoredPriority() > escalatedPriority) {
swift_task_escalate(this, waitingStatus.getStoredPriority());
escalatedPriority = waitingStatus.getStoredPriority();
}

// Put the waiting task at the beginning of the wait queue.
waitingTask->getNextWaitingTask() = queueHead.getTask();
auto newQueueHead = WaitQueueItem::get(Status::Executing, waitingTask);
Expand All @@ -153,11 +195,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_acquire)) {

// Escalate the priority of this task based on the priority
// of the waiting task.
auto status = waitingTask->_private()._status().load(std::memory_order_relaxed);
swift_task_escalate(this, status.getStoredPriority());

_swift_task_clearCurrent();
return FutureFragment::Status::Executing;
}
Expand Down