Skip to content

Commit 56f36b6

Browse files
authored
Merge pull request #253 from dgrove-oss/linux-qos-prioritty
Convert dispatch_workq from legacy priorities to qos
2 parents dc1857c + b076935 commit 56f36b6

File tree

3 files changed

+56
-58
lines changed

3 files changed

+56
-58
lines changed

src/event/workqueue.c

Lines changed: 21 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ typedef struct dispatch_workq_monitor_s {
6969
int num_registered_tids;
7070
} dispatch_workq_monitor_s, *dispatch_workq_monitor_t;
7171

72-
static dispatch_workq_monitor_s _dispatch_workq_monitors[WORKQ_NUM_PRIORITIES];
72+
static dispatch_workq_monitor_s _dispatch_workq_monitors[DISPATCH_QOS_MAX];
7373

7474
#pragma mark Implementation of the monitoring subsystem.
7575

@@ -80,12 +80,13 @@ static void _dispatch_workq_init_once(void *context DISPATCH_UNUSED);
8080
static dispatch_once_t _dispatch_workq_init_once_pred;
8181

8282
void
83-
_dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
83+
_dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls)
8484
{
8585
dispatch_once_f(&_dispatch_workq_init_once_pred, NULL, &_dispatch_workq_init_once);
8686

8787
#if HAVE_DISPATCH_WORKQ_MONITORING
88-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
88+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
89+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
8990
dispatch_assert(mon->dq == root_q);
9091
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9192
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
@@ -97,10 +98,11 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
9798
}
9899

99100
void
100-
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority)
101+
_dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls)
101102
{
102103
#if HAVE_DISPATCH_WORKQ_MONITORING
103-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[priority];
104+
dispatch_qos_t qos = _dispatch_qos_from_qos_class(cls);
105+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[qos-1];
104106
dispatch_tid tid = _dispatch_thread_getspecific(tid);
105107
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
106108
for (int i = 0; i < mon->num_registered_tids; i++) {
@@ -177,16 +179,10 @@ _dispatch_workq_count_runnable_workers(dispatch_workq_monitor_t mon)
177179
static void
178180
_dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
179181
{
180-
// TODO: Once we switch away from the legacy priorities to
181-
// newer QoS, we can loop in order of decreasing QoS
182-
// and track the total number of runnable threads seen
183-
// across pools. We can then use that number to
184-
// implement a global policy where low QoS queues
185-
// are not eligible for over-subscription if the higher
186-
// QoS queues have already consumed the target
187-
// number of threads.
188-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
189-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
182+
int global_soft_max = WORKQ_OVERSUBSCRIBE_FACTOR * dispatch_hw_config(active_cpus);
183+
int global_runnable = 0;
184+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
185+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
190186
dispatch_queue_t dq = mon->dq;
191187

192188
if (!_dispatch_queue_class_probe(dq)) {
@@ -198,16 +194,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
198194
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
199195
dq->dq_label, mon->num_runnable, mon->target_runnable);
200196

197+
global_runnable += mon->num_runnable;
198+
201199
if (mon->num_runnable == 0) {
202-
// We are below target, and no worker is runnable.
200+
// We have work, but no worker is runnable.
203201
// It is likely the program is stalled. Therefore treat
204202
// this as if dq were an overcommit queue and call poke
205203
// with the limit being the maximum number of workers for dq.
206204
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
207205
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
208206
dq->dq_label, floor);
209207
_dispatch_global_queue_poke(dq, 1, floor);
210-
} else if (mon->num_runnable < mon->target_runnable) {
208+
global_runnable += 1; // account for poke in global estimate
209+
} else if (mon->num_runnable < mon->target_runnable &&
210+
global_runnable < global_soft_max) {
211211
// We are below target, but some workers are still runnable.
212212
// We want to oversubscribe to hit the desired load target.
213213
// However, this under-utilization may be transitory so set the
@@ -218,42 +218,20 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
218218
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
219219
dq->dq_label, floor);
220220
_dispatch_global_queue_poke(dq, 1, floor);
221+
global_runnable += 1; // account for poke in global estimate
221222
}
222223
}
223224
}
224225
#endif // HAVE_DISPATCH_WORKQ_MONITORING
225226

