Skip to content

[Concurrency] Decode actor/task flags in signposts, make task_wait an interval. #41808

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 17, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 40 additions & 8 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,30 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
return offsetof(ActiveActorStatus, DrainLock);
}
#endif

void traceStateChanged(HeapObject *actor) {
// Convert our state to a consistent raw value. These values currently match
// the enum values, but this explicit conversion provides room for change.
uint8_t traceState = 255;
switch (getActorState()) {
case ActiveActorStatus::Idle:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these values be exported in a private header somewhere for Instruments to slurp up?

And maybe we have a version number as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do we think we'd just change the signpost ID value if we had a breaking change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a header probably wouldn't make things any easier. @Harjas12 any thoughts on that?

We have several options for making changes. We can refrain from using traceState values that are no longer in use and add new ones, we can keep this parameter in some sort of degraded state and add a new one with new info, or we can change the signpost ID. Adding a version number doesn't give us anything particularly useful on top of that.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the best way to handle this is to add the new values by appending to the log string. Parsers should be smart enough that the old versions ignore anything new and new version know to ignore the old fields and look at the new fields. If we do have to "deprecate" a field, we would ideally keep it in a good enough state for 1 year before we completely removed it. Any changes that remove from the format would need a 1 year lead time.

traceState = 0;
break;
case ActiveActorStatus::Scheduled:
traceState = 1;
break;
case ActiveActorStatus::Running:
traceState = 2;
break;
case ActiveActorStatus::Zombie_ReadyForDeallocation:
traceState = 3;
break;
}
concurrency::trace::actor_state_changed(
actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(),
traceState, isDistributedRemote(), isMaxPriorityEscalated(),
static_cast<uint8_t>(getMaxPriority()));
}
};

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION && SWIFT_POINTER_IS_4_BYTES
Expand Down Expand Up @@ -943,7 +967,10 @@ class DefaultActorImpl : public HeapObject {

// Only for static assert use below, not for actual use otherwise
static constexpr size_t offsetOfActiveActorStatus() {
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Winvalid-offsetof"
return offsetof(DefaultActorImpl, StatusStorage);
#pragma clang diagnostic pop
}

private:
Expand Down Expand Up @@ -1124,11 +1151,10 @@ static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor,
ActiveActorStatus oldState, ActiveActorStatus newState) {

SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %zx to %zx (%s)\n", actor,
oldState.getOpaqueFlags(), newState.getOpaqueFlags(), __FUNCTION__);
concurrency::trace::actor_state_changed(actor,
newState.getFirstJob().getRawJob(), newState.getFirstJob().needsPreprocessing(),
newState.getOpaqueFlags());
SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %#x to %#x (%s)\n", actor,
oldState.getOpaqueFlags(), newState.getOpaqueFlags(),
__FUNCTION__);
newState.traceStateChanged(actor);
}

void DefaultActorImpl::destroy() {
Expand Down Expand Up @@ -1192,7 +1218,9 @@ void DefaultActorImpl::scheduleActorProcessJob(JobPriority priority, bool useInl
swift_retain(this);
job = new ProcessOutOfLineJob(this, priority);
}
SWIFT_TASK_DEBUG_LOG("Scheduling processing job %p for actor %p at priority %#x", job, this, priority);
SWIFT_TASK_DEBUG_LOG(
"Scheduling processing job %p for actor %p at priority %#zx", job, this,
priority);
swift_task_enqueueGlobal(job);
}

Expand Down Expand Up @@ -1259,7 +1287,9 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
// We can do relaxed loads here, we are just using the current head in the
// atomic state and linking that into the new job we are inserting, we don't
// need acquires
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#x", job, this, priority);
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#zx", job,
this, priority);
concurrency::trace::actor_enqueue(this, job);
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {
auto newState = oldState;
Expand Down Expand Up @@ -1422,6 +1452,7 @@ Job * DefaultActorImpl::drainOne() {
/* failure */ std::memory_order_acquire)) {
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
traceActorStateTransition(this, oldState, newState);
concurrency::trace::actor_dequeue(this, firstJob);
return firstJob;
}

