Skip to content

Commit e935692

Browse files
authored
Merge pull request #40070 from mikeash/concurrency-tracing
[Concurrency] Add tracing for major operations in the concurrency runtime.
2 parents 7c63b8a + 0977920 commit e935692

File tree

13 files changed

+608
-16
lines changed

13 files changed

+608
-16
lines changed

include/swift/ABI/Executor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ class ExecutorRef {
124124
/// Is this executor the main executor?
125125
bool isMainExecutor() const;
126126

127+
/// Get the raw value of the Implementation field, for tracing.
128+
uintptr_t getRawImplementation() { return Implementation; }
129+
127130
bool operator==(ExecutorRef other) const {
128131
return Identity == other.Identity;
129132
}

include/swift/ABI/Task.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,14 @@ class AsyncTask : public Job {
272272

273273
/// Set the task's ID field to the next task ID.
274274
void setTaskId();
275+
uint64_t getTaskId();
276+
277+
/// Get the task's resume function, for logging purposes only. This will
278+
/// attempt to see through the various adapters that are sometimes used, and
279+
/// failing that will return ResumeTask. The returned function pointer may
280+
/// have a different signature than ResumeTask, and it's only for identifying
281+
/// code associated with the task.
282+
const void *getResumeFunctionForLogging();
275283

276284
/// Given that we've already fully established the job context
277285
/// in the current thread, start running this task. To establish

stdlib/public/Concurrency/Actor.cpp

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,9 @@ class JobRef {
523523
return reinterpret_cast<Job*>(Value);
524524
}
525525

526+
/// Get the Job pointer with no preconditions on its type, for tracing.
527+
Job *getRawJob() const { return reinterpret_cast<Job *>(Value & JobMask); }
528+
526529
bool operator==(JobRef other) const {
527530
return Value == other.Value;
528531
}
@@ -685,6 +688,7 @@ class DefaultActorImpl : public HeapObject {
685688
flags.setIsDistributedRemote(isDistributedRemote);
686689
new (&CurrentState) swift::atomic<State>(State{JobRef(), flags});
687690
JobStorageHeapObject.metadata = nullptr;
691+
concurrency::trace::actor_create(this);
688692
}
689693

690694
/// Properly destruct an actor, except for the heap header.
@@ -766,8 +770,10 @@ void DefaultActorImpl::destroy() {
766770
newState.Flags.setStatus(Status::Zombie_Latching);
767771
if (CurrentState.compare_exchange_weak(oldState, newState,
768772
std::memory_order_relaxed,
769-
std::memory_order_relaxed))
773+
std::memory_order_relaxed)) {
774+
concurrency::trace::actor_destroy(this);
770775
return;
776+
}
771777
}
772778
}
773779

@@ -793,6 +799,8 @@ void DefaultActorImpl::deallocate() {
793799
}
794800

795801
void DefaultActorImpl::deallocateUnconditional() {
802+
concurrency::trace::actor_deallocate(this);
803+
796804
if (JobStorageHeapObject.metadata != nullptr)
797805
JobStorage.~ProcessInlineJob();
798806
auto metadata = cast<ClassMetadata>(this->metadata);
@@ -924,12 +932,19 @@ static Job *preprocessQueue(JobRef first,
924932
return firstNewJob;
925933
}
926934

935+
static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
936+
concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) {
937+
return getNextJobInQueue(job).getAsPreprocessedJob();
938+
});
939+
}
940+
927941
void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
928942
SWIFT_TASK_DEBUG_LOG("giving up thread for actor %p", this);
929943
auto oldState = CurrentState.load(std::memory_order_acquire);
930944
assert(oldState.Flags.isAnyRunningStatus());
931945

932946
auto firstNewJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
947+
traceJobQueue(this, firstNewJob);
933948

934949
_swift_tsan_release(this);
935950
while (true) {
@@ -976,16 +991,23 @@ void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
976991
firstNewJob = preprocessQueue(oldState.FirstJob,
977992
firstPreprocessed,
978993
firstNewJob);
994+
traceJobQueue(this, firstNewJob);
979995

980996
// Try again.
981997
continue;
982998
}
983999

