Skip to content

Commit a64b75e

Browse files
authored
Merge pull request #41808 from mikeash/more-signposts
[Concurrency] Decode actor/task flags in signposts, make task_wait an interval.
2 parents ce2c771 + b555b60 commit a64b75e

File tree

6 files changed

+177
-48
lines changed

6 files changed

+177
-48
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,30 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
817817
return offsetof(ActiveActorStatus, DrainLock);
818818
}
819819
#endif
820+
821+
void traceStateChanged(HeapObject *actor) {
822+
// Convert our state to a consistent raw value. These values currently match
823+
// the enum values, but this explicit conversion provides room for change.
824+
uint8_t traceState = 255;
825+
switch (getActorState()) {
826+
case ActiveActorStatus::Idle:
827+
traceState = 0;
828+
break;
829+
case ActiveActorStatus::Scheduled:
830+
traceState = 1;
831+
break;
832+
case ActiveActorStatus::Running:
833+
traceState = 2;
834+
break;
835+
case ActiveActorStatus::Zombie_ReadyForDeallocation:
836+
traceState = 3;
837+
break;
838+
}
839+
concurrency::trace::actor_state_changed(
840+
actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(),
841+
traceState, isDistributedRemote(), isMaxPriorityEscalated(),
842+
static_cast<uint8_t>(getMaxPriority()));
843+
}
820844
};
821845

822846
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
@@ -943,7 +967,10 @@ class DefaultActorImpl : public HeapObject {
943967

944968
// Only for static assert use below, not for actual use otherwise
945969
static constexpr size_t offsetOfActiveActorStatus() {
970+
#pragma clang diagnostic push
971+
#pragma clang diagnostic ignored "-Winvalid-offsetof"
946972
return offsetof(DefaultActorImpl, StatusStorage);
973+
#pragma clang diagnostic pop
947974
}
948975

949976
private:
@@ -1124,11 +1151,10 @@ static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
11241151
static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor,
11251152
ActiveActorStatus oldState, ActiveActorStatus newState) {
11261153

1127-
SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %zx to %zx (%s)\n", actor,
1128-
oldState.getOpaqueFlags(), newState.getOpaqueFlags(), __FUNCTION__);
1129-
concurrency::trace::actor_state_changed(actor,
1130-
newState.getFirstJob().getRawJob(), newState.getFirstJob().needsPreprocessing(),
1131-
newState.getOpaqueFlags());
1154+
SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %#x to %#x (%s)\n", actor,
1155+
oldState.getOpaqueFlags(), newState.getOpaqueFlags(),
1156+
__FUNCTION__);
1157+
newState.traceStateChanged(actor);
11321158
}
11331159

11341160
void DefaultActorImpl::destroy() {
@@ -1192,7 +1218,9 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
11921218
swift_retain(this);
11931219
job = new ProcessOutOfLineJob(this, priority);
11941220
}
1195-
SWIFT_TASK_DEBUG_LOG("Scheduling processing job %p for actor %p at priority %#x", job, this, priority);
1221+
SWIFT_TASK_DEBUG_LOG(
1222+
"Scheduling processing job %p for actor %p at priority %#zx", job, this,
1223+
priority);
11961224
swift_task_enqueueGlobal(job);
11971225
}
11981226

@@ -1259,7 +1287,9 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
12591287
// We can do relaxed loads here, we are just using the current head in the
12601288
// atomic state and linking that into the new job we are inserting, we don't
12611289
// need acquires
1262-
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#x", job, this, priority);
1290+
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#zx", job,
1291+
this, priority);
1292+
concurrency::trace::actor_enqueue(this, job);
12631293
auto oldState = _status().load(std::memory_order_relaxed);
12641294
while (true) {
12651295
auto newState = oldState;
@@ -1422,6 +1452,7 @@ Job * DefaultActorImpl::drainOne() {
14221452
/* failure */ std::memory_order_acquire)) {
14231453
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
14241454
traceActorStateTransition(this, oldState, newState);
1455+
concurrency::trace::actor_dequeue(this, firstJob);
14251456
return firstJob;
14261457
}
14271458

@@ -1650,7 +1681,8 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo,
16501681
/// do that in runOnAssumedThread.
16511682
static void giveUpThreadForSwitch(ExecutorRef currentExecutor) {
16521683
if (currentExecutor.isGeneric()) {
1653-
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p", currentExecutor);
1684+
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p",
1685+
currentExecutor.getIdentity());
16541686
return;
16551687
}
16561688

