Skip to content

Commit 9ba09ff

Browse files
Process incoming queue when obtaining drainer lock
1 parent 40c38f9 commit 9ba09ff

File tree

3 files changed

+213
-133
lines changed

3 files changed

+213
-133
lines changed

include/swift/Basic/PriorityQueue.h

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,21 +22,6 @@
2222

2323
namespace swift {
2424

25-
/// A simple linked list of nodes.
26-
template <class Node, class NodeTraits>
27-
class SimpleQueue {
28-
public:
29-
Node head;
30-
31-
SimpleQueue() : head() {}
32-
33-
void prepend(Node newNode) {
34-
assert(newNode && "inserting a null node");
35-
NodeTraits::setNext(newNode, head);
36-
head = newNode;
37-
}
38-
};
39-
4025
/// A class for priority FIFO queue with a fixed number of priorities.
4126
///
4227
/// The `Node` type parameter represents a reference to a list node.
@@ -104,23 +89,25 @@ class PriorityQueue {
10489

10590
/// Add a chain of nodes of mixed priorities to this queue.
10691
void enqueueContentsOf(Node otherHead) {
92+
if (!otherHead) return;
10793
Node runHead = otherHead;
108-
while (runHead) {
109-
int priorityIndex = NodeTraits::getPriorityIndex(runHead);
110-
94+
int priorityIndex = NodeTraits::getPriorityIndex(runHead);
95+
do {
11196
// Find run of jobs of the same priority
11297
Node runTail = runHead;
11398
Node next = NodeTraits::getNext(runTail);
114-
while (true) {
115-
if (!next) break;
116-
if (NodeTraits::getPriorityIndex(next) != priorityIndex) break;
99+
int nextRunPriorityIndex = badIndex;
100+
while (next) {
101+
nextRunPriorityIndex = NodeTraits::getPriorityIndex(next);
102+
if (nextRunPriorityIndex != priorityIndex) break;
117103
runTail = next;
118104
next = NodeTraits::getNext(runTail);
119105
}
120106

121107
enqueueRun(priorityIndex, runHead, runTail);
122108
runHead = next;
123-
}
109+
priorityIndex = nextRunPriorityIndex;
110+
} while(runHead);
124111
}
125112

126113
Node dequeue() {
@@ -139,6 +126,11 @@ class PriorityQueue {
139126
Node peek() const { return head; }
140127
bool empty() const { return !head; }
141128
private:
129+
// Use large negative value to increase chance of causing segfault if this
130+
// value ends up being used for indexing. Using -1 would cause accessing
131+
// `head`, which is less noticeable.
132+
static const int badIndex = std::numeric_limits<int>::min();
133+
142134
void enqueueRun(int priorityIndex, Node runHead, Node runTail) {
143135
for (int i = priorityIndex;; i--) {
144136
if (i < 0) {

stdlib/public/Concurrency/Actor.cpp

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -975,7 +975,6 @@ class DefaultActorImplHeader : public HeapObject {
975975
class DefaultActorImplFooter {
976976
protected:
977977
#if !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS
978-
using SimpleQueue = swift::SimpleQueue<Job *, JobQueueTraits>;
979978
using PriorityQueue = swift::PriorityQueue<Job *, JobQueueTraits>;
980979

981980
// When enqueued, jobs are atomically added to a linked list with the head
@@ -1080,9 +1079,13 @@ class DefaultActorImpl
10801079
/// new priority
10811080
void enqueueStealer(Job *job, JobPriority priority);
10821081

1083-
/// Dequeues one job from `prioritizedJobs`.
1082+
/// Dequeues one job from `prioritisedJobs`.
10841083
/// The calling thread must be holding the actor lock while calling this
10851084
Job *drainOne();
1085+
1086+
/// Atomically claims incoming jobs from ActiveActorStatus, and calls `handleUnprioritizedJobs()`.
1087+
/// Called with actor lock held on current thread.
1088+
void processIncomingQueue();
10861089
#endif
10871090

10881091
/// Check if the actor is actually a distributed *remote* actor.
@@ -1119,14 +1122,10 @@ class DefaultActorImpl
11191122
/// when actor gets a priority override and we schedule a stealer.
11201123
void scheduleActorProcessJob(JobPriority priority);
11211124

1122-
/// Atomically takes a list of jobs from ActiveActorStatus, reversing them in
1123-
/// the process. Returns jobs of mixed priorities in FIFO order.
1124-
SimpleQueue collectJobs();
1125-
1126-
/// Check for new jobs in the incoming queue and move them to the
1127-
/// processing queue.
1125+
/// Processes claimed incoming jobs into `prioritizedJobs`.
1126+
/// Incoming jobs are of mixed priorities and in LIFO order.
11281127
/// Called with actor lock held on current thread.
1129-
void processJobs();
1128+
void handleUnprioritizedJobs(Job *head);
11301129
#endif /* !SWIFT_CONCURRENCY_ACTORS_AS_LOCKS */
11311130

11321131
void deallocateUnconditional();
@@ -1372,13 +1371,12 @@ void DefaultActorImpl::enqueueStealer(Job *job, JobPriority priority) {
13721371

13731372
}
13741373

1375-
DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
1374+
void DefaultActorImpl::processIncomingQueue() {
13761375
// Pairs with the store release in DefaultActorImpl::enqueue
13771376
bool distributedActorIsRemote = swift_distributed_actor_is_remote(this);
13781377
auto oldState = _status().load(SWIFT_MEMORY_ORDER_CONSUME);
13791378
_swift_tsan_consume(this);
13801379

1381-
SimpleQueue result;
13821380
// We must ensure that any jobs not seen by collectJobs() don't have any
13831381
// dangling references to the jobs that have been collected. For that we must
13841382
// atomically set head pointer to NULL. If it fails because more jobs have
@@ -1387,7 +1385,7 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
13871385
// If there aren't any new jobs in the incoming queue, we can return
13881386
// immediately without updating the status.
13891387
if (!oldState.getFirstUnprioritisedJob()) {
1390-
return result;
1388+
return;
13911389
}
13921390
assert(oldState.isAnyRunning());
13931391

@@ -1405,28 +1403,26 @@ DefaultActorImpl::SimpleQueue DefaultActorImpl::collectJobs() {
14051403
}
14061404
}
14071405

1408-
// Collect jobs, reversing them in the process
1409-
auto job = oldState.getFirstUnprioritisedJob();
1410-
while (job) {
1411-
auto next = getNextJob(job);
1412-
result.prepend(job);
1413-
job = next;
1414-
}
1415-
1416-
return result;
1406+
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
14171407
}
14181408

14191409
// Called with actor lock held on current thread
1420-
void DefaultActorImpl::processJobs() {
1421-
SimpleQueue jobs = collectJobs();
1422-
prioritizedJobs.enqueueContentsOf(jobs.head);
1410+
void DefaultActorImpl::handleUnprioritizedJobs(Job *head) {
1411+
// Reverse jobs from LIFO to FIFO order
1412+
Job *reversed = nullptr;
1413+
while (head) {
1414+
auto next = getNextJob(head);
1415+
setNextJob(head, reversed);
1416+
reversed = head;
1417+
head = next;
1418+
}
1419+
prioritizedJobs.enqueueContentsOf(reversed);
14231420
}
14241421

14251422
// Called with actor lock held on current thread
14261423
Job *DefaultActorImpl::drainOne() {
14271424
SWIFT_TASK_DEBUG_LOG("Draining one job from default actor %p", this);
14281425

1429-
processJobs();
14301426
traceJobQueue(this, prioritizedJobs.peek());
14311427
auto firstJob = prioritizedJobs.dequeue();
14321428
if (!firstJob) {
@@ -1490,39 +1486,40 @@ static void defaultActorDrain(DefaultActorImpl *actor) {
14901486
TaskExecutorRef::undefined());
14911487

14921488
while (true) {
1493-
if (shouldYieldThread()) {
1494-
currentActor->unlock(true);
1495-
break;
1496-
}
1497-
14981489
Job *job = currentActor->drainOne();
14991490
if (job == NULL) {
15001491
// No work left to do, try unlocking the actor. This may fail if there is
15011492
// work concurrently enqueued in which case, we'd try again in the loop
1502-
if (!currentActor->unlock(false)) {
1503-
continue;
1493+
if (currentActor->unlock(false)) {
1494+
break;
1495+
}
1496+
} else {
1497+
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1498+
auto taskExecutor = task->getPreferredTaskExecutor();
1499+
trackingInfo.setTaskExecutor(taskExecutor);
15041500
}
1505-
break;
1506-
}
15071501

1508-
if (AsyncTask *task = dyn_cast<AsyncTask>(job)) {
1509-
auto taskExecutor = task->getPreferredTaskExecutor();
1510-
trackingInfo.setTaskExecutor(taskExecutor);
1502+
// This thread is now going to follow the task on this actor. It may hop off
1503+
// the actor
1504+
runJobInEstablishedExecutorContext(job);
1505+
1506+
// We could have come back from the job on a generic executor and not as
1507+
// part of a default actor. If so, there is no more work left for us to do
1508+
// here.
1509+
auto currentExecutor = trackingInfo.getActiveExecutor();
1510+
if (!currentExecutor.isDefaultActor()) {
1511+
currentActor = nullptr;
1512+
break;
1513+
}
1514+
currentActor = asImpl(currentExecutor.getDefaultActor());
15111515
}
15121516

1513-
// This thread is now going to follow the task on this actor. It may hop off
1514-
// the actor
1515-
runJobInEstablishedExecutorContext(job);
1516-
1517-
// We could have come back from the job on a generic executor and not as
1518-
// part of a default actor. If so, there is no more work left for us to do
1519-
// here.
1520-
auto currentExecutor = trackingInfo.getActiveExecutor();
1521-
if (!currentExecutor.isDefaultActor()) {
1522-
currentActor = nullptr;
1517+
if (shouldYieldThread()) {
1518+
currentActor->unlock(true);
15231519
break;
15241520
}
1525-
currentActor = asImpl(currentExecutor.getDefaultActor());
1521+
1522+
currentActor->processIncomingQueue();
15261523
}
15271524

15281525
// Leave the tracking info.
@@ -1668,6 +1665,17 @@ retry:;
16681665
auto newState = oldState.withRunning();
16691666
newState = newState.withoutEscalatedPriority();
16701667

1668+
// Claim incoming jobs when obtaining lock as a drainer, to save one
1669+
// round of atomic load and compare-exchange.
1670+
// This is not useful when obtaining lock for assuming thread during actor
1671+
// switching, because arbitrary use code can run between locking and
1672+
// draining the next job. So we still need to call processIncomingQueue() to
1673+
// check for higher priority jobs that could have been scheduled in the
1674+
// meantime. And processing is more efficient when done in larger batches.
1675+
if (asDrainer) {
1676+
newState = newState.withFirstUnprioritisedJob(nullptr);
1677+
}
1678+
16711679
// This needs an acquire since we are taking a lock
16721680
if (_status().compare_exchange_weak(oldState, newState,
16731681
std::memory_order_acquire,
@@ -1677,6 +1685,9 @@ retry:;
16771685
assert(prioritizedJobs.empty());
16781686
}
16791687
traceActorStateTransition(this, oldState, newState, distributedActorIsRemote);
1688+
if (asDrainer) {
1689+
handleUnprioritizedJobs(oldState.getFirstUnprioritisedJob());
1690+
}
16801691
return true;
16811692
}
16821693
}

0 commit comments

Comments
 (0)