Skip to content

Commit 9888026

Browse files
Using multiple insertion points to ensure all jobs are always inserted in O(1)
1 parent efc6f1a commit 9888026

File tree

1 file changed

+78
-58
lines changed

1 file changed

+78
-58
lines changed

stdlib/public/Concurrency/Actor.cpp

Lines changed: 78 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -983,28 +983,38 @@ class alignas(sizeof(void *) * 2) ActiveActorStatus {
983983

984984
/// Given that a job is enqueued normally on a default actor, get/set
985985
/// the next job in the actor's queue.
986-
static JobRef getNextJobInQueue(Job *job) {
986+
static JobRef getNextJobRef(Job *job) {
987987
return *reinterpret_cast<JobRef *>(job->SchedulerPrivate);
988988
}
989-
static void setNextJobInQueue(Job *job, JobRef next) {
989+
static void setNextJobRef(Job *job, JobRef next) {
990990
*reinterpret_cast<JobRef *>(job->SchedulerPrivate) = next;
991991
}
992992

993-
namespace {
993+
static Job* getNextJob(Job *job) {
994+
return getNextJobRef(job).getAsPreprocessedJob();
995+
}
996+
static void setNextJob(Job *job, Job* next) {
997+
setNextJobRef(job, JobRef::getPreprocessed(next));
998+
}
994999

995-
struct JobQueueTraits {
996-
static Job *getNext(Job *job) {
997-
return getNextJobInQueue(job).getAsPreprocessedJob();
998-
}
999-
static void setNext(Job *job, Job *next) {
1000-
setNextJobInQueue(job, JobRef::getPreprocessed(next));
1001-
}
1002-
static int compare(Job *lhs, Job *rhs) {
1003-
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
1000+
enum { NumPriorityBuckets = 5 };
1001+
1002+
static int getPriorityIndex(JobPriority priority) {
1003+
switch (priority) {
1004+
case JobPriority::UserInteractive:
1005+
return 0;
1006+
case JobPriority::UserInitiated:
1007+
return 1;
1008+
case JobPriority::Unspecified:
1009+
assert(false);
1010+
case JobPriority::Default:
1011+
return 2;
1012+
case JobPriority::Utility:
1013+
return 3;
1014+
case JobPriority::Background:
1015+
return 4;
10041016
}
1005-
};
1006-
1007-
} // end anonymous namespace
1017+
}
10081018

10091019
#endif
10101020

@@ -1080,9 +1090,14 @@ class DefaultActorImpl : public HeapObject {
10801090
// the future
10811091
alignas(sizeof(ActiveActorStatus)) char StatusStorage[sizeof(ActiveActorStatus)];
10821092

1083-
using ListMerger = swift::ListMerger<Job *, JobQueueTraits>;
1084-
ListMerger::LastInsertionPoint lastInsertionPoint =
1085-
ListMerger::LastInsertionPoint();
1093+
// All jobs are stored in a single linked list
1094+
// Head of the list is stored inside ActiveActorStatus
1095+
// List starts with a zero or more unprocessed jobs of mixed priorities in LIFO order.
1096+
// After that list contains processed jobs sorted by priority.
1097+
// Within the same priority jobs are sorted in the FIFO order.
1098+
// Elements of `insertionPoints` point to the last job of the corresponding priority,
1099+
// or are null if no jobs of that priority exist.
1100+
Job *insertionPoints[NumPriorityBuckets];
10861101
#endif
10871102
// TODO (rokhinip): Make this a flagset
10881103
bool isDistributedRemoteActor;
@@ -1161,7 +1176,8 @@ class DefaultActorImpl : public HeapObject {
11611176

11621177
Job *preprocessQueue(JobRef start);
11631178
Job *preprocessQueue(JobRef unprocessedStart, JobRef unprocessedEnd,
1164-
Job *existingProcessedJobsToMergeInto);
1179+
Job *processedJobsStart);
1180+
Job *preprocessJobs(Job* jobsToProcess, Job *processedJobsStart);
11651181
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
11661182

11671183
void deallocateUnconditional();
@@ -1251,34 +1267,28 @@ static NonDefaultDistributedActorImpl *asImpl(NonDefaultDistributedActor *actor)
12511267
// preprocessQueue
12521268
Job *DefaultActorImpl::preprocessQueue(JobRef unprocessedStart,
12531269
JobRef unprocessedEnd,
1254-
Job *existingProcessedJobsToMergeInto) {
1255-
assert(existingProcessedJobsToMergeInto != NULL);
1270+
Job *processedJobsStart) {
1271+
assert(processedJobsStart != NULL);
12561272
assert(unprocessedStart.needsPreprocessing());
12571273
assert(unprocessedStart.getAsJob() != unprocessedEnd.getAsJob());
12581274

12591275
// Build up a list of jobs we need to preprocess
1260-
ListMerger jobsToProcess;
1276+
Job* jobsToProcess = nullptr;
12611277

12621278
// Get just the prefix list of unprocessed jobs
12631279
auto current = unprocessedStart;
12641280
while (current != unprocessedEnd) {
12651281
assert(current.needsPreprocessing());
12661282
// Advance current to next pointer and process current unprocessed job
12671283
auto job = current.getAsJob();
1268-
current = getNextJobInQueue(job);
1284+
current = getNextJobRef(job);
12691285

1270-
jobsToProcess.insertAtFront(job);
1286+
setNextJob(job, jobsToProcess);
1287+
jobsToProcess = job;
12711288
}
12721289

12731290
// Finish processing the unprocessed jobs
1274-
Job *newProcessedJobs = std::get<0>(jobsToProcess.release());
1275-
assert(newProcessedJobs);
1276-
1277-
ListMerger mergedList(existingProcessedJobsToMergeInto, lastInsertionPoint);
1278-
mergedList.merge(newProcessedJobs);
1279-
Job *result;
1280-
std::tie(result, lastInsertionPoint) = mergedList.release();
1281-
return result;
1291+
return preprocessJobs(jobsToProcess, processedJobsStart);
12821292
}
12831293

12841294
// Called with the actor drain lock held.
@@ -1297,49 +1307,56 @@ Job *DefaultActorImpl::preprocessQueue(JobRef start) {
12971307
// There exist some jobs which haven't been preprocessed
12981308

12991309
// Build up a list of jobs we need to preprocess
1300-
ListMerger jobsToProcess;
1310+
Job* jobsToProcess = nullptr;
13011311

1302-
Job *wellFormedListStart = NULL;
1312+
Job *processedJobsStart = NULL;
13031313

1304-
auto current = start;
1305-
while (current) {
1314+
for (JobRef current = start; current; ) {
13061315
if (!current.needsPreprocessing()) {
13071316
// We can assume that everything from here onwards as being well formed
13081317
// and sorted
1309-
wellFormedListStart = current.getAsPreprocessedJob();
1318+
processedJobsStart = current.getAsPreprocessedJob();
13101319
break;
13111320
}
13121321

13131322
// Advance current to next pointer and insert current fella to jobsToProcess
13141323
// list
13151324
auto job = current.getAsJob();
1316-
current = getNextJobInQueue(job);
1325+
current = getNextJobRef(job);
13171326

1318-
jobsToProcess.insertAtFront(job);
1327+
setNextJob(job, jobsToProcess);
1328+
jobsToProcess = job;
13191329
}
13201330

13211331
// Finish processing the unprocessed jobs
1322-
auto processedJobHead = std::get<0>(jobsToProcess.release());
1323-
assert(processedJobHead);
1324-
1325-
Job *firstJob = NULL;
1326-
if (wellFormedListStart) {
1327-
// Merge it with already known well formed list if we have one.
1328-
ListMerger mergedList(wellFormedListStart, lastInsertionPoint);
1329-
mergedList.merge(processedJobHead);
1330-
std::tie(firstJob, lastInsertionPoint) = mergedList.release();
1331-
} else {
1332-
// Nothing to merge with, just return the head we already have
1333-
firstJob = processedJobHead;
1334-
lastInsertionPoint = ListMerger::LastInsertionPoint();
1335-
}
1332+
return preprocessJobs(jobsToProcess, processedJobsStart);
1333+
}
13361334

1337-
return firstJob;
1335+
Job *DefaultActorImpl::preprocessJobs(Job* jobsToProcess, Job *processedJobsStart) {
1336+
for (auto current = jobsToProcess; current; ) {
1337+
auto next = getNextJob(current);
1338+
int priorityIndex = getPriorityIndex(current->getPriority());
1339+
for (int i = priorityIndex; ; i--) {
1340+
if (i < 0) {
1341+
setNextJob(current, processedJobsStart);
1342+
processedJobsStart = current;
1343+
break;
1344+
}
1345+
if (insertionPoints[i]) {
1346+
setNextJob(current, getNextJob(insertionPoints[i]));
1347+
setNextJob(insertionPoints[i], current);
1348+
break;
1349+
}
1350+
}
1351+
insertionPoints[priorityIndex] = current;
1352+
current = next;
1353+
}
1354+
return processedJobsStart;
13381355
}
13391356

13401357
static void traceJobQueue(DefaultActorImpl *actor, Job *first) {
13411358
concurrency::trace::actor_note_job_queue(actor, first, [](Job *job) {
1342-
return getNextJobInQueue(job).getAsPreprocessedJob();
1359+
return getNextJob(job);
13431360
});
13441361
}
13451362

@@ -1381,7 +1398,7 @@ void DefaultActorImpl::enqueue(Job *job, JobPriority priority) {
13811398

13821399
// Link this into the queue in the atomic state
13831400
JobRef currentHead = oldState.getFirstJob();
1384-
setNextJobInQueue(job, currentHead);
1401+
setNextJobRef(job, currentHead);
13851402
JobRef newHead = JobRef::getUnpreprocessed(job);
13861403

13871404
newState = newState.withFirstJob(newHead);
@@ -1533,11 +1550,14 @@ Job * DefaultActorImpl::drainOne() {
15331550

15341551
auto newState = oldState;
15351552
// Dequeue the first job and set up a new head
1536-
newState = newState.withFirstJob(getNextJobInQueue(firstJob));
1553+
newState = newState.withFirstJob(getNextJobRef(firstJob));
15371554
if (_status().compare_exchange_weak(oldState, newState,
15381555
/* success */ std::memory_order_relaxed,
15391556
/* failure */ std::memory_order_relaxed)) {
1540-
lastInsertionPoint.nodeWasRemoved(firstJob);
1557+
int priorityIndex = getPriorityIndex(firstJob->getPriority());
1558+
if (insertionPoints[priorityIndex] == firstJob) {
1559+
insertionPoints[priorityIndex] = nullptr;
1560+
}
15411561
SWIFT_TASK_DEBUG_LOG("Drained first job %p from actor %p", firstJob, this);
15421562
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
15431563
concurrency::trace::actor_dequeue(this, firstJob);

0 commit comments

Comments
 (0)