@@ -107,6 +107,7 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
107
107
108
108
auto queueHead = fragment->waitQueue .load (std::memory_order_acquire);
109
109
bool contextIntialized = false ;
110
+ auto escalatedPriority = JobPriority::Unspecified;
110
111
while (true ) {
111
112
switch (queueHead.getStatus ()) {
112
113
case Status::Error:
@@ -139,6 +140,47 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
139
140
waitingTask->flagAsSuspended ();
140
141
}
141
142
143
+ // Escalate the blocking task to the priority of the waiting task.
144
+ // FIXME: Also record that the waiting task is now waiting on the
145
+ // blocking task so that escalators of the waiting task can propagate
146
+ // the escalation to the blocking task.
147
+ //
148
+ // Recording this dependency is tricky because we need escalators
149
+ // to be able to escalate without worrying about the blocking task
150
+ // concurrently finishing, resuming the escalated task, and being
151
+ // invalidated. So we're not doing that yet. In the meantime, we
152
+ // do the best-effort alternative of escalating the blocking task
153
+ // as a one-time deal to the current priority of the waiting task.
154
+ // If the waiting task is escalated after this point, the priority
155
+ // will not be escalated, but that's inevitable in the absence of
156
+ // propagation during escalation.
157
+ //
158
+ // We have to do the escalation before we successfully enqueue the
159
+ // waiting task on the blocking task's wait queue, because as soon as
160
+ // we do, this thread is no longer blocking the resumption of the
161
+ // waiting task, and so both the blocking task (which is retained
162
+ // during the wait only from the waiting task's perspective) and the
163
+ // waiting task (which can simply terminate) must be treat as
164
+ // invalidated from this thread's perspective.
165
+ //
166
+ // When we do fix this bug to record the dependency, we will have to
167
+ // do it before this escalation of the blocking task so that there
168
+ // isn't a race where an escalation of the waiting task can fail
169
+ // to propagate to the blocking task. The correct priority to
170
+ // escalate to is the priority we observe when we successfully record
171
+ // the dependency; any later escalations will automatically propagate.
172
+ //
173
+ // If the blocking task finishes while we're doing this escalation,
174
+ // the escalation will be innocuous. The wasted effort is acceptable;
175
+ // programmers should be encouraged to give tasks that will block
176
+ // other tasks the correct priority to begin with.
177
+ auto waitingStatus =
178
+ waitingTask->_private ()._status ().load (std::memory_order_relaxed);
179
+ if (waitingStatus.getStoredPriority () > escalatedPriority) {
180
+ swift_task_escalate (this , waitingStatus.getStoredPriority ());
181
+ escalatedPriority = waitingStatus.getStoredPriority ();
182
+ }
183
+
142
184
// Put the waiting task at the beginning of the wait queue.
143
185
waitingTask->getNextWaitingTask () = queueHead.getTask ();
144
186
auto newQueueHead = WaitQueueItem::get (Status::Executing, waitingTask);
@@ -147,11 +189,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
147
189
/* success*/ std::memory_order_release,
148
190
/* failure*/ std::memory_order_acquire)) {
149
191
150
- // Escalate the priority of this task based on the priority
151
- // of the waiting task.
152
- auto status = waitingTask->_private ()._status ().load (std::memory_order_relaxed);
153
- swift_task_escalate (this , status.getStoredPriority ());
154
-
155
192
_swift_task_clearCurrent ();
156
193
return FutureFragment::Status::Executing;
157
194
}
0 commit comments