Skip to content

Commit b555b60

Browse files
committed
[Concurrency] Decode actor/task flags in signposts, make task_wait an interval, signpost continuations.
Decode all fields from the various flags values, pass each field as a separate argument to the various signposts. We were just passing the raw value of the flags and requiring the signpost client to decode them, which was ugly and required the client to know details they shouldn't need to know. Strip ptrauth bits from the task resume function when signposting, when ptrauth is supported. Add signpost events for continuation init/await/resume events. We also make task_wait into an interval, rather than a single event. The interval ends when the task resumes. As part of this change, we also skip emitting the interval when the wait completed immediately and the task didn't have to suspend. While we're in there, clean up a few SWIFT_TASK_DEBUG_LOG lines that emitted warnings when built with the logging enabled. rdar://88658803
1 parent 8da37b0 commit b555b60

File tree

6 files changed

+177
-48
lines changed

6 files changed

+177
-48
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,30 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
817817
return offsetof(ActiveActorStatus, DrainLock);
818818
}
819819
#endif
820+
821+
void traceStateChanged(HeapObject *actor) {
822+
// Convert our state to a consistent raw value. These values currently match
823+
// the enum values, but this explicit conversion provides room for change.
824+
uint8_t traceState = 255;
825+
switch (getActorState()) {
826+
case ActiveActorStatus::Idle:
827+
traceState = 0;
828+
break;
829+
case ActiveActorStatus::Scheduled:
830+
traceState = 1;
831+
break;
832+
case ActiveActorStatus::Running:
833+
traceState = 2;
834+
break;
835+
case ActiveActorStatus::Zombie_ReadyForDeallocation:
836+
traceState = 3;
837+
break;
838+
}
839+
concurrency::trace::actor_state_changed(
840+
actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(),
841+
traceState, isDistributedRemote(), isMaxPriorityEscalated(),
842+
static_cast<uint8_t>(getMaxPriority()));
843+
}
820844
};
821845

822846
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
@@ -943,7 +967,10 @@ class DefaultActorImpl : public HeapObject {
943967

944968
// Only for static assert use below, not for actual use otherwise
945969
static constexpr size_t offsetOfActiveActorStatus() {
970+
#pragma clang diagnostic push
971+
#pragma clang diagnostic ignored "-Winvalid-offsetof"
946972
return offsetof(DefaultActorImpl, StatusStorage);
973+
#pragma clang diagnostic pop
947974
}
948975

949976
private:
@@ -1124,11 +1151,10 @@ static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
11241151
static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor,
11251152
ActiveActorStatus oldState, ActiveActorStatus newState) {
11261153

1127-
SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %zx to %zx (%s)\n", actor,
1128-
oldState.getOpaqueFlags(), newState.getOpaqueFlags(), __FUNCTION__);
1129-
concurrency::trace::actor_state_changed(actor,
1130-
newState.getFirstJob().getRawJob(), newState.getFirstJob().needsPreprocessing(),
1131-
newState.getOpaqueFlags());
1154+
SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %#x to %#x (%s)\n", actor,
1155+
oldState.getOpaqueFlags(), newState.getOpaqueFlags(),
1156+
__FUNCTION__);
1157+
newState.traceStateChanged(actor);
11321158
}
11331159

11341160
void DefaultActorImpl::destroy() {
@@ -1192,7 +1218,9 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
11921218
swift_retain(this);
11931219
job = new ProcessOutOfLineJob(this, priority);
11941220
}
1195-
SWIFT_TASK_DEBUG_LOG("Scheduling processing job %p for actor %p at priority %#x", job, this, priority);
1221+
SWIFT_TASK_DEBUG_LOG(
1222+
"Scheduling processing job %p for actor %p at priority %#zx", job, this,
1223+
priority);
11961224
swift_task_enqueueGlobal(job);
11971225
}
11981226

@@ -1259,7 +1287,9 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12591287
// We can do relaxed loads here, we are just using the current head in the
12601288
// atomic state and linking that into the new job we are inserting, we don't
12611289
// need acquires
1262-
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#x", job, this, priority);
1290+
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#zx", job,
1291+
this, priority);
1292+
concurrency::trace::actor_enqueue(this, job);
12631293
auto oldState = _status().load(std::memory_order_relaxed);
12641294
while (true) {
12651295
auto newState = oldState;
@@ -1422,6 +1452,7 @@ Job * DefaultActorImpl::drainOne() {
14221452
/* failure */ std::memory_order_acquire)) {
14231453
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
14241454
traceActorStateTransition(this, oldState, newState);
1455+
concurrency::trace::actor_dequeue(this, firstJob);
14251456
return firstJob;
14261457
}
14271458

