Skip to content

Commit 62f900e

Browse files
committed
rework counting of pending thread creations
use poke with negative floor to drive oversubscription
1 parent 0df3bd1 commit 62f900e

File tree

4 files changed

+85
-109
lines changed

4 files changed

+85
-109
lines changed

src/event/workqueue.c

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -88,10 +88,9 @@ _dispatch_workq_worker_register(dispatch_queue_t root_q, int priority)
8888
dispatch_assert(mon->dq == root_q);
8989
dispatch_tid tid = _dispatch_thread_getspecific(tid);
9090
_dispatch_unfair_lock_lock(&mon->registered_tid_lock);
91-
if (mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1) {
92-
int worker_id = mon->num_registered_tids++;
93-
mon->registered_tids[worker_id] = tid;
94-
}
91+
dispatch_assert(mon->num_registered_tids < WORKQ_MAX_TRACKED_TIDS-1);
92+
int worker_id = mon->num_registered_tids++;
93+
mon->registered_tids[worker_id] = tid;
9594
_dispatch_unfair_lock_unlock(&mon->registered_tid_lock);
9695
}
9796

@@ -187,36 +186,29 @@ _dispatch_workq_monitor_pools(void *context DISPATCH_UNUSED)
187186
}
188187

189188
_dispatch_workq_count_runnable_workers(mon);
190-
int32_t count = _dispatch_pthread_root_queue_thread_pool_size(dq);
191-
_dispatch_debug("workq: %s is non-empty with pool_size %d (%d runnable)",
192-
dq->dq_label, count, mon->num_runnable);
193-
194-
if (mon->num_runnable < mon->target_runnable) {
195-
// If we are below target there are two cases to consider:
196-
// (a) We are below target, but some workers are still runnable.
197-
// We want to oversubscribe to hit the target, but this
198-
// may be transitory so only go up to a small multiple
199-
// of threads per core.
200-
// (b) We are below target, and no worker is runnable.
201-
// It is likely the program is stalled. Therefore treat
202-
// this as if dq was an overcommit queue and create
203-
// another worker unless we have already hit the hard
204-
// limit on the maximum number of workers for dq.
205-
int32_t limit_a = WORKQ_OVERSUBSCRIBE_FACTOR * mon->target_runnable;
206-
int32_t limit_b = WORKQ_MAX_TRACKED_TIDS - mon->target_runnable;
207-
int32_t limit =
208-
mon->num_runnable == 0 ? limit_b : MIN(limit_a, limit_b);
209-
if (count + limit > 0) {
210-
bool r = _dispatch_pthread_root_queue_oversubscribe(dq, 1);
211-
if (r) {
212-
_dispatch_debug("workq: requested overscribe worker for %s", dq->dq_label);
213-
} else {
214-
_dispatch_debug("workq: still pending worker create on %s", dq->dq_label);
215-
}
216-
} else {
217-
_dispatch_debug("workq: %s already over by %d; taking no action",
218-
dq->dq_label, -count);
219-
}
189+
_dispatch_debug("workq: %s has %d runnable wokers (target is %d)",
190+
dq->dq_label, mon->num_runnable, mon->target_runnable);
191+
192+
if (mon->num_runnable == 0) {
193+
// We are below target, and no worker is runnable.
194+
// It is likely the program is stalled. Therefore treat
195+
// this as if dq were an overcommit queue and call poke
196+
// with the limit being the maximum number of workers for dq.
197+
int32_t floor = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
198+
_dispatch_debug("workq: %s has no runnable workers; poking with floor %d",
199+
dq->dq_label, floor);
200+
_dispatch_global_queue_poke(dq, 1, floor);
201+
} else if (mon->num_runnable < mon->target_runnable) {
202+
// We are below target, but some workers are still runnable.
203+
// We want to oversubscribe to hit the desired load target.
204+
// However, this under-utilization may be transitory so set the
205+
// floor as a small multiple of threads per core.
206+
int32_t floor = (1 - WORKQ_OVERSUBSCRIBE_FACTOR) * mon->target_runnable;
207+
int32_t floor2 = mon->target_runnable - WORKQ_MAX_TRACKED_TIDS;
208+
floor = MAX(floor, floor2);
209+
_dispatch_debug("workq: %s under utilization target; poking with floor %d",
210+
dq->dq_label, floor);
211+
_dispatch_global_queue_poke(dq, 1, floor);
220212
}
221213
}
222214
}

