Skip to content

Escalate the actor and thread running the actor when job in actor queue is escalated #63977

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
145 changes: 124 additions & 21 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
}

public:
bool operator==(ActiveActorStatus other) const {
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
return (Flags == other.Flags) && (DrainLock == other.DrainLock) && (FirstJob == other.FirstJob);
#else
return (Flags == other.Flags) && (FirstJob == other.FirstJob);
#endif
}

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
constexpr ActiveActorStatus()
Expand Down Expand Up @@ -879,10 +886,13 @@ class DefaultActorImpl : public HeapObject {
bool unlock(bool forceUnlock);

#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
/// Enqueue a job onto the actor. This typically means that the actor hit
/// contention during the tryLock and so we're taking the slow path
/// Enqueue a job onto the actor.
void enqueue(Job *job, JobPriority priority);

/// Enqueue a stealer for the given task since it has been escalated to the
/// new priority
void enqueueStealer(Job *job, JobPriority priority);

// The calling thread must be holding the actor lock while calling this
Job *drainOne();
#endif
Expand Down Expand Up @@ -1126,7 +1136,6 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
newState = newState.withFirstJob(newHead);

if (oldState.isIdle()) {
// Someone gave up the actor lock after we failed fast path.
// Schedule the actor
newState = newState.withScheduled();
newState = newState.withNewPriority(priority);
Expand All @@ -1137,8 +1146,8 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
}

// This needs to be a store release so that we also publish the contents of
// the new Job we are adding to the atomic job queue. Pairs with load
// acquire in drainOne.
// the new Job we are adding to the atomic job queue. Pairs with consume
// in drainOne.
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_release,
/* failure */ std::memory_order_relaxed)) {
Expand Down Expand Up @@ -1175,18 +1184,91 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
}
}

// The input job is already escalated to the new priority and has already been
// enqueued into the actor. Push a stealer job for it on the actor.
//
// The caller of this function is escalating the input task and holding its
// TaskStatusRecordLock and escalating this executor via the
// TaskDependencyStatusRecord.
void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {

SWIFT_TASK_DEBUG_LOG("[Override] Escalating an actor %p due to job that is enqueued being escalated", this);

auto oldState = _status().load(std::memory_order_relaxed);
while (true) {
// Until we figure out how to safely enqueue a stealer and rendevouz with
// the original job so that we don't double-invoke the job, we shall simply
// escalate the actor's max priority to match the new one.
//
// Ideally, we'd also re-sort the job queue so that the escalated job gets
// to the front of the queue but since the actor's max QoS is a saturating
// function, this still handles the priority inversion correctly but with
// priority overhang instead.

if (oldState.isIdle()) {
// We are observing a race. Possible scenarios:
//
// 1. Escalator is racing with the drain of the actor/task. The task has
// just been popped off the actor and is about to run. The thread running
// the task will readjust its own priority once it runs since it should
// see the escalation in the ActiveTaskStatus and we don't need to
// escalate the actor as it will be spurious.
//
// 2. Escalator is racing with the enqueue of the task. The task marks
// the place it will enqueue in the dependency record before it enqueues
// itself. Escalator raced in between these two operations and escalated the
// task. Pushing a stealer job for the task onto the actor should fix it.
return;
}
auto newState = oldState;

if (priority > oldState.getMaxPriority()) {
newState = newState.withEscalatedPriority(priority);
}

if (oldState == newState)
return;

if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
traceActorStateTransition(this, oldState, newState);
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (newState.isRunning()) {
// Actor is running on a thread, escalate the thread running it
SWIFT_TASK_DEBUG_LOG("[Override] Escalating actor %p which is running on %#x to %#x priority", this, newState.currentDrainer(), priority);
dispatch_lock_t *lockAddr = this->drainLockAddr();
swift_dispatch_lock_override_start_with_debounce(lockAddr, newState.currentDrainer(),
(qos_class_t) priority);

} else if (newState.isEnqueued()) {
// We are scheduling a stealer for an actor due to priority override.
// This extra processing job has a reference on the actor. See
// ownership rule (2).
SWIFT_TASK_DEBUG_LOG(
"[Override] Scheduling a stealer for actor %p at %#x priority",
this, newState.getMaxPriority());
swift_retain(this);
scheduleActorProcessJob(newState.getMaxPriority());
}
#endif
}
}

}

// Called with actor lock held on current thread
Job * DefaultActorImpl::drainOne() {
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);

// Pairs with the store release in DefaultActorImpl::enqueue
auto oldState = _status().load(std::memory_order_acquire);
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
_swift_tsan_consume(this);

auto jobToPreprocessFrom = oldState.getFirstJob();
Job *firstJob = preprocessQueue(jobToPreprocessFrom);
traceJobQueue(this, firstJob);

