Skip to content

Stealer support for actor runtime. #62435

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 54 additions & 16 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,10 +912,9 @@ class DefaultActorImpl : public HeapObject {
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
dispatch_lock_t *drainLockAddr();
#endif
/// Schedule a processing job. This can generally only be
/// done if we know nobody else is trying to do it at the same time,
/// e.g. if this thread just successfully transitioned the actor from
/// Idle to Scheduled.
/// Schedule a processing job.
/// It can be done when actor transitions from Idle to Scheduled or
/// when actor gets a priority override and we schedule a stealer.
void scheduleActorProcessJob(JobPriority priority);
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */

Expand Down Expand Up @@ -1153,12 +1152,14 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
swift_dispatch_lock_override_start_with_debounce(lockAddr, newState.currentDrainer(),
(qos_class_t) priority);
} else {
// TODO (rokhinip): Actor is scheduled - we need to schedule a
// stealer at the higher priority
//
// TODO (rokhinip): Add a signpost to flag that this is a potential
// priority inversion
SWIFT_TASK_DEBUG_LOG("[Override] Escalating actor %p which is enqueued", this);
// We are scheduling a stealer for an actor due to priority override.
// This extra processing job has a reference on the actor. See
// ownership rule (2).
SWIFT_TASK_DEBUG_LOG(
"[Override] Scheduling a stealer for actor %p at %#x priority",
this, newState.getMaxPriority());
swift_retain(this);
scheduleActorProcessJob(newState.getMaxPriority());
}
}
#endif
Expand Down Expand Up @@ -1251,8 +1252,13 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
DefaultActorImpl *currentActor = actor;

bool actorLockAcquired = actor->tryLock(true);
// We always must succeed in taking the actor lock that we are draining
// because we don't have to compete with OOL jobs. See ownership rule (3)
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (!actorLockAcquired) {
// tryLock may fail when we compete with other stealers for the actor.
goto done;
}
#endif

(void)actorLockAcquired;
assert(actorLockAcquired);

Expand Down Expand Up @@ -1295,6 +1301,9 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
// Leave the tracking info.
trackingInfo.leave();

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
done:
#endif
// Balances with the retain taken in ProcessOutOfLineJob::process
swift_release(actor);
}
Expand Down Expand Up @@ -1370,7 +1379,7 @@ bool DefaultActorImpl::tryLock(bool asDrainer) {
dispatch_thread_override_info_s threadOverrideInfo;
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;

bool receivedOverride = false;
retry:;
#else
SWIFT_TASK_DEBUG_LOG("Thread attempting to jump onto %p, as drainer = %d", this, asDrainer);
Expand All @@ -1380,9 +1389,24 @@ retry:;
while (true) {

if (asDrainer) {
// TODO (rokhinip): Once we have multiple OOL process job support, this
// assert can potentially fail due to a race with an actor stealer that
// might have won the race and started running the actor
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (!oldState.isScheduled()) {
// Some other actor stealer won the race and started running the actor
// and potentially be done with it if state is observed as idle here.

// This extra processing jobs releases its reference. See ownership rule
// (4).
swift_release(this);

if (receivedOverride) {
// Reset any override as a result of contending for the actor lock.
swift_dispatch_lock_override_end(overrideFloor);
}
return false;
}
#endif

// We are still in the race with other stealers to take over the actor.
assert(oldState.isScheduled());

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
Expand All @@ -1397,6 +1421,7 @@ retry:;

(void) swift_dispatch_thread_override_self(maxActorPriority);
overrideFloor = maxActorPriority;
receivedOverride = true;
goto retry;
}
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
Expand Down Expand Up @@ -1471,10 +1496,23 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
}
// We need to schedule the actor - remove any escalation bits since we'll
// schedule the actor at the max priority currently on it

// N decreases by 1 as this processing job is going away; but R is
// still 1. We schedule a new processing job to maintain N >= R.

// It is possible that there are stealers scheduled for the actor already;
// but, we still schedule one anyway. This is because it is possible that
// those stealers got scheduled when we were running the actor and gone
// away. (See tryLock function.)
newState = newState.withScheduled();
newState = newState.withoutEscalatedPriority();
} else {
// There is no work left to do - actor goes idle

// R becomes 0 and N descreases by 1.
// But, we may still have stealers scheduled so N could be > 0. This is
// fine since N >= R. Every such stealer, once scheduled, will observe
// actor as idle, will release its ref and return. (See tryLock function.)
newState = newState.withIdle();
newState = newState.resetPriority();
}
Expand Down