Skip to content

Commit f1078cd

Browse files
authored
Merge pull request #63977 from apple/rokhinip/101864092-escalate-actor-executor
Escalate the actor and thread running the actor when job in actor queue is escalated
2 parents fbac545 + 9b6c873 commit f1078cd

File tree

3 files changed

+175
-22
lines changed

3 files changed

+175
-22
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 124 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -589,6 +589,13 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
589589
}
590590

591591
public:
592+
bool operator==(ActiveActorStatus other) const {
593+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
594+
return (Flags == other.Flags) && (DrainLock == other.DrainLock) && (FirstJob == other.FirstJob);
595+
#else
596+
return (Flags == other.Flags) && (FirstJob == other.FirstJob);
597+
#endif
598+
}
592599

593600
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
594601
constexpr ActiveActorStatus()
@@ -879,10 +886,13 @@ class DefaultActorImpl : public HeapObject {
879886
bool unlock(bool forceUnlock);
880887

881888
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
882-
/// Enqueue a job onto the actor. This typically means that the actor hit
883-
/// contention during the tryLock and so we're taking the slow path
889+
/// Enqueue a job onto the actor.
884890
void enqueue(Job *job, JobPriority priority);
885891

892+
/// Enqueue a stealer for the given task since it has been escalated to the
893+
/// new priority
894+
void enqueueStealer(Job *job, JobPriority priority);
895+
886896
// The calling thread must be holding the actor lock while calling this
887897
Job *drainOne();
888898
#endif
@@ -1126,7 +1136,6 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
11261136
newState = newState.withFirstJob(newHead);
11271137

11281138
if (oldState.isIdle()) {
1129-
// Someone gave up the actor lock after we failed fast path.
11301139
// Schedule the actor
11311140
newState = newState.withScheduled();
11321141
newState = newState.withNewPriority(priority);
@@ -1137,8 +1146,8 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
11371146
}
11381147

11391148
// This needs to be a store release so that we also publish the contents of
1140-
// the new Job we are adding to the atomic job queue. Pairs with load
1141-
// acquire in drainOne.
1149+
// the new Job we are adding to the atomic job queue. Pairs with consume
1150+
// in drainOne.
11421151
if (_status().compare_exchange_weak(oldState, newState,
11431152
/* success */ std::memory_order_release,
11441153
/* failure */ std::memory_order_relaxed)) {
@@ -1175,18 +1184,91 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
11751184
}
11761185
}
11771186

1187+
// The input job is already escalated to the new priority and has already been
1188+
// enqueued into the actor. Push a stealer job for it on the actor.
1189+
//
1190+
// The caller of this function is escalating the input task and holding its
1191+
// TaskStatusRecordLock and escalating this executor via the
1192+
// TaskDependencyStatusRecord.
1193+
void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
1194+
1195+
SWIFT_TASK_DEBUG_LOG("[Override] Escalating an actor %p due to job that is enqueued being escalated", this);
1196+
1197+
auto oldState = _status().load(std::memory_order_relaxed);
1198+
while (true) {
1199+
// Until we figure out how to safely enqueue a stealer and rendevouz with
1200+
// the original job so that we don't double-invoke the job, we shall simply
1201+
// escalate the actor's max priority to match the new one.
1202+
//
1203+
// Ideally, we'd also re-sort the job queue so that the escalated job gets
1204+
// to the front of the queue but since the actor's max QoS is a saturating
1205+
// function, this still handles the priority inversion correctly but with
1206+
// priority overhang instead.
1207+
1208+
if (oldState.isIdle()) {
1209+
// We are observing a race. Possible scenarios:
1210+
//
1211+
// 1. Escalator is racing with the drain of the actor/task. The task has
1212+
// just been popped off the actor and is about to run. The thread running
1213+
// the task will readjust its own priority once it runs since it should
1214+
// see the escalation in the ActiveTaskStatus and we don't need to
1215+
// escalate the actor as it will be spurious.
1216+
//
1217+
// 2. Escalator is racing with the enqueue of the task. The task marks
1218+
// the place it will enqueue in the dependency record before it enqueues
1219+
// itself. Escalator raced in between these two operations and escalated the
1220+
// task. Pushing a stealer job for the task onto the actor should fix it.
1221+
return;
1222+
}
1223+
auto newState = oldState;
1224+
1225+
if (priority > oldState.getMaxPriority()) {
1226+
newState = newState.withEscalatedPriority(priority);
1227+
}
1228+
1229+
if (oldState == newState)
1230+
return;
1231+
1232+
if (_status().compare_exchange_weak(oldState, newState,
1233+
/* success */ std::memory_order_relaxed,
1234+
/* failure */ std::memory_order_relaxed)) {
1235+
traceActorStateTransition(this, oldState, newState);
1236+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1237+
if (newState.isRunning()) {
1238+
// Actor is running on a thread, escalate the thread running it
1239+
SWIFT_TASK_DEBUG_LOG("[Override] Escalating actor %p which is running on %#x to %#x priority", this, newState.currentDrainer(), priority);
1240+
dispatch_lock_t *lockAddr = this->drainLockAddr();
1241+
swift_dispatch_lock_override_start_with_debounce(lockAddr, newState.currentDrainer(),
1242+
(qos_class_t) priority);
1243+
1244+
} else if (newState.isEnqueued()) {
1245+
// We are scheduling a stealer for an actor due to priority override.
1246+
// This extra processing job has a reference on the actor. See
1247+
// ownership rule (2).
1248+
SWIFT_TASK_DEBUG_LOG(
1249+
"[Override] Scheduling a stealer for actor %p at %#x priority",
1250+
this, newState.getMaxPriority());
1251+
swift_retain(this);
1252+
scheduleActorProcessJob(newState.getMaxPriority());
1253+
}
1254+
#endif
1255+
}
1256+
}
1257+
1258+
}
1259+
11781260
// Called with actor lock held on current thread
11791261
Job * DefaultActorImpl::drainOne() {
11801262
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);
11811263