@@ -1650,7 +1681,8 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo,
16501681
/// do that in runOnAssumedThread.
16511682
static void giveUpThreadForSwitch(ExecutorRef currentExecutor) {
16521683
if (currentExecutor.isGeneric()) {
1653-
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p", currentExecutor);
1684+
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p",
1685+
currentExecutor.getIdentity());
16541686
return;
16551687
}
16561688

stdlib/public/Concurrency/Task.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
107107
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
108108
bool contextIntialized = false;
109109
while (true) {
110-
concurrency::trace::task_wait(
111-
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
112110
switch (queueHead.getStatus()) {
113111
case Status::Error:
114112
case Status::Success:
@@ -123,6 +121,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
123121
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, going to sleep",
124122
waitingTask, this);
125123
_swift_tsan_release(static_cast<Job *>(waitingTask));
124+
concurrency::trace::task_wait(
125+
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
126126
// Task is not complete. We'll need to add ourselves to the queue.
127127
break;
128128
}
@@ -233,6 +233,8 @@ void AsyncTask::completeFuture(AsyncContext *context) {
233233

234234
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
235235

236+
concurrency::trace::task_resume(waitingTask);
237+
236238
// Enqueue the waiter on the global executor.
237239
// TODO: allow waiters to fill in a suggested executor
238240
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
@@ -470,23 +472,27 @@ const void
470472
reinterpret_cast<void *>(task_future_wait_resume_adapter);
471473

472474
const void *AsyncTask::getResumeFunctionForLogging() {
475+
const void *result = reinterpret_cast<const void *>(ResumeTask);
476+
473477
if (ResumeTask == non_future_adapter) {
474478
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
475479
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
476-
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
480+
result =
481+
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
477482
} else if (ResumeTask == future_adapter) {
478483
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
479484
reinterpret_cast<char *>(ResumeContext) -
480485
sizeof(FutureAsyncContextPrefix));
481-
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
486+
result =
487+
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
482488
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
483489
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
484-
return reinterpret_cast<const void *>(context->ResumeParent);
490+
result = reinterpret_cast<const void *>(context->ResumeParent);
485491
} else if (ResumeTask == task_future_wait_resume_adapter) {
486-
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
492+
result = reinterpret_cast<const void *>(ResumeContext->ResumeParent);
487493
}
488494

489-
return reinterpret_cast<const void *>(ResumeTask);
495+
return __ptrauth_swift_runtime_function_entry_strip(result);
490496
}
491497

492498
JobPriority swift::swift_task_currentPriority(AsyncTask *task)
@@ -653,7 +659,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
653659
basePriority = JobPriority::Default;
654660
}
655661

656-
SWIFT_TASK_DEBUG_LOG("Task's base priority = %#x", basePriority);
662+
SWIFT_TASK_DEBUG_LOG("Task's base priority = %#zx", basePriority);
657663

658664
// Figure out the size of the header.
659665
size_t headerSize = sizeof(AsyncTask);
@@ -788,7 +794,9 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
788794
futureAsyncContextPrefix->indirectResult = futureFragment->getStoragePtr();
789795
}
790796

791-
SWIFT_TASK_DEBUG_LOG("creating task %p with parent %p at base pri %zu", task, parent, basePriority);
797+
SWIFT_TASK_DEBUG_LOG("creating task %p ID %" PRIu64
798+
" with parent %p at base pri %zu",
799+
task, task->getTaskId(), parent, basePriority);
792800

793801
// Initialize the task-local allocator.
794802
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
@@ -1059,6 +1067,8 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
10591067
task->ResumeContext = context;
10601068
task->ResumeTask = context->ResumeParent;
10611069

1070+
concurrency::trace::task_continuation_init(task, context);
1071+
10621072
return task;
10631073
}
10641074

@@ -1071,6 +1081,8 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
10711081
assert(task->ResumeTask == context->ResumeParent);
10721082
#endif
10731083

1084+
concurrency::trace::task_continuation_await(context);
1085+
10741086
auto &sync = context->AwaitSynchronization;
10751087

10761088
auto oldStatus = sync.load(std::memory_order_acquire);
@@ -1157,12 +1169,14 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
11571169
SWIFT_CC(swift)
11581170
static void swift_continuation_resumeImpl(AsyncTask *task) {
11591171
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1172+
concurrency::trace::task_continuation_resume(context, false);
11601173
resumeTaskAfterContinuation(task, context);
11611174
}
11621175

