Skip to content

Commit 0977920

Browse files
committed
[Concurrency] Add tracing for major operations in the concurrency runtime.
Each trace point is declared as a function in the new `Tracing.h` header. These functions are called from the appropriate places in the concurrency runtime. On Darwin, an implementation of these functions is provided which uses the `os/signpost.h` API to emit signpost events/intervals. When the signpost API is not available, no-op stub implementations are provided. Implementations for other OSes can be provided by providing implementations of the trace functions for that OS. rdar://81858487
1 parent 127528a commit 0977920

File tree

13 files changed

+608
-16
lines changed

13 files changed

+608
-16
lines changed

include/swift/ABI/Executor.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,9 @@ class ExecutorRef {
124124
/// Is this executor the main executor?
125125
bool isMainExecutor() const;
126126

127+
/// Get the raw value of the Implementation field, for tracing.
128+
uintptr_t getRawImplementation() { return Implementation; }
129+
127130
bool operator==(ExecutorRef other) const {
128131
return Identity == other.Identity;
129132
}

include/swift/ABI/Task.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,14 @@ class AsyncTask : public Job {
272272

273273
/// Set the task's ID field to the next task ID.
274274
void setTaskId();
275+
uint64_t getTaskId();
276+
277+
/// Get the task's resume function, for logging purposes only. This will
278+
/// attempt to see through the various adapters that are sometimes used, and
279+
/// failing that will return ResumeTask. The returned function pointer may
280+
/// have a different signature than ResumeTask, and it's only for identifying
281+
/// code associated with the task.
282+
const void *getResumeFunctionForLogging();
275283

276284
/// Given that we've already fully established the job context
277285
/// in the current thread, start running this task. To establish

stdlib/public/Concurrency/Actor.cpp

Lines changed: 40 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -525,6 +525,9 @@ class JobRef {
525525
return reinterpret_cast<Job*>(Value);
526526
}
527527

528+
/// Get the Job pointer with no preconditions on its type, for tracing.
529+
Job *getRawJob() const { return reinterpret_cast<Job *>(Value & JobMask); }
530+
528531
bool operator==(JobRef other) const {
529532
return Value == other.Value;
530533
}
@@ -687,6 +690,7 @@ class DefaultActorImpl : public HeapObject {
687690
flags.setIsDistributedRemote(isDistributedRemote);
688691
new (&CurrentState) swift::atomic<State>(State{JobRef(), flags});
689692
JobStorageHeapObject.metadata = nullptr;
693+
concurrency::trace::actor_create(this);
690694
}
691695

692696
/// Properly destruct an actor, except for the heap header.
@@ -768,8 +772,10 @@ void DefaultActorImpl::destroy() {
768772
newState.Flags.setStatus(Status::Zombie_Latching);
769773
if (CurrentState.compare_exchange_weak(oldState, newState,
770774
std::memory_order_relaxed,
771-
std::memory_order_relaxed))
775+
std::memory_order_relaxed)) {
776+
concurrency::trace::actor_destroy(this);
772777
return;
778+
}
773779
}
774780
}
775781

@@ -795,6 +801,8 @@ void DefaultActorImpl::deallocate() {
795801
}
796802

797803
void DefaultActorImpl::deallocateUnconditional() {
804+
concurrency::trace::actor_deallocate(this);
805+
798806
if (JobStorageHeapObject.metadata != nullptr)
799807
JobStorage.~ProcessInlineJob();
800808
auto metadata = cast<ClassMetadata>(this->metadata);
@@ -926,12 +934,19 @@ static Job *preprocessQueue(JobRef first,
926934
return firstNewJob;
927935
}
928936

937+
static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
938+
concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) {
939+
return getNextJobInQueue(job).getAsPreprocessedJob();
940+
});
941+
}
942+
929943
void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
930944
SWIFT_TASK_DEBUG_LOG("giving up thread for actor %p", this);
931945
auto oldState = CurrentState.load(std::memory_order_acquire);
932946
assert(oldState.Flags.isAnyRunningStatus());
933947

934948
auto firstNewJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
949+
traceJobQueue(this, firstNewJob);
935950

936951
_swift_tsan_release(this);
937952
while (true) {
@@ -978,16 +993,23 @@ void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
978993
firstNewJob = preprocessQueue(oldState.FirstJob,
979994
firstPreprocessed,
980995
firstNewJob);
996+
traceJobQueue(this, firstNewJob);
981997

982998
// Try again.
983999
continue;
9841000
}
9851001