984-
#define LOG_STATE_TRANSITION \
985-
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
986-
oldState.Flags.getOpaqueValue(), \
987-
newState.Flags.getOpaqueValue(), __FUNCTION__)
988-
LOG_STATE_TRANSITION;
1000+
#define ACTOR_STATE_TRANSITION \
1001+
do { \
1002+
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
1003+
oldState.Flags.getOpaqueValue(), \
1004+
newState.Flags.getOpaqueValue(), __FUNCTION__); \
1005+
concurrency::trace::actor_state_changed( \
1006+
this, newState.FirstJob.getRawJob(), \
1007+
newState.FirstJob.needsPreprocessing(), \
1008+
newState.Flags.getOpaqueValue()); \
1009+
} while (0)
1010+
ACTOR_STATE_TRANSITION;
9891011

9901012
// The priority of the remaining work.
9911013
auto newPriority = newState.Flags.getMaxPriority();
@@ -1038,7 +1060,8 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
10381060
auto success = CurrentState.compare_exchange_weak(oldState, newState,
10391061
/*success*/ std::memory_order_relaxed,
10401062
/*failure*/ std::memory_order_acquire);
1041-
if (success) LOG_STATE_TRANSITION;
1063+
if (success)
1064+
ACTOR_STATE_TRANSITION;
10421065
return success;
10431066
};
10441067

@@ -1080,7 +1103,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
10801103
/*success*/ std::memory_order_relaxed,
10811104
/*failure*/ std::memory_order_acquire))
10821105
continue;
1083-
LOG_STATE_TRANSITION;
1106+
ACTOR_STATE_TRANSITION;
10841107
_swift_tsan_acquire(this);
10851108

10861109
// If that succeeded, we can proceed to the main body.
@@ -1098,6 +1121,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
10981121
// Okay, now it's safe to look at queue state.
10991122
// Preprocess any queue items at the front of the queue.
11001123
auto newFirstJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
1124+
traceJobQueue(this, newFirstJob);
11011125

11021126
_swift_tsan_release(this);
11031127
while (true) {
@@ -1142,11 +1166,12 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
11421166
newFirstJob = preprocessQueue(oldState.FirstJob,
11431167
firstPreprocessed,
11441168
newFirstJob);
1169+
traceJobQueue(this, newFirstJob);
11451170

11461171
// Loop to retry updating the state.
11471172
continue;
11481173
}
1149-
LOG_STATE_TRANSITION;
1174+
ACTOR_STATE_TRANSITION;
11501175

11511176
// We successfully updated the state.
11521177