11631176
SWIFT_CC(swift)
11641177
static void swift_continuation_throwingResumeImpl(AsyncTask *task) {
11651178
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1179+
concurrency::trace::task_continuation_resume(context, false);
11661180
resumeTaskAfterContinuation(task, context);
11671181
}
11681182

@@ -1171,6 +1185,7 @@ SWIFT_CC(swift)
11711185
static void swift_continuation_throwingResumeWithErrorImpl(AsyncTask *task,
11721186
/* +1 */ SwiftError *error) {
11731187
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1188+
concurrency::trace::task_continuation_resume(context, true);
11741189
context->ErrorResult = error;
11751190
resumeTaskAfterContinuation(task, context);
11761191
}

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -507,7 +507,9 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus {
507507
}
508508

509509
void traceStatusChanged(AsyncTask *task) {
510-
concurrency::trace::task_status_changed(task, Flags);
510+
concurrency::trace::task_status_changed(
511+
task, static_cast<uint8_t>(getStoredPriority()), isCancelled(),
512+
isStoredPriorityEscalated(), isRunning(), isEnqueued());
511513
}
512514
};
513515

@@ -771,7 +773,10 @@ inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
771773

772774
// Set up task for enqueue to next location by setting the Job priority field
773775
Flags.setPriority(newStatus.getStoredPriority());
774-
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
776+
concurrency::trace::task_flags_changed(
777+
this, static_cast<uint8_t>(Flags.getPriority()), Flags.task_isChildTask(),
778+
Flags.task_isFuture(), Flags.task_isGroupChildTask(),
779+
Flags.task_isAsyncLetTask());
775780

776781
swift_task_enqueue(this, newExecutor);
777782
}

stdlib/public/Concurrency/Tracing.h

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
namespace swift {
2323
class AsyncLet;
2424
class AsyncTask;
25+
class ContinuationAsyncContext;
2526
class ExecutorRef;
2627
struct HeapObject;
2728
class Job;
@@ -43,10 +44,13 @@ void actor_enqueue(HeapObject *actor, Job *job);
4344

4445
void actor_dequeue(HeapObject *actor, Job *job);
4546

46-
// The `flags` parameter is the raw values of the actor's
47-
// DefaultActorImpl::State::Flags.
47+
// State values are:
48+
// Idle = 0, Scheduled = 1, Running = 2, Zombie_ReadyForDeallocation = 3,
49+
// invalid/unknown = 255
4850
void actor_state_changed(HeapObject *actor, Job *firstJob,
49-
bool needsPreprocessing, uintptr_t flags);
51+
bool needsPreprocessing, uint8_t state,
52+
bool isDistributedRemote, bool isPriorityEscalated,
53+
uint8_t maxPriority);
5054

5155
void actor_note_job_queue(HeapObject *actor, Job *first,
5256
Job *(*getNext)(Job *));
@@ -58,17 +62,28 @@ void task_create(AsyncTask *task, AsyncTask *parent, TaskGroup *group,
5862

5963
void task_destroy(AsyncTask *task);
6064

61-
// The `flags` parameter is the raw value of the ActiveTaskStatus::Flags field
62-
// in the task.
63-
void task_status_changed(AsyncTask *task, uintptr_t flags);
65+
void task_status_changed(AsyncTask *task, uint8_t maxPriority, bool isCancelled,
66+
bool isEscalated, bool isRunning, bool isEnqueued);
6467

65-
// The `flags` parameter is the raw value of Job::Flags.
66-
void task_flags_changed(AsyncTask *task, uint32_t flags);
68+
void task_flags_changed(AsyncTask *task, uint8_t jobPriority, bool isChildTask,
69+
bool isFuture, bool isGroupChildTask,
70+
bool isAsyncLetTask);
6771

6872
// The `status` parameter is the value of the corresponding
6973
// FutureFragment::Status.
7074
void task_wait(AsyncTask *task, AsyncTask *waitingOn, uintptr_t status);
7175

76+
void task_resume(AsyncTask *task);
77+
78+
// The context parameter is the context pointer used to create the continuation.
79+
// This same pointer will be passed to the corresponding call to
80+
// task_continuation_await and task_continuation_resume.
81+
void task_continuation_init(AsyncTask *task, ContinuationAsyncContext *context);
82+
83+
void task_continuation_await(ContinuationAsyncContext *context);
84+
85+
void task_continuation_resume(ContinuationAsyncContext *context, bool error);
86+
7287
void job_enqueue_global(Job *job);
7388

7489
void job_enqueue_global_with_delay(unsigned long long delay, Job *job);

0 commit comments

Comments
 (0)