Skip to content

Commit 0bd0f77

Browse files
committed
NFC: move the global executor implementations into their own files
1 parent 4c8983b commit 0bd0f77

File tree

4 files changed

+441
-359
lines changed

4 files changed

+441
-359
lines changed
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
///===--- CooperativeGlobalExecutor.inc ---------------------*- C++ -*--===///
2+
///
3+
/// This source file is part of the Swift.org open source project
4+
///
5+
/// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
6+
/// Licensed under Apache License v2.0 with Runtime Library Exception
7+
///
8+
/// See https:///swift.org/LICENSE.txt for license information
9+
/// See https:///swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
///
11+
///===------------------------------------------------------------------===///
12+
///
13+
/// The implementation of the cooperative global executor.
14+
///
15+
/// This file is included into GlobalExecutor.cpp only when
16+
/// the cooperative global executor is enabled. It is expected to
17+
/// declare the following functions:
18+
/// swift_task_enqueueGlobalImpl
19+
/// swift_task_enqueueGlobalWithDelayImpl
20+
/// swift_task_enqueueMainExecutorImpl
21+
/// as well as any cooperative-executor-specific functions in the runtime.
22+
///
23+
///===------------------------------------------------------------------===///
24+
25+
#include <chrono>
26+
#include <thread>
27+
28+
static Job *JobQueue = nullptr;
29+
30+
class DelayedJob {
31+
public:
32+
Job *job;
33+
unsigned long long when;
34+
DelayedJob *next;
35+
36+
DelayedJob(Job *job, unsigned long long when) : job(job), when(when), next(nullptr) {}
37+
};
38+
39+
static DelayedJob *DelayedJobQueue = nullptr;
40+
41+
/// Get the next-in-queue storage slot.
42+
static Job *&nextInQueue(Job *cur) {
43+
return reinterpret_cast<Job*&>(cur->SchedulerPrivate[Job::NextWaitingTaskIndex]);
44+
}
45+
46+
/// Insert a job into the cooperative global queue.
47+
SWIFT_CC(swift)
48+
static void swift_task_enqueueGlobalImpl(Job *job) {
49+
assert(job && "no job provided");
50+
51+
Job **position = &JobQueue;
52+
while (auto cur = *position) {
53+
// If we find a job with lower priority, insert here.
54+
if (cur->getPriority() < newJob->getPriority()) {
55+
nextInQueue(newJob) = cur;
56+
*position = newJob;
57+
return;
58+
}
59+
60+
// Otherwise, keep advancing through the queue.
61+
position = &nextInQueue(cur);
62+
}
63+
nextInQueue(newJob) = nullptr;
64+
*position = newJob;
65+
}
66+
67+
/// Enqueues a task on the main executor.
68+
SWIFT_CC(swift)
69+
static void swift_task_enqueueMainExecutorImpl(Job *job) {
70+
// The cooperative executor does not distinguish between the main
71+
// queue and the global queue.
72+
swift_task_enqueueGlobalImpl(job);
73+
}
74+
75+
static unsigned long long currentNanos() {
76+
auto now = std::chrono::steady_clock::now();
77+
auto nowNanos = std::chrono::time_point_cast<std::chrono::nanoseconds>(now);
78+
auto value = std::chrono::duration_cast<std::chrono::nanoseconds>(nowNanos.time_since_epoch());
79+
return value.count();
80+
}
81+
82+
/// Insert a job into the cooperative global queue with a delay.
83+
SWIFT_CC(swift)
84+
static void swift_task_enqueueGlobalWithDelayImpl(unsigned long long delay,
85+
Job *job) {
86+
assert(job && "no job provided");
87+
88+
DelayedJob **position = &DelayedJobQueue;
89+
DelayedJob *newJob = new DelayedJob(job, currentNanos() + delay);
90+
91+
while (auto cur = *position) {
92+
// If we find a job with lower priority, insert here.
93+
if (cur->when > newJob->when) {
94+
newJob->next = cur;
95+
*position = newJob;
96+
return;
97+
}
98+
99+
// Otherwise, keep advancing through the queue.
100+
position = &cur->next;
101+
}
102+
*position = newJob;
103+
}
104+
105+
/// Claim the next job from the cooperative global queue.
106+
static Job *claimNextFromCooperativeGlobalQueue() {
107+
// Check delayed jobs first
108+
while (true) {
109+
if (auto delayedJob = DelayedJobQueue) {
110+
if (delayedJob->when < currentNanos()) {
111+
DelayedJobQueue = delayedJob->next;
112+
auto job = delayedJob->job;
113+
114+
delete delayedJob;
115+
116+
return job;
117+
}
118+
}
119+
if (auto job = JobQueue) {
120+
JobQueue = nextInQueue(job);
121+
return job;
122+
}
123+
// there are only delayed jobs left, but they are not ready,
124+
// so we sleep until the first one is
125+
if (auto delayedJob = DelayedJobQueue) {
126+
std::this_thread::sleep_for(std::chrono::nanoseconds(delayedJob->when - currentNanos()));
127+
continue;
128+
}
129+
return nullptr;
130+
}
131+
}
132+
133+
void swift::
134+
swift_task_donateThreadToGlobalExecutorUntil(bool (*condition)(void *),
135+
void *conditionContext) {
136+
while (!condition(conditionContext)) {
137+
auto job = claimNextFromCooperativeGlobalQueue();
138+
if (!job) return;
139+
swift_job_run(job, ExecutorRef::generic());
140+
}
141+
}
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
///===--- DispatchGlobalExecutor.inc ------------------------*- C++ -*--===///
2+
///
3+
/// This source file is part of the Swift.org open source project
4+
///
5+
/// Copyright (c) 2014 - 2020 Apple Inc. and the Swift project authors
6+
/// Licensed under Apache License v2.0 with Runtime Library Exception
7+
///
8+
/// See https:///swift.org/LICENSE.txt for license information
9+
/// See https:///swift.org/CONTRIBUTORS.txt for the list of Swift project authors
10+
///
11+
///===------------------------------------------------------------------===///
12+
///
13+
/// The implementation of the global executor when using Dispatch.
14+
///
15+
/// This file is included into GlobalExecutor.cpp only when Dispatch
16+
/// integration is enabled. It is expected to define the following
17+
/// functions:
18+
/// swift_task_enqueueGlobalImpl
19+
/// swift_task_enqueueGlobalWithDelayImpl
20+
/// swift_task_enqueueMainExecutorImpl
21+
/// as well as any Dispatch-specific functions for the runtime.
22+
///
23+
///===------------------------------------------------------------------===///
24+
25+
#if SWIFT_CONCURRENCY_ENABLE_DISPATCH
26+
#include <dispatch/dispatch.h>
27+
28+
#if !defined(_WIN32)
29+
#include <dlfcn.h>
30+
#endif
31+
32+
#endif
33+
34+
// Ensure that Job's layout is compatible with what Dispatch expects.
35+
// Note: MinimalDispatchObjectHeader just has the fields we care about, it is
36+
// not complete and should not be used for anything other than these asserts.
37+
struct MinimalDispatchObjectHeader {
38+
const void *VTable;
39+
int Opaque0;
40+
int Opaque1;
41+
void *Linkage;
42+
};
43+
static_assert(
44+
offsetof(Job, metadata) == offsetof(MinimalDispatchObjectHeader, VTable),
45+
"Job Metadata field must match location of Dispatch VTable field.");
46+
static_assert(offsetof(Job, SchedulerPrivate[Job::DispatchLinkageIndex]) ==
47+
offsetof(MinimalDispatchObjectHeader, Linkage),
48+
"Dispatch Linkage field must match Job "
49+
"SchedulerPrivate[DispatchLinkageIndex].");
50+
51+
/// The function passed to dispatch_async_f to execute a job.
52+
static void __swift_run_job(void *_job) {
53+
Job *job = (Job*) _job;
54+
auto metadata =
55+
reinterpret_cast<const DispatchClassMetadata *>(job->metadata);
56+
metadata->VTableInvoke(job, nullptr, 0);
57+
}
58+
59+
/// The type of a function pointer for enqueueing a Job object onto a dispatch
60+
/// queue.
61+
typedef void (*dispatchEnqueueFuncType)(dispatch_queue_t queue, void *obj,
62+
dispatch_qos_class_t qos);
63+
64+
/// Initialize dispatchEnqueueFunc and then call through to the proper
65+
/// implementation.
66+
static void initializeDispatchEnqueueFunc(dispatch_queue_t queue, void *obj,
67+
dispatch_qos_class_t qos);
68+
69+
/// A function pointer to the function used to enqueue a Job onto a dispatch
70+
/// queue. Initially set to initializeDispatchEnqueueFunc, so that the first
71+
/// call will initialize it. initializeDispatchEnqueueFunc sets it to point
72+
/// either to dispatch_async_swift_job when it's available, otherwise to
73+
/// dispatchEnqueueDispatchAsync.
74+
static std::atomic<dispatchEnqueueFuncType> dispatchEnqueueFunc{
75+
initializeDispatchEnqueueFunc};
76+
77+
/// A small adapter that dispatches a Job onto a queue using dispatch_async_f.
78+
static void dispatchEnqueueDispatchAsync(dispatch_queue_t queue, void *obj,
79+
dispatch_qos_class_t qos) {
80+
dispatch_async_f(queue, obj, __swift_run_job);
81+
}
82+
83+
static void initializeDispatchEnqueueFunc(dispatch_queue_t queue, void *obj,
84+
dispatch_qos_class_t qos) {
85+
dispatchEnqueueFuncType func = nullptr;
86+
87+
// Always fall back to plain dispatch_async_f on Windows for now, and
88+
// also for back-deployed concurrency.
89+
#if !defined(_WIN32) && !defined(SWIFT_CONCURRENCY_BACK_DEPLOYMENT)
90+
if (runtime::environment::concurrencyEnableJobDispatchIntegration())
91+
func = reinterpret_cast<dispatchEnqueueFuncType>(
92+
dlsym(RTLD_NEXT, "dispatch_async_swift_job"));
93+
#endif
94+
95+
if (!func)
96+
func = dispatchEnqueueDispatchAsync;
97+
98+
dispatchEnqueueFunc.store(func, std::memory_order_relaxed);
99+
100+
func(queue, obj, qos);
101+
}
102+
103+
/// Enqueue a Job onto a dispatch queue using dispatchEnqueueFunc.
104+
static void dispatchEnqueue(dispatch_queue_t queue, Job *job,
105+
dispatch_qos_class_t qos, void *executorQueue) {
106+
job->SchedulerPrivate[Job::DispatchQueueIndex] = executorQueue;
107+
dispatchEnqueueFunc.load(std::memory_order_relaxed)(queue, job, qos);
108+
}
109+
110+
static constexpr size_t globalQueueCacheCount =
111+
static_cast<size_t>(JobPriority::UserInteractive) + 1;
112+
static std::atomic<dispatch_queue_t> globalQueueCache[globalQueueCacheCount];
113+
114+
#if defined(SWIFT_CONCURRENCY_BACK_DEPLOYMENT) || !defined(__APPLE__)
115+
extern "C" void dispatch_queue_set_width(dispatch_queue_t dq, long width);
116+
#endif
117+
118+
static dispatch_queue_t getGlobalQueue(JobPriority priority) {
119+
size_t numericPriority = static_cast<size_t>(priority);
120+
if (numericPriority >= globalQueueCacheCount)
121+
swift_Concurrency_fatalError(0, "invalid job priority %#zx");
122+
123+
#ifdef SWIFT_CONCURRENCY_BACK_DEPLOYMENT
124+
std::memory_order loadOrder = std::memory_order_acquire;
125+
#else
126+
std::memory_order loadOrder = std::memory_order_relaxed;
127+
#endif
128+
129+
auto *ptr = &globalQueueCache[numericPriority];
130+
auto queue = ptr->load(loadOrder);
131+
if (SWIFT_LIKELY(queue))
132+
return queue;
133+
134+
#if defined(SWIFT_CONCURRENCY_BACK_DEPLOYMENT) || !defined(__APPLE__)
135+
const int DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS = -3;
136+
137+
// Create a new cooperative concurrent queue and swap it in.
138+
dispatch_queue_attr_t newQueueAttr = dispatch_queue_attr_make_with_qos_class(
139+
DISPATCH_QUEUE_CONCURRENT, (dispatch_qos_class_t)priority, 0);
140+
dispatch_queue_t newQueue = dispatch_queue_create(
141+
"Swift global concurrent queue", newQueueAttr);
142+
dispatch_queue_set_width(newQueue, DISPATCH_QUEUE_WIDTH_MAX_LOGICAL_CPUS);
143+
144+
if (!ptr->compare_exchange_strong(queue, newQueue,
145+
/*success*/ std::memory_order_release,
146+
/*failure*/ std::memory_order_acquire)) {
147+
dispatch_release(newQueue);
148+
return queue;
149+
}
150+
151+
return newQueue;
152+
#else
153+
// If we don't have a queue cached for this priority, cache it now. This may
154+
// race with other threads doing this at the same time for this priority, but
155+
// that's OK, they'll all end up writing the same value.
156+
queue = dispatch_get_global_queue((dispatch_qos_class_t)priority,
157+
/*flags*/ 0);
158+
159+
// Unconditionally store it back in the cache. If we raced with another
160+
// thread, we'll just overwrite the entry with the same value.
161+
ptr->store(queue, std::memory_order_relaxed);
162+
#endif
163+
164+
return queue;
165+
}
166+
167+
SWIFT_CC(swift)
168+
static void swift_task_enqueueGlobalImpl(Job *job) {
169+
assert(job && "no job provided");
170+
171+
// We really want four things from the global execution service:
172+
// - Enqueuing work should have minimal runtime and memory overhead.
173+
// - Adding work should never result in an "explosion" where many
174+
// more threads are created than the available cores.
175+
// - Jobs should run on threads with an appropriate priority.
176+
// - Thread priorities should temporarily elevatable to avoid
177+
// priority inversions.
178+
//
179+
// Of these, the first two are the most important. Many programs
180+
// do not rely on high-usage priority scheduling, and many priority
181+
// inversions can be avoided at a higher level (albeit with some
182+
// performance cost, e.g. by creating higher-priority tasks to run
183+
// critical sections that contend with high-priority work). In
184+
// contrast, if the async feature adds too much overhead, or if
185+
// heavy use of it leads to thread explosions and memory exhaustion,
186+
// programmers will have no choice but to stop using it. So if
187+
// goals are in conflict, it's best to focus on core properties over
188+
// priority-inversion avoidance.
189+
190+
// We currently use Dispatch for our thread pool on all platforms.
191+
// Dispatch currently backs its serial queues with a global
192+
// concurrent queue that is prone to thread explosions when a flood
193+
// of jobs are added to it. That problem does not apply equally
194+
// to the global concurrent queues returned by dispatch_get_global_queue,
195+
// which are not strictly CPU-limited but are at least much more
196+
// cautious about adding new threads. We cannot safely elevate
197+
// the priorities of work added to this queue using Dispatch's public
198+
// API, but as discussed above, that is less important than avoiding
199+
// performance problems.
200+
JobPriority priority = job->getPriority();
201+
202+
auto queue = getGlobalQueue(priority);
203+
204+
dispatchEnqueue(queue, job, (dispatch_qos_class_t)priority,
205+
DISPATCH_QUEUE_GLOBAL_EXECUTOR);
206+
}
207+
208+
209+
SWIFT_CC(swift)
210+
static void swift_task_enqueueGlobalWithDelayImpl(unsigned long long delay,
211+
Job *job) {
212+
assert(job && "no job provided");
213+
214+
dispatch_function_t dispatchFunction = &__swift_run_job;
215+
void *dispatchContext = job;
216+
217+
JobPriority priority = job->getPriority();
218+
219+
auto queue = getGlobalQueue(priority);
220+
221+
job->SchedulerPrivate[Job::DispatchQueueIndex] =
222+
DISPATCH_QUEUE_GLOBAL_EXECUTOR;
223+
224+
dispatch_time_t when = dispatch_time(DISPATCH_TIME_NOW, delay);
225+
dispatch_after_f(when, queue, dispatchContext, dispatchFunction);
226+
}
227+
228+
SWIFT_CC(swift)
229+
static void swift_task_enqueueMainExecutorImpl(Job *job) {
230+
assert(job && "no job provided");
231+
232+
JobPriority priority = job->getPriority();
233+
234+
// This is an inline function that compiles down to a pointer to a global.
235+
auto mainQueue = dispatch_get_main_queue();
236+
237+
dispatchEnqueue(mainQueue, job, (dispatch_qos_class_t)priority, mainQueue);
238+
}
239+
240+
void swift::swift_task_enqueueOnDispatchQueue(Job *job,
241+
HeapObject *_queue) {
242+
JobPriority priority = job->getPriority();
243+
auto queue = reinterpret_cast<dispatch_queue_t>(_queue);
244+
dispatchEnqueue(queue, job, (dispatch_qos_class_t)priority, queue);
245+
}

0 commit comments

Comments
 (0)