11821264
// Pairs with the store release in DefaultActorImpl::enqueue
1183-
auto oldState = _status().load(std::memory_order_acquire);
1265+
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
1266+
_swift_tsan_consume(this);
11841267

11851268
auto jobToPreprocessFrom = oldState.getFirstJob();
11861269
Job *firstJob = preprocessQueue(jobToPreprocessFrom);
11871270
traceJobQueue(this, firstJob);
11881271

1189-
_swift_tsan_release(this);
11901272
while (true) {
11911273
assert(oldState.isAnyRunning());
11921274

@@ -1200,8 +1282,8 @@ Job * DefaultActorImpl::drainOne() {
12001282
// Dequeue the first job and set up a new head
12011283
newState = newState.withFirstJob(getNextJobInQueue(firstJob));
12021284
if (_status().compare_exchange_weak(oldState, newState,
1203-
/* success */ std::memory_order_release,
1204-
/* failure */ std::memory_order_acquire)) {
1285+
/* success */ std::memory_order_relaxed,
1286+
/* failure */ std::memory_order_relaxed)) {
12051287
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
12061288
traceActorStateTransition(this, oldState, newState);
12071289
concurrency::trace::actor_dequeue(this, firstJob);
@@ -1387,14 +1469,11 @@ bool DefaultActorImpl::tryLock(bool asDrainer) {
13871469
dispatch_thread_override_info_s threadOverrideInfo;
13881470
threadOverrideInfo = swift_dispatch_thread_get_current_override_qos_floor();
13891471
qos_class_t overrideFloor = threadOverrideInfo.override_qos_floor;
1390-
bool receivedOverride = false;
13911472
retry:;
13921473
#else
13931474
SWIFT_TASK_DEBUG_LOG("Thread attempting to jump onto %p, as drainer = %d", this, asDrainer);
13941475
#endif
13951476

1396-
// Note: This doesn't have to be a load acquire because the jobQueue is part
1397-
// of the same atomic.
13981477
auto oldState = _status().load(std::memory_order_relaxed);
13991478
while (true) {
14001479

@@ -1408,10 +1487,6 @@ retry:;
14081487
// (4).
14091488
swift_release(this);
14101489

1411-
if (receivedOverride) {
1412-
// Reset any override as a result of contending for the actor lock.
1413-
swift_dispatch_lock_override_end(overrideFloor);
1414-
}
14151490
return false;
14161491
}
14171492
#endif
@@ -1431,7 +1506,6 @@ retry:;
14311506

14321507
(void) swift_dispatch_thread_override_self(maxActorPriority);
14331508
overrideFloor = maxActorPriority;
1434-
receivedOverride = true;
14351509
goto retry;
14361510
}
14371511
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
@@ -1446,9 +1520,14 @@ retry:;
14461520
assert(!oldState.getFirstJob());
14471521
}
14481522

1523+
// Taking the drain lock clears the max priority escalated bit because we've
1524+
// already represented the current max priority of the actor on the thread.
14491525
auto newState = oldState.withRunning();
1526+
newState = newState.withoutEscalatedPriority();
1527+
1528+
// This needs an acquire since we are taking a lock
14501529
if (_status().compare_exchange_weak(oldState, newState,
1451-
std::memory_order_relaxed,
1530+
std::memory_order_acquire,
14521531
std::memory_order_relaxed)) {
14531532
_swift_tsan_acquire(this);
14541533
traceActorStateTransition(this, oldState, newState);
@@ -1527,9 +1606,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
15271606
newState = newState.resetPriority();
15281607
}
15291608

1609+
// This needs to be a release since we are unlocking a lock
15301610
if (_status().compare_exchange_weak(oldState, newState,
1531-
/* success */ std::memory_order_relaxed,
1611+
/* success */ std::memory_order_release,
15321612
/* failure */ std::memory_order_relaxed)) {
1613+
_swift_tsan_release(this);
15331614
traceActorStateTransition(this, oldState, newState);
15341615

15351616
if (newState.isScheduled()) {
@@ -1542,8 +1623,11 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
15421623
}
15431624

15441625
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
1545-
// Reset any override on this thread as a result of this thread running
1546-
// the actor. Only do this after we have reenqueued the actor
1626+
// Reset any asynchronous escalations we may have gotten on this thread
1627+
// after taking the drain lock.
1628+
//
1629+
// Only do this after we have reenqueued the actor so that we don't lose
1630+
// any "mojo" prior to the enqueue.
15471631
if (oldState.isMaxPriorityEscalated()) {
15481632
swift_dispatch_lock_override_end((qos_class_t) oldState.getMaxPriority());
15491633
}
@@ -1818,9 +1902,28 @@ static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
18181902
_swift_task_enqueueOnExecutor(job, executorObject, executorType, wtable);
18191903
}
18201904

1905+
static void
1906+
swift_actor_escalate(DefaultActorImpl *actor, AsyncTask *task, JobPriority newPriority)
1907+
{
1908+
return actor->enqueueStealer(task, newPriority);
1909+
}
1910+
18211911
SWIFT_CC(swift)
18221912
void swift::swift_executor_escalate(ExecutorRef executor, AsyncTask *task,
18231913
JobPriority newPriority) {
1914+
if (executor.isGeneric()) {
1915+
// TODO (rokhinip): We'd push a stealer job for the task on the executor.
1916+
return;
1917+
}
1918+
1919+
if (executor.isDefaultActor()) {
1920+
return swift_actor_escalate(asImpl(executor.getDefaultActor()), task, newPriority);
1921+
}
1922+
1923+
// TODO (rokhinip): This is either the main actor or an actor with a custom
1924+
// executor. We need to let the executor know that the job has been escalated.
1925+
// For now, do nothing
1926+
return;
18241927
}
18251928

18261929
#define OVERRIDE_ACTOR COMPATIBILITY_OVERRIDE

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,7 @@ void swift::updateStatusRecord(AsyncTask *task, TaskStatusRecord *record,
481481

482482
SWIFT_TASK_DEBUG_LOG("Updating status record %p of task %p", record, task);
483483
withStatusRecordLock(task, status, [&](ActiveTaskStatus lockedStatus) {
484-
#if NDEBUG
484+
#ifndef NDEBUG
485485
bool foundRecord = false;
486486
for (auto cur: lockedStatus.records()) {
487487
if (cur == record) {

test/Concurrency/async_task_priority.swift

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,29 @@ func childTaskWaitingForEscalation(sem: DispatchSemaphore, basePri: TaskPriority
6868
let _ = await testNestedTaskPriority(basePri: basePri, curPri: curPri)
6969
}
7070

71+
actor Test {
72+
private var value = 0
73+
init() { }
74+
75+
func increment() -> Int {
76+
let cur = value
77+
value = value + 1
78+
return cur
79+
}
80+
81+
func blockActorThenIncrement(semToSignal: DispatchSemaphore, semToWait : DispatchSemaphore, priExpected: TaskPriority) -> Int {
82+
semToSignal.signal()
83+
84+
semToWait.wait();
85+
86+
sleep(1)
87+
// TODO: insert a test to verify that thread priority has actually escalated
88+
// to match priExpected
89+
return increment()
90+
}
91+
92+
}
93+
7194

7295
@main struct Main {
7396
static func main() async {
@@ -267,6 +290,33 @@ func childTaskWaitingForEscalation(sem: DispatchSemaphore, basePri: TaskPriority
267290
await task2.value
268291
}
269292

293+
tests.test("Task escalation of a task enqueued on an actor") {
294+
let task1Pri: TaskPriority = .background
295+
let task2Pri: TaskPriority = .background
296+
let parentPri: TaskPriority = Task.currentPriority
297+
298+
let sem1 = DispatchSemaphore(value: 0) // to unblock enqueue of task2
299+
let sem2 = DispatchSemaphore(value: 0)
300+
let testActor = Test()
301+
302+
let task1 = Task(priority: task1Pri) {
303+
expectedBasePri(priority: task1Pri);
304+
await testActor.blockActorThenIncrement(semToSignal: sem1, semToWait: sem2, priExpected: parentPri);
305+
}
306+
307+
sem1.wait() // Wait until task1 is on the actor
308+
309+
let task2 = Task(priority: task2Pri) {
310+
expectedBasePri(priority: task2Pri);
311+
await testActor.increment()
312+
}
313+
314+
sleep(1)
315+
sem2.signal() // task2 is probably enqueued on the actor at this point, unblock task1
316+
317+
await task2.value // Escalate task2 which should be queued behind task1 on the actor
318+
}
319+
270320
}
271321
await runAllTestsAsync()
272322
}

0 commit comments

Comments
 (0)