src/inline_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1479,7 +1479,7 @@ _dispatch_root_queue_push_inline(dispatch_queue_t dq, dispatch_object_t _head,
14791479
struct dispatch_object_s *head = _head._do, *tail = _tail._do;
14801480
if (unlikely(_dispatch_queue_push_update_tail_list(dq, head, tail))) {
14811481
_dispatch_queue_push_update_head(dq, head);
1482-
return _dispatch_global_queue_poke(dq, n);
1482+
return _dispatch_global_queue_poke(dq, n, 0);
14831483
}
14841484
}
14851485

src/queue.c

Lines changed: 57 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -2077,13 +2077,6 @@ dispatch_pthread_root_queue_copy_current(void)
20772077
return (dispatch_queue_t)_os_object_retain_with_resurrect(dq->_as_os_obj);
20782078
}
20792079

2080-
int32_t
2081-
_dispatch_pthread_root_queue_thread_pool_size(dispatch_queue_t dq)
2082-
{
2083-
dispatch_root_queue_context_t qc = dq->do_ctxt;
2084-
return os_atomic_load2o(qc, dgq_thread_pool_size, relaxed);
2085-
}
2086-
20872080
#endif // DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
20882081

20892082
void
@@ -3956,50 +3949,12 @@ _dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos,
39563949
}
39573950
#endif
39583951

3959-
#if DISPATCH_USE_PTHREAD_POOL
3960-
static void
3961-
_dispatch_pthread_root_queue_spawn_workers(dispatch_queue_t dq,
3962-
pthread_attr_t *attr, int n)
3963-
{
3964-
pthread_t tid, *pthr = &tid;
3965-
int r;
3966-
3967-
do {
3968-
_dispatch_retain(dq);
3969-
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
3970-
if (r != EAGAIN) {
3971-
(void)dispatch_assume_zero(r);
3972-
}
3973-
_dispatch_temporary_resource_shortage();
3974-
}
3975-
} while (--n);
3976-
}
3977-
3978-
bool
3979-
_dispatch_pthread_root_queue_oversubscribe(dispatch_queue_t dq, int n)
3980-
{
3981-
dispatch_root_queue_context_t qc = dq->do_ctxt;
3982-
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
3983-
pthread_attr_t *attr = &pqc->dpq_thread_attr;
3984-
3985-
if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
3986-
return false;
3987-
}
3988-
3989-
(void)os_atomic_sub2o(qc, dgq_thread_pool_size, n, release);
3990-
3991-
_dispatch_pthread_root_queue_spawn_workers(dq, attr, n);
3992-
3993-
return true;
3994-
}
3995-
#endif
3996-
39973952
DISPATCH_NOINLINE
39983953
static void
3999-
_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n)
3954+
_dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n, int floor)
40003955
{
40013956
dispatch_root_queue_context_t qc = dq->do_ctxt;
4002-
int32_t i = n;
3957+
int remaining = n;
40033958
int r = ENOSYS;
40043959

40053960
_dispatch_root_queues_init();
@@ -4019,16 +3974,16 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n)
40193974
r = pthread_workqueue_additem_np(qc->dgq_kworkqueue,
40203975
_dispatch_worker_thread4, dq, &wh, &gen_cnt);
40213976
(void)dispatch_assume_zero(r);
4022-
} while (--i);
3977+
} while (--remaining);
40233978
return;
40243979
}
40253980
#endif // DISPATCH_USE_LEGACY_WORKQUEUE_FALLBACK
40263981
#if HAVE_PTHREAD_WORKQUEUE_QOS
4027-
r = _pthread_workqueue_addthreads((int)i,
3982+
r = _pthread_workqueue_addthreads(remaining,
40283983
_dispatch_priority_to_pp(dq->dq_priority));
40293984
#elif HAVE_PTHREAD_WORKQUEUE_SETDISPATCH_NP
40303985
r = pthread_workqueue_addthreads_np(qc->dgq_wq_priority,
4031-
qc->dgq_wq_options, (int)i);
3986+
qc->dgq_wq_options, remaining);
40323987
#endif
40333988
(void)dispatch_assume_zero(r);
40343989
return;
@@ -4038,28 +3993,43 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n)
40383993
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
40393994
if (fastpath(pqc->dpq_thread_mediator.do_vtable)) {
40403995
while (dispatch_semaphore_signal(&pqc->dpq_thread_mediator)) {
4041-
if (!--i) {
3996+
_dispatch_root_queue_debug("signaled sleeping worker for "
3997+
"global queue: %p", dq);
3998+
if (!--remaining) {
40423999
return;
40434000
}
40444001
}
40454002
}
4046-
int32_t j, t_count;
4003+
4004+
bool overcommit = dq->dq_priority & DISPATCH_PRIORITY_FLAG_OVERCOMMIT;
4005+
if (overcommit) {
4006+
os_atomic_add2o(qc, dgq_pending, remaining, relaxed);
4007+
} else {
4008+
if (!os_atomic_cmpxchg2o(qc, dgq_pending, 0, remaining, relaxed)) {
4009+
_dispatch_root_queue_debug("worker thread request still pending for "
4010+
"global queue: %p", dq);
4011+
return;
4012+
}
4013+
}
4014+
4015+
int32_t can_request, t_count;
40474016
// seq_cst with atomic store to tail <rdar://problem/16932833>
40484017
t_count = os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
40494018
do {
4050-
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
4051-
bool pool_full = t_count <= 0; // oversubscription encoded by negative dgq_thread_pool_size
4052-
#else
4053-
bool pool_full = t_count == 0;
4054-
#endif
4055-
if (pool_full) {
4019+
can_request = t_count < floor ? 0 : t_count - floor;
4020+
if (remaining > can_request) {
4021+
_dispatch_root_queue_debug("pthread pool reducing request from %d to %d",
4022+
remaining, can_request);
4023+
os_atomic_sub2o(qc, dgq_pending, remaining - can_request, relaxed);
4024+
remaining = can_request;
4025+
}
4026+
if (remaining == 0) {
40564027
_dispatch_root_queue_debug("pthread pool is full for root queue: "
40574028
"%p", dq);
40584029
return;
40594030
}
4060-
j = i > t_count ? t_count : i;
40614031
} while (!os_atomic_cmpxchgvw2o(qc, dgq_thread_pool_size, t_count,
4062-
t_count - j, &t_count, acquire));
4032+
t_count - remaining, &t_count, acquire));
40634033

