@@ -589,6 +589,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
589
589
}
590
590
591
591
public:
592
+ bool operator ==(ActiveActorStatus other) const {
593
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
594
+ return (Flags == other.Flags ) && (DrainLock == other.DrainLock ) && (FirstJob == other.FirstJob );
595
+ #else
596
+ return (Flags == other.Flags ) && (FirstJob == other.FirstJob );
597
+ #endif
598
+ }
592
599
593
600
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
594
601
constexpr ActiveActorStatus ()
@@ -879,10 +886,13 @@ class DefaultActorImpl : public HeapObject {
879
886
bool unlock (bool forceUnlock);
880
887
881
888
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
882
- // / Enqueue a job onto the actor. This typically means that the actor hit
883
- // / contention during the tryLock and so we're taking the slow path
889
+ // / Enqueue a job onto the actor.
884
890
void enqueue (Job *job, JobPriority priority);
885
891
892
+ // / Enqueue a stealer for the given task since it has been escalated to the
893
+ // / new priority
894
+ void enqueueStealer (Job *job, JobPriority priority);
895
+
886
896
// The calling thread must be holding the actor lock while calling this
887
897
Job *drainOne ();
888
898
#endif
@@ -1126,7 +1136,6 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1126
1136
newState = newState.withFirstJob (newHead);
1127
1137
1128
1138
if (oldState.isIdle ()) {
1129
- // Someone gave up the actor lock after we failed fast path.
1130
1139
// Schedule the actor
1131
1140
newState = newState.withScheduled ();
1132
1141
newState = newState.withNewPriority (priority);
@@ -1175,6 +1184,79 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1175
1184
}
1176
1185
}
1177
1186
1187
+ // The input job is already escalated to the new priority and has already been
1188
+ // enqueued into the actor. Push a stealer job for it on the actor.
1189
+ //
1190
+ // The caller of this function is escalating the input task and holding its
1191
+ // TaskStatusRecordLock and escalating this executor via the
1192
+ // TaskDependencyStatusRecord.
1193
+ void DefaultActorImpl::enqueueStealer (Job *job, JobPriority priority) {
1194
+
1195
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating an actor %p due to job that is enqueued being escalated" , this );
1196
+
1197
+ auto oldState = _status ().load (std::memory_order_relaxed);
1198
+ while (true ) {
1199
+ // Until we figure out how to safely enqueue a stealer and rendevouz with
1200
+ // the original job so that we don't double-invoke the job, we shall simply
1201
+ // escalate the actor's max priority to match the new one.
1202
+ //
1203
+ // Ideally, we'd also re-sort the job queue so that the escalated job gets
1204
+ // to the front of the queue but since the actor's max QoS is a saturating
1205
+ // function, this still handles the priority inversion correctly but with
1206
+ // priority overhang instead.
1207
+
1208
+ if (oldState.isIdle ()) {
1209
+ // We are observing a race. Possible scenarios:
1210
+ //
1211
+ // 1. Escalator is racing with the drain of the actor/task. The task has
1212
+ // just been popped off the actor and is about to run. The thread running
1213
+ // the task will readjust its own priority once it runs since it should
1214
+ // see the escalation in the ActiveTaskStatus and we don't need to
1215
+ // escalate the actor as it will be spurious.
1216
+ //
1217
+ // 2. Escalator is racing with the enqueue of the task. The task marks
1218
+ // the place it will enqueue in the dependency record before it enqueues
1219
+ // itself. Escalator raced in between these two operations and escalated the
1220
+ // task. Pushing a stealer job for the task onto the actor should fix it.
1221
+ return ;
1222
+ }
1223
+ auto newState = oldState;
1224
+
1225
+ if (priority > oldState.getMaxPriority ()) {
1226
+ newState = newState.withEscalatedPriority (priority);
1227
+ }
1228
+
1229
+ if (oldState == newState)
1230
+ return ;
1231
+
1232
+ if (_status ().compare_exchange_weak (oldState, newState,
1233
+ /* success */ std::memory_order_relaxed,
1234
+ /* failure */ std::memory_order_relaxed)) {
1235
+ traceActorStateTransition (this , oldState, newState);
1236
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1237
+ if (newState.isRunning ()) {
1238
+ // Actor is running on a thread, escalate the thread running it
1239
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating actor %p which is running on %#x to %#x priority" , this , newState.currentDrainer (), priority);
1240
+ dispatch_lock_t *lockAddr = this ->drainLockAddr ();
1241
+ swift_dispatch_lock_override_start_with_debounce (lockAddr, newState.currentDrainer (),
1242
+ (qos_class_t ) priority);
1243
+
1244
+ } else if (newState.isEnqueued ()) {
1245
+ // We are scheduling a stealer for an actor due to priority override.
1246
+ // This extra processing job has a reference on the actor. See
1247
+ // ownership rule (2).
1248
+ SWIFT_TASK_DEBUG_LOG (
1249
+ " [Override] Scheduling a stealer for actor %p at %#x priority" ,
1250
+ this , newState.getMaxPriority ());
1251
+ swift_retain (this );
1252
+ scheduleActorProcessJob (newState.getMaxPriority ());
1253
+ }
1254
+ #endif
1255
+ }
1256
+ }
1257
+
1258
+ }
1259
+
1178
1260
// Called with actor lock held on current thread
1179
1261
Job * DefaultActorImpl::drainOne () {
1180
1262
SWIFT_TASK_DEBUG_LOG (" Draining one job from default actor %p" , this );
@@ -1820,9 +1902,28 @@ static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
1820
1902
_swift_task_enqueueOnExecutor (job, executorObject, executorType, wtable);
1821
1903
}
1822
1904
1905
+ static void
1906
+ swift_actor_escalate (DefaultActorImpl *actor, AsyncTask *task, JobPriority newPriority)
1907
+ {
1908
+ return actor->enqueueStealer (task, newPriority);
1909
+ }
1910
+
1823
1911
SWIFT_CC (swift)
1824
1912
void swift::swift_executor_escalate(ExecutorRef executor, AsyncTask *task,
1825
1913
JobPriority newPriority) {
1914
+ if (executor.isGeneric ()) {
1915
+ // TODO (rokhinip): We'd push a stealer job for the task on the executor.
1916
+ return ;
1917
+ }
1918
+
1919
+ if (executor.isDefaultActor ()) {
1920
+ return swift_actor_escalate (asImpl (executor.getDefaultActor ()), task, newPriority);
1921
+ }
1922
+
1923
+ // TODO (rokhinip): This is either the main actor or an actor with a custom
1924
+ // executor. We need to let the executor know that the job has been escalated.
1925
+ // For now, do nothing
1926
+ return ;
1826
1927
}
1827
1928
1828
1929
#define OVERRIDE_ACTOR COMPATIBILITY_OVERRIDE
0 commit comments