Skip to content

Commit 8a930ca

Browse files
authored
Merge pull request #63919 from apple/rokhinip/105932276-track-task-enqueued-executor
Track the executor that a task is enqueued in, in dependency records
2 parents 7dd16da + 534811c commit 8a930ca

File tree

5 files changed

+146
-105
lines changed

5 files changed

+146
-105
lines changed

include/swift/ABI/Executor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ class ExecutorRef {
6767
: Identity(identity), Implementation(implementation) {}
6868

6969
public:
70+
7071
/// A generic execution environment. When running in a generic
7172
/// environment, it's presumed to be okay to switch synchronously
7273
/// to an actor. As an executor request, this represents a request

include/swift/ABI/TaskStatus.h

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
#include "swift/ABI/MetadataValues.h"
2424
#include "swift/ABI/Task.h"
25+
#include "swift/ABI/Executor.h"
2526
#include "swift/Runtime/HeapObject.h"
2627

2728
namespace swift {
@@ -278,22 +279,23 @@ class EscalationNotificationStatusRecord : public TaskStatusRecord {
278279
// This record is allocated for a task to record what it is dependent on before
279280
// the task can make progress again.
280281
class TaskDependencyStatusRecord : public TaskStatusRecord {
281-
// A word sized storage which references what this task is suspended waiting
282-
// for. Note that this is different from the waitQueue in the future fragment
283-
// of a task since that denotes all the tasks which this specific task, will
284-
// unblock.
282+
// A word sized storage which references what this task is waiting for. Note
283+
// that this is different from the waitQueue in the future fragment of a task
284+
// since that denotes all the tasks which this specific task, will unblock.
285285
//
286286
// This field is only really pointing to something valid when the
287-
// ActiveTaskStatus specifies that the task is suspended. It can be accessed
288-
// asynchronous to the task during escalation which will therefore require the
289-
// task status record lock for synchronization.
287+
// ActiveTaskStatus specifies that the task is suspended or enqueued. It can
288+
// be accessed asynchronous to the task during escalation which will therefore
289+
// require the task status record lock for synchronization.
290290
//
291291
// When a task has TaskDependencyStatusRecord in the status record list, it
292292
// must be the innermost status record, barring the status record lock which
293293
// could be taken while this record is present.
294294
//
295295
// The type of thing we are waiting on, is specified in the enum below
296-
union {
296+
union Dependent {
297+
constexpr Dependent() {}
298+
297299
// This task is suspended waiting on another task. This could be an async
298300
// let child task or it could be another unstructured task.
299301
AsyncTask *Task;
@@ -310,40 +312,68 @@ class TaskDependencyStatusRecord : public TaskStatusRecord {
310312
// the duration of the wait. We do not need to take an additional +1 on this
311313
// task group in this dependency record.
312314
TaskGroup *TaskGroup;
313-
} WaitingOn;
315+
316+
// The task is enqueued waiting on an executor. It could be any kind of
317+
// executor - the generic executor, the default actor's executor, or an
318+
// actor with a custom executor.
319+
//
320+
// This information is helpful to know *where* a task is enqueued into
321+
// (potentially intrusively), so that the appropriate escalation effect
322+
// (which may be different for each type of executor) can happen if a task
323+
// is escalated while enqueued.
324+
ExecutorRef Executor;
325+
} DependentOn;
314326

315327
// Enum specifying the type of dependency this task has
316328
enum DependencyKind {
317329
WaitingOnTask = 1,
318330
WaitingOnContinuation,
319331
WaitingOnTaskGroup,
332+
333+
EnqueuedOnExecutor,
320334
} DependencyKind;
321335

336+
// The task that has this task status record - ie a backpointer from the
337+
// record to the task with the record. This is not its own +1, we rely on the
338+
// fact that since this status record is linked into a task, the task is
339+
// already alive and maintained by someone and we can safely borrow the
340+
// reference.
341+
AsyncTask *WaitingTask;
342+
322343
public:
323-
TaskDependencyStatusRecord(AsyncTask *task) :
344+
TaskDependencyStatusRecord(AsyncTask *waitingTask, AsyncTask *task) :
324345
TaskStatusRecord(TaskStatusRecordKind::TaskDependency),
325-
DependencyKind(WaitingOnTask) {
326-
WaitingOn.Task = task;
346+
DependencyKind(WaitingOnTask), WaitingTask(waitingTask) {
347+
DependentOn.Task = task;
327348
}
328349

329-
TaskDependencyStatusRecord(ContinuationAsyncContext *context) :
350+
TaskDependencyStatusRecord(AsyncTask *waitingTask, ContinuationAsyncContext *context) :
330351
TaskStatusRecord(TaskStatusRecordKind::TaskDependency),
331-
DependencyKind(WaitingOnContinuation) {
332-
WaitingOn.Continuation = context;
352+
DependencyKind(WaitingOnContinuation), WaitingTask(waitingTask) {
353+
DependentOn.Continuation = context;
333354
}
334355

335-
TaskDependencyStatusRecord(TaskGroup *taskGroup) :
356+
TaskDependencyStatusRecord(AsyncTask *waitingTask, TaskGroup *taskGroup) :
336357
TaskStatusRecord(TaskStatusRecordKind::TaskDependency),
337-
DependencyKind(WaitingOnTaskGroup) {
338-
WaitingOn.TaskGroup = taskGroup;
358+
DependencyKind(WaitingOnTaskGroup), WaitingTask(waitingTask){
359+
DependentOn.TaskGroup = taskGroup;
339360
}
340361

341-
void destroy() { }
362+
TaskDependencyStatusRecord(AsyncTask *waitingTask, ExecutorRef executor) :
363+
TaskStatusRecord(TaskStatusRecordKind::TaskDependency),
364+
DependencyKind(EnqueuedOnExecutor), WaitingTask(waitingTask) {
365+
DependentOn.Executor = executor;
366+
}
342367

343368
static bool classof(const TaskStatusRecord *record) {
344369
return record->getKind() == TaskStatusRecordKind::TaskDependency;
345370
}
346371

372+
void updateDependencyToEnqueuedOn(ExecutorRef executor) {
373+
DependencyKind = EnqueuedOnExecutor;
374+
DependentOn.Executor = executor;
375+
}
376+
347377
void performEscalationAction(JobPriority newPriority);
348378
};
349379

stdlib/public/Concurrency/Actor.cpp

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,19 +1808,21 @@ static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
18081808
return swift_task_enqueueGlobal(job);
18091809

18101810
if (executor.isDefaultActor()) {
1811-
#if SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
1812-
assert(false && "Should not enqueue tasks on actors in actors as locks model");
1813-
#else
1814-
return asImpl(executor.getDefaultActor())->enqueue(job, job->getPriority());
1815-
#endif
1811+
return swift_defaultActor_enqueue(job, executor.getDefaultActor());
18161812
}
18171813

1814+
// For main actor or actors with custom executors
18181815
auto wtable = executor.getSerialExecutorWitnessTable();
18191816
auto executorObject = executor.getIdentity();
18201817
auto executorType = swift_getObjectType(executorObject);
18211818
_swift_task_enqueueOnExecutor(job, executorObject, executorType, wtable);
18221819
}
18231820

1821+
SWIFT_CC(swift)
1822+
void swift::swift_executor_escalate(ExecutorRef executor, AsyncTask *task,
1823+
JobPriority newPriority) {
1824+
}
1825+
18241826
#define OVERRIDE_ACTOR COMPATIBILITY_OVERRIDE
18251827
#include COMPATIBILITY_OVERRIDE_INCLUDE_PATH
18261828

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 75 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ _swift_task_getDispatchQueueSerialExecutorWitnessTable() {
122122
}
123123
#endif
124124

125+
// The task listed as argument is escalated to a new priority. Pass that
126+
// inforamtion along to the executor that it is enqueued into.
127+
SWIFT_CC(swift)
128+
void
129+
swift_executor_escalate(ExecutorRef executor, AsyncTask *task, JobPriority newPriority);
130+
125131
/*************** Methods for Status records manipulation ******************/
126132

127133
/// Add a status record to the input task.
@@ -839,17 +845,19 @@ inline void AsyncTask::flagAsRunning() {
839845

840846
if (!oldStatus.hasTaskDependency()) {
841847
SWIFT_TASK_DEBUG_LOG("%p->flagAsRunning() with no task dependency", this);
848+
assert(_private().dependencyRecord == nullptr);
849+
842850
while (true) {
843851
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
844-
// Task's priority is greater than the thread's - do a self escalation
845-
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
846-
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
847-
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
848-
overrideFloor, this, maxTaskPriority);
849-
850-
(void) swift_dispatch_thread_override_self(maxTaskPriority);
851-
overrideFloor = maxTaskPriority;
852-
}
852+
// Task's priority is greater than the thread's - do a self escalation
853+
qos_class_t maxTaskPriority = (qos_class_t) oldStatus.getStoredPriority();
854+
if (threadOverrideInfo.can_override && (maxTaskPriority > overrideFloor)) {
855+
SWIFT_TASK_DEBUG_LOG("[Override] Self-override thread with oq_floor %#x to match task %p's max priority %#x",
856+
overrideFloor, this, maxTaskPriority);
857+
858+
(void) swift_dispatch_thread_override_self(maxTaskPriority);
859+
overrideFloor = maxTaskPriority;
860+
}
853861
#endif
854862
// Set self as executor and remove escalation bit if any - the task's
855863
// priority escalation has already been reflected on the thread.
@@ -923,83 +931,75 @@ inline void AsyncTask::flagAsRunning() {
923931
inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
924932
#if SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL
925933
assert(false && "Should not enqueue any tasks to execute in task-to-thread model");
926-
#else
934+
#else /* SWIFT_CONCURRENCY_TASK_TO_THREAD_MODEL */
927935
auto oldStatus = _private()._status().load(std::memory_order_relaxed);
928-
JobPriority priority = JobPriority::Unspecified;
929-
930-
if (oldStatus.isRunning()) {
931-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() running to enqueued", this);
932-
// Case 1:
933-
// running -> enqueued
934-
// Most likely due to task running into actor contention
935-
// TODO: Need to record a new task dependency
936-
while (true) {
937-
// Drop execution lock and any override the thread might have received as
938-
// a result of executing it previously. Mark the task as being enqueued
939-
auto newStatus = oldStatus.withRunning(false);
940-
newStatus = newStatus.withoutStoredPriorityEscalation();
941-
newStatus = newStatus.withEnqueued();
936+
assert(!oldStatus.isEnqueued());
942937

943-
if (_private()._status().compare_exchange_weak(oldStatus, newStatus,
944-
/* success */std::memory_order_relaxed,
945-
/* failure */std::memory_order_relaxed)) {
946-
newStatus.traceStatusChanged(this);
947-
priority = newStatus.getStoredPriority();
948-
break;
949-
}
950-
}
951-
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
952-
// The thread was previously running the task, now that we aren't, we need
953-
// to remove any task escalation on the thread as a result of the task.
954-
if (oldStatus.isStoredPriorityEscalated()) {
955-
SWIFT_TASK_DEBUG_LOG("[Override] Reset override %#x on thread from task %p",
956-
oldStatus.getStoredPriority(), this);
957-
swift_dispatch_lock_override_end((qos_class_t) oldStatus.getStoredPriority());
958-
}
959-
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
960-
swift_task_exitThreadLocalContext((char *)&_private().ExclusivityAccessSet[0]);
961-
restoreTaskVoucher(this);
962-
963-
} else if (oldStatus.hasTaskDependency()) {
964-
// Case 2: suspended -> enqueued
965-
// Subcase 2a: Task had a dependency which is now cleared - remove task
966-
// dependency record and destroy it
967-
auto dependencyRecord = _private().dependencyRecord;
938+
if (!oldStatus.isRunning() && oldStatus.hasTaskDependency()) {
939+
// Task went from suspended --> enqueued and has a previous
940+
// dependency record.
941+
//
942+
// Atomically update the existing dependency record with new dependency
943+
// information.
944+
TaskDependencyStatusRecord *dependencyRecord = _private().dependencyRecord;
968945
assert(dependencyRecord != nullptr);
969-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() suspended to enqueued and remove dependency %p", this, dependencyRecord);
970946

971-
removeStatusRecord(this, dependencyRecord, [&](ActiveTaskStatus unused,
972-
ActiveTaskStatus& newStatus) {
947+
SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsAndEnqueueOnExecutor() and update dependencyRecord %p",
948+
this, dependencyRecord);
949+
950+
updateStatusRecord(this, dependencyRecord, [&] {
951+
952+
// Update dependency record to the new dependency
953+
dependencyRecord->updateDependencyToEnqueuedOn(newExecutor);
954+
955+
}, oldStatus, [&](ActiveTaskStatus unused, ActiveTaskStatus &newStatus) {
956+
957+
// Remove escalation bits + set enqueued bit
973958
newStatus = newStatus.withoutStoredPriorityEscalation();
974959
newStatus = newStatus.withEnqueued();
975-
newStatus = newStatus.withoutTaskDependency();
976-
977-
priority = newStatus.getStoredPriority();
960+
assert(newStatus.hasTaskDependency());
978961
});
979-
this->destroyTaskDependency(dependencyRecord);
980-
981962
} else {
982-
// Case 2: suspended -> enqueued
983-
// Subcase 2b: Task is newly created and enqueued to run - no task
984-
// dependency record present
985-
SWIFT_TASK_DEBUG_LOG("%p->flagAsAndEnqueueOnExecutor() suspended to enqueued with no dependency", this);
986-
auto newStatus = oldStatus;
987-
while (true) {
963+
// 2 subcases:
964+
// * Task went from running on this thread --> enqueued on executor
965+
// * Task went from suspended to enqueued on this executor and has no
966+
// dependency record (Eg. newly created)
967+
assert(_private().dependencyRecord == nullptr);
968+
969+
void *allocation = _swift_task_alloc_specific(this, sizeof(class TaskDependencyStatusRecord));
970+
TaskDependencyStatusRecord *dependencyRecord = _private().dependencyRecord = ::new (allocation) TaskDependencyStatusRecord(this, newExecutor);
971+
SWIFT_TASK_DEBUG_LOG("[Dependency] %p->flagAsAndEnqueueOnExecutor() with dependencyRecord %p", this,
972+
dependencyRecord);
973+
974+
addStatusRecord(this, dependencyRecord, oldStatus, [&](ActiveTaskStatus unused,
975+
ActiveTaskStatus &newStatus) {
976+
977+
newStatus = newStatus.withRunning(false);
988978
newStatus = newStatus.withoutStoredPriorityEscalation();
989979
newStatus = newStatus.withEnqueued();
980+
newStatus = newStatus.withTaskDependency();
990981

991-
if (_private()._status().compare_exchange_weak(oldStatus, newStatus,
992-
/* success */std::memory_order_relaxed,
993-
/* failure */std::memory_order_relaxed)) {
994-
newStatus.traceStatusChanged(this);
995-
priority = newStatus.getStoredPriority();
996-
break;
982+
return true;
983+
});
984+
985+
if (oldStatus.isRunning()) {
986+
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
987+
// The thread was previously running the task, now that we aren't and
988+
// we've successfully escalated the thing the task is waiting on. We need
989+
// to remove any task escalation on the thread as a result of the task.
990+
if (oldStatus.isStoredPriorityEscalated()) {
991+
SWIFT_TASK_DEBUG_LOG("[Override] Reset override %#x on thread from task %p",
992+
oldStatus.getStoredPriority(), this);
993+
swift_dispatch_lock_override_end((qos_class_t) oldStatus.getStoredPriority());
997994
}
995+
#endif /* SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION */
996+
swift_task_exitThreadLocalContext((char *)&_private().ExclusivityAccessSet[0]);
997+
restoreTaskVoucher(this);
998998
}
999999
}
10001000

10011001
// Set up task for enqueue to next location by setting the Job priority field
1002-
Flags.setPriority(priority);
1002+
Flags.setPriority(oldStatus.getStoredPriority());
10031003
concurrency::trace::task_flags_changed(
10041004
this, static_cast<uint8_t>(Flags.getPriority()), Flags.task_isChildTask(),
10051005
Flags.task_isFuture(), Flags.task_isGroupChildTask(),
@@ -1036,9 +1036,8 @@ void AsyncTask::flagAsSuspended(TaskDependencyStatusRecord *dependencyStatusReco
10361036
// Note that we have to do this escalation while adding the status record
10371037
// and not after - we are not guaranteed to be able to have a valid
10381038
// reference to the dependencyStatusRecord or its contents, once we have
1039-
// published it in the ActiveTaskStatus.
1040-
SWIFT_TASK_DEBUG_LOG("[Dependency] Escalate the dependency %p of task %p",
1041-
dependencyStatusRecord, this);
1039+
// published it in the ActiveTaskStatus since someone else could
1040+
// concurrently made us runnable.
10421041
dependencyStatusRecord->performEscalationAction(newStatus.getStoredPriority());
10431042

10441043
// Always add the dependency status record
@@ -1062,8 +1061,6 @@ void AsyncTask::flagAsSuspended(TaskDependencyStatusRecord *dependencyStatusReco
10621061

10631062
inline void AsyncTask::destroyTaskDependency(TaskDependencyStatusRecord *dependencyRecord) {
10641063
assert(_private().dependencyRecord == dependencyRecord);
1065-
1066-
dependencyRecord->destroy();
10671064
_swift_task_dealloc_specific(this, dependencyRecord);
10681065

10691066
_private().dependencyRecord = nullptr;
@@ -1075,7 +1072,7 @@ inline void AsyncTask::flagAsSuspendedOnTask(AsyncTask *task) {
10751072
assert(_private().dependencyRecord == nullptr);
10761073

10771074
void *allocation = _swift_task_alloc_specific(this, sizeof(class TaskDependencyStatusRecord));
1078-
auto record = ::new (allocation) TaskDependencyStatusRecord(task);
1075+
auto record = ::new (allocation) TaskDependencyStatusRecord(this, task);
10791076
SWIFT_TASK_DEBUG_LOG("[Dependency] Create a dependencyRecord %p for dependency on task %p", allocation, task);
10801077
_private().dependencyRecord = record;
10811078

@@ -1086,7 +1083,7 @@ inline void AsyncTask::flagAsSuspendedOnContinuation(ContinuationAsyncContext *c
10861083
assert(_private().dependencyRecord == nullptr);
10871084

10881085
void *allocation = _swift_task_alloc_specific(this, sizeof(class TaskDependencyStatusRecord));
1089-
auto record = ::new (allocation) TaskDependencyStatusRecord(context);
1086+
auto record = ::new (allocation) TaskDependencyStatusRecord(this, context);
10901087
SWIFT_TASK_DEBUG_LOG("[Dependency] Create a dependencyRecord %p for dependency on continuation %p", allocation, context);
10911088
_private().dependencyRecord = record;
10921089

@@ -1097,7 +1094,7 @@ inline void AsyncTask::flagAsSuspendedOnTaskGroup(TaskGroup *taskGroup) {
10971094
assert(_private().dependencyRecord == nullptr);
10981095

10991096
void *allocation = _swift_task_alloc_specific(this, sizeof(class TaskDependencyStatusRecord));
1100-
auto record = ::new (allocation) TaskDependencyStatusRecord(taskGroup);
1097+
auto record = ::new (allocation) TaskDependencyStatusRecord(this, taskGroup);
11011098
SWIFT_TASK_DEBUG_LOG("[Dependency] Create a dependencyRecord %p for dependency on taskGroup %p", allocation, taskGroup);
11021099
_private().dependencyRecord = record;
11031100

0 commit comments

Comments
 (0)