40644034
pthread_attr_t *attr = &pqc->dpq_thread_attr;
40654035
pthread_t tid, *pthr = &tid;
@@ -4068,30 +4038,41 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, int n)
40684038
pthr = _dispatch_mgr_root_queue_init();
40694039
}
40704040
#endif
4071-
_dispatch_pthread_root_queue_spawn_workers(dq, attr, j);
4041+
do {
4042+
_dispatch_retain(dq);
4043+
while ((r = pthread_create(pthr, attr, _dispatch_worker_thread, dq))) {
4044+
if (r != EAGAIN) {
4045+
(void)dispatch_assume_zero(r);
4046+
}
4047+
_dispatch_temporary_resource_shortage();
4048+
}
4049+
} while (--remaining);
40724050
#endif // DISPATCH_USE_PTHREAD_POOL
40734051
}
40744052

40754053
DISPATCH_NOINLINE
40764054
void
4077-
_dispatch_global_queue_poke(dispatch_queue_t dq, int n)
4055+
_dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor)
40784056
{
40794057
if (!_dispatch_queue_class_probe(dq)) {
40804058
return;
40814059
}
40824060
#if HAVE_PTHREAD_WORKQUEUES
40834061
dispatch_root_queue_context_t qc = dq->do_ctxt;
40844062
if (
4063+
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
4064+
false && /* counting done in poke_slow for this config */
4065+
#endif
40854066
#if DISPATCH_USE_PTHREAD_POOL
4086-
(qc->dgq_kworkqueue != (void*)(~0ul)) &&
4067+
(qc->dgq_kworkqueue != (void*)(~0ul)) &&
40874068
#endif
4088-
!os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4089-
_dispatch_root_queue_debug("worker thread request still pending for "
4069+
!os_atomic_cmpxchg2o(qc, dgq_pending, 0, n, relaxed)) {
4070+
_dispatch_root_queue_debug("worker thread request still pending for "
40904071
"global queue: %p", dq);
40914072
return;
40924073
}
40934074
#endif // HAVE_PTHREAD_WORKQUEUES
4094-
return _dispatch_global_queue_poke_slow(dq, n);
4075+
return _dispatch_global_queue_poke_slow(dq, n, floor);
40954076
}
40964077

40974078
#pragma mark -
@@ -5296,7 +5277,7 @@ _dispatch_root_queue_drain_one_slow(dispatch_queue_t dq)
52965277
(void)os_atomic_dec2o(qc, dgq_pending, relaxed);
52975278
}
52985279
if (!available) {
5299-
_dispatch_global_queue_poke(dq, 1);
5280+
_dispatch_global_queue_poke(dq, 1, 0);
53005281
}
53015282
return available;
53025283
}
@@ -5363,7 +5344,7 @@ _dispatch_root_queue_drain_one(dispatch_queue_t dq)
53635344
}
53645345

