Skip to content

Commit 26ea1fe

Browse files
committed
Miscellaneous improvements to the cooperative executor:
- Switch to using ListMerger for priority insertion. - Move ready delayed jobs to the back to the queue instead of running them immediately, before anything currently in the queue. - Respect priorities in delayed jobs. - Use the proper std::chrono facilities for managing deadlines instead of doing all the math on nanoseconds since the epoch. - Take advantage of the second word of queue-private storage to avoid allocating a separate job when a deadline fits there.
1 parent c45f2c4 commit 26ea1fe

File tree

1 file changed

+112
-56
lines changed

1 file changed

+112
-56
lines changed

stdlib/public/Concurrency/CooperativeGlobalExecutor.inc

Lines changed: 112 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -24,44 +24,80 @@
2424

2525
#include <chrono>
2626
#include <thread>
27+
#include "swift/Basic/ListMerger.h"
2728

28-
static Job *JobQueue = nullptr;
29+
namespace {
2930

30-
class DelayedJob {
31-
public:
32-
Job *job;
33-
unsigned long long when;
34-
DelayedJob *next;
31+
struct JobQueueTraits {
32+
static Job *&storage(Job *cur) {
33+
return reinterpret_cast<Job*&>(cur->SchedulerPrivate[0]);
34+
}
3535

36-
DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
36+
static Job *getNext(Job *job) {
37+
return storage(job);
38+
}
39+
static void setNext(Job *job, Job *next) {
40+
storage(job) = next;
41+
}
42+
static int compare(Job *lhs, Job *rhs) {
43+
return descendingPriorityOrder(lhs->getPriority(), rhs->getPriority());
44+
}
3745
};
46+
using JobQueueMerger = ListMerger<Job*, JobQueueTraits>;
3847

39-
static DelayedJob *DelayedJobQueue = nullptr;
48+
using JobDeadline = std::chrono::time_point<std::chrono::steady_clock>;
4049

41-
/// Get the next-in-queue storage slot.
42-
static Job *&nextInQueue(Job *cur) {
43-
return reinterpret_cast<Job*&>(cur->SchedulerPrivate[Job::NextWaitingTaskIndex]);
44-
}
50+
template <bool = (sizeof(JobDeadline) <= sizeof(void*) &&
51+
alignof(JobDeadline) <= alignof(void*))>
52+
struct JobDeadlineStorage;
53+
54+
/// Specialization for when JobDeadline fits in SchedulerPrivate.
55+
template <>
56+
struct JobDeadlineStorage<true> {
57+
static JobDeadline &storage(Job *job) {
58+
return reinterpret_cast<JobDeadline&>(job->SchedulerPrivate[1]);
59+
}
60+
static JobDeadline get(Job *job) {
61+
return storage(job);
62+
}
63+
static void set(Job *job, JobDeadline deadline) {
64+
new(static_cast<void*>(&storage(job))) JobDeadline(deadline);
65+
}
66+
static void destroy(Job *job) {
67+
storage(job).~JobDeadline();
68+
}
69+
};
70+
71+
/// Specialization for when JobDeadline doesn't fit in SchedulerPrivate.
72+
template <>
73+
struct JobDeadlineStorage<false> {
74+
static JobDeadline *&storage(Job *job) {
75+
return reinterpret_cast<JobDeadline*&>(job->SchedulerPrivate[1]);
76+
}
77+
static JobDeadline get(Job *job) {
78+
return *storage(job);
79+
}
80+
static void set(Job *job, JobDeadline deadline) {
81+
storage(job) = new JobDeadline(deadline);
82+
}
83+
static void destroy(Job *job) {
84+
delete storage(job);
85+
}
86+
};
87+
88+
} // end anonymous namespace
89+
90+
static Job *JobQueue = nullptr;
91+
static Job *DelayedJobQueue = nullptr;
4592

4693
/// Insert a job into the cooperative global queue.
4794
SWIFT_CC(swift)
4895
static void swift_task_enqueueGlobalImpl(Job *job) {
4996
assert(job && "no job provided");
5097

51-
Job **position = &JobQueue;
52-
while (auto cur = *position) {
53-
// If we find a job with lower priority, insert here.
54-
if (cur->getPriority() < newJob->getPriority()) {
55-
nextInQueue(newJob) = cur;
56-
*position = newJob;
57-
return;
58-
}
59-
60-
// Otherwise, keep advancing through the queue.
61-
position = &nextInQueue(cur);
62-
}
63-
nextInQueue(newJob) = nullptr;
64-
*position = newJob;
98+
JobQueueMerger merger(JobQueue);
99+
merger.insert(job);
100+
JobQueue = merger.release();
65101
}
66102

67103
/// Enqueues a task on the main executor.
@@ -72,60 +108,80 @@ static void swift_task_enqueueMainExecutorImpl(Job *job) {
72108
swift_task_enqueueGlobalImpl(job);
73109
}
74110

75-
static unsigned long long currentNanos() {
76-
auto now = std::chrono::steady_clock::now();
77-
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
78-
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
79-
return value.count();
80-
}
81-
82111
/// Insert a job into the cooperative global queue with a delay.
83112
SWIFT_CC(swift)
84113
static void swift_task_enqueueGlobalWithDelayImpl(JobDelay delay,
85-
Job *job) {
86-
assert(job && "no job provided");
114+
Job *newJob) {
115+
assert(newJob && "no job provided");
87116

88-
DelayedJob **position = &DelayedJobQueue;
89-
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);
117+
auto deadline = std::chrono::steady_clock::now()
118+
+ std::chrono::duration_cast<JobDeadline::duration>(
119+
std::chrono::nanoseconds(delay));
120+
JobDeadlineStorage<>::set(newJob, deadline);
90121

122+
Job **position = &DelayedJobQueue;
91123
while (auto cur = *position) {
92-
// If we find a job with lower priority, insert here.
93-
if (cur->when > newJob->when) {
94-
newJob->next = cur;
124+
// If we find a job with a later deadline, insert here.
125+
// Note that we maintain FIFO order.
126+
if (deadline < JobDeadlineStorage<>::get(cur)) {
127+
JobQueueTraits::setNext(newJob, cur);
95128
*position = newJob;
96129
return;
97130
}
98131

99132
// Otherwise, keep advancing through the queue.
100-
position = &cur->next;
133+
position = &JobQueueTraits::storage(cur);
101134
}
135+
JobQueueTraits::setNext(newJob, nullptr);
102136
*position = newJob;
103137
}
104138

139+
/// Recognize jobs in the delayed-jobs queue that are ready to execute
140+
/// and move them to the primary queue.
141+
static void recognizeReadyDelayedJobs() {
142+
// Process all the delayed jobs.
143+
auto nextDelayedJob = DelayedJobQueue;
144+
if (!nextDelayedJob) return;
145+
146+
auto now = std::chrono::steady_clock::now();
147+
JobQueueMerger readyJobs(JobQueue);
148+
149+
// Pull jobs off of the delayed-jobs queue whose deadline has been
150+
// reached, and add them to the ready queue.
151+
while (nextDelayedJob &&
152+
JobDeadlineStorage<>::get(nextDelayedJob) <= now) {
153+
// Destroy the storage of the deadline in the job.
154+
JobDeadlineStorage<>::destroy(nextDelayedJob);
155+
156+
auto next = JobQueueTraits::getNext(nextDelayedJob);
157+
readyJobs.insert(nextDelayedJob);
158+
nextDelayedJob = next;
159+
}
160+
161+
JobQueue = readyJobs.release();
162+
DelayedJobQueue = nextDelayedJob;
163+
}
164+
105165
/// Claim the next job from the cooperative global queue.
106166
static Job *claimNextFromCooperativeGlobalQueue() {
107-
// Check delayed jobs first
108167
while (true) {
109-
if (auto delayedJob = DelayedJobQueue) {
110-
if (delayedJob->when < currentNanos()) {
111-
DelayedJobQueue = delayedJob->next;
112-
auto job = delayedJob->job;
113-
114-
delete delayedJob;
115-
116-
return job;
117-
}
118-
}
168+
// Move any delayed jobs that are now ready into the primary queue.
169+
recognizeReadyDelayedJobs();
170+
171+
// If there's a job in the primary queue, run it.
119172
if (auto job = JobQueue) {
120-
JobQueue = nextInQueue(job);
173+
JobQueue = JobQueueTraits::getNext(job);
121174
return job;
122175
}
123-
// there are only delayed jobs left, but they are not ready,
124-
// so we sleep until the first one is
176+
177+
// If there are only delayed jobs left, sleep until the next deadline.
178+
// TODO: should the donator have some say in this?
125179
if (auto delayedJob = DelayedJobQueue) {
126-
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
180+
auto deadline = JobDeadlineStorage<>::get(delayedJob);
181+
std::this_thread::sleep_until(deadline);
127182
continue;
128183
}
184+
129185
return nullptr;
130186
}
131187
}

0 commit comments

Comments
 (0)