_swift_tsan_release(this);
while (true) {
assert(oldState.isAnyRunning());

Expand All @@ -1200,8 +1282,8 @@ Job * DefaultActorImpl::drainOne() {
// Dequeue the first job and set up a new head
newState = newState.withFirstJob(getNextJobInQueue(firstJob));
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_release,
/* failure */ std::memory_order_acquire)) {
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
traceActorStateTransition(this, oldState, newState);
concurrency::trace::actor_dequeue(this, firstJob);
Expand Down Expand Up @@ -1387,14 +1469,11 @@ bool DefaultActorImpl::tryLock(bool asDrainer) {
dispatch_thread_override_info_s threadOverrideInfo;
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
bool receivedOverride = false;
retry:;
#else
SWIFT_TASK_DEBUG_LOG("Thread attempting to jump onto %p, as drainer = %d", this, asDrainer);
#endif

// Note: This doesn't have to be a load acquire because the jobQueue is part
// of the same atomic.
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {

Expand All @@ -1408,10 +1487,6 @@ retry:;
// (4).
swift_release(this);

if (receivedOverride) {
// Reset any override as a result of contending for the actor lock.
swift_dispatch_lock_override_end(overrideFloor);
}
return false;
}
#endif
Expand All @@ -1431,7 +1506,6 @@ retry:;

(void) swift_dispatch_thread_override_self(maxActorPriority);
overrideFloor = maxActorPriority;
receivedOverride = true;
goto retry;
}
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
Expand All @@ -1446,9 +1520,14 @@ retry:;
assert(!oldState.getFirstJob());
}

// Taking the drain lock clears the max priority escalated bit because we've
// already represented the current max priority of the actor on the thread.
auto newState = oldState.withRunning();
newState = newState.withoutEscalatedPriority();

// This needs an acquire since we are taking a lock
if (_status().compare_exchange_weak(oldState, newState,
std::memory_order_relaxed,
std::memory_order_acquire,
std::memory_order_relaxed)) {
_swift_tsan_acquire(this);
traceActorStateTransition(this, oldState, newState);
Expand Down Expand Up @@ -1527,9 +1606,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
newState = newState.resetPriority();
}

// This needs to be a release since we are unlocking a lock
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* success */ std::memory_order_release,
/* failure */ std::memory_order_relaxed)) {
_swift_tsan_release(this);
traceActorStateTransition(this, oldState, newState);

if (newState.isScheduled()) {
Expand All @@ -1542,8 +1623,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
}

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
// Reset any override on this thread as a result of this thread running
// the actor. Only do this after we have reenqueued the actor
// Reset any asynchronous escalations we may have gotten on this thread
// after taking the drain lock.
//
// Only do this after we have reenqueued the actor so that we don't lose
// any "mojo" prior to the enqueue.
if (oldState.isMaxPriorityEscalated()) {
swift_dispatch_lock_override_end((qos_class_t) oldState.getMaxPriority());
}
Expand Down Expand Up @@ -1818,9 +1902,28 @@ static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
_swift_task_enqueueOnExecutor(job, executorObject, executorType, wtable);
}

static void
swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPriority)
{
return actor->enqueueStealer(task, newPriority);
}

SWIFT_CC(swift)
void swift::swift_executor_escalate(ExecutorRef executor, AsyncTask *task,
JobPriority newPriority) {
if (executor.isGeneric()) {
// TODO (rokhinip): We'd push a stealer job for the task on the executor.
return;
}

if (executor.isDefaultActor()) {
return swift_actor_escalate(asImpl(executor.getDefaultActor()), task, newPriority);
}

// TODO (rokhinip): This is either the main actor or an actor with a custom
// executor. We need to let the executor know that the job has been escalated.
// For now, do nothing
return;
}

#define OVERRIDE_ACTOR COMPATIBILITY_OVERRIDE
Expand Down
2 changes: 1 addition & 1 deletion stdlib/public/Concurrency/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,7 @@ void swift::updateStatusRecord(AsyncTask *task, TaskStatusRecord *record,

SWIFT_TASK_DEBUG_LOG("Updating status record %p of task %p", record, task);
withStatusRecordLock(task, status, [&](ActiveTaskStatus lockedStatus) {
#if NDEBUG
#ifndef NDEBUG
bool foundRecord = false;
for (auto cur: lockedStatus.records()) {
if (cur == record) {
Expand Down
50 changes: 50 additions & 0 deletions test/Concurrency/async_task_priority.swift
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,29 @@ func childTaskWaitingForEscalation(sem: DispatchSemaphore, basePri: TaskPriority
let _ = await testNestedTaskPriority(basePri: basePri, curPri: curPri)
}

actor Test {
private var value = 0
init() { }

func increment() -> Int {
let cur = value
value = value + 1
return cur
}

func blockActorThenIncrement(semToSignal: DispatchSemaphore, semToWait : DispatchSemaphore, priExpected: TaskPriority) -> Int {
semToSignal.signal()

semToWait.wait();

sleep(1)
// TODO: insert a test to verify that thread priority has actually escalated
// to match priExpected
return increment()
}

}


@main struct Main {
static func main() async {
Expand Down Expand Up @@ -267,6 +290,33 @@ func childTaskWaitingForEscalation(sem: DispatchSemaphore, basePri: TaskPriority
await task2.value
}

tests.test("Task escalation of a task enqueued on an actor") {
let task1Pri: TaskPriority = .background
let task2Pri: TaskPriority = .background
let parentPri: TaskPriority = Task.currentPriority

let sem1 = DispatchSemaphore(value: 0) // to unblock enqueue of task2
let sem2 = DispatchSemaphore(value: 0)
let testActor = Test()

let task1 = Task(priority: task1Pri) {
expectedBasePri(priority: task1Pri);
await testActor.blockActorThenIncrement(semToSignal: sem1, semToWait: sem2, priExpected: parentPri);
}

sem1.wait() // Wait until task1 is on the actor

let task2 = Task(priority: task2Pri) {
expectedBasePri(priority: task2Pri);
await testActor.increment()
}

sleep(1)
sem2.signal() // task2 is probably enqueued on the actor at this point, unblock task1

await task2.value // Escalate task2 which should be queued behind task1 on the actor
}

}
await runAllTestsAsync()
}
Expand Down