Skip to content

[Concurrency] Add tracing for major operations in the concurrency runtime. #40070

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions include/swift/ABI/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ class ExecutorRef {
/// Is this executor the main executor?
bool isMainExecutor() const;

/// Get the raw value of the Implementation field, for tracing.
uintptr_t getRawImplementation() { return Implementation; }

bool operator==(ExecutorRef other) const {
return Identity == other.Identity;
}
Expand Down
8 changes: 8 additions & 0 deletions include/swift/ABI/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,14 @@ class AsyncTask : public Job {

/// Set the task's ID field to the next task ID.
void setTaskId();
uint64_t getTaskId();

/// Get the task's resume function, for logging purposes only. This will
/// attempt to see through the various adapters that are sometimes used, and
/// failing that will return ResumeTask. The returned function pointer may
/// have a different signature than ResumeTask, and it's only for identifying
/// code associated with the task.
const void *getResumeFunctionForLogging();

/// Given that we've already fully established the job context
/// in the current thread, start running this task. To establish
Expand Down
51 changes: 40 additions & 11 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,9 @@ class JobRef {
return reinterpret_cast<Job*>(Value);
}

/// Get the Job pointer with no preconditions on its type, for tracing.
Job *getRawJob() const { return reinterpret_cast<Job *>(Value & JobMask); }

bool operator==(JobRef other) const {
return Value == other.Value;
}
Expand Down Expand Up @@ -687,6 +690,7 @@ class DefaultActorImpl : public HeapObject {
flags.setIsDistributedRemote(isDistributedRemote);
new (&CurrentState) swift::atomic<State>(State{JobRef(), flags});
JobStorageHeapObject.metadata = nullptr;
concurrency::trace::actor_create(this);
}

/// Properly destruct an actor, except for the heap header.
Expand Down Expand Up @@ -768,8 +772,10 @@ void DefaultActorImpl::destroy() {
newState.Flags.setStatus(Status::Zombie_Latching);
if (CurrentState.compare_exchange_weak(oldState, newState,
std::memory_order_relaxed,
std::memory_order_relaxed))
std::memory_order_relaxed)) {
concurrency::trace::actor_destroy(this);
return;
}
}
}

Expand All @@ -795,6 +801,8 @@ void DefaultActorImpl::deallocate() {
}

void DefaultActorImpl::deallocateUnconditional() {
concurrency::trace::actor_deallocate(this);

if (JobStorageHeapObject.metadata != nullptr)
JobStorage.~ProcessInlineJob();
auto metadata = cast<ClassMetadata>(this->metadata);
Expand Down Expand Up @@ -926,12 +934,19 @@ static Job *preprocessQueue(JobRef first,
return firstNewJob;
}

static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) {
return getNextJobInQueue(job).getAsPreprocessedJob();
});
}

void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
SWIFT_TASK_DEBUG_LOG("giving up thread for actor %p", this);
auto oldState = CurrentState.load(std::memory_order_acquire);
assert(oldState.Flags.isAnyRunningStatus());

auto firstNewJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
traceJobQueue(this, firstNewJob);

_swift_tsan_release(this);
while (true) {
Expand Down Expand Up @@ -978,16 +993,23 @@ void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
firstNewJob = preprocessQueue(oldState.FirstJob,
firstPreprocessed,
firstNewJob);
traceJobQueue(this, firstNewJob);

// Try again.
continue;
}

#define LOG_STATE_TRANSITION \
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
oldState.Flags.getOpaqueValue(), \
newState.Flags.getOpaqueValue(), __FUNCTION__)
LOG_STATE_TRANSITION;
#define ACTOR_STATE_TRANSITION \
do { \
SWIFT_TASK_DEBUG_LOG("actor %p transitioned from %zx to %zx (%s)\n", this, \
oldState.Flags.getOpaqueValue(), \
newState.Flags.getOpaqueValue(), __FUNCTION__); \
concurrency::trace::actor_state_changed( \
this, newState.FirstJob.getRawJob(), \
newState.FirstJob.needsPreprocessing(), \
newState.Flags.getOpaqueValue()); \
} while (0)
ACTOR_STATE_TRANSITION;

// The priority of the remaining work.
auto newPriority = newState.Flags.getMaxPriority();
Expand Down Expand Up @@ -1040,7 +1062,8 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
auto success = CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire);
if (success) LOG_STATE_TRANSITION;
if (success)
ACTOR_STATE_TRANSITION;
return success;
};

Expand Down Expand Up @@ -1082,7 +1105,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire))
continue;
LOG_STATE_TRANSITION;
ACTOR_STATE_TRANSITION;
_swift_tsan_acquire(this);

// If that succeeded, we can proceed to the main body.
Expand All @@ -1100,6 +1123,7 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
// Okay, now it's safe to look at queue state.
// Preprocess any queue items at the front of the queue.
auto newFirstJob = preprocessQueue(oldState.FirstJob, JobRef(), nullptr);
traceJobQueue(this, newFirstJob);