226-
227-
// temporary until we switch over to QoS based interface.
228-
static dispatch_queue_t
229-
get_root_queue_from_legacy_priority(int priority)
230-
{
231-
switch (priority) {
232-
case WORKQ_HIGH_PRIOQUEUE:
233-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS];
234-
case WORKQ_DEFAULT_PRIOQUEUE:
235-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS];
236-
case WORKQ_LOW_PRIOQUEUE:
237-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS];
238-
case WORKQ_BG_PRIOQUEUE:
239-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS];
240-
case WORKQ_BG_PRIOQUEUE_CONDITIONAL:
241-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS];
242-
case WORKQ_HIGH_PRIOQUEUE_CONDITIONAL:
243-
return &_dispatch_root_queues[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS];
244-
default:
245-
return NULL;
246-
}
247-
}
248-
249227
static void
250228
_dispatch_workq_init_once(void *context DISPATCH_UNUSED)
251229
{
252230
#if HAVE_DISPATCH_WORKQ_MONITORING
253231
int target_runnable = dispatch_hw_config(active_cpus);
254-
for (int i = 0; i < WORKQ_NUM_PRIORITIES; i++) {
255-
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i];
256-
mon->dq = get_root_queue_from_legacy_priority(i);
232+
for (dispatch_qos_t i = DISPATCH_QOS_MAX; i > DISPATCH_QOS_UNSPECIFIED; i--) {
233+
dispatch_workq_monitor_t mon = &_dispatch_workq_monitors[i-1];
234+
mon->dq = _dispatch_get_root_queue(i, false);
257235
void *buf = _dispatch_calloc(WORKQ_MAX_TRACKED_TIDS, sizeof(dispatch_tid));
258236
mon->registered_tids = buf;
259237
mon->target_runnable = target_runnable;

src/event/workqueue_internal.h

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,12 @@
2727
#ifndef __DISPATCH_WORKQUEUE_INTERNAL__
2828
#define __DISPATCH_WORKQUEUE_INTERNAL__
2929

30-
/* Work queue priority attributes. */
31-
#define WORKQ_HIGH_PRIOQUEUE 0
32-
#define WORKQ_DEFAULT_PRIOQUEUE 1
33-
#define WORKQ_LOW_PRIOQUEUE 2
34-
#define WORKQ_BG_PRIOQUEUE 3
35-
#define WORKQ_BG_PRIOQUEUE_CONDITIONAL 4
36-
#define WORKQ_HIGH_PRIOQUEUE_CONDITIONAL 5
37-
38-
#define WORKQ_NUM_PRIORITIES 6
39-
4030
#define WORKQ_ADDTHREADS_OPTION_OVERCOMMIT 0x1
4131

4232
#define DISPATCH_WORKQ_MAX_PTHREAD_COUNT 255
4333

44-
void _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority);
45-
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, int priority);
34+
void _dispatch_workq_worker_register(dispatch_queue_t root_q, qos_class_t cls);
35+
void _dispatch_workq_worker_unregister(dispatch_queue_t root_q, qos_class_t cls);
4636

4737
#if defined(__linux__)
4838
#define HAVE_DISPATCH_WORKQ_MONITORING 1