Expand Down Expand Up @@ -1650,7 +1681,8 @@ static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo,
/// do that in runOnAssumedThread.
static void giveUpThreadForSwitch(ExecutorRef currentExecutor) {
if (currentExecutor.isGeneric()) {
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p", currentExecutor);
SWIFT_TASK_DEBUG_LOG("Giving up current generic executor %p",
currentExecutor.getIdentity());
return;
}

Expand Down
33 changes: 24 additions & 9 deletions stdlib/public/Concurrency/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,6 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
auto queueHead = fragment->waitQueue.load(std::memory_order_acquire);
bool contextIntialized = false;
while (true) {
concurrency::trace::task_wait(
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
switch (queueHead.getStatus()) {
case Status::Error:
case Status::Success:
Expand All @@ -123,6 +121,8 @@ FutureFragment::Status AsyncTask::waitFuture(AsyncTask *waitingTask,
SWIFT_TASK_DEBUG_LOG("task %p waiting on task %p, going to sleep",
waitingTask, this);
_swift_tsan_release(static_cast<Job *>(waitingTask));
concurrency::trace::task_wait(
waitingTask, this, static_cast<uintptr_t>(queueHead.getStatus()));
// Task is not complete. We'll need to add ourselves to the queue.
break;
}
Expand Down Expand Up @@ -233,6 +233,8 @@ void AsyncTask::completeFuture(AsyncContext *context) {

_swift_tsan_acquire(static_cast<Job *>(waitingTask));

concurrency::trace::task_resume(waitingTask);

// Enqueue the waiter on the global executor.
// TODO: allow waiters to fill in a suggested executor
waitingTask->flagAsAndEnqueueOnExecutor(ExecutorRef::generic());
Expand Down Expand Up @@ -470,23 +472,27 @@ const void
reinterpret_cast<void *>(task_future_wait_resume_adapter);

const void *AsyncTask::getResumeFunctionForLogging() {
const void *result = reinterpret_cast<const void *>(ResumeTask);

if (ResumeTask == non_future_adapter) {
auto asyncContextPrefix = reinterpret_cast<AsyncContextPrefix *>(
reinterpret_cast<char *>(ResumeContext) - sizeof(AsyncContextPrefix));
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
result =
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
} else if (ResumeTask == future_adapter) {
auto asyncContextPrefix = reinterpret_cast<FutureAsyncContextPrefix *>(
reinterpret_cast<char *>(ResumeContext) -
sizeof(FutureAsyncContextPrefix));
return reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
result =
reinterpret_cast<const void *>(asyncContextPrefix->asyncEntryPoint);
} else if (ResumeTask == task_wait_throwing_resume_adapter) {
auto context = static_cast<TaskFutureWaitAsyncContext *>(ResumeContext);
return reinterpret_cast<const void *>(context->ResumeParent);
result = reinterpret_cast<const void *>(context->ResumeParent);
} else if (ResumeTask == task_future_wait_resume_adapter) {
return reinterpret_cast<const void *>(ResumeContext->ResumeParent);
result = reinterpret_cast<const void *>(ResumeContext->ResumeParent);
}

return reinterpret_cast<const void *>(ResumeTask);
return __ptrauth_swift_runtime_function_entry_strip(result);
}

JobPriority swift::swift_task_currentPriority(AsyncTask *task)
Expand Down Expand Up @@ -653,7 +659,7 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
basePriority = JobPriority::Default;
}

SWIFT_TASK_DEBUG_LOG("Task's base priority = %#x", basePriority);
SWIFT_TASK_DEBUG_LOG("Task's base priority = %#zx", basePriority);

// Figure out the size of the header.
size_t headerSize = sizeof(AsyncTask);
Expand Down Expand Up @@ -788,7 +794,9 @@ static AsyncTaskAndContext swift_task_create_commonImpl(
futureAsyncContextPrefix->indirectResult = futureFragment->getStoragePtr();
}

SWIFT_TASK_DEBUG_LOG("creating task %p with parent %p at base pri %zu", task, parent, basePriority);
SWIFT_TASK_DEBUG_LOG("creating task %p ID %" PRIu64
" with parent %p at base pri %zu",
task, task->getTaskId(), parent, basePriority);

// Initialize the task-local allocator.
initialContext->ResumeParent = reinterpret_cast<TaskContinuationFunction *>(
Expand Down Expand Up @@ -1059,6 +1067,8 @@ static AsyncTask *swift_continuation_initImpl(ContinuationAsyncContext *context,
task->ResumeContext = context;
task->ResumeTask = context->ResumeParent;

concurrency::trace::task_continuation_init(task, context);

return task;
}

Expand All @@ -1071,6 +1081,8 @@ static void swift_continuation_awaitImpl(ContinuationAsyncContext *context) {
assert(task->ResumeTask == context->ResumeParent);
#endif

concurrency::trace::task_continuation_await(context);

auto &sync = context->AwaitSynchronization;

auto oldStatus = sync.load(std::memory_order_acquire);
Expand Down Expand Up @@ -1157,12 +1169,14 @@ static void resumeTaskAfterContinuation(AsyncTask *task,
SWIFT_CC(swift)
static void swift_continuation_resumeImpl(AsyncTask *task) {
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
concurrency::trace::task_continuation_resume(context, false);
resumeTaskAfterContinuation(task, context);
}

SWIFT_CC(swift)
static void swift_continuation_throwingResumeImpl(AsyncTask *task) {
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
concurrency::trace::task_continuation_resume(context, false);
resumeTaskAfterContinuation(task, context);
}

Expand All @@ -1171,6 +1185,7 @@ SWIFT_CC(swift)
static void swift_continuation_throwingResumeWithErrorImpl(AsyncTask *task,
/* +1 */ SwiftError *error) {
auto context = static_cast<ContinuationAsyncContext*>(task->ResumeContext);
concurrency::trace::task_continuation_resume(context, true);
context->ErrorResult = error;
resumeTaskAfterContinuation(task, context);
}
Expand Down
9 changes: 7 additions & 2 deletions stdlib/public/Concurrency/TaskPrivate.h
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,9 @@ class alignas(2 * sizeof(void*)) ActiveTaskStatus {
}

void traceStatusChanged(AsyncTask *task) {
concurrency::trace::task_status_changed(task, Flags);
concurrency::trace::task_status_changed(
task, static_cast<uint8_t>(getStoredPriority()), isCancelled(),
isStoredPriorityEscalated(), isRunning(), isEnqueued());
}
};

Expand Down Expand Up @@ -771,7 +773,10 @@ inline void AsyncTask::flagAsAndEnqueueOnExecutor(ExecutorRef newExecutor) {

// Set up task for enqueue to next location by setting the Job priority field
Flags.setPriority(newStatus.getStoredPriority());
concurrency::trace::task_flags_changed(this, Flags.getOpaqueValue());
concurrency::trace::task_flags_changed(
this, static_cast<uint8_t>(Flags.getPriority()), Flags.task_isChildTask(),
Flags.task_isFuture(), Flags.task_isGroupChildTask(),
Flags.task_isAsyncLetTask());

swift_task_enqueue(this, newExecutor);
}
Expand Down
31 changes: 23 additions & 8 deletions stdlib/public/Concurrency/Tracing.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
namespace swift {
class AsyncLet;
class AsyncTask;
class ContinuationAsyncContext;
class ExecutorRef;
struct HeapObject;
class Job;
Expand All @@ -43,10 +44,13 @@ void actor_enqueue(HeapObject *actor, Job *job);

void actor_dequeue(HeapObject *actor, Job *job);

// The `flags` parameter is the raw values of the actor's
// DefaultActorImpl::State::Flags.
// State values are:
// Idle = 0, Scheduled = 1, Running = 2, Zombie_ReadyForDeallocation = 3,
// invalid/unknown = 255
void actor_state_changed(HeapObject *actor, Job *firstJob,
bool needsPreprocessing, uintptr_t flags);
bool needsPreprocessing, uint8_t state,
bool isDistributedRemote, bool isPriorityEscalated,
uint8_t maxPriority);

void actor_note_job_queue(HeapObject *actor, Job *first,
Job *(*getNext)(Job *));
Expand All @@ -58,17 +62,28 @@ void task_create(AsyncTask *task, AsyncTask *parent, TaskGroup *group,

void task_destroy(AsyncTask *task);

// The `flags` parameter is the raw value of the ActiveTaskStatus::Flags field
// in the task.
void task_status_changed(AsyncTask *task, uintptr_t flags);
void task_status_changed(AsyncTask *task, uint8_t maxPriority, bool isCancelled,
bool isEscalated, bool isRunning, bool isEnqueued);

// The `flags` parameter is the raw value of Job::Flags.
void task_flags_changed(AsyncTask *task, uint32_t flags);
void task_flags_changed(AsyncTask *task, uint8_t jobPriority, bool isChildTask,
bool isFuture, bool isGroupChildTask,
bool isAsyncLetTask);

// The `status` parameter is the value of the corresponding
// FutureFragment::Status.
void task_wait(AsyncTask *task, AsyncTask *waitingOn, uintptr_t status);

void task_resume(AsyncTask *task);

// The context parameter is the context pointer used to create the continuation.
// This same pointer will be passed to the corresponding call to
// task_continuation_await and task_continuation_resume.
void task_continuation_init(AsyncTask *task, ContinuationAsyncContext *context);

void task_continuation_await(ContinuationAsyncContext *context);

void task_continuation_resume(ContinuationAsyncContext *context, bool error);

void job_enqueue_global(Job *job);

void job_enqueue_global_with_delay(unsigned long long delay, Job *job);
Expand Down
Loading