_swift_tsan_release(this);
while (true) {
Expand Down Expand Up @@ -1144,11 +1168,12 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
newFirstJob = preprocessQueue(oldState.FirstJob,
firstPreprocessed,
newFirstJob);
traceJobQueue(this, newFirstJob);

// Loop to retry updating the state.
continue;
}
LOG_STATE_TRANSITION;
ACTOR_STATE_TRANSITION;

// We successfully updated the state.

Expand Down Expand Up @@ -1180,10 +1205,12 @@ static void swift_job_runImpl(Job *job, ExecutorRef executor) {
if (!executor.isGeneric()) trackingInfo.disallowSwitching();

trackingInfo.enterAndShadow(executor);
auto traceHandle = concurrency::trace::job_run_begin(job, &executor);

SWIFT_TASK_DEBUG_LOG("%s(%p)", __func__, job);
runJobInEstablishedExecutorContext(job);

concurrency::trace::job_run_end(job, &executor, traceHandle);
trackingInfo.leave();

// Give up the current executor if this is a switching context
Expand Down Expand Up @@ -1237,6 +1264,7 @@ static void processDefaultActor(DefaultActorImpl *currentActor,

SWIFT_TASK_DEBUG_LOG("processDefaultActor %p claimed job %p", currentActor,
job);
concurrency::trace::actor_dequeue(currentActor, job);

// If we failed to claim a job, we have nothing to do.
if (!job) {
Expand Down Expand Up @@ -1351,7 +1379,8 @@ void DefaultActorImpl::enqueue(Job *job) {
/*success*/ std::memory_order_release,
/*failure*/ std::memory_order_relaxed))
continue;
LOG_STATE_TRANSITION;
ACTOR_STATE_TRANSITION;
concurrency::trace::actor_enqueue(this, job);

// Okay, we successfully updated the status. Schedule a job to
// process the actor if necessary.
Expand Down Expand Up @@ -1381,7 +1410,7 @@ bool DefaultActorImpl::tryAssumeThread(RunningJobInfo runner) {
if (CurrentState.compare_exchange_weak(oldState, newState,
/*success*/ std::memory_order_relaxed,
/*failure*/ std::memory_order_acquire)) {
LOG_STATE_TRANSITION;
ACTOR_STATE_TRANSITION;
_swift_tsan_acquire(this);
return true;
}
Expand Down
1 change: 1 addition & 0 deletions stdlib/public/Concurrency/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ add_swift_target_library(swift_Concurrency ${SWIFT_STDLIB_LIBRARY_BUILD_TYPES} I
TaskLocal.swift
TaskSleep.swift
ThreadSanitizer.cpp
TracingSignpost.cpp
Mutex.cpp
AsyncStreamBuffer.swift
AsyncStream.swift
Expand Down
5 changes: 5 additions & 0 deletions stdlib/public/Concurrency/GlobalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,13 +85,17 @@ void (*swift::swift_task_enqueueMainExecutor_hook)(
void swift::swift_task_enqueueGlobal(Job *job) {
_swift_tsan_release(job);

concurrency::trace::job_enqueue_global(job);

if (swift_task_enqueueGlobal_hook)
swift_task_enqueueGlobal_hook(job, swift_task_enqueueGlobalImpl);
else
swift_task_enqueueGlobalImpl(job);
}

void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
concurrency::trace::job_enqueue_global_with_delay(delay, job);

if (swift_task_enqueueGlobalWithDelay_hook)
swift_task_enqueueGlobalWithDelay_hook(
delay, job, swift_task_enqueueGlobalWithDelayImpl);
Expand All @@ -100,6 +104,7 @@ void swift::swift_task_enqueueGlobalWithDelay(JobDelay delay, Job *job) {
}

void swift::swift_task_enqueueMainExecutor(Job *job) {
concurrency::trace::job_enqueue_main_executor(job);
if (swift_task_enqueueMainExecutor_hook)
swift_task_enqueueMainExecutor_hook(job,
swift_task_enqueueMainExecutorImpl);
Expand Down
33 changes: 33 additions & 0 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "swift/Runtime/HeapObject.h"
#include "TaskGroupPrivate.h"
#include "TaskPrivate.h"
#include "Tracing.h"
#include "Debug.h"
#include "Error.h"

Expand Down Expand Up @@ -102,6 +103,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
bool contextIntialized = false;
while (true) {
concurrency::trace::task_wait(
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
switch (queueHead.getStatus()) {
case Status::Error:
case Status::Success:
Expand Down Expand Up @@ -245,6 +248,8 @@ AsyncTask::~AsyncTask() {
}

Private.destroy();

concurrency::trace::task_destroy(this);
}

void AsyncTask::setTaskId() {
Expand All @@ -260,6 +265,12 @@ void AsyncTask::setTaskId() {
_private().Id = (Fetched >> 32) & 0xffffffff;
}

uint64_t AsyncTask::getTaskId() {
// Reconstitute a full 64-bit task ID from the 32-bit job ID and the upper
// 32 bits held in _private().
return (uint64_t)Id << _private().Id;
}

SWIFT_CC(swift)
static void destroyTask(SWIFT_CONTEXT HeapObject *obj) {
auto task = static_cast<AsyncTask*>(obj);
Expand Down Expand Up @@ -439,6 +450,26 @@ task_future_wait_resume_adapter(SWIFT_ASYNC_CONTEXT AsyncContext *_context) {
return _context->ResumeParent(_context->Parent);
}

const void *AsyncTask::getResumeFunctionForLogging() {
if (ResumeTask == non_future_adapter) {
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
} else if (ResumeTask == future_adapter) {
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
reinterpret_cast<char *>(ResumeContext) -
sizeof(FutureAsyncContextPrefix));
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
return reinterpret_cast<const void *>(context->ResumeParent);
} else if (ResumeTask == task_future_wait_resume_adapter) {
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
}

return reinterpret_cast<const void *>(ResumeTask);
}

/// Implementation of task creation.
SWIFT_CC(swift)
static AsyncTaskAndContext swift_task_create_commonImpl(
Expand Down Expand Up @@ -702,6 +733,8 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
initialContext->Parent = nullptr;
initialContext->Flags = AsyncContextKind::Ordinary;

concurrency::trace::task_create(task, parent, group, asyncLet);

// Attach to the group, if needed.
if (group) {
swift_taskGroup_attachChild(group, task);
Expand Down
9 changes: 9 additions & 0 deletions stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#define SWIFT_CONCURRENCY_TASKPRIVATE_H

#include "Error.h"
#include "Tracing.h"
#include "swift/ABI/Metadata.h"
#include "swift/ABI/Task.h"
#include "swift/Runtime/Atomic.h"
Expand Down Expand Up @@ -266,6 +267,10 @@ class alignas(sizeof(void*) * 2) ActiveTaskStatus {
llvm::iterator_range<record_iterator> records() const {
return record_iterator::rangeBeginning(getInnermostRecord());
}

void traceStatusChanged(AsyncTask *task) {
concurrency::trace::task_status_changed(task, Flags);
}
};

/// The size of an allocator slab.
Expand Down Expand Up @@ -373,11 +378,13 @@ inline void AsyncTask::flagAsRunning() {
if (newStatus.isStoredPriorityEscalated()) {
newStatus = newStatus.withoutStoredPriorityEscalation();
Flags.setPriority(oldStatus.getStoredPriority());
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
}

if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
newStatus.traceStatusChanged(this);
adoptTaskVoucher(this);
swift_task_enterThreadLocalContext(
(char *)&_private().ExclusivityAccessSet[0]);
Expand All @@ -403,11 +410,13 @@ inline void AsyncTask::flagAsSuspended() {
if (newStatus.isStoredPriorityEscalated()) {
newStatus = newStatus.withoutStoredPriorityEscalation();
Flags.setPriority(oldStatus.getStoredPriority());
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
}

if (_private().Status.compare_exchange_weak(oldStatus, newStatus,
std::memory_order_relaxed,
std::memory_order_relaxed)) {
newStatus.traceStatusChanged(this);
swift_task_exitThreadLocalContext(
(char *)&_private().ExclusivityAccessSet[0]);
restoreTaskVoucher(this);
Expand Down
5 changes: 5 additions & 0 deletions stdlib/public/Concurrency/TaskStatus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ static bool withStatusRecordLock(AsyncTask *task,
if (task->_private().Status.compare_exchange_weak(status, newStatus,
/*success*/ std::memory_order_relaxed,
/*failure*/ loadOrdering)) {
newStatus.traceStatusChanged(task);
status = newStatus;
return false;
}
Expand All @@ -203,6 +204,7 @@ static bool withStatusRecordLock(AsyncTask *task,
if (task->_private().Status.compare_exchange_weak(status, newStatus,
/*success*/ std::memory_order_release,
/*failure*/ loadOrdering)) {
newStatus.traceStatusChanged(task);

// Update `status` for the purposes of the callback function.
// Note that we don't include the record lock, but do need to
Expand Down Expand Up @@ -230,6 +232,7 @@ static bool withStatusRecordLock(AsyncTask *task,
// atomic objects in the task status records. Because of this, we can
// actually unpublish the lock with a relaxed store.
assert(!status.isLocked());
status.traceStatusChanged(task);
task->_private().Status.store(status,
/*success*/ std::memory_order_relaxed);

Expand Down Expand Up @@ -606,6 +609,7 @@ void AsyncTask::flagAsRunning_slow() {
if (status.isStoredPriorityEscalated()) {
status = status.withoutStoredPriorityEscalation();
Flags.setPriority(status.getStoredPriority());
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
}
});
}
Expand All @@ -619,6 +623,7 @@ void AsyncTask::flagAsSuspended_slow() {
if (status.isStoredPriorityEscalated()) {
status = status.withoutStoredPriorityEscalation();
Flags.setPriority(status.getStoredPriority());
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
}
});
}
Expand Down
Loading