src/queue.c

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
!defined(DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK)
3838
#define DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK 1
3939
#endif
40+
#if HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP || DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
41+
#define DISPATCH_USE_WORKQ_PRIORITY 1
42+
#endif
4043
#if DISPATCH_USE_WORKQUEUES && DISPATCH_USE_PTHREAD_POOL && \
4144
!DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
4245
#define pthread_workqueue_t void*
@@ -158,7 +161,10 @@ struct dispatch_root_queue_context_s {
158161
int volatile dgq_pending;
159162
#if DISPATCH_USE_WORKQUEUES
160163
qos_class_t dgq_qos;
161-
int dgq_wq_priority, dgq_wq_options;
164+
#if DISPATCH_USE_WORKQ_PRIORITY
165+
int dgq_wq_priority;
166+
#endif
167+
int dgq_wq_options;
162168
#if DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK || DISPATCH_USE_PTHREAD_POOL
163169
pthread_workqueue_t dgq_kworkqueue;
164170
#endif
@@ -186,7 +192,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
186192
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS] = {{{
187193
#if DISPATCH_USE_WORKQUEUES
188194
.dgq_qos = QOS_CLASS_MAINTENANCE,
195+
#if DISPATCH_USE_WORKQ_PRIORITY
189196
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
197+
#endif
190198
.dgq_wq_options = 0,
191199
#endif
192200
#if DISPATCH_ENABLE_THREAD_POOL
@@ -197,7 +205,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
197205
[DISPATCH_ROOT_QUEUE_IDX_MAINTENANCE_QOS_OVERCOMMIT] = {{{
198206
#if DISPATCH_USE_WORKQUEUES
199207
.dgq_qos = QOS_CLASS_MAINTENANCE,
208+
#if DISPATCH_USE_WORKQ_PRIORITY
200209
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE,
210+
#endif
201211
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
202212
#endif
203213
#if DISPATCH_ENABLE_THREAD_POOL
@@ -208,7 +218,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
208218
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS] = {{{
209219
#if DISPATCH_USE_WORKQUEUES
210220
.dgq_qos = QOS_CLASS_BACKGROUND,
221+
#if DISPATCH_USE_WORKQ_PRIORITY
211222
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
223+
#endif
212224
.dgq_wq_options = 0,
213225
#endif
214226
#if DISPATCH_ENABLE_THREAD_POOL
@@ -219,7 +231,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
219231
[DISPATCH_ROOT_QUEUE_IDX_BACKGROUND_QOS_OVERCOMMIT] = {{{
220232
#if DISPATCH_USE_WORKQUEUES
221233
.dgq_qos = QOS_CLASS_BACKGROUND,
234+
#if DISPATCH_USE_WORKQ_PRIORITY
222235
.dgq_wq_priority = WORKQ_BG_PRIOQUEUE_CONDITIONAL,
236+
#endif
223237
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
224238
#endif
225239
#if DISPATCH_ENABLE_THREAD_POOL
@@ -230,7 +244,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
230244
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS] = {{{
231245
#if DISPATCH_USE_WORKQUEUES
232246
.dgq_qos = QOS_CLASS_UTILITY,
247+
#if DISPATCH_USE_WORKQ_PRIORITY
233248
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
249+
#endif
234250
.dgq_wq_options = 0,
235251
#endif
236252
#if DISPATCH_ENABLE_THREAD_POOL
@@ -241,7 +257,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
241257
[DISPATCH_ROOT_QUEUE_IDX_UTILITY_QOS_OVERCOMMIT] = {{{
242258
#if DISPATCH_USE_WORKQUEUES
243259
.dgq_qos = QOS_CLASS_UTILITY,
260+
#if DISPATCH_USE_WORKQ_PRIORITY
244261
.dgq_wq_priority = WORKQ_LOW_PRIOQUEUE,
262+
#endif
245263
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
246264
#endif
247265
#if DISPATCH_ENABLE_THREAD_POOL
@@ -252,7 +270,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
252270
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS] = {{{
253271
#if DISPATCH_USE_WORKQUEUES
254272
.dgq_qos = QOS_CLASS_DEFAULT,
273+
#if DISPATCH_USE_WORKQ_PRIORITY
255274
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
275+
#endif
256276
.dgq_wq_options = 0,
257277
#endif
258278
#if DISPATCH_ENABLE_THREAD_POOL
@@ -263,7 +283,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
263283
[DISPATCH_ROOT_QUEUE_IDX_DEFAULT_QOS_OVERCOMMIT] = {{{
264284
#if DISPATCH_USE_WORKQUEUES
265285
.dgq_qos = QOS_CLASS_DEFAULT,
286+
#if DISPATCH_USE_WORKQ_PRIORITY
266287
.dgq_wq_priority = WORKQ_DEFAULT_PRIOQUEUE,
288+
#endif
267289
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
268290
#endif
269291
#if DISPATCH_ENABLE_THREAD_POOL
@@ -274,7 +296,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
274296
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS] = {{{
275297
#if DISPATCH_USE_WORKQUEUES
276298
.dgq_qos = QOS_CLASS_USER_INITIATED,
299+
#if DISPATCH_USE_WORKQ_PRIORITY
277300
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
301+
#endif
278302
.dgq_wq_options = 0,
279303
#endif
280304
#if DISPATCH_ENABLE_THREAD_POOL
@@ -285,7 +309,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
285309
[DISPATCH_ROOT_QUEUE_IDX_USER_INITIATED_QOS_OVERCOMMIT] = {{{
286310
#if DISPATCH_USE_WORKQUEUES
287311
.dgq_qos = QOS_CLASS_USER_INITIATED,
312+
#if DISPATCH_USE_WORKQ_PRIORITY
288313
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE,
314+
#endif
289315
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
290316
#endif
291317
#if DISPATCH_ENABLE_THREAD_POOL
@@ -296,7 +322,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
296322
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS] = {{{
297323
#if DISPATCH_USE_WORKQUEUES
298324
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
325+
#if DISPATCH_USE_WORKQ_PRIORITY
299326
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
327+
#endif
300328
.dgq_wq_options = 0,
301329
#endif
302330
#if DISPATCH_ENABLE_THREAD_POOL
@@ -307,7 +335,9 @@ static struct dispatch_root_queue_context_s _dispatch_root_queue_contexts[] = {
307335
[DISPATCH_ROOT_QUEUE_IDX_USER_INTERACTIVE_QOS_OVERCOMMIT] = {{{
308336
#if DISPATCH_USE_WORKQUEUES
309337
.dgq_qos = QOS_CLASS_USER_INTERACTIVE,
338+
#if DISPATCH_USE_WORKQ_PRIORITY
310339
.dgq_wq_priority = WORKQ_HIGH_PRIOQUEUE_CONDITIONAL,
340+
#endif
311341
.dgq_wq_options = WORKQ_ADDTHREADS_OPTION_OVERCOMMIT,
312342
#endif
313343
#if DISPATCH_ENABLE_THREAD_POOL
@@ -5809,7 +5839,7 @@ _dispatch_worker_thread(void *context)
58095839
bool manager = (dq == &_dispatch_mgr_root_queue);
58105840
bool monitored = !(overcommit || manager);
58115841
if (monitored) {
5812-
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
5842+
_dispatch_workq_worker_register(dq, qc->dgq_qos);
58135843
}
58145844
#endif
58155845

@@ -5823,7 +5853,7 @@ _dispatch_worker_thread(void *context)
58235853

58245854
#if DISPATCH_USE_INTERNAL_WORKQUEUE
58255855
if (monitored) {
5826-
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
5856+
_dispatch_workq_worker_unregister(dq, qc->dgq_qos);
58275857
}
58285858
#endif
58295859
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);

0 commit comments

Comments
 (0)