Skip to content

Commit ac4fab8

Browse files
committed
Priority escalation support when actors run into contention
Radar-Id: rdar://problem/86100521
1 parent b318f65 commit ac4fab8

File tree

1 file changed

+113
-21
lines changed

1 file changed

+113
-21
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 113 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -639,10 +639,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
639639

640640
// Bit 3
641641
DistributedRemote = 0x8,
642+
// Bit 4
643+
isPriorityEscalated = 0x10,
642644

643645
// Bits 8 - 15. We only need 8 bits of the whole size_t to represent Job
644646
// Priority
645647
PriorityMask = 0xFF00,
648+
PriorityAndOverrideMask = PriorityMask | isPriorityEscalated,
646649
PriorityShift = 0x8,
647650
};
648651

@@ -752,11 +755,37 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
752755
JobPriority getMaxPriority() const {
753756
return (JobPriority) ((Flags & PriorityMask) >> PriorityShift);
754757
}
755-
ActiveActorStatus withMaxPriority(JobPriority priority) const {
758+
ActiveActorStatus withNewPriority(JobPriority priority) const {
759+
uint32_t flags = Flags & ~PriorityAndOverrideMask;
760+
flags |= (uint32_t(priority) << PriorityShift);
756761
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
757-
return ActiveActorStatus((Flags & ~PriorityMask) | (uint32_t(priority) << PriorityShift), DrainLock, FirstJob);
762+
return ActiveActorStatus(flags, DrainLock, FirstJob);
758763
#else
759-
return ActiveActorStatus((Flags & ~PriorityMask) | (uint32_t(priority) << PriorityShift), FirstJob);
764+
return ActiveActorStatus(flags, FirstJob);
765+
#endif
766+
}
767+
ActiveActorStatus resetPriority() const {
768+
return withNewPriority(JobPriority::Unspecified);
769+
}
770+
771+
bool isMaxPriorityEscalated() const { return Flags & isPriorityEscalated; }
772+
ActiveActorStatus withEscalatedPriority(JobPriority priority) const {
773+
JobPriority currentPriority = JobPriority((Flags & PriorityMask) >> PriorityShift);
774+
assert(priority > currentPriority);
775+
776+
uint32_t flags = (Flags & ~PriorityMask) | (uint32_t(priority) << PriorityShift);
777+
flags |= isPriorityEscalated;
778+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
779+
return ActiveActorStatus(flags, DrainLock, FirstJob);
780+
#else
781+
return ActiveActorStatus(flags, FirstJob);
782+
#endif
783+
}
784+
ActiveActorStatus withoutEscalatedPriority() const {
785+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
786+
return ActiveActorStatus(Flags & ~isPriorityEscalated, DrainLock, FirstJob);
787+
#else
788+
return ActiveActorStatus(Flags & ~isPriorityEscalated, FirstJob);
760789
#endif
761790
}
762791

@@ -774,13 +803,20 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
774803
uint32_t getOpaqueFlags() const {
775804
return Flags;
776805
}
806+
777807
uint32_t currentDrainer() const {
778808
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
779809
return dispatch_lock_owner(DrainLock);
780810
#else
781811
return 0;
782812
#endif
783813
}
814+
815+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
816+
static size_t drainLockOffset() {
817+
return offsetof(ActiveActorStatus, DrainLock);
818+
}
819+
#endif
784820
};
785821

786822
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
@@ -911,6 +947,10 @@ class DefaultActorImpl : public HeapObject {
911947
}
912948

913949
private:
950+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
951+
dispatch_lock_t *drainLockAddr();
952+
#endif
953+
914954
void deallocateUnconditional();
915955

916956
/// Schedule an inline processing job. This can generally only be
@@ -1121,6 +1161,13 @@ void DefaultActorImpl::deallocate() {
11211161
deallocateUnconditional();
11221162
}
11231163

