Skip to content

Commit 87f4a33

Browse files
Process incoming queue when obtaining drainer lock
1 parent c8d93a3 commit 87f4a33

File tree

3 files changed

+208
-133
lines changed

3 files changed

+208
-133
lines changed

include/swift/Basic/PriorityQueue.h

Lines changed: 9 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,6 @@
2222

2323
namespace swift {
2424

25-
/// A simple linked list of nodes.
26-
template <class Node, class NodeTraits>
27-
class SimpleQueue {
28-
public:
29-
Node head;
30-
31-
SimpleQueue() : head() {}
32-
33-
void prepend(Node newNode) {
34-
assert(newNode && "inserting a null node");
35-
NodeTraits::setNext(newNode, head);
36-
head = newNode;
37-
}
38-
};
39-
4025
/// A class for priority FIFO queue with a fixed number of priorities.
4126
///
4227
/// The `Node` type parameter represents a reference to a list node.
@@ -104,23 +89,25 @@ class PriorityQueue {
10489

10590
/// Add a chain of nodes of mixed priorities to this queue.
10691
void enqueueContentsOf(Node otherHead) {
92+
if (!otherHead) return;
10793
Node runHead = otherHead;
108-
while (runHead) {
109-
int priorityIndex = NodeTraits::getPriorityIndex(runHead);
110-
94+
int priorityIndex = NodeTraits::getPriorityIndex(runHead);
95+
do {
11196
// Find run of jobs of the same priority
11297
Node runTail = runHead;
11398
Node next = NodeTraits::getNext(runTail);
114-
while (true) {
115-
if (!next) break;
116-
if (NodeTraits::getPriorityIndex(next) != priorityIndex) break;
99+
int nextRunPriorityIndex;
100+
while (next) {
101+
nextRunPriorityIndex = NodeTraits::getPriorityIndex(next);
102+
if (nextRunPriorityIndex != priorityIndex) break;
117103
runTail = next;
118104
next = NodeTraits::getNext(runTail);
119105
}
120106

121107
enqueueRun(priorityIndex, runHead, runTail);
122108
runHead = next;
123-
}
109+
priorityIndex = nextRunPriorityIndex;
110+
} while(runHead);
124111
}
125112

126113
Node dequeue() {

stdlib/public/Concurrency/Actor.cpp

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -973,7 +973,6 @@ class DefaultActorImplHeader : public HeapObject {
973973
class DefaultActorImplFooter {
974974
protected:
975975
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
976-
using SimpleQueue = swift::SimpleQueue<Job *, JobQueueTraits>;
977976
using PriorityQueue = swift::PriorityQueue<Job *, JobQueueTraits>;
978977

979978
// When enqueued, jobs are atomically added to a linked list with the head
@@ -1078,9 +1077,13 @@ class DefaultActorImpl
10781077
/// new priority
10791078
void enqueueStealer(Job *job, JobPriority priority);
10801079

1081-
/// Dequeues one job from `prioritizedJobs`.
1080+
/// Dequeues one job from `prioritisedJobs`.
10821081
/// The calling thread must be holding the actor lock while calling this
10831082
Job *drainOne();
1083+
1084+
/// Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1085+
/// Called with actor lock held on current thread.
1086+
void processIncomingQueue();
10841087
#endif
10851088

10861089
/// Check if the actor is actually a distributed *remote* actor.
@@ -1117,14 +1120,10 @@ class DefaultActorImpl
11171120
/// when actor gets a priority override and we schedule a stealer.
11181121
void scheduleActorProcessJob(JobPriority priority);
11191122

1120-
/// Atomically takes a list of jobs from ActiveActorStatus, reversing them in
1121-
/// the process. Returns jobs of mixed priorities in FIFO order.
1122-
SimpleQueue collectJobs();
1123-
1124-
/// Check for new jobs in the incoming queue and move them to the
1125-
/// processing queue.
1123+
/// Processes claimed incoming jobs into `prioritizedJobs`.
1124+
/// Incoming jobs are of mixed priorities and in LIFO order.
11261125
/// Called with actor lock held on current thread.
1127-
void processJobs();
1126+
void handleUnprioritizedJobs(Job *head);
11281127
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
11291128

11301129
void deallocateUnconditional();
@@ -1370,13 +1369,12 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
13701369

13711370
}
13721371

1373-
DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
1372+
void DefaultActorImpl::processIncomingQueue() {
13741373
// Pairs with the store release in DefaultActorImpl::enqueue
13751374
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
13761375
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
13771376
_swift_tsan_consume(this);
13781377

1379-
SimpleQueue result;
13801378
// We must ensure that any jobs not seen by the collectJobs() don't have any
13811379
// dangling references to the jobs that have been collected. For that we must
13821380
// atomically set head pointer to NULL. If it fails because more jobs have
@@ -1385,7 +1383,7 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
13851383
// If there aren't any new jobs in the incoming queue, we can return
13861384
// immediately without updating the status.
13871385
if (!oldState.getFirstUnprioritisedJob()) {
1388-
return result;
1386+
return;
13891387
}
13901388
assert(oldState.isAnyRunning());
13911389

@@ -1403,28 +1401,26 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
14031401
}
14041402
}
14051403

1406-
// Collect jobs, reversing them in the process
1407-
auto job = oldState.getFirstUnprioritisedJob();
1408-
while (job) {
1409-
auto next = getNextJob(job);
1410-
result.prepend(job);
1411-
job = next;
1412-
}
1413-
1414-
return result;
1404+
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
14151405
}
14161406

14171407
// Called with actor lock held on current thread
1418-
void DefaultActorImpl::processJobs() {
1419-
SimpleQueue jobs = collectJobs();
1420-
prioritizedJobs.enqueueContentsOf(jobs.head);
1408+
void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
1409+
// Reverse jobs from LIFO to FIFO order
1410+
Job *reversed = nullptr;
1411+
while (head) {
1412+
auto next = getNextJob(head);
1413+
setNextJob(head, reversed);
1414+
reversed = head;
1415+
head = next;
1416+
}
1417+
prioritizedJobs.enqueueContentsOf(reversed);
14211418
}
14221419

14231420
// Called with actor lock held on current thread
14241421
Job *DefaultActorImpl::drainOne() {
14251422
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);
14261423

1427-
processJobs();
14281424
traceJobQueue(this, prioritizedJobs.peek());
14291425
auto firstJob = prioritizedJobs.dequeue();
14301426
if (!firstJob) {
@@ -1488,39 +1484,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
14881484
TaskExecutorRef::undefined());
14891485

14901486
while (true) {
1491-
if (shouldYieldThread()) {
1492-
currentActor->unlock(true);
1493-
break;
1494-
}
1495-
14961487
Job *job = currentActor->drainOne();
14971488
if (job == NULL) {
14981489
// No work left to do, try unlocking the actor. This may fail if there is
14991490
// work concurrently enqueued in which case, we'd try again in the loop
1500-
if (!currentActor->unlock(false)) {
1501-
continue;
1491+
if (currentActor->unlock(false)) {
1492+
break;
1493+
}
1494+
} else {
1495+
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1496+
auto taskExecutor = task->getPreferredTaskExecutor();
1497+
trackingInfo.setTaskExecutor(taskExecutor);
15021498
}
1503-
break;
1504-
}
15051499

1506-
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1507-
auto taskExecutor = task->getPreferredTaskExecutor();
1508-
trackingInfo.setTaskExecutor(taskExecutor);
1500+
// This thread is now going to follow the task on this actor. It may hop off
1501+
// the actor
1502+
runJobInEstablishedExecutorContext(job);
1503+
1504+
// We could have come back from the job on a generic executor and not as
1505+
// part of a default actor. If so, there is no more work left for us to do
1506+
// here.
1507+
auto currentExecutor = trackingInfo.getActiveExecutor();
1508+
if (!currentExecutor.isDefaultActor()) {
1509+
currentActor = nullptr;
1510+
break;
1511+
}
1512+
currentActor = asImpl(currentExecutor.getDefaultActor());
15091513
}
15101514

1511-
// This thread is now going to follow the task on this actor. It may hop off
1512-
// the actor
1513-
runJobInEstablishedExecutorContext(job);
1514-
1515-
// We could have come back from the job on a generic executor and not as
1516-
// part of a default actor. If so, there is no more work left for us to do
1517-
// here.
1518-
auto currentExecutor = trackingInfo.getActiveExecutor();
1519-
if (!currentExecutor.isDefaultActor()) {
1520-
currentActor = nullptr;
1515+
if (shouldYieldThread()) {
1516+
currentActor->unlock(true);
15211517
break;
15221518
}
1523-
currentActor = asImpl(currentExecutor.getDefaultActor());
1519+
1520+
currentActor->processIncomingQueue();
15241521
}
15251522

15261523
// Leave the tracking info.
@@ -1666,6 +1663,17 @@ retry:;
16661663
auto newState = oldState.withRunning();
16671664
newState = newState.withoutEscalatedPriority();
16681665

1666+
// Claim incoming jobs when obtaining lock as a drainer, to save one
1667+
// round of atomic load and compare-exchange.
1668+
// This is not useful when obtaining lock for assuming thread during actor
1669+
// switching, because arbitrary use code can run between locking and
1670+
// draining the next job. So we still need to call processIncomingQueue() to
1671+
// check for higher priority jobs that could have been scheduled in the
1672+
// meantime. And processing is more efficient when done in larger batches.
1673+
if (asDrainer) {
1674+
newState = newState.withFirstUnprioritisedJob(nullptr);
1675+
}
1676+
16691677
// This needs an acquire since we are taking a lock
16701678
if (_status().compare_exchange_weak(oldState, newState,
16711679
std::memory_order_acquire,
@@ -1675,6 +1683,9 @@ retry:;
16751683
assert(prioritizedJobs.empty());
16761684
}
16771685
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
1686+
if (asDrainer) {
1687+
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
1688+
}
16781689
return true;
16791690
}
16801691
}

0 commit comments

Comments
 (0)