Skip to content

Commit 534811c

Browse files
committed
When a task gets enqueued on an executor, start tracking which executor
it is enqueued on. This way, we have the necessary bookkeeping to escalate an executor when a task that is enqueued, is escalated. Radar-Id: rdar://problem/101864092
1 parent d773067 commit 534811c

File tree

4 files changed

+86
-79
lines changed

4 files changed

+86
-79
lines changed

include/swift/ABI/TaskStatus.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,11 @@ class TaskDependencyStatusRecord : public TaskStatusRecord {
369369
return record->getKind() == TaskStatusRecordKind::TaskDependency;
370370
}
371371

372+
void updateDependencyToEnqueuedOn(ExecutorRef executor) {
373+
DependencyKind = EnqueuedOnExecutor;
374+
DependentOn.Executor = executor;
375+
}
376+
372377
void performEscalationAction(JobPriority newPriority);
373378
};
374379

stdlib/public/Concurrency/Actor.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,19 +1808,21 @@ static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
18081808
return swift_task_enqueueGlobal(job);
18091809

18101810
if (executor.isDefaultActor()) {
1811-
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
1812-
assert(false && "Should not enqueue tasks on actors in actors as locks model");
1813-
#else
1814-
return asImpl(executor.getDefaultActor())->enqueue(job, job->getPriority());
1815-
#endif
1811+
return swift_defaultActor_enqueue(job, executor.getDefaultActor());
18161812
}
18171813

1814+
// For main actor or actors with custom executors
18181815
auto wtable = executor.getSerialExecutorWitnessTable();
18191816
auto executorObject = executor.getIdentity();
18201817
auto executorType = swift_getObjectType(executorObject);
18211818
_swift_task_enqueueOnExecutor(job, executorObject, executorType, wtable);
18221819
}
18231820

1821+
SWIFT_CC(swift)
1822+
void swift::swift_executor_escalate(ExecutorRef executor, AsyncTask *task,
1823+
JobPriority newPriority) {
1824+
}
1825+
18241826
#define OVERRIDE_ACTOR COMPATIBILITY_OVERRIDE
18251827
#include COMPATIBILITY_OVERRIDE_INCLUDE_PATH
18261828

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 72 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ _swift_task_getDispatchQueueSerialExecutorWitnessTable() {
122122
}
123123
#endif
124124

125+
// The task listed as argument is escalated to a new priority. Pass that
126+
// inforamtion along to the executor that it is enqueued into.
127+
SWIFT_CC(swift)
128+
void
129+
swift_executor_escalate(ExecutorRef executor, AsyncTask *task, JobPriority newPriority);
130+
125131
/*************** Methods for Status records manipulation ******************/
126132

