Skip to content

Commit be8d2c8

Browse files
committed
Give up default actors in swift_job_run if we switch to them.
Previously, if this happened, we simply left the actor in a running state, causing any further jobs submitted to it to never be executed. I can only speculate why this wasn't showing up in testing. Also, change swift_job_run so that it prevents switching if the executor passed in is not generic. This is an entrypoint for arbitrary executors and generally should not allow unexpected switching (if someday custom executors participate in that scheme). This infrastructure will also be useful for implementing the `async let` semantics of running synchronously until the task reaches a suspension point. Finally, improve the #if'ed logging code throughout the task/actor runtime.
1 parent 156264f commit be8d2c8

File tree

3 files changed

+142
-60
lines changed

3 files changed

+142
-60
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 115 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@
2727
#include "llvm/ADT/PointerIntPair.h"
2828
#include "TaskPrivate.h"
2929

30-
// Uncomment to enable helpful debug spew to stderr
31-
//#define SWIFT_TASK_PRINTF_DEBUG 1
32-
3330
using namespace swift;
3431

3532
/// Should we yield the thread?
@@ -74,6 +71,11 @@ class ExecutorTrackingInfo {
7471
/// The active executor.
7572
ExecutorRef ActiveExecutor = ExecutorRef::generic();
7673

74+
/// Whether this context allows switching. Some contexts do not;
75+
/// for example, we do not allow switching from swift_job_run
76+
/// unless the passed-in executor is generic.
77+
bool AllowsSwitching = true;
78+
7779
/// The tracking info that was active when this one was entered.
7880
ExecutorTrackingInfo *SavedInfo;
7981

@@ -114,21 +116,27 @@ class ExecutorTrackingInfo {
114116
return ActiveExecutor;
115117
}
116118

117-
static ExecutorRef getActiveExecutorInThread() {
118-
if (auto activeInfo = ActiveInfoInThread.get())
119-
return activeInfo->getActiveExecutor();
120-
return ExecutorRef::generic();
119+
void setActiveExecutor(ExecutorRef newExecutor) {
120+
ActiveExecutor = newExecutor;
121121
}
122122

123-
void leave() {
124-
ActiveInfoInThread.set(SavedInfo);
123+
124+
bool allowsSwitching() const {
125+
return AllowsSwitching;
126+
}
127+
128+
/// Disallow switching in this tracking context. This should only
129+
/// be set on a new tracking info, before any jobs are run in it.
130+
void disallowSwitching() {
131+
AllowsSwitching = false;
125132
}
126-
127-
static ExecutorRef getActiveExecutorOnThread() {
128-
if (auto active = ActiveInfoInThread.get())
129-
return active->getActiveExecutor();
130133

131-
swift_unreachable("no active executor?!");
134+
static ExecutorTrackingInfo *current() {
135+
return ActiveInfoInThread.get();
136+
}
137+
138+
void leave() {
139+
ActiveInfoInThread.set(SavedInfo);
132140
}
133141
};
134142

@@ -152,16 +160,6 @@ SWIFT_RUNTIME_DECLARE_THREAD_LOCAL(
152160

153161
} // end anonymous namespace
154162

155-
SWIFT_CC(swift)
156-
static void swift_job_runImpl(Job *job, ExecutorRef executor) {
157-
ExecutorTrackingInfo trackingInfo;
158-
trackingInfo.enterAndShadow(executor);
159-
160-
runJobInEstablishedExecutorContext(job);
161-
162-
trackingInfo.leave();
163-
}
164-
165163
void swift::runJobInEstablishedExecutorContext(Job *job) {
166164
_swift_tsan_acquire(job);
167165

@@ -199,9 +197,11 @@ AsyncTask *swift::_swift_task_clearCurrent() {
199197

200198
SWIFT_CC(swift)
201199
static ExecutorRef swift_task_getCurrentExecutorImpl() {
202-
auto result = ExecutorTrackingInfo::getActiveExecutorOnThread();
200+
auto currentTracking = ExecutorTrackingInfo::current();
201+
auto result = (currentTracking ? currentTracking->getActiveExecutor()
202+
: ExecutorRef::generic());
203203
#if SWIFT_TASK_PRINTF_DEBUG
204-
fprintf(stderr, "%p getting current executor %p\n", pthread_self(), (void*)result.getRawValue());
204+
fprintf(stderr, "[%p] getting current executor %p\n", pthread_self(), result.getIdentity());
205205
#endif
206206
return result;
207207
}
@@ -885,7 +885,7 @@ static Job *preprocessQueue(JobRef first,
885885

886886
void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
887887
#if SWIFT_TASK_PRINTF_DEBUG
888-
fprintf(stderr, "%p %p.giveUpThread\n", pthread_self(), this);
888+
fprintf(stderr, "[%p] giving up thread for actor %p\n", pthread_self(), this);
889889
#endif
890890
auto oldState = CurrentState.load(std::memory_order_acquire);
891891
assert(oldState.Flags.getStatus() == Status::Running);
@@ -936,8 +936,8 @@ void DefaultActorImpl::giveUpThread(RunningJobInfo runner) {
936936
}
937937

938938
#if SWIFT_TASK_PRINTF_DEBUG
939-
# define LOG_STATE_TRANSITION fprintf(stderr, "%p transition from %zx to %zx in %p.%s\n", \
940-
pthread_self(), oldState.Flags.getOpaqueValue(), newState.Flags.getOpaqueValue(), this, __FUNCTION__)
939+
# define LOG_STATE_TRANSITION fprintf(stderr, "[%p] actor %p transitioned from %zx to %zx (%s)\n", \
940+
pthread_self(), this, oldState.Flags.getOpaqueValue(), newState.Flags.getOpaqueValue(), __FUNCTION__)
941941
#else
942942
# define LOG_STATE_TRANSITION ((void)0)
943943
#endif
@@ -1141,6 +1141,34 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
11411141
}
11421142
}
11431143

1144+
SWIFT_CC(swift)
1145+
static void swift_job_runImpl(Job *job, ExecutorRef executor) {
1146+
ExecutorTrackingInfo trackingInfo;
1147+
1148+
// swift_job_run is a primary entrypoint for executors telling us to
1149+
// run jobs. Actor executors won't expect us to switch off them
1150+
// during this operation. But do allow switching if the executor
1151+
// is generic.
1152+
if (!executor.isGeneric()) trackingInfo.disallowSwitching();
1153+
1154+
trackingInfo.enterAndShadow(executor);
1155+
1156+
runJobInEstablishedExecutorContext(job);
1157+
1158+
trackingInfo.leave();
1159+
1160+
// Give up the current executor if this is a switching context
1161+
// (which, remember, only happens if we started out on a generic
1162+
// executor) and we've switched to a default actor.
1163+
auto currentExecutor = trackingInfo.getActiveExecutor();
1164+
if (trackingInfo.allowsSwitching() && currentExecutor.isDefaultActor()) {
1165+
// Use an underestimated priority; if this means we create an
1166+
// extra processing job in some cases, that's probably okay.
1167+
auto runner = RunningJobInfo::forOther(JobPriority(0));
1168+
asImpl(currentExecutor.getDefaultActor())->giveUpThread(runner);
1169+
}
1170+
}
1171+
11441172
/// The primary function for processing an actor on a thread. Start
11451173
/// processing the given default actor as the active default actor on
11461174
/// the current thread, and keep processing whatever actor we're
@@ -1154,15 +1182,18 @@ Job *DefaultActorImpl::claimNextJobOrGiveUp(bool actorIsOwned,
11541182
static void processDefaultActor(DefaultActorImpl *currentActor,
11551183
RunningJobInfo runner) {
11561184
#if SWIFT_TASK_PRINTF_DEBUG
1157-
fprintf(stderr, "%p processDefaultActor %p\n", pthread_self(), currentActor);
1185+
fprintf(stderr, "[%p] processDefaultActor %p\n", pthread_self(), currentActor);
11581186
#endif
11591187
DefaultActorImpl *actor = currentActor;
11601188

1161-
// Register that we're processing a default actor in this frame.
1189+
// If we actually have work to do, we'll need to set up tracking info.
1190+
// Optimistically assume that we will; the alternative (an override job
1191+
// took over the actor first) is rare.
11621192
ExecutorTrackingInfo trackingInfo;
1163-
auto activeTrackingInfo = trackingInfo.enterOrUpdate(
1193+
trackingInfo.enterAndShadow(
11641194
ExecutorRef::forDefaultActor(asAbstract(currentActor)));
11651195

1196+
// Remember whether we've already taken over the actor.
11661197
bool threadIsRunningActor = false;
11671198
while (true) {
11681199
assert(currentActor);
@@ -1171,12 +1202,13 @@ static void processDefaultActor(DefaultActorImpl *currentActor,
11711202
if (shouldYieldThread())
11721203
break;
11731204

1174-
// Claim another job from the current actor.
1205+
// Try to claim another job from the current actor, taking it over
1206+
// if we haven't already.
11751207
auto job = currentActor->claimNextJobOrGiveUp(threadIsRunningActor,
11761208
runner);
11771209

11781210
#if SWIFT_TASK_PRINTF_DEBUG
1179-
fprintf(stderr, "%p processDefaultActor %p claimed job %p\n", pthread_self(), currentActor, job);
1211+
fprintf(stderr, "[%p] processDefaultActor %p claimed job %p\n", pthread_self(), currentActor, job);
11801212
#endif
11811213

11821214
// If we failed to claim a job, we have nothing to do.
@@ -1187,15 +1219,18 @@ static void processDefaultActor(DefaultActorImpl *currentActor,
11871219
break;
11881220
}
11891221

1222+
// This thread now owns the current actor.
1223+
threadIsRunningActor = true;
1224+
11901225
// Run the job.
11911226
runJobInEstablishedExecutorContext(job);
11921227

11931228
// The current actor may have changed after the job.
11941229
// If it's become nil, or not a default actor, we have nothing to do.
1195-
auto currentExecutor = activeTrackingInfo->getActiveExecutor();
1230+
auto currentExecutor = trackingInfo.getActiveExecutor();
11961231

11971232
#if SWIFT_TASK_PRINTF_DEBUG
1198-
fprintf(stderr, "%p processDefaultActor %p current executor now %p\n", pthread_self(), currentActor, (void*)currentExecutor.getRawValue());
1233+
fprintf(stderr, "[%p] processDefaultActor %p current executor now %p\n", pthread_self(), currentActor, currentExecutor.getIdentity());
11991234
#endif
12001235

12011236
if (!currentExecutor.isDefaultActor()) {
@@ -1205,17 +1240,15 @@ static void processDefaultActor(DefaultActorImpl *currentActor,
12051240
break;
12061241
}
12071242
currentActor = asImpl(currentExecutor.getDefaultActor());
1208-
1209-
// Otherwise, we know that we're running the actor on this thread.
1210-
threadIsRunningActor = true;
12111243
}
12121244

1213-
if (activeTrackingInfo == &trackingInfo)
1214-
trackingInfo.leave();
1245+
// Leave the tracking info.
1246+
trackingInfo.leave();
12151247

12161248
// If we still have an active actor, we should give it up.
1217-
if (currentActor)
1249+
if (threadIsRunningActor && currentActor) {
12181250
currentActor->giveUpThread(runner);
1251+
}
12191252

12201253
swift_release(actor);
12211254
}
@@ -1386,7 +1419,14 @@ void swift::swift_MainActor_register(HeapObject *actor) {
13861419
/*****************************************************************************/
13871420

13881421
/// Can the current executor give up its thread?
1389-
static bool canGiveUpThreadForSwitch(ExecutorRef currentExecutor) {
1422+
static bool canGiveUpThreadForSwitch(ExecutorTrackingInfo *trackingInfo,
1423+
ExecutorRef currentExecutor) {
1424+
assert(trackingInfo || currentExecutor.isGeneric());
1425+
1426+
// Some contexts don't allow switching at all.
1427+
if (trackingInfo && !trackingInfo->allowsSwitching())
1428+
return false;
1429+
13901430
// We can certainly "give up" a generic executor to try to run
13911431
// a task for an actor.
13921432
if (currentExecutor.isGeneric())
@@ -1439,31 +1479,38 @@ static bool tryAssumeThreadForSwitch(ExecutorRef newExecutor,
14391479
/// continue to run the given task on it.
14401480
SWIFT_CC(swiftasync)
14411481
static void runOnAssumedThread(AsyncTask *task, ExecutorRef executor,
1482+
ExecutorTrackingInfo *oldTracking,
14421483
RunningJobInfo runner) {
1443-
// Set that this actor is now the active default actor on this thread,
1444-
// and set up tracking info if there isn't any already.
1445-
ExecutorTrackingInfo trackingInfo;
1446-
auto activeTrackingInfo = trackingInfo.enterOrUpdate(executor);
1484+
// If there's alreaady tracking info set up, just change the executor
1485+
// there and tail-call the task. We don't want these frames to
1486+
// potentially accumulate linearly.
1487+
if (oldTracking) {
1488+
oldTracking->setActiveExecutor(executor);
14471489

1448-
// If one already existed, we should just tail-call the task; we don't
1449-
// want these frames to potentially accumulate linearly.
1450-
if (activeTrackingInfo != &trackingInfo) {
14511490
// FIXME: force tail call
14521491
return task->runInFullyEstablishedContext();
14531492
}
14541493

1455-
// Otherwise, run the new task.
1494+
// Otherwise, set up tracking info.
1495+
ExecutorTrackingInfo trackingInfo;
1496+
trackingInfo.enterAndShadow(executor);
1497+
1498+
// Run the new task.
14561499
task->runInFullyEstablishedContext();
14571500

14581501
// Leave the tracking frame, and give up the current actor if
14591502
// we have one.
14601503
//
1461-
// In principle, we could execute more tasks here, but that's probably
1462-
// not a reasonable thing to do in an assumed context rather than a
1463-
// dedicated actor-processing job.
1504+
// In principle, we could execute more tasks from the actor here, but
1505+
// that's probably not a reasonable thing to do in an assumed context
1506+
// rather than a dedicated actor-processing job.
14641507
executor = trackingInfo.getActiveExecutor();
14651508
trackingInfo.leave();
14661509

1510+
#if SWIFT_TASK_PRINTF_DEBUG
1511+
fprintf(stderr, "[%p] leaving assumed thread, current executor is %p\n", pthread_self(), executor.getIdentity());
1512+
#endif
1513+
14671514
if (executor.isDefaultActor())
14681515
asImpl(executor.getDefaultActor())->giveUpThread(runner);
14691516
}
@@ -1472,9 +1519,12 @@ SWIFT_CC(swiftasync)
14721519
static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContext,
14731520
TaskContinuationFunction *resumeFunction,
14741521
ExecutorRef newExecutor) {
1475-
auto currentExecutor = ExecutorTrackingInfo::getActiveExecutorInThread();
1522+
auto trackingInfo = ExecutorTrackingInfo::current();
1523+
auto currentExecutor =
1524+
(trackingInfo ? trackingInfo->getActiveExecutor()
1525+
: ExecutorRef::generic());
14761526
#if SWIFT_TASK_PRINTF_DEBUG
1477-
fprintf(stderr, "%p switch %p -> %p\n", pthread_self(), (void*)currentExecutor.getRawValue(), (void*)newExecutor.getRawValue());
1527+
fprintf(stderr, "[%p] trying to switch from executor %p to %p\n", pthread_self(), currentExecutor.getIdentity(), newExecutor.getIdentity());
14781528
#endif
14791529

14801530
// If the current executor is compatible with running the new executor,
@@ -1499,16 +1549,22 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
14991549
// If the current executor can give up its thread, and the new executor
15001550
// can take over a thread, try to do so; but don't do this if we've
15011551
// been asked to yield the thread.
1502-
if (canGiveUpThreadForSwitch(currentExecutor) &&
1552+
if (canGiveUpThreadForSwitch(trackingInfo, currentExecutor) &&
15031553
!shouldYieldThread() &&
15041554
tryAssumeThreadForSwitch(newExecutor, runner)) {
1555+
#if SWIFT_TASK_PRINTF_DEBUG
1556+
fprintf(stderr, "[%p] switch succeeded, task %p assumed thread for executor %p\n", pthread_self(), task, newExecutor.getIdentity());
1557+
#endif
15051558
giveUpThreadForSwitch(currentExecutor, runner);
15061559
// FIXME: force tail call
1507-
return runOnAssumedThread(task, newExecutor, runner);
1560+
return runOnAssumedThread(task, newExecutor, trackingInfo, runner);
15081561
}
15091562

15101563
// Otherwise, just asynchronously enqueue the task on the given
15111564
// executor.
1565+
#if SWIFT_TASK_PRINTF_DEBUG
1566+
fprintf(stderr, "[%p] switch failed, task %p enqueued on executor %p\n", pthread_self(), task, newExecutor.getIdentity());
1567+
#endif
15121568
swift_task_enqueue(task, newExecutor);
15131569
}
15141570

@@ -1519,7 +1575,7 @@ static void swift_task_switchImpl(SWIFT_ASYNC_CONTEXT AsyncContext *resumeContex
15191575
SWIFT_CC(swift)
15201576
static void swift_task_enqueueImpl(Job *job, ExecutorRef executor) {
15211577
#if SWIFT_TASK_PRINTF_DEBUG
1522-
fprintf(stderr, "%p enqueue %p\n", pthread_self(), (void*)executor.getRawValue());
1578+
fprintf(stderr, "[%p] enqueue job %p on executor %p\n", pthread_self(), job, executor.getIdentity());
15231579
#endif
15241580

15251581
assert(job && "no job provided");

0 commit comments

Comments
 (0)