Skip to content

Commit 84623ce

Browse files
committed
Teach default actors to execute jobs in priority order.
rdar://76473589
1 parent 26ea1fe commit 84623ce

File tree

2 files changed

+153
-17
lines changed

2 files changed

+153
-17
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include "swift/Runtime/ThreadLocalStorage.h"
3535
#include "swift/ABI/Task.h"
3636
#include "swift/ABI/Actor.h"
37+
#include "swift/Basic/ListMerger.h"
3738
#ifndef SWIFT_CONCURRENCY_BACK_DEPLOYMENT
3839
#include "llvm/Config/config.h"
3940
#else
@@ -1110,6 +1111,22 @@ void DefaultActorImpl::OverrideJobCache::commit() {
11101111
}
11111112
}
11121113

1114+
namespace {
1115+
1116+
struct JobQueueTraits {
1117+
static Job *getNext(Job *job) {
1118+
return getNextJobInQueue(job).getAsPreprocessedJob();
1119+
}
1120+
static void setNext(Job *job, Job *next) {
1121+
setNextJobInQueue(job, JobRef::getPreprocessed(next));
1122+
}
1123+
static int compare(Job *lhs, Job *rhs) {
1124+
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
1125+
}
1126+
};
1127+
1128+
} // end anonymous namespace
1129+
11131130
/// Preprocess the prefix of the actor's queue that hasn't already
11141131
/// been preprocessed:
11151132
///
@@ -1128,7 +1145,8 @@ static Job *preprocessQueue(JobRef first,
11281145
if (!first.needsPreprocessing())
11291146
return first.getAsPreprocessedJob();
11301147

1131-
Job *firstNewJob = nullptr;
1148+
using ListMerger = swift::ListMerger<Job*, JobQueueTraits>;
1149+
ListMerger newJobs;
11321150

11331151
while (first != previousFirst) {
11341152
// If we find something that doesn't need preprocessing, it must've
@@ -1160,27 +1178,20 @@ static Job *preprocessQueue(JobRef first,
11601178
// jobs; since enqueue() always adds jobs to the front, reversing
11611179
// the order effectively makes the actor queue FIFO, which is what
11621180
// we want.
1163-
// FIXME: but we should also sort by priority
11641181
auto job = first.getAsJob();
11651182
first = getNextJobInQueue(job);
1166-
setNextJobInQueue(job, JobRef::getPreprocessed(firstNewJob));
1167-
firstNewJob = job;
1183+
newJobs.insertAtFront(job);
11681184
}
11691185

11701186
// If there are jobs already in the queue, put the new jobs at the end.
1187+
auto firstNewJob = newJobs.release();
11711188
if (!firstNewJob) {
11721189
firstNewJob = previousFirstNewJob;
11731190
} else if (previousFirstNewJob) {
1174-
auto cur = previousFirstNewJob;
1175-
while (true) {
1176-
auto next = getNextJobInQueue(cur).getAsPreprocessedJob();
1177-
if (!next) {
1178-
setNextJobInQueue(cur, JobRef::getPreprocessed(firstNewJob));
1179-
break;
1180-
}
1181-
cur = next;
1182-
}
1183-
firstNewJob = previousFirstNewJob;
1191+
// Merge the jobs we just processed into the existing job list.
1192+
ListMerger merge(previousFirstNewJob);
1193+
merge.merge(firstNewJob);
1194+
firstNewJob = merge.release();
11841195
}
11851196

11861197
return firstNewJob;

unittests/runtime/Actor.cpp

Lines changed: 128 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ static void enqueueGlobal(Job *job,
4848
// Add the job after (i.e. before in execution order) all jobs
4949
// with lower priority.
5050
for (auto i = globalQueue.begin(), e = globalQueue.end(); i != e; ++i) {
51-
if (job->getPriority() <= (*i)->getPriority()) {
51+
if (descendingPriorityOrder((*i)->getPriority(), job->getPriority()) <= 0) {
5252
globalQueue.insert(i, job);
5353
return;
5454
}
@@ -158,6 +158,19 @@ static AsyncTask *createTask(JobPriority priority, Fn &&fn) {
158158
.first;
159159
}
160160

161+
template <class Fn>
162+
static AsyncTask *createAndEnqueueTask(JobPriority priority,
163+
TestActor *actor,
164+
Fn &&fn) {
165+
auto task = createTaskWithContext<AsyncContext, Fn>(priority, std::move(fn))
166+
.first;
167+
if (actor)
168+
swift_task_enqueue(task, ExecutorRef::forDefaultActor(actor));
169+
else
170+
swift_task_enqueueGlobal(task);
171+
return task;
172+
}
173+
161174
template <class Context, class Fn>
162175
static void parkTask(AsyncTask *task, Context *context, Fn &&fn) {
163176
auto invoke =
@@ -312,7 +325,7 @@ TEST(ActorTest, actorContention) {
312325

313326
parkTask(task, context,
314327
[](Context *context) SWIFT_CC(swiftasync) {
315-
EXPECT_PROGRESS(3);
328+
EXPECT_PROGRESS(2);
316329
auto executor = swift_task_getCurrentExecutor();
317330
EXPECT_FALSE(executor.isGeneric());
318331
EXPECT_EQ(ExecutorRef::forDefaultActor(context->get<1>()),
@@ -336,7 +349,7 @@ TEST(ActorTest, actorContention) {
336349
auto task1 = createTaskStoring(JobPriority::Background,
337350
(AsyncTask*) nullptr, actor,
338351
[](Context *context) SWIFT_CC(swiftasync) {
339-
EXPECT_PROGRESS(2);
352+
EXPECT_PROGRESS(3);
340353
auto executor = swift_task_getCurrentExecutor();
341354
EXPECT_FALSE(executor.isGeneric());
342355
EXPECT_EQ(ExecutorRef::forDefaultActor(context->get<1>()),
@@ -361,3 +374,115 @@ TEST(ActorTest, actorContention) {
361374
EXPECT_PROGRESS(0);
362375
});
363376
}
377+
378+
TEST(ActorTest, actorPriority) {
379+
run([] {
380+
auto actor = createActor();
381+
382+
createAndEnqueueTask(JobPriority::Background, actor,
383+
[=](AsyncContext *context) {
384+
EXPECT_PROGRESS(4);
385+
return context->ResumeParent(context);
386+
});
387+
388+
createAndEnqueueTask(JobPriority::Utility, actor,
389+
[=](AsyncContext *context) {
390+
EXPECT_PROGRESS(1);
391+
return context->ResumeParent(context);
392+
});
393+
394+
createAndEnqueueTask(JobPriority::Background, actor,
395+
[=](AsyncContext *context) {
396+
EXPECT_PROGRESS(5);
397+
finishTest();
398+
return context->ResumeParent(context);
399+
});
400+
401+
createAndEnqueueTask(JobPriority::Utility, actor,
402+
[=](AsyncContext *context) {
403+
EXPECT_PROGRESS(2);
404+
return context->ResumeParent(context);
405+
});
406+
407+
createAndEnqueueTask(JobPriority::Default, actor,
408+
[=](AsyncContext *context) {
409+
EXPECT_PROGRESS(0);
410+
return context->ResumeParent(context);
411+
});
412+
413+
createAndEnqueueTask(JobPriority::Utility, actor,
414+
[=](AsyncContext *context) {
415+
EXPECT_PROGRESS(3);
416+
return context->ResumeParent(context);
417+
});
418+
});
419+
}
420+
421+
TEST(ActorTest, actorPriority2) {
422+
run([] {
423+
auto actor = createActor();
424+
425+
createAndEnqueueTask(JobPriority::Background, actor,
426+
[=](AsyncContext *context) {
427+
EXPECT_PROGRESS(7);
428+
return context->ResumeParent(context);
429+
});
430+
431+
createAndEnqueueTask(JobPriority::Utility, actor,
432+
[=](AsyncContext *context) {
433+
EXPECT_PROGRESS(1);
434+
435+
createAndEnqueueTask(JobPriority::Utility, actor,
436+
[=](AsyncContext *context) {
437+
EXPECT_PROGRESS(5);
438+
return context->ResumeParent(context);
439+
});
440+
441+
createAndEnqueueTask(JobPriority::Default, actor,
442+
[](AsyncContext *context) {
443+
EXPECT_PROGRESS(2);
444+
return context->ResumeParent(context);
445+
});
446+
447+
return context->ResumeParent(context);
448+
});
449+
450+
createAndEnqueueTask(JobPriority::Background, actor,
451+
[=](AsyncContext *context) {
452+
EXPECT_PROGRESS(8);
453+
return context->ResumeParent(context);
454+
});
455+
456+
createAndEnqueueTask(JobPriority::Utility, actor,
457+
[=](AsyncContext *context) {
458+
EXPECT_PROGRESS(3);
459+
460+
createAndEnqueueTask(JobPriority::Background, actor,
461+
[=](AsyncContext *context) {
462+
EXPECT_PROGRESS(9);
463+
finishTest();
464+
return context->ResumeParent(context);
465+
});
466+
467+
createAndEnqueueTask(JobPriority::Utility, actor,
468+
[=](AsyncContext *context) {
469+
EXPECT_PROGRESS(6);
470+
return context->ResumeParent(context);
471+
});
472+
473+
return context->ResumeParent(context);
474+
});
475+
476+
createAndEnqueueTask(JobPriority::Default, actor,
477+
[=](AsyncContext *context) {
478+
EXPECT_PROGRESS(0);
479+
return context->ResumeParent(context);
480+
});
481+
482+
createAndEnqueueTask(JobPriority::Utility, actor,
483+
[=](AsyncContext *context) {
484+
EXPECT_PROGRESS(4);
485+
return context->ResumeParent(context);
486+
});
487+
});
488+
}

0 commit comments

Comments
 (0)