127133
/// Add a status record to the input task.
@@ -839,17 +845,19 @@ inline void AsyncTask::flagAsRunning() {
839845

840846
if (!oldStatus.hasTaskDependency()) {
841847
SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this);
848+
assert(_private().dependencyRecord == nullptr);
849+
842850
while (true) {
843851
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
844-
// Task's priority is greater than the thread's - do a self escalation
845-
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
846-
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
847-
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
848-
overrideFloor, this, maxTaskPriority);
849-
850-
(void) swift_dispatch_thread_override_self(maxTaskPriority);
851-
overrideFloor = maxTaskPriority;
852-
}
852+
// Task's priority is greater than the thread's - do a self escalation
853+
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
854+
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
855+
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
856+
overrideFloor, this, maxTaskPriority);
857+
858+
(void) swift_dispatch_thread_override_self(maxTaskPriority);
859+
overrideFloor = maxTaskPriority;
860+
}
853861
#endif
854862
// Set self as executor and remove escalation bit if any - the task's
855863
// priority escalation has already been reflected on the thread.
@@ -923,83 +931,75 @@ inline void AsyncTask::flagAsRunning() {
923931
inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
924932
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
925933
assert(false && "Should not enqueue any tasks to execute in task-to-thread model");
926-
#else
934+
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
927935
auto oldStatus = _private()._status().load(std::memory_order_relaxed);
928-
JobPriority priority = JobPriority::Unspecified;
929-
930-
if (oldStatus.isRunning()) {
931-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() running to enqueued", this);
932-
// Case 1:
933-
// running -> enqueued
934-
// Most likely due to task running into actor contention
935-
// TODO: Need to record a new task dependency
936-
while (true) {
937-
// Drop execution lock and any override the thread might have received as
938-
// a result of executing it previously. Mark the task as being enqueued
939-
auto newStatus = oldStatus.withRunning(false);
940-
newStatus = newStatus.withoutStoredPriorityEscalation();
941-
newStatus = newStatus.withEnqueued();
936+
assert(!oldStatus.isEnqueued());
942937

943-
if (_private()._status().compare_exchange_weak(oldStatus, newStatus,
944-
/* success */std::memory_order_relaxed,
945-
/* failure */std::memory_order_relaxed)) {
946-
newStatus.traceStatusChanged(this);
947-
priority = newStatus.getStoredPriority();
948-
break;
949-
}
950-
}
951-
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
952-
// The thread was previously running the task, now that we aren't, we need
953-
// to remove any task escalation on the thread as a result of the task.
954-
if (oldStatus.isStoredPriorityEscalated()) {
955-
SWIFT_TASK_DEBUG_LOG("[Override] Reset override %#x on thread from task %p",
956-
oldStatus.getStoredPriority(), this);
957-
swift_dispatch_lock_override_end((qos_class_t) oldStatus.getStoredPriority());
958-
}
959-
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
960-
swift_task_exitThreadLocalContext((char *)&_private().ExclusivityAccessSet[0]);
961-
restoreTaskVoucher(this);
962-
963-
} else if (oldStatus.hasTaskDependency()) {
964-
// Case 2: suspended -> enqueued
965-
// Subcase 2a: Task had a dependency which is now cleared - remove task
966-
// dependency record and destroy it
967-
auto dependencyRecord = _private().dependencyRecord;
938+
if (!oldStatus.isRunning() && oldStatus.hasTaskDependency()) {
939+
// Task went from suspended --> enqueued and has a previous
940+
// dependency record.
941+
//
942+
// Atomically update the existing dependency record with new dependency
943+
// information.
944+
TaskDependencyStatusRecord *dependencyRecord = _private().dependencyRecord;
968945
assert(dependencyRecord != nullptr);
969-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() suspended to enqueued and remove dependency %p", this, dependencyRecord);
970946

971-
removeStatusRecord(this, dependencyRecord, [&](ActiveTaskStatus unused,
972-
ActiveTaskStatus& newStatus) {
947+
SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsAndEnqueueOnExecutor() and update dependencyRecord %p",
948+
this, dependencyRecord);
949+
950+
updateStatusRecord(this, dependencyRecord, [&] {
951+
952+
// Update dependency record to the new dependency
953+
dependencyRecord->updateDependencyToEnqueuedOn(newExecutor);
954+
955+
}, oldStatus, [&](ActiveTaskStatus unused, ActiveTaskStatus &newStatus) {
956+
957+
// Remove escalation bits + set enqueued bit
973958
newStatus = newStatus.withoutStoredPriorityEscalation();
974959
newStatus = newStatus.withEnqueued();
975-
newStatus = newStatus.withoutTaskDependency();
976-
977-
priority = newStatus.getStoredPriority();
960+
assert(newStatus.hasTaskDependency());
978961
});
979-
this->destroyTaskDependency(dependencyRecord);
980-
981962
} else {
982-
// Case 2: suspended -> enqueued
983-
// Subcase 2b: Task is newly created and enqueued to run - no task
984-
// dependency record present
985-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() suspended to enqueued with no dependency", this);
986-
auto newStatus = oldStatus;
987-
while (true) {
963+
// 2 subcases:
964+
// * Task went from running on this thread --> enqueued on executor
965+
// * Task went from suspended to enqueued on this executor and has no
966+
// dependency record (Eg. newly created)
967+
assert(_private().dependencyRecord == nullptr);
968+
969+
void *allocation = _swift_task_alloc_specific(this, sizeof(class TaskDependencyStatusRecord));
970+
TaskDependencyStatusRecord *dependencyRecord = _private().dependencyRecord = ::new (allocation) TaskDependencyStatusRecord(this, newExecutor);
971+
SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsAndEnqueueOnExecutor() with dependencyRecord %p", this,
972+
dependencyRecord);
973+
974+
addStatusRecord(this, dependencyRecord, oldStatus, [&](ActiveTaskStatus unused,
975+
ActiveTaskStatus &newStatus) {
976+
977+
newStatus = newStatus.withRunning(false);
988978
newStatus = newStatus.withoutStoredPriorityEscalation();
989979
newStatus = newStatus.withEnqueued();
980+
newStatus = newStatus.withTaskDependency();
990981

991-
if (_private()._status().compare_exchange_weak(oldStatus, newStatus,
992-
/* success */std::memory_order_relaxed,
993-
/* failure */std::memory_order_relaxed)) {
994-
newStatus.traceStatusChanged(this);
995-
priority = newStatus.getStoredPriority();
996-
break;
982+
return true;
983+
});
984+
985+
if (oldStatus.isRunning()) {
986+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
987+
// The thread was previously running the task, now that we aren't and
988+
// we've successfully escalated the thing the task is waiting on. We need
989+
// to remove any task escalation on the thread as a result of the task.
990+
if (oldStatus.isStoredPriorityEscalated()) {
991+
SWIFT_TASK_DEBUG_LOG("[Override] Reset override %#x on thread from task %p",
992+
oldStatus.getStoredPriority(), this);
993+
swift_dispatch_lock_override_end((qos_class_t) oldStatus.getStoredPriority());
997994
}
995+
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
996+
swift_task_exitThreadLocalContext((char *)&_private().ExclusivityAccessSet[0]);
997+
restoreTaskVoucher(this);
998998
}
999999
}
10001000

10011001
// Set up task for enqueue to next location by setting the Job priority field
1002-
Flags.setPriority(priority);
1002+
Flags.setPriority(oldStatus.getStoredPriority());
10031003
concurrency::trace::task_flags_changed(
10041004
this, static_cast<uint8_t>(Flags.getPriority()), Flags.task_isChildTask(),
10051005
Flags.task_isFuture(), Flags.task_isGroupChildTask(),
@@ -1036,9 +1036,8 @@ void AsyncTask::flagAsSuspended(TaskDependencyStatusRecord *dependencyStatusReco
10361036
// Note that we have to do this escalation while adding the status record
10371037
// and not after - we are not guaranteed to be able to have a valid
10381038
// reference to the dependencyStatusRecord or its contents, once we have
1039-
// published it in the ActiveTaskStatus.
1040-
SWIFT_TASK_DEBUG_LOG("[Dependency] Escalate the dependency %p of task %p",
1041-
dependencyStatusRecord, this);
1039+
// published it in the ActiveTaskStatus since someone else could
1040+
// concurrently made us runnable.
10421041
dependencyStatusRecord->performEscalationAction(newStatus.getStoredPriority());
10431042

10441043
// Always add the dependency status record
@@ -1062,7 +1061,6 @@ void AsyncTask::flagAsSuspended(TaskDependencyStatusRecord *dependencyStatusReco
10621061

10631062
inline void AsyncTask::destroyTaskDependency(TaskDependencyStatusRecord *dependencyRecord) {
10641063
assert(_private().dependencyRecord == dependencyRecord);
1065-
10661064
_swift_task_dealloc_specific(this, dependencyRecord);
10671065

10681066
_private().dependencyRecord = nullptr;

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -479,6 +479,7 @@ void swift::updateStatusRecord(AsyncTask *task, TaskStatusRecord *record,
479479
ActiveTaskStatus& status,
480480
llvm::function_ref<void(ActiveTaskStatus, ActiveTaskStatus&)>fn) {
481481

482+
SWIFT_TASK_DEBUG_LOG("Updating status record %p of task %p", record, task);
482483
withStatusRecordLock(task, status, [&](ActiveTaskStatus lockedStatus) {
483484
#if NDEBUG
484485
bool foundRecord = false;
@@ -841,6 +842,7 @@ void TaskDependencyStatusRecord::performEscalationAction(JobPriority newPriority
841842
case EnqueuedOnExecutor:
842843
SWIFT_TASK_DEBUG_LOG("[Dependency] Escalate dependent executor %p noted in %p record",
843844
this->DependentOn.Executor, this);
845+
swift_executor_escalate(this->DependentOn.Executor, this->WaitingTask, newPriority);
844846
break;
845847
}
846848
}

0 commit comments

Comments
 (0)