@@ -1178,10 +1203,12 @@ static void swift_job_runImpl(Job *job, ExecutorRef executor) {
11781203
if (!executor.isGeneric()) trackingInfo.disallowSwitching();
11791204

11801205
trackingInfo.enterAndShadow(executor);
1206+
auto traceHandle = concurrency::trace::job_run_begin(job, &executor);
11811207

11821208
SWIFT_TASK_DEBUG_LOG("%s(%p)", __func__, job);
11831209
runJobInEstablishedExecutorContext(job);
11841210

1211+
concurrency::trace::job_run_end(job, &executor, traceHandle);
11851212
trackingInfo.leave();
11861213

11871214
// Give up the current executor if this is a switching context
@@ -1235,6 +1262,7 @@ static void processDefaultActor(DefaultActorImpl *currentActor,
12351262

12361263
SWIFT_TASK_DEBUG_LOG("processDefaultActor %p claimed job %p", currentActor,
12371264
job);
1265+
concurrency::trace::actor_dequeue(currentActor, job);
12381266

12391267
// If we failed to claim a job, we have nothing to do.
12401268
if (!job) {
@@ -1349,7 +1377,8 @@ void DefaultActorImpl::enqueue(Job *job) {
13491377
/*success*/ std::memory_order_release,
13501378
/*failure*/ std::memory_order_relaxed))
13511379
continue;
1352-
LOG_STATE_TRANSITION;
1380+
ACTOR_STATE_TRANSITION;
1381+
concurrency::trace::actor_enqueue(this, job);
13531382

13541383
// Okay, we successfully updated the status. Schedule a job to
13551384
// process the actor if necessary.
@@ -1379,7 +1408,7 @@ bool DefaultActorImpl::tryAssumeThread(RunningJobInfo runner) {
13791408
if (CurrentState.compare_exchange_weak(oldState, newState,
13801409
/*success*/ std::memory_order_relaxed,
13811410
/*failure*/ std::memory_order_acquire)) {
1382-
LOG_STATE_TRANSITION;
1411+
ACTOR_STATE_TRANSITION;
13831412
_swift_tsan_acquire(this);
13841413
return true;
13851414
}

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
103103
TaskLocal.swift
104104
TaskSleep.swift
105105
ThreadSanitizer.cpp
106+
TracingSignpost.cpp
106107
Mutex.cpp
107108
AsyncStreamBuffer.swift
108109
AsyncStream.swift

stdlib/public/Concurrency/GlobalExecutor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,17 @@ void (*swift::swift_task_enqueueMainExecutor_hook)(
8585
void swift::swift_task_enqueueGlobal(Job *job) {
8686
_swift_tsan_release(job);
8787

88+
concurrency::trace::job_enqueue_global(job);
89+
8890
if (swift_task_enqueueGlobal_hook)
8991
swift_task_enqueueGlobal_hook(job, swift_task_enqueueGlobalImpl);
9092
else
9193
swift_task_enqueueGlobalImpl(job);
9294
}
9395

9496
void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
97+
concurrency::trace::job_enqueue_global_with_delay(delay, job);
98+
9599
if (swift_task_enqueueGlobalWithDelay_hook)
96100
swift_task_enqueueGlobalWithDelay_hook(
97101
delay, job, swift_task_enqueueGlobalWithDelayImpl);
@@ -100,6 +104,7 @@ void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
100104
}
101105

102106
void swift::swift_task_enqueueMainExecutor(Job *job) {
107+
concurrency::trace::job_enqueue_main_executor(job);
103108
if (swift_task_enqueueMainExecutor_hook)
104109
swift_task_enqueueMainExecutor_hook(job,
105110
swift_task_enqueueMainExecutorImpl);

stdlib/public/Concurrency/Task.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "swift/Runtime/HeapObject.h"
2525
#include "TaskGroupPrivate.h"
2626
#include "TaskPrivate.h"
27+
#include "Tracing.h"
2728
#include "Debug.h"
2829
#include "Error.h"
2930

@@ -102,6 +103,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
102103
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
103104
bool contextIntialized = false;
104105
while (true) {
106+
concurrency::trace::task_wait(
107+
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
105108
switch (queueHead.getStatus()) {
106109
case Status::Error:
107110
case Status::Success:
@@ -245,6 +248,8 @@ AsyncTask::~AsyncTask() {
245248
}
246249

247250
Private.destroy();
251+
252+
concurrency::trace::task_destroy(this);
248253
}
249254

250255
void AsyncTask::setTaskId() {
@@ -260,6 +265,12 @@ void AsyncTask::setTaskId() {
260265
_private().Id = (Fetched >> 32) & 0xffffffff;
261266
}
262267

268+
uint64_t AsyncTask::getTaskId() {
269+
// Reconstitute a full 64-bit task ID from the 32-bit job ID and the upper
270+
// 32 bits held in _private().
271+
return (uint64_t)Id << _private().Id;
272+
}
273+
263274
SWIFT_CC(swift)
264275
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
265276
auto task = static_cast<AsyncTask*>(obj);
@@ -439,6 +450,26 @@ task_future_wait_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {
439450
return _context->ResumeParent(_context->Parent);
440451
}
441452

453+
const void *AsyncTask::getResumeFunctionForLogging() {
454+
if (ResumeTask == non_future_adapter) {
455+
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
456+
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
457+
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
458+
} else if (ResumeTask == future_adapter) {
459+
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
460+
reinterpret_cast<char *>(ResumeContext) -
461+
sizeof(FutureAsyncContextPrefix));
462+
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
463+
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
464+
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
465+
return reinterpret_cast<const void *>(context->ResumeParent);
466+
} else if (ResumeTask == task_future_wait_resume_adapter) {
467+
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
468+
}
469+
470+
return reinterpret_cast<const void *>(ResumeTask);
471+
}
472+
442473
/// Implementation of task creation.
443474
SWIFT_CC(swift)
444475
static AsyncTaskAndContext swift_task_create_commonImpl(
@@ -702,6 +733,8 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
702733
initialContext->Parent = nullptr;
703734
initialContext->Flags = AsyncContextKind::Ordinary;
704735

736+
concurrency::trace::task_create(task, parent, group, asyncLet);
737+
705738
// Attach to the group, if needed.
706739
if (group) {
707740
swift_taskGroup_attachChild(group, task);

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define SWIFT_CONCURRENCY_TASKPRIVATE_H
1919

2020
#include "Error.h"
21+
#include "Tracing.h"
2122
#include "swift/ABI/Metadata.h"
2223
#include "swift/ABI/Task.h"
2324
#include "swift/Runtime/Atomic.h"
@@ -266,6 +267,10 @@ class alignas(sizeof(void*) * 2) ActiveTaskStatus {
266267
llvm::iterator_range<record_iterator> records() const {
267268
return record_iterator::rangeBeginning(getInnermostRecord());
268269
}
270+
271+
void traceStatusChanged(AsyncTask *task) {
272+
concurrency::trace::task_status_changed(task, Flags);
273+
}
269274
};
270275

271276
/// The size of an allocator slab.
@@ -373,11 +378,13 @@ inline void AsyncTask::flagAsRunning() {
373378
if (newStatus.isStoredPriorityEscalated()) {
374379
newStatus = newStatus.withoutStoredPriorityEscalation();
375380
Flags.setPriority(oldStatus.getStoredPriority());
381+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
376382
}
377383

378384
if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
379385
std::memory_order_relaxed,
380386
std::memory_order_relaxed)) {
387+
newStatus.traceStatusChanged(this);
381388
adoptTaskVoucher(this);
382389
swift_task_enterThreadLocalContext(
383390
(char *)&_private().ExclusivityAccessSet[0]);
@@ -403,11 +410,13 @@ inline void AsyncTask::flagAsSuspended() {
403410
if (newStatus.isStoredPriorityEscalated()) {
404411
newStatus = newStatus.withoutStoredPriorityEscalation();
405412
Flags.setPriority(oldStatus.getStoredPriority());
413+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
406414
}
407415

408416
if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
409417
std::memory_order_relaxed,
410418
std::memory_order_relaxed)) {
419+
newStatus.traceStatusChanged(this);
411420
swift_task_exitThreadLocalContext(
412421
(char *)&_private().ExclusivityAccessSet[0]);
413422
restoreTaskVoucher(this);

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ static bool withStatusRecordLock(AsyncTask *task,
184184
if (task->_private().Status.compare_exchange_weak(status, newStatus,
185185
/*success*/ std::memory_order_relaxed,
186186
/*failure*/ loadOrdering)) {
187+
newStatus.traceStatusChanged(task);
187188
status = newStatus;
188189
return false;
189190
}
@@ -203,6 +204,7 @@ static bool withStatusRecordLock(AsyncTask *task,
203204
if (task->_private().Status.compare_exchange_weak(status, newStatus,
204205
/*success*/ std::memory_order_release,
205206
/*failure*/ loadOrdering)) {
207+
newStatus.traceStatusChanged(task);
206208

207209
// Update `status` for the purposes of the callback function.
208210
// Note that we don't include the record lock, but do need to
@@ -230,6 +232,7 @@ static bool withStatusRecordLock(AsyncTask *task,
230232
// atomic objects in the task status records. Because of this, we can
231233
// actually unpublish the lock with a relaxed store.
232234
assert(!status.isLocked());
235+
status.traceStatusChanged(task);
233236
task->_private().Status.store(status,
234237
/*success*/ std::memory_order_relaxed);
235238

@@ -606,6 +609,7 @@ void AsyncTask::flagAsRunning_slow() {
606609
if (status.isStoredPriorityEscalated()) {
607610
status = status.withoutStoredPriorityEscalation();
608611
Flags.setPriority(status.getStoredPriority());
612+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
609613
}
610614
});
611615
}
@@ -619,6 +623,7 @@ void AsyncTask::flagAsSuspended_slow() {
619623
if (status.isStoredPriorityEscalated()) {
620624
status = status.withoutStoredPriorityEscalation();
621625
Flags.setPriority(status.getStoredPriority());
626+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
622627
}
623628
});
624629
}

0 commit comments

Comments
 (0)