stdlib/public/Concurrency/Task.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -107,8 +107,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
107107
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
108108
bool contextIntialized = false;
109109
while (true) {
110-
concurrency::trace::task_wait(
111-
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
112110
switch (queueHead.getStatus()) {
113111
case Status::Error:
114112
case Status::Success:
@@ -123,6 +121,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
123121
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, going to sleep",
124122
waitingTask, this);
125123
_swift_tsan_release(static_cast<Job *>(waitingTask));
124+
concurrency::trace::task_wait(
125+
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
126126
// Task is not complete. We'll need to add ourselves to the queue.
127127
break;
128128
}
@@ -233,6 +233,8 @@ void AsyncTask::completeFuture(AsyncContext *context) {
233233

234234
_swift_tsan_acquire(static_cast<Job *>(waitingTask));
235235

236+
concurrency::trace::task_resume(waitingTask);
237+
236238
// Enqueue the waiter on the global executor.
237239
// TODO: allow waiters to fill in a suggested executor
238240
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
@@ -470,23 +472,27 @@ const void
470472
reinterpret_cast<void *>(task_future_wait_resume_adapter);
471473

472474
const void *AsyncTask::getResumeFunctionForLogging() {
475+
const void *result = reinterpret_cast<const void *>(ResumeTask);
476+
473477
if (ResumeTask == non_future_adapter) {
474478
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
475479
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
476-
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
480+
result =
481+
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
477482
} else if (ResumeTask == future_adapter) {
478483
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
479484
reinterpret_cast<char *>(ResumeContext) -
480485
sizeof(FutureAsyncContextPrefix));
481-
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
486+
result =
487+
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
482488
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
483489
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
484-
return reinterpret_cast<const void *>(context->ResumeParent);
490+
result = reinterpret_cast<const void *>(context->ResumeParent);
485491
} else if (ResumeTask == task_future_wait_resume_adapter) {
486-
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
492+
result = reinterpret_cast<const void *>(ResumeContext->ResumeParent);
487493
}
488494

489-
return reinterpret_cast<const void *>(ResumeTask);
495+
return __ptrauth_swift_runtime_function_entry_strip(result);
490496
}
491497

492498
JobPriority swift::swift_task_currentPriority(AsyncTask *task)
@@ -653,7 +659,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
653659
basePriority = JobPriority::Default;
654660
}
655661

656-
SWIFT_TASK_DEBUG_LOG("Task's base priority = %#x", basePriority);
662+
SWIFT_TASK_DEBUG_LOG("Task's base priority = %#zx", basePriority);
657663

658664
// Figure out the size of the header.
659665
size_t headerSize = sizeof(AsyncTask);
@@ -788,7 +794,9 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
788794
futureAsyncContextPrefix->indirectResult = futureFragment->getStoragePtr();
789795
}
790796

791-
SWIFT_TASK_DEBUG_LOG("creating task %p with parent %p at base pri %zu", task, parent, basePriority);
797+
SWIFT_TASK_DEBUG_LOG("creating task %p ID %" PRIu64
798+
" with parent %p at base pri %zu",
799+
task, task->getTaskId(), parent, basePriority);
792800

793801
// Initialize the task-local allocator.
794802
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
@@ -1059,6 +1067,8 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
10591067
task->ResumeContext = context;
10601068
task->ResumeTask = context->ResumeParent;
10611069

1070+
concurrency::trace::task_continuation_init(task, context);
1071+
10621072
return task;
10631073
}
10641074

@@ -1071,6 +1081,8 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
10711081
assert(task->ResumeTask == context->ResumeParent);
10721082
#endif
10731083

1084+
concurrency::trace::task_continuation_await(context);
1085+
10741086
auto &sync = context->AwaitSynchronization;
10751087

10761088
auto oldStatus = sync.load(std::memory_order_acquire);
@@ -1157,12 +1169,14 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
11571169
SWIFT_CC(swift)
11581170
static void swift_continuation_resumeImpl(AsyncTask *task) {
11591171
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1172+
concurrency::trace::task_continuation_resume(context, false);
11601173
resumeTaskAfterContinuation(task, context);
11611174
}
11621175

11631176
SWIFT_CC(swift)
11641177
static void swift_continuation_throwingResumeImpl(AsyncTask *task) {
11651178
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1179+
concurrency::trace::task_continuation_resume(context, false);
11661180
resumeTaskAfterContinuation(task, context);
11671181
}
11681182