53655346
os_atomic_store2o(dq, dq_items_head, next, relaxed);
5366-
_dispatch_global_queue_poke(dq, 1);
5347+
_dispatch_global_queue_poke(dq, 1, 0);
53675348
out:
53685349
return head;
53695350
}
@@ -5506,6 +5487,9 @@ _dispatch_worker_thread(void *context)
55065487
dispatch_root_queue_context_t qc = dq->do_ctxt;
55075488
dispatch_pthread_root_queue_context_t pqc = qc->dgq_ctxt;
55085489

5490+
int pending = (int)os_atomic_dec2o(qc, dgq_pending, relaxed);
5491+
dispatch_assert(pending >= 0);
5492+
55095493
if (pqc->dpq_observer_hooks.queue_will_execute) {
55105494
_dispatch_set_pthread_root_queue_observer_hooks(
55115495
&pqc->dpq_observer_hooks);
@@ -5525,7 +5509,9 @@ _dispatch_worker_thread(void *context)
55255509

55265510
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
55275511
bool overcommit = qc->dgq_wq_options & WORKQ_ADDTHREADS_OPTION_OVERCOMMIT;
5528-
if (!overcommit) {
5512+
bool manager = dq == &_dispatch_mgr_root_queue;
5513+
bool monitored = !(overcommit || manager);
5514+
if (monitored) {
55295515
_dispatch_workq_worker_register(dq, qc->dgq_wq_priority);
55305516
}
55315517
#endif
@@ -5539,12 +5525,12 @@ _dispatch_worker_thread(void *context)
55395525
dispatch_time(0, timeout)) == 0);
55405526

55415527
#if HAVE_INTERNAL_PTHREAD_WORKQUEUE
5542-
if (!overcommit) {
5528+
if (monitored) {
55435529
_dispatch_workq_worker_unregister(dq, qc->dgq_wq_priority);
55445530
}
55455531
#endif
55465532
(void)os_atomic_inc2o(qc, dgq_thread_pool_size, release);
5547-
_dispatch_global_queue_poke(dq, 1);
5533+
_dispatch_global_queue_poke(dq, 1, 0);
55485534
_dispatch_release(dq);
55495535

55505536
return NULL;

src/queue_internal.h

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -563,7 +563,7 @@ void _dispatch_queue_resume(dispatch_queue_t dq, bool activate);
563563
void _dispatch_queue_finalize_activation(dispatch_queue_t dq);
564564
void _dispatch_queue_invoke(dispatch_queue_t dq,
565565
dispatch_invoke_context_t dic, dispatch_invoke_flags_t flags);
566-
void _dispatch_global_queue_poke(dispatch_queue_t dq, int n);
566+
void _dispatch_global_queue_poke(dispatch_queue_t dq, int n, int floor);
567567
void _dispatch_queue_push(dispatch_queue_t dq, dispatch_object_t dou,
568568
dispatch_qos_t qos);
569569
void _dispatch_try_lock_transfer_or_wakeup(dispatch_queue_t dq);
@@ -593,8 +593,6 @@ void _dispatch_runloop_queue_dispose(dispatch_queue_t dq);
593593
void _dispatch_mgr_queue_drain(void);
594594
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
595595
void _dispatch_mgr_priority_init(void);
596-
int32_t _dispatch_pthread_root_queue_thread_pool_size(dispatch_queue_t dq);
597-
bool _dispatch_pthread_root_queue_oversubscribe(dispatch_queue_t dq, int n);
598596
#else
599597
static inline void _dispatch_mgr_priority_init(void) {}
600598
#endif

0 commit comments

Comments
 (0)