Skip to content

[Concurrency runtime] Don't read from the actor after transitioning state #66008

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 19, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 14 additions & 9 deletions stdlib/public/Concurrency/Actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
}
#endif

void traceStateChanged(HeapObject *actor) {
void traceStateChanged(HeapObject *actor, bool distributedActorIsRemote) {
// Convert our state to a consistent raw value. These values currently match
// the enum values, but this explicit conversion provides room for change.
uint8_t traceState = 255;
Expand All @@ -814,7 +814,7 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
}
concurrency::trace::actor_state_changed(
actor, getFirstJob().getRawJob(), getFirstJob().needsPreprocessing(),
traceState, swift_distributed_actor_is_remote((HeapObject *) actor),
traceState, distributedActorIsRemote,
isMaxPriorityEscalated(), static_cast<uint8_t>(getMaxPriority()));
}
};
Expand Down Expand Up @@ -1176,12 +1176,12 @@ static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
}

static SWIFT_ATTRIBUTE_ALWAYS_INLINE void traceActorStateTransition(DefaultActorImpl *actor,
ActiveActorStatus oldState, ActiveActorStatus newState) {
ActiveActorStatus oldState, ActiveActorStatus newState, bool distributedActorIsRemote) {

SWIFT_TASK_DEBUG_LOG("Actor %p transitioned from %#x to %#x (%s)", actor,
oldState.getOpaqueFlags(), newState.getOpaqueFlags(),
__FUNCTION__);
newState.traceStateChanged(actor);
newState.traceStateChanged(actor, distributedActorIsRemote);
}

#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
Expand All @@ -1206,6 +1206,7 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
SWIFT_TASK_DEBUG_LOG("Enqueueing job %p onto actor %p at priority %#zx", job,
this, priority);
concurrency::trace::actor_enqueue(this, job);
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {
auto newState = oldState;
Expand Down Expand Up @@ -1233,7 +1234,7 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_release,
/* failure */ std::memory_order_relaxed)) {
traceActorStateTransition(this, oldState, newState);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);

if (!oldState.isScheduled() && newState.isScheduled()) {
// We took responsibility to schedule the actor for the first time. See
Expand Down Expand Up @@ -1276,6 +1277,7 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {

SWIFT_TASK_DEBUG_LOG("[Override] Escalating an actor %p due to job that is enqueued being escalated", this);

bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {
// Until we figure out how to safely enqueue a stealer and rendevouz with
Expand Down Expand Up @@ -1314,7 +1316,7 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
if (_status().compare_exchange_weak(oldState, newState,
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
traceActorStateTransition(this, oldState, newState);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
#if SWIFT_CONCURRENCY_ENABLE_PRIORITY_ESCALATION
if (newState.isRunning()) {
// Actor is running on a thread, escalate the thread running it
Expand Down Expand Up @@ -1344,6 +1346,7 @@ Job * DefaultActorImpl::drainOne() {
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);

// Pairs with the store release in DefaultActorImpl::enqueue
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
_swift_tsan_consume(this);

Expand All @@ -1367,7 +1370,7 @@ Job * DefaultActorImpl::drainOne() {
/* success */ std::memory_order_relaxed,
/* failure */ std::memory_order_relaxed)) {
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
traceActorStateTransition(this, oldState, newState);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
concurrency::trace::actor_dequeue(this, firstJob);
return firstJob;
}
Expand Down Expand Up @@ -1556,6 +1559,7 @@ retry:;
SWIFT_TASK_DEBUG_LOG("Thread attempting to jump onto %p, as drainer = %d", this, asDrainer);
#endif

bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(std::memory_order_relaxed);
while (true) {

Expand Down Expand Up @@ -1612,7 +1616,7 @@ retry:;
std::memory_order_acquire,
std::memory_order_relaxed)) {
_swift_tsan_acquire(this);
traceActorStateTransition(this, oldState, newState);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
return true;
}
}
Expand All @@ -1635,6 +1639,7 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
this->drainLock.unlock();
return true;
#else
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
auto oldState = _status().load(std::memory_order_relaxed);
SWIFT_TASK_DEBUG_LOG("Try unlock-ing actor %p with forceUnlock = %d", this, forceUnlock);

Expand Down Expand Up @@ -1693,7 +1698,7 @@ bool DefaultActorImpl::unlock(bool forceUnlock)
/* success */ std::memory_order_release,
/* failure */ std::memory_order_relaxed)) {
_swift_tsan_release(this);
traceActorStateTransition(this, oldState, newState);
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);

if (newState.isScheduled()) {
// See ownership rule (6) in DefaultActorImpl
Expand Down