@@ -1171,6 +1185,7 @@ SWIFT_CC(swift)
11711185
static void swift_continuation_throwingResumeWithErrorImpl(AsyncTask *task,
11721186
/* +1 */ SwiftError *error) {
11731187
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
1188+
concurrency::trace::task_continuation_resume(context, true);
11741189
context->ErrorResult = error;
11751190
resumeTaskAfterContinuation(task, context);
11761191
}

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -511,7 +511,9 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus {
511511
}
512512

513513
void traceStatusChanged(AsyncTask *task) {
514-
concurrency::trace::task_status_changed(task, Flags);
514+
concurrency::trace::task_status_changed(
515+
task, static_cast<uint8_t>(getStoredPriority()), isCancelled(),
516+
isStoredPriorityEscalated(), isRunning(), isEnqueued());
515517
}
516518
};
517519

@@ -775,7 +777,10 @@ inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {
775777

776778
// Set up task for enqueue to next location by setting the Job priority field
777779
Flags.setPriority(newStatus.getStoredPriority());
778-
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
780+
concurrency::trace::task_flags_changed(
781+
this, static_cast<uint8_t>(Flags.getPriority()), Flags.task_isChildTask(),
782+
Flags.task_isFuture(), Flags.task_isGroupChildTask(),
783+
Flags.task_isAsyncLetTask());
779784

780785
swift_task_enqueue(this, newExecutor);
781786
}

stdlib/public/Concurrency/Tracing.h

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
namespace swift {
2323
class AsyncLet;
2424
class AsyncTask;
25+
class ContinuationAsyncContext;
2526
class ExecutorRef;
2627
struct HeapObject;
2728
class Job;
@@ -43,10 +44,13 @@ void actor_enqueue(HeapObject *actor, Job *job);
4344

4445
void actor_dequeue(HeapObject *actor, Job *job);
4546

46-
// The `flags` parameter is the raw values of the actor's
47-
// DefaultActorImpl::State::Flags.
47+
// State values are:
48+
// Idle = 0, Scheduled = 1, Running = 2, Zombie_ReadyForDeallocation = 3,
49+
// invalid/unknown = 255
4850
void actor_state_changed(HeapObject *actor, Job *firstJob,
49-
bool needsPreprocessing, uintptr_t flags);
51+
bool needsPreprocessing, uint8_t state,
52+
bool isDistributedRemote, bool isPriorityEscalated,
53+
uint8_t maxPriority);
5054

5155
void actor_note_job_queue(HeapObject *actor, Job *first,
5256
Job *(*getNext)(Job *));
@@ -58,17 +62,28 @@ void task_create(AsyncTask *task, AsyncTask *parent, TaskGroup *group,
5862

5963
void task_destroy(AsyncTask *task);
6064

61-
// The `flags` parameter is the raw value of the ActiveTaskStatus::Flags field
62-
// in the task.
63-
void task_status_changed(AsyncTask *task, uintptr_t flags);
65+
void task_status_changed(AsyncTask *task, uint8_t maxPriority, bool isCancelled,
66+
bool isEscalated, bool isRunning, bool isEnqueued);
6467

65-
// The `flags` parameter is the raw value of Job::Flags.
66-
void task_flags_changed(AsyncTask *task, uint32_t flags);
68+
void task_flags_changed(AsyncTask *task, uint8_t jobPriority, bool isChildTask,
69+
bool isFuture, bool isGroupChildTask,
70+
bool isAsyncLetTask);
6771

6872
// The `status` parameter is the value of the corresponding
6973
// FutureFragment::Status.
7074
void task_wait(AsyncTask *task, AsyncTask *waitingOn, uintptr_t status);
7175

76+
void task_resume(AsyncTask *task);
77+
78+
// The context parameter is the context pointer used to create the continuation.
79+
// This same pointer will be passed to the corresponding call to
80+
// task_continuation_await and task_continuation_resume.
81+
void task_continuation_init(AsyncTask *task, ContinuationAsyncContext *context);
82+
83+
void task_continuation_await(ContinuationAsyncContext *context);
84+
85+
void task_continuation_resume(ContinuationAsyncContext *context, bool error);
86+
7287
void job_enqueue_global(Job *job);
7388

7489
void job_enqueue_global_with_delay(unsigned long long delay, Job *job);

0 commit comments

Comments
 (0)