@@ -617,10 +617,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
617
617
618
618
// Bit 3
619
619
DistributedRemote = 0x8 ,
620
+ // Bit 4
621
+ isPriorityEscalated = 0x10 ,
620
622
621
623
// Bits 8 - 15. We only need 8 bits of the whole size_t to represent Job
622
624
// Priority
623
625
PriorityMask = 0xFF00 ,
626
+ PriorityAndOverrideMask = PriorityMask | isPriorityEscalated,
624
627
PriorityShift = 0x8 ,
625
628
};
626
629
@@ -725,11 +728,37 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
725
728
JobPriority getMaxPriority () const {
726
729
return (JobPriority) ((Flags & PriorityMask) >> PriorityShift);
727
730
}
728
- ActiveActorStatus withMaxPriority (JobPriority priority) const {
731
+ ActiveActorStatus withNewPriority (JobPriority priority) const {
732
+ uint32_t flags = Flags & ~PriorityAndOverrideMask;
733
+ flags |= (uint32_t (priority) << PriorityShift);
729
734
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
730
- return ActiveActorStatus ((Flags & ~PriorityMask) | ( uint32_t (priority) << PriorityShift) , DrainLock, FirstJob);
735
+ return ActiveActorStatus (flags , DrainLock, FirstJob);
731
736
#else
732
- return ActiveActorStatus ((Flags & ~PriorityMask) | (uint32_t (priority) << PriorityShift), FirstJob);
737
+ return ActiveActorStatus (flags, FirstJob);
738
+ #endif
739
+ }
740
+ ActiveActorStatus resetPriority () const {
741
+ return withNewPriority (JobPriority::Unspecified);
742
+ }
743
+
744
+ bool isMaxPriorityEscalated () const { return Flags & isPriorityEscalated; }
745
+ ActiveActorStatus withEscalatedPriority (JobPriority priority) const {
746
+ JobPriority currentPriority = JobPriority ((Flags & PriorityMask) >> PriorityShift);
747
+ assert (priority > currentPriority);
748
+
749
+ uint32_t flags = (Flags & ~PriorityMask) | (uint32_t (priority) << PriorityShift);
750
+ flags |= isPriorityEscalated;
751
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
752
+ return ActiveActorStatus (flags, DrainLock, FirstJob);
753
+ #else
754
+ return ActiveActorStatus (flags, FirstJob);
755
+ #endif
756
+ }
757
+ ActiveActorStatus withoutEscalatedPriority () const {
758
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
759
+ return ActiveActorStatus (Flags & ~isPriorityEscalated, DrainLock, FirstJob);
760
+ #else
761
+ return ActiveActorStatus (Flags & ~isPriorityEscalated, FirstJob);
733
762
#endif
734
763
}
735
764
@@ -747,13 +776,20 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
747
776
uint32_t getOpaqueFlags () const {
748
777
return Flags;
749
778
}
779
+
750
780
uint32_t currentDrainer () const {
751
781
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
752
782
return dispatch_lock_owner (DrainLock);
753
783
#else
754
784
return 0 ;
755
785
#endif
756
786
}
787
+
788
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
789
+ static size_t drainLockOffset () {
790
+ return offsetof (ActiveActorStatus, DrainLock);
791
+ }
792
+ #endif
757
793
};
758
794
759
795
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
@@ -865,6 +901,10 @@ class DefaultActorImpl : public HeapObject {
865
901
bool isDistributedRemote ();
866
902
867
903
private:
904
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
905
+ dispatch_lock_t *drainLockAddr ();
906
+ #endif
907
+
868
908
void deallocateUnconditional ();
869
909
870
910
// / Schedule an inline processing job. This can generally only be
@@ -1031,6 +1071,13 @@ void DefaultActorImpl::deallocate() {
1031
1071
deallocateUnconditional ();
1032
1072
}
1033
1073
1074
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1075
+ dispatch_lock_t *DefaultActorImpl::drainLockAddr () {
1076
+ ActiveActorStatus *actorStatus = (ActiveActorStatus *) &this ->CurrentState ;
1077
+ return (dispatch_lock_t *) (((char *) actorStatus) + ActiveActorStatus::drainLockOffset ());
1078
+ }
1079
+ #endif
1080
+
1034
1081
void DefaultActorImpl::deallocateUnconditional () {
1035
1082
concurrency::trace::actor_deallocate (this );
1036
1083
@@ -1059,21 +1106,48 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
1059
1106
1060
1107
1061
1108
bool DefaultActorImpl::tryLock (bool asDrainer) {
1062
- SWIFT_TASK_DEBUG_LOG (" Attempting to jump onto %p, as drainer = %d" , this , asDrainer);
1063
- auto oldState = CurrentState.load (std::memory_order_relaxed);
1109
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1110
+ SWIFT_TASK_DEBUG_LOG (" Thread %#x attempting to jump onto %p, as drainer = %d" , dispatch_lock_value_for_self (), this , asDrainer);
1111
+ dispatch_thread_override_info_s threadOverrideInfo;
1112
+ threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor ();
1113
+ qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor ;
1114
+
1115
+ retry:;
1116
+ #else
1117
+ SWIFT_TASK_DEBUG_LOG (" Thread attempting to jump onto %p, as drainer = %d" , this , asDrainer);
1118
+ #endif
1064
1119
1120
+ auto oldState = CurrentState.load (std::memory_order_relaxed);
1065
1121
while (true ) {
1122
+
1066
1123
if (asDrainer) {
1067
1124
// TODO (rokhinip): Once we have OOL process job support, this assert can
1068
1125
// potentially fail due to a race with an actor stealer that might have
1069
1126
// won and started running the task
1070
1127
assert (oldState.isScheduled ());
1128
+
1129
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1130
+ // We only want to self override a thread if we are taking the actor lock
1131
+ // as a drainer because there might have been higher priority work
1132
+ // enqueued that might have escalated the max priority of the actor to be
1133
+ // higher than the original thread request.
1134
+ qos_class_t maxActorPriority = (qos_class_t ) oldState.getMaxPriority ();
1135
+
1136
+ if (threadOverrideInfo.can_override && (maxActorPriority > overrideFloor)) {
1137
+ SWIFT_TASK_DEBUG_LOG (" [Override] Self-override thread with oq_floor %#x to match max actor %p's priority %#x" , overrideFloor, this , maxActorPriority);
1138
+
1139
+ (void ) swift_dispatch_thread_override_self (maxActorPriority);
1140
+ overrideFloor = maxActorPriority;
1141
+ goto retry;
1142
+ }
1143
+ #endif
1071
1144
} else {
1072
1145
// We're trying to take the lock in an uncontended manner
1073
1146
if (oldState.isRunning () || oldState.isScheduled ()) {
1074
1147
SWIFT_TASK_DEBUG_LOG (" Failed to jump to %p in fast path" , this );
1075
1148
return false ;
1076
1149
}
1150
+ assert (oldState.getMaxPriority () == JobPriority::Unspecified);
1077
1151
}
1078
1152
1079
1153
auto newState = oldState.withRunning ();
@@ -1106,11 +1180,10 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1106
1180
// Someone gave up the actor lock after we failed fast path.
1107
1181
// Schedule the actor
1108
1182
newState = newState.withScheduled ();
1109
- newState = newState.withMaxPriority (priority);
1110
-
1183
+ newState = newState.withNewPriority (priority);
1111
1184
} else {
1112
1185
if (priority > oldState.getMaxPriority ()) {
1113
- newState = newState.withMaxPriority (priority);
1186
+ newState = newState.withEscalatedPriority (priority);
1114
1187
}
1115
1188
}
1116
1189
@@ -1124,14 +1197,24 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
1124
1197
return scheduleActorProcessJob (newState.getMaxPriority (), true );
1125
1198
}
1126
1199
1200
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1127
1201
if (oldState.getMaxPriority () != newState.getMaxPriority ()) {
1128
1202
if (newState.isRunning ()) {
1129
- // TODO (rokhinip): Override the thread running the actor
1130
- return ;
1203
+ // Actor is running on a thread, escalate the thread running it
1204
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating actor %p which is running on %#x to %#x priority" , this , newState.currentDrainer (), priority);
1205
+ dispatch_lock_t *lockAddr = this ->drainLockAddr ();
1206
+ swift_dispatch_lock_override_start_with_debounce (lockAddr, newState.currentDrainer (),
1207
+ (qos_class_t ) priority);
1131
1208
} else {
1132
- // TODO (rokhinip): Schedule the stealer
1209
+ // TODO (rokhinip): Actor is scheduled - we need to schedule a
1210
+ // stealer at the higher priority
1211
+ //
1212
+ // TODO (rokhinip): Add a signpost to flag that this is a potential
1213
+ // priority inversion
1214
+ SWIFT_TASK_DEBUG_LOG (" [Override] Escalating actor %p which is enqueued" , this );
1133
1215
}
1134
1216
}
1217
+ #endif
1135
1218
return ;
1136
1219
}
1137
1220
}
@@ -1153,30 +1236,38 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
1153
1236
_swift_tsan_release (this );
1154
1237
while (true ) {
1155
1238
assert (oldState.isAnyRunning ());
1156
- // TODO (rokhinip): Further assert that the current thread is the one
1157
- // running the actor
1239
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1240
+ assert (dispatch_lock_is_locked_by_self (*(this ->drainLockAddr ())));
1241
+ #endif
1158
1242
1159
1243
if (oldState.isZombie_ReadyForDeallocation ()) {
1160
- // TODO (rokhinip): This is where we need to reset any override the thread
1161
- // might have as a result of this actor
1244
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1245
+ // Reset any override on this thread as a result of this thread running
1246
+ // the actor
1247
+ if (oldState.isMaxPriorityEscalated ()) {
1248
+ swift_dispatch_lock_override_end ((qos_class_t )oldState.getMaxPriority ());
1249
+ }
1250
+ #endif
1162
1251
deallocateUnconditional ();
1163
1252
SWIFT_TASK_DEBUG_LOG (" Unlock-ing actor %p succeeded with full deallocation" , this );
1164
1253
return true ;
1165
1254
}
1166
1255
1167
1256
auto newState = oldState;
1168
1257
if (oldState.getFirstJob () != NULL ) {
1169
- // There is work left to do
1258
+ // There is work left to do, don't unlock the actor
1170
1259
if (!forceUnlock) {
1171
1260
SWIFT_TASK_DEBUG_LOG (" Unlock-ing actor %p failed" , this );
1172
1261
return false ;
1173
1262
}
1174
-
1263
+ // We need to schedule the actor - remove any escalation bits since we'll
1264
+ // schedule the actor at the max priority currently on it
1175
1265
newState = newState.withScheduled ();
1266
+ newState = newState.withoutEscalatedPriority ();
1176
1267
} else {
1177
1268
// There is no work left to do - actor goes idle
1178
1269
newState = newState.withIdle ();
1179
- newState = newState.withMaxPriority (JobPriority::Unspecified );
1270
+ newState = newState.resetPriority ( );
1180
1271
}
1181
1272
1182
1273
if (CurrentState.compare_exchange_weak (oldState, newState,
@@ -1191,9 +1282,13 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
1191
1282
SWIFT_TASK_DEBUG_LOG (" Actor %p is idle now" , this );
1192
1283
}
1193
1284
1194
- // TODO (rokhinip): Reset any overrides the thread might have had as a
1195
- // result of the actor
1196
-
1285
+ #if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1286
+ // Reset any override on this thread as a result of this thread running
1287
+ // the actor. Only do this after we have reenqueued the actor
1288
+ if (oldState.isMaxPriorityEscalated ()) {
1289
+ swift_dispatch_lock_override_end ((qos_class_t ) oldState.getMaxPriority ());
1290
+ }
1291
+ #endif
1197
1292
return true ;
1198
1293
}
1199
1294
}
0 commit comments