1164+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1165+
dispatch_lock_t *DefaultActorImpl::drainLockAddr() {
1166+
ActiveActorStatus *actorStatus = (ActiveActorStatus *) &this->StatusStorage;
1167+
return (dispatch_lock_t *) (((char *) actorStatus) + ActiveActorStatus::drainLockOffset());
1168+
}
1169+
#endif
1170+
11241171
void DefaultActorImpl::deallocateUnconditional() {
11251172
concurrency::trace::actor_deallocate(this);
11261173

@@ -1149,17 +1196,41 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
11491196

11501197

11511198
bool DefaultActorImpl::tryLock(bool asDrainer) {
1152-
SWIFT_TASK_DEBUG_LOG("Attempting to jump onto %p, as drainer = %d", this, asDrainer);
1199+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1200+
SWIFT_TASK_DEBUG_LOG("Thread %#x attempting to jump onto %p, as drainer = %d", dispatch_lock_value_for_self(), this, asDrainer);
1201+
dispatch_thread_override_info_s threadOverrideInfo;
1202+
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
1203+
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
1204+
1205+
retry:;
1206+
#else
1207+
SWIFT_TASK_DEBUG_LOG("Thread attempting to jump onto %p, as drainer = %d", this, asDrainer);
1208+
#endif
11531209

11541210
auto oldState = _status().load(std::memory_order_relaxed);
11551211
while (true) {
11561212

1157-
while (true) {
11581213
if (asDrainer) {
11591214
// TODO (rokhinip): Once we have OOL process job support, this assert can
11601215
// potentially fail due to a race with an actor stealer that might have
11611216
// won the race and started running the actor
11621217
assert(oldState.isScheduled());
1218+
1219+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1220+
// We only want to self override a thread if we are taking the actor lock
1221+
// as a drainer because there might have been higher priority work
1222+
// enqueued that might have escalated the max priority of the actor to be
1223+
// higher than the original thread request.
1224+
qos_class_t maxActorPriority = (qos_class_t) oldState.getMaxPriority();
1225+
1226+
if (threadOverrideInfo.can_override && (maxActorPriority > overrideFloor)) {
1227+
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match max actor %p's priority %#x", overrideFloor, this, maxActorPriority);
1228+
1229+
(void) swift_dispatch_thread_override_self(maxActorPriority);
1230+
overrideFloor = maxActorPriority;
1231+
goto retry;
1232+
}
1233+
#endif
11631234
} else {
11641235
// We're trying to take the lock in an uncontended manner
11651236
if (oldState.isRunning() || oldState.isScheduled()) {
@@ -1202,11 +1273,10 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12021273
// Someone gave up the actor lock after we failed fast path.
12031274
// Schedule the actor
12041275
newState = newState.withScheduled();
1205-
newState = newState.withMaxPriority(priority);
1206-
1276+
newState = newState.withNewPriority(priority);
12071277
} else {
12081278
if (priority > oldState.getMaxPriority()) {
1209-
newState = newState.withMaxPriority(priority);
1279+
newState = newState.withEscalatedPriority(priority);
12101280
}
12111281
}
12121282

@@ -1221,14 +1291,24 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12211291
return scheduleActorProcessJob(newState.getMaxPriority(), true);
12221292
}
12231293

1294+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
12241295
if (oldState.getMaxPriority() != newState.getMaxPriority()) {
12251296
if (newState.isRunning()) {
1226-
// TODO (rokhinip): Override the thread running the actor
1227-
return;
1297+
// Actor is running on a thread, escalate the thread running it
1298+
SWIFT_TASK_DEBUG_LOG("[Override] Escalating actor %p which is running on %#x to %#x priority", this, newState.currentDrainer(), priority);
1299+
dispatch_lock_t *lockAddr = this->drainLockAddr();
1300+
swift_dispatch_lock_override_start_with_debounce(lockAddr, newState.currentDrainer(),
1301+
(qos_class_t) priority);
12281302
} else {
1229-
// TODO (rokhinip): Schedule the stealer
1303+
// TODO (rokhinip): Actor is scheduled - we need to schedule a
1304+
// stealer at the higher priority
1305+
//
1306+
// TODO (rokhinip): Add a signpost to flag that this is a potential
1307+
// priority inversion
1308+
SWIFT_TASK_DEBUG_LOG("[Override] Escalating actor %p which is enqueued", this);
12301309
}
12311310
}
1311+
#endif
12321312
return;
12331313
}
12341314
}
@@ -1252,30 +1332,38 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
12521332
_swift_tsan_release(this);
12531333
while (true) {
12541334
assert(oldState.isAnyRunning());
1255-
// TODO (rokhinip): Further assert that the current thread is the one
1256-
// running the actor
1335+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1336+
assert(dispatch_lock_is_locked_by_self(*(this->drainLockAddr())));
1337+
#endif
12571338

12581339
if (oldState.isZombie_ReadyForDeallocation()) {
1259-
// TODO (rokhinip): This is where we need to reset any override the thread
1260-
// might have as a result of this actor
1340+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1341+
// Reset any override on this thread as a result of this thread running
1342+
// the actor
1343+
if (oldState.isMaxPriorityEscalated()) {
1344+
swift_dispatch_lock_override_end((qos_class_t)oldState.getMaxPriority());
1345+
}
1346+
#endif
12611347
deallocateUnconditional();
12621348
SWIFT_TASK_DEBUG_LOG("Unlock-ing actor %p succeeded with full deallocation", this);
12631349
return true;
12641350
}
12651351

12661352
auto newState = oldState;
12671353
if (oldState.getFirstJob() != NULL) {
1268-
// There is work left to do
1354+
// There is work left to do, don't unlock the actor
12691355
if (!forceUnlock) {
12701356
SWIFT_TASK_DEBUG_LOG("Unlock-ing actor %p failed", this);
12711357
return false;
12721358
}
1273-
1359+
// We need to schedule the actor - remove any escalation bits since we'll
1360+
// schedule the actor at the max priority currently on it
12741361
newState = newState.withScheduled();
1362+
newState = newState.withoutEscalatedPriority();
12751363
} else {
12761364
// There is no work left to do - actor goes idle
12771365
newState = newState.withIdle();
1278-
newState = newState.withMaxPriority(JobPriority::Unspecified);
1366+
newState = newState.resetPriority();
12791367
}
12801368

12811369
if (_status().compare_exchange_weak(oldState, newState,
@@ -1292,9 +1380,13 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
12921380
SWIFT_TASK_DEBUG_LOG("Actor %p is idle now", this);
12931381
}
12941382

1295-
// TODO (rokhinip): Reset any overrides the thread might have had as a
1296-
// result of the actor
1297-
1383+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1384+
// Reset any override on this thread as a result of this thread running
1385+
// the actor. Only do this after we have reenqueued the actor
1386+
if (oldState.isMaxPriorityEscalated()) {
1387+
swift_dispatch_lock_override_end((qos_class_t) oldState.getMaxPriority());
1388+
}
1389+
#endif
12981390
return true;
12991391
}
13001392
}

0 commit comments

Comments
 (0)