986-
#define LOG_STATE_TRANSITION \
987-
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
988-
oldState.Flags.getOpaqueValue(), \
989-
newState.Flags.getOpaqueValue(), __FUNCTION__)
990-
LOG_STATE_TRANSITION;
1002+
#define ACTOR_STATE_TRANSITION \
1003+
do { \
1004+
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
1005+
oldState.Flags.getOpaqueValue(), \
1006+
newState.Flags.getOpaqueValue(), __FUNCTION__); \
1007+
concurrency::trace::actor_state_changed( \
1008+
this, newState.FirstJob.getRawJob(), \
1009+
newState.FirstJob.needsPreprocessing(), \
1010+
newState.Flags.getOpaqueValue()); \
1011+
} while (0)
1012+
ACTOR_STATE_TRANSITION;
9911013

9921014
// The priority of the remaining work.
9931015
auto newPriority = newState.Flags.getMaxPriority();
@@ -1040,7 +1062,8 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
10401062
auto success = CurrentState.compare_exchange_weak(oldState, newState,
10411063
/*success*/ std::memory_order_relaxed,
10421064
/*failure*/ std::memory_order_acquire);
1043-
if (success) LOG_STATE_TRANSITION;
1065+
if (success)
1066+
ACTOR_STATE_TRANSITION;
10441067
return success;
10451068
};
10461069

@@ -1082,7 +1105,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
10821105
/*success*/ std::memory_order_relaxed,
10831106
/*failure*/ std::memory_order_acquire))
10841107
continue;
1085-
LOG_STATE_TRANSITION;
1108+
ACTOR_STATE_TRANSITION;
10861109
_swift_tsan_acquire(this);
10871110

10881111
// If that succeeded, we can proceed to the main body.
@@ -1100,6 +1123,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
11001123
// Okay, now it's safe to look at queue state.
11011124
// Preprocess any queue items at the front of the queue.
11021125
auto newFirstJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
1126+
traceJobQueue(this, newFirstJob);
11031127

11041128
_swift_tsan_release(this);
11051129
while (true) {
@@ -1144,11 +1168,12 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
11441168
newFirstJob = preprocessQueue(oldState.FirstJob,
11451169
firstPreprocessed,
11461170
newFirstJob);
1171+
traceJobQueue(this, newFirstJob);
11471172

11481173
// Loop to retry updating the state.
11491174
continue;
11501175
}
1151-
LOG_STATE_TRANSITION;
1176+
ACTOR_STATE_TRANSITION;
11521177

11531178
// We successfully updated the state.
11541179

