@@ -639,10 +639,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
639
639
640
640
// Bit 3
641
641
DistributedRemote = 0x8 ,
642
+ // Bit 4
643
+ isPriorityEscalated = 0x10 ,
642
644
643
645
// Bits 8 - 15. We only need 8 bits of the whole size_t to represent Job
644
646
// Priority
645
647
PriorityMask = 0xFF00 ,
648
+ PriorityAndOverrideMask = PriorityMask | isPriorityEscalated,
646
649
PriorityShift = 0x8 ,
647
650
};
648
651
@@ -758,11 +761,37 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
758
761
JobPriority getMaxPriority () const {
759
762
return (JobPriority) ((Flags & PriorityMask) >> PriorityShift);
760
763
}
761
- ActiveActorStatus withMaxPriority (JobPriority priority) const {
764
+ ActiveActorStatus withNewPriority (JobPriority priority) const {
765
+ uint32_t flags = Flags & ~PriorityAndOverrideMask;
766
+ flags |= (uint32_t (priority) << PriorityShift);
762
767
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
763
- return ActiveActorStatus ((Flags & ~PriorityMask) | ( uint32_t (priority) << PriorityShift) , DrainLock, FirstJob);
768
+ return ActiveActorStatus (flags , DrainLock, FirstJob);
764
769
#else
765
- return ActiveActorStatus ((Flags & ~PriorityMask) | (uint32_t (priority) << PriorityShift), FirstJob);
770
+ return ActiveActorStatus (flags, FirstJob);
771
+ #endif
772
+ }
773
+ ActiveActorStatus resetPriority () const {
774
+ return withNewPriority (JobPriority::Unspecified);
775
+ }
776
+
777
+ bool isMaxPriorityEscalated () const { return Flags & isPriorityEscalated; }
778
+ ActiveActorStatus withEscalatedPriority (JobPriority priority) const {
779
+ JobPriority currentPriority = JobPriority ((Flags & PriorityMask) >> PriorityShift);
780
+ assert (priority > currentPriority);
781
+
782
+ uint32_t flags = (Flags & ~PriorityMask) | (uint32_t (priority) << PriorityShift);
783
+ flags |= isPriorityEscalated;
784
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
785
+ return ActiveActorStatus (flags, DrainLock, FirstJob);
786
+ #else
787
+ return ActiveActorStatus (flags, FirstJob);
788
+ #endif
789
+ }
790
+ ActiveActorStatus withoutEscalatedPriority () const {
791
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
792
+ return ActiveActorStatus (Flags & ~isPriorityEscalated, DrainLock, FirstJob);
793
+ #else
794
+ return ActiveActorStatus (Flags & ~isPriorityEscalated, FirstJob);
766
795
#endif
767
796
}
768
797
@@ -780,13 +809,20 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
780
809
uint32_t getOpaqueFlags () const {
781
810
return Flags;
782
811
}
812
+
783
813
uint32_t currentDrainer () const {
784
814
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
785
815
return dispatch_lock_owner (DrainLock);
786
816
#else
787
817
return 0 ;
788
818
#endif
789
819
}
820
+
821
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
822
+ static size_t drainLockOffset () {
823
+ return offsetof (ActiveActorStatus, DrainLock);
824
+ }
825
+ #endif
790
826
};
791
827
792
828
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
@@ -917,6 +953,10 @@ class DefaultActorImpl : public HeapObject {
917
953
}
918
954
919
955
private:
956
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
957
+ dispatch_lock_t *drainLockAddr ();
958
+ #endif
959
+
920
960
void deallocateUnconditional ();
921
961
922
962
// / Schedule an inline processing job. This can generally only be
@@ -1085,6 +1125,13 @@ void DefaultActorImpl::deallocate() {
1085
1125
deallocateUnconditional ();
1086
1126
}
1087
1127
1128
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1129
+ dispatch_lock_t *DefaultActorImpl::drainLockAddr () {
1130
+ ActiveActorStatus *actorStatus = (ActiveActorStatus *) &this ->StatusStorage ;
1131
+ return (dispatch_lock_t *) (((char *) actorStatus) + ActiveActorStatus::drainLockOffset ());
1132
+ }
1133
+ #endif
1134
+
1088
1135
void DefaultActorImpl::deallocateUnconditional () {
1089
1136
concurrency::trace::actor_deallocate (this );
1090
1137
@@ -1113,23 +1160,48 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
1113
1160
1114
1161
1115
1162
bool DefaultActorImpl::tryLock (bool asDrainer) {
1116
- SWIFT_TASK_DEBUG_LOG (" Attempting to jump onto %p, as drainer = %d" , this , asDrainer);
1163
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1164
+ SWIFT_TASK_DEBUG_LOG (" Thread %#x attempting to jump onto %p, as drainer = %d" , dispatch_lock_value_for_self (), this , asDrainer);
1165
+ dispatch_thread_override_info_s threadOverrideInfo;
1166
+ threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor ();
1167
+ qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor ;
1168
+
1169
+ retry:;
1170
+ #else
1171
+ SWIFT_TASK_DEBUG_LOG (" Thread attempting to jump onto %p, as drainer = %d" , this , asDrainer);
1172
+ #endif
1117
1173
1118
1174
auto oldState = _status ().load (std::memory_order_relaxed);
1119
1175
while (true ) {
1120
1176
1121
- while (true ) {
1122
1177
if (asDrainer) {
1123
1178
// TODO (rokhinip): Once we have OOL process job support, this assert can
1124
1179
// potentially fail due to a race with an actor stealer that might have
1125
1180
// won the race and started running the actor
1126
1181
assert (oldState.isScheduled ());
1182
+
1183
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1184
+ // We only want to self override a thread if we are taking the actor lock
1185
+ // as a drainer because there might have been higher priority work
1186
+ // enqueued that might have escalated the max priority of the actor to be
1187
+ // higher than the original thread request.
1188
+ qos_class_t maxActorPriority = (qos_class_t ) oldState.getMaxPriority ();
1189
+
1190
+ if (threadOverrideInfo.can_override && (maxActorPriority > overrideFloor)) {
1191
+ SWIFT_TASK_DEBUG_LOG (" [Override] Self-override thread with oq_floor %#x to match max actor %p's priority %#x" , overrideFloor, this , maxActorPriority);
1192
+
1193
+ (void ) swift_dispatch_thread_override_self (maxActorPriority);
1194
+ overrideFloor = maxActorPriority;
1195
+ goto retry;
1196
+ }
1197
+ #endif
1127
1198
} else {
1128
1199
// We're trying to take the lock in an uncontended manner
1129
1200
if (oldState.isRunning () || oldState.isScheduled ()) {
1130
1201
SWIFT_TASK_DEBUG_LOG (" Failed to jump to %p in fast path" , this );
1131
1202
return false ;
1132
1203
}
1204
+ assert (oldState.getMaxPriority () == JobPriority::Unspecified);
1133
1205
}
1134
1206
1135
1207
auto newState = oldState.withRunning ();
@@ -1162,11 +1234,10 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1162
1234
// Someone gave up the actor lock after we failed fast path.
1163
1235
// Schedule the actor
1164
1236
newState = newState.withScheduled ();
1165
- newState = newState.withMaxPriority (priority);
1166
-
1237
+ newState = newState.withNewPriority (priority);
1167
1238
} else {
1168
1239
if (priority > oldState.getMaxPriority ()) {
1169
- newState = newState.withMaxPriority (priority);
1240
+ newState = newState.withEscalatedPriority (priority);
1170
1241
}
1171
1242
}
1172
1243
@@ -1181,14 +1252,24 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1181
1252
return scheduleActorProcessJob (newState.getMaxPriority (), true );
1182
1253
}
1183
1254
1255
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1184
1256
if (oldState.getMaxPriority () != newState.getMaxPriority ()) {
1185
1257
if (newState.isRunning ()) {
1186
- // TODO (rokhinip): Override the thread running the actor
1187
- return ;
1258
+ // Actor is running on a thread, escalate the thread running it
1259
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating actor %p which is running on %#x to %#x priority" , this , newState.currentDrainer (), priority);
1260
+ dispatch_lock_t *lockAddr = this ->drainLockAddr ();
1261
+ swift_dispatch_lock_override_start_with_debounce (lockAddr, newState.currentDrainer (),
1262
+ (qos_class_t ) priority);
1188
1263
} else {
1189
- // TODO (rokhinip): Schedule the stealer
1264
+ // TODO (rokhinip): Actor is scheduled - we need to schedule a
1265
+ // stealer at the higher priority
1266
+ //
1267
+ // TODO (rokhinip): Add a signpost to flag that this is a potential
1268
+ // priority inversion
1269
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating actor %p which is enqueued" , this );
1190
1270
}
1191
1271
}
1272
+ #endif
1192
1273
return ;
1193
1274
}
1194
1275
}
@@ -1212,30 +1293,38 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
1212
1293
_swift_tsan_release (this );
1213
1294
while (true ) {
1214
1295
assert (oldState.isAnyRunning ());
1215
- // TODO (rokhinip): Further assert that the current thread is the one
1216
- // running the actor
1296
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1297
+ assert (dispatch_lock_is_locked_by_self (*(this ->drainLockAddr ())));
1298
+ #endif
1217
1299
1218
1300
if (oldState.isZombie_ReadyForDeallocation ()) {
1219
- // TODO (rokhinip): This is where we need to reset any override the thread
1220
- // might have as a result of this actor
1301
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1302
+ // Reset any override on this thread as a result of this thread running
1303
+ // the actor
1304
+ if (oldState.isMaxPriorityEscalated ()) {
1305
+ swift_dispatch_lock_override_end ((qos_class_t )oldState.getMaxPriority ());
1306
+ }
1307
+ #endif
1221
1308
deallocateUnconditional ();
1222
1309
SWIFT_TASK_DEBUG_LOG (" Unlock-ing actor %p succeeded with full deallocation" , this );
1223
1310
return true ;
1224
1311
}
1225
1312
1226
1313
auto newState = oldState;
1227
1314
if (oldState.getFirstJob () != NULL ) {
1228
- // There is work left to do
1315
+ // There is work left to do, don't unlock the actor
1229
1316
if (!forceUnlock) {
1230
1317
SWIFT_TASK_DEBUG_LOG (" Unlock-ing actor %p failed" , this );
1231
1318
return false ;
1232
1319
}
1233
-
1320
+ // We need to schedule the actor - remove any escalation bits since we'll
1321
+ // schedule the actor at the max priority currently on it
1234
1322
newState = newState.withScheduled ();
1323
+ newState = newState.withoutEscalatedPriority ();
1235
1324
} else {
1236
1325
// There is no work left to do - actor goes idle
1237
1326
newState = newState.withIdle ();
1238
- newState = newState.withMaxPriority (JobPriority::Unspecified );
1327
+ newState = newState.resetPriority ( );
1239
1328
}
1240
1329
1241
1330
if (_status ().compare_exchange_weak (oldState, newState,
@@ -1252,9 +1341,13 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
1252
1341
SWIFT_TASK_DEBUG_LOG (" Actor %p is idle now" , this );
1253
1342
}
1254
1343
1255
- // TODO (rokhinip): Reset any overrides the thread might have had as a
1256
- // result of the actor
1257
-
1344
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1345
+ // Reset any override on this thread as a result of this thread running
1346
+ // the actor. Only do this after we have reenqueued the actor
1347
+ if (oldState.isMaxPriorityEscalated ()) {
1348
+ swift_dispatch_lock_override_end ((qos_class_t ) oldState.getMaxPriority ());
1349
+ }
1350
+ #endif
1258
1351
return true ;
1259
1352
}
1260
1353
}
0 commit comments