@@ -1180,10 +1205,12 @@ static void swift_job_runImpl(Job *job, ExecutorRef executor) {
11801205
if (!executor.isGeneric()) trackingInfo.disallowSwitching();
11811206

11821207
trackingInfo.enterAndShadow(executor);
1208+
auto traceHandle = concurrency::trace::job_run_begin(job, &executor);
11831209

11841210
SWIFT_TASK_DEBUG_LOG("%s(%p)", __func__, job);
11851211
runJobInEstablishedExecutorContext(job);
11861212

1213+
concurrency::trace::job_run_end(job, &executor, traceHandle);
11871214
trackingInfo.leave();
11881215

11891216
// Give up the current executor if this is a switching context
@@ -1237,6 +1264,7 @@ static void processDefaultActor(DefaultActorImpl *currentActor,
12371264

12381265
SWIFT_TASK_DEBUG_LOG("processDefaultActor %p claimed job %p", currentActor,
12391266
job);
1267+
concurrency::trace::actor_dequeue(currentActor, job);
12401268

12411269
// If we failed to claim a job, we have nothing to do.
12421270
if (!job) {
@@ -1351,7 +1379,8 @@ void DefaultActorImpl::enqueue(Job *job) {
13511379
/*success*/ std::memory_order_release,
13521380
/*failure*/ std::memory_order_relaxed))
13531381
continue;
1354-
LOG_STATE_TRANSITION;
1382+
ACTOR_STATE_TRANSITION;
1383+
concurrency::trace::actor_enqueue(this, job);
13551384

13561385
// Okay, we successfully updated the status. Schedule a job to
13571386
// process the actor if necessary.
@@ -1381,7 +1410,7 @@ bool DefaultActorImpl::tryAssumeThread(RunningJobInfo runner) {
13811410
if (CurrentState.compare_exchange_weak(oldState, newState,
13821411
/*success*/ std::memory_order_relaxed,
13831412
/*failure*/ std::memory_order_acquire)) {
1384-
LOG_STATE_TRANSITION;
1413+
ACTOR_STATE_TRANSITION;
13851414
_swift_tsan_acquire(this);
13861415
return true;
13871416
}

stdlib/public/Concurrency/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
103103
TaskLocal.swift
104104
TaskSleep.swift
105105
ThreadSanitizer.cpp
106+
TracingSignpost.cpp
106107
Mutex.cpp
107108
AsyncStreamBuffer.swift
108109
AsyncStream.swift

stdlib/public/Concurrency/GlobalExecutor.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,17 @@ void (*swift::swift_task_enqueueMainExecutor_hook)(
8585
void swift::swift_task_enqueueGlobal(Job *job) {
8686
_swift_tsan_release(job);
8787

88+
concurrency::trace::job_enqueue_global(job);
89+
8890
if (swift_task_enqueueGlobal_hook)
8991
swift_task_enqueueGlobal_hook(job, swift_task_enqueueGlobalImpl);
9092
else
9193
swift_task_enqueueGlobalImpl(job);
9294
}
9395

9496
void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
97+
concurrency::trace::job_enqueue_global_with_delay(delay, job);
98+
9599
if (swift_task_enqueueGlobalWithDelay_hook)
96100
swift_task_enqueueGlobalWithDelay_hook(
97101
delay, job, swift_task_enqueueGlobalWithDelayImpl);
@@ -100,6 +104,7 @@ void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
100104
}
101105

102106
void swift::swift_task_enqueueMainExecutor(Job *job) {
107+
concurrency::trace::job_enqueue_main_executor(job);
103108
if (swift_task_enqueueMainExecutor_hook)
104109
swift_task_enqueueMainExecutor_hook(job,
105110
swift_task_enqueueMainExecutorImpl);

stdlib/public/Concurrency/Task.cpp

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
#include "swift/Runtime/HeapObject.h"
2525
#include "TaskGroupPrivate.h"
2626
#include "TaskPrivate.h"
27+
#include "Tracing.h"
2728
#include "Debug.h"
2829
#include "Error.h"
2930

@@ -102,6 +103,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
102103
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
103104
bool contextIntialized = false;
104105
while (true) {
106+
concurrency::trace::task_wait(
107+
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
105108
switch (queueHead.getStatus()) {
106109
case Status::Error:
107110
case Status::Success:
@@ -245,6 +248,8 @@ AsyncTask::~AsyncTask() {
245248
}
246249

247250
Private.destroy();
251+
252+
concurrency::trace::task_destroy(this);
248253
}
249254

250255
void AsyncTask::setTaskId() {
@@ -260,6 +265,12 @@ void AsyncTask::setTaskId() {
260265
_private().Id = (Fetched >> 32) & 0xffffffff;
261266
}
262267

268+
uint64_t AsyncTask::getTaskId() {
269+
// Reconstitute a full 64-bit task ID from the 32-bit job ID and the upper
270+
// 32 bits held in _private().
271+
return (uint64_t)Id << _private().Id;
272+
}
273+
263274
SWIFT_CC(swift)
264275
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
265276
auto task = static_cast<AsyncTask*>(obj);
@@ -439,6 +450,26 @@ task_future_wait_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {
439450
return _context->ResumeParent(_context->Parent);
440451
}
441452

453+
const void *AsyncTask::getResumeFunctionForLogging() {
454+
if (ResumeTask == non_future_adapter) {
455+
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
456+
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
457+
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
458+
} else if (ResumeTask == future_adapter) {
459+
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
460+
reinterpret_cast<char *>(ResumeContext) -
461+
sizeof(FutureAsyncContextPrefix));
462+
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
463+
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
464+
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
465+
return reinterpret_cast<const void *>(context->ResumeParent);
466+
} else if (ResumeTask == task_future_wait_resume_adapter) {
467+
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
468+
}
469+
470+
return reinterpret_cast<const void *>(ResumeTask);
471+
}
472+
442473
/// Implementation of task creation.
443474
SWIFT_CC(swift)
444475
static AsyncTaskAndContext swift_task_create_commonImpl(
@@ -702,6 +733,8 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
702733
initialContext->Parent = nullptr;
703734
initialContext->Flags = AsyncContextKind::Ordinary;
704735

736+
concurrency::trace::task_create(task, parent, group, asyncLet);
737+
705738
// Attach to the group, if needed.
706739
if (group) {
707740
swift_taskGroup_attachChild(group, task);

stdlib/public/Concurrency/TaskPrivate.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define SWIFT_CONCURRENCY_TASKPRIVATE_H
1919

2020
#include "Error.h"
21+
#include "Tracing.h"
2122
#include "swift/ABI/Metadata.h"
2223
#include "swift/ABI/Task.h"
2324
#include "swift/Runtime/Atomic.h"
@@ -266,6 +267,10 @@ class alignas(sizeof(void*) * 2) ActiveTaskStatus {
266267
llvm::iterator_range<record_iterator> records() const {
267268
return record_iterator::rangeBeginning(getInnermostRecord());
268269
}
270+
271+
void traceStatusChanged(AsyncTask *task) {
272+
concurrency::trace::task_status_changed(task, Flags);
273+
}
269274
};
270275

271276
/// The size of an allocator slab.
@@ -373,11 +378,13 @@ inline void AsyncTask::flagAsRunning() {
373378
if (newStatus.isStoredPriorityEscalated()) {
374379
newStatus = newStatus.withoutStoredPriorityEscalation();
375380
Flags.setPriority(oldStatus.getStoredPriority());
381+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
376382
}
377383

378384
if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
379385
std::memory_order_relaxed,
380386
std::memory_order_relaxed)) {
387+
newStatus.traceStatusChanged(this);
381388
adoptTaskVoucher(this);
382389
swift_task_enterThreadLocalContext(
383390
(char *)&_private().ExclusivityAccessSet[0]);
@@ -403,11 +410,13 @@ inline void AsyncTask::flagAsSuspended() {
403410
if (newStatus.isStoredPriorityEscalated()) {
404411
newStatus = newStatus.withoutStoredPriorityEscalation();
405412
Flags.setPriority(oldStatus.getStoredPriority());
413+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
406414
}
407415

408416
if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
409417
std::memory_order_relaxed,
410418
std::memory_order_relaxed)) {
419+
newStatus.traceStatusChanged(this);
411420
swift_task_exitThreadLocalContext(
412421
(char *)&_private().ExclusivityAccessSet[0]);
413422
restoreTaskVoucher(this);

stdlib/public/Concurrency/TaskStatus.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ static bool withStatusRecordLock(AsyncTask *task,
184184
if (task->_private().Status.compare_exchange_weak(status, newStatus,
185185
/*success*/ std::memory_order_relaxed,
186186
/*failure*/ loadOrdering)) {
187+
newStatus.traceStatusChanged(task);
187188
status = newStatus;
188189
return false;
189190
}
@@ -203,6 +204,7 @@ static bool withStatusRecordLock(AsyncTask *task,
203204
if (task->_private().Status.compare_exchange_weak(status, newStatus,
204205
/*success*/ std::memory_order_release,
205206
/*failure*/ loadOrdering)) {
207+
newStatus.traceStatusChanged(task);
206208

207209
// Update `status` for the purposes of the callback function.
208210
// Note that we don't include the record lock, but do need to
@@ -230,6 +232,7 @@ static bool withStatusRecordLock(AsyncTask *task,
230232
// atomic objects in the task status records. Because of this, we can
231233
// actually unpublish the lock with a relaxed store.
232234
assert(!status.isLocked());
235+
status.traceStatusChanged(task);
233236
task->_private().Status.store(status,
234237
/*success*/ std::memory_order_relaxed);
235238

@@ -606,6 +609,7 @@ void AsyncTask::flagAsRunning_slow() {
606609
if (status.isStoredPriorityEscalated()) {
607610
status = status.withoutStoredPriorityEscalation();
608611
Flags.setPriority(status.getStoredPriority());
612+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
609613
}
610614
});
611615
}
@@ -619,6 +623,7 @@ void AsyncTask::flagAsSuspended_slow() {
619623
if (status.isStoredPriorityEscalated()) {
620624
status = status.withoutStoredPriorityEscalation();
621625
Flags.setPriority(status.getStoredPriority());
626+
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
622627
}
623628
});
624629
}

0 commit comments

Comments
 (0)