Skip to content

Commit 9db1503

Browse files
committed
tidy up to prepare patch for review
1 parent 534dd0a commit 9db1503

File tree

3 files changed

+61
-58
lines changed

3 files changed

+61
-58
lines changed

src/event/workqueue.c

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -41,40 +41,40 @@
4141

4242
#pragma mark static data for management subsystem
4343

44-
/*
45-
* The number of runnable worker threads as observed by
46-
* the scheduling management subsystem.
47-
*/
48-
static volatile int32_t _dispatch_workq_runnable_workers;
49-
50-
/*
51-
* The desired number of runnable worker threads
52-
* for a workqueue (assuming sufficient work).
53-
*/
54-
static int32_t _dispatch_workq_target_runnable_workers;
55-
5644
#if DISPATCH_ENABLE_PWQ_KEXT
5745
/* Are we using user-level or kext based management? */
5846
static bool _dispatch_workq_kext_active;
5947
#endif
6048

6149
/*
62-
* State for the user-level pool management.
50+
* State for the user-level monitoring of a workqueue.
6351
*/
6452
typedef struct dispatch_workq_manager_s {
53+
/* The observed number of runnable registered workers */
54+
int32_t runnable_workers;
55+
56+
/* The desired number of runnable registered workers */
57+
int32_t target_runnable_workers;
58+
6559
/*
66-
* Tracking of registered workers; all updates and reads
67-
* are performed while holding the lock.
68-
*/
60+
* Tracking of registered workers; all accesses must hold lock.
61+
* Invariant: registered_workers[0]...registered_workers[num_registered_workers-1]
62+
* contain the pids of the workers that we are managing.
63+
*/
6964
dispatch_unfair_lock_s registered_worker_lock;
70-
int num_registered_workers;
7165
pid_t *registered_workers;
72-
66+
int num_registered_workers;
7367
} dispatch_workq_manager_s, *dispatch_workq_manager_t;
7468

7569
static dispatch_workq_manager_s _dispatch_workq_manager;
7670

77-
#pragma mark backdoors into queue.c internals
71+
#pragma mark inline helper functions
72+
73+
DISPATCH_INLINE
74+
dispatch_workq_manager_t
75+
_dispatch_workq_get_default_manager() {
76+
return &_dispatch_workq_manager;
77+
}
7878

7979
DISPATCH_INLINE
8080
dispatch_queue_t
@@ -114,17 +114,18 @@ dispatch_workq_worker_register(dispatch_queue_t root_q)
114114
return true;
115115
}
116116
#endif
117+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
117118
bool rc;
118119
int tid = syscall(SYS_gettid);
119-
_dispatch_unfair_lock_lock(&_dispatch_workq_manager.registered_worker_lock);
120-
if (_dispatch_workq_manager.num_registered_workers < WORKQ_MAX_TRACKED_WORKERS-1) {
121-
int worker_id = _dispatch_workq_manager.num_registered_workers++;
122-
_dispatch_workq_manager.registered_workers[worker_id] = tid;
120+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
121+
if (mgr->num_registered_workers < WORKQ_MAX_TRACKED_WORKERS-1) {
122+
int worker_id = mgr->num_registered_workers++;
123+
mgr->registered_workers[worker_id] = tid;
123124
rc = true;
124125
} else {
125126
rc = false;
126127
}
127-
_dispatch_unfair_lock_unlock(&_dispatch_workq_manager.registered_worker_lock);
128+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
128129

129130
return rc;
130131
}
@@ -143,18 +144,19 @@ dispatch_workq_worker_unregister(dispatch_queue_t root_q)
143144
return;
144145
}
145146
#endif
147+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
146148
int tid = syscall(SYS_gettid);
147-
_dispatch_unfair_lock_lock(&_dispatch_workq_manager.registered_worker_lock);
148-
for (int i=0; i<_dispatch_workq_manager.num_registered_workers; i++) {
149-
if (_dispatch_workq_manager.registered_workers[i] == tid) {
150-
int last = _dispatch_workq_manager.num_registered_workers - 1;
151-
_dispatch_workq_manager.registered_workers[i] = _dispatch_workq_manager.registered_workers[last];
152-
_dispatch_workq_manager.registered_workers[last] = 0;
153-
_dispatch_workq_manager.num_registered_workers--;
149+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
150+
for (int i=0; i<mgr->num_registered_workers; i++) {
151+
if (mgr->registered_workers[i] == tid) {
152+
int last = mgr->num_registered_workers - 1;
153+
mgr->registered_workers[i] = mgr->registered_workers[last];
154+
mgr->registered_workers[last] = 0;
155+
mgr->num_registered_workers--;
154156
break;
155157
}
156158
}
157-
_dispatch_unfair_lock_unlock(&_dispatch_workq_manager.registered_worker_lock);
159+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
158160
}
159161

160162

@@ -163,19 +165,19 @@ dispatch_workq_worker_unregister(dispatch_queue_t root_q)
163165
* to get a count of the number of them that are actually runnable.
164166
* See the proc(5) man page for the format of the contents of /proc/[pid]/stat
165167
*/
166-
static int
167-
_dispatch_workq_count_runnable_workers(void)
168+
static void
169+
_dispatch_workq_count_runnable_workers(dispatch_workq_manager_t mgr)
168170
{
169171
char path[128];
170172
char buf[4096];
171173
int running_count = 0;
172174

173175
memset(buf, 0, sizeof(buf));
174176

175-
_dispatch_unfair_lock_lock(&_dispatch_workq_manager.registered_worker_lock);
177+
_dispatch_unfair_lock_lock(&mgr->registered_worker_lock);
176178

177-
for (int i=0; i<_dispatch_workq_manager.num_registered_workers; i++) {
178-
pid_t worker_pid = _dispatch_workq_manager.registered_workers[i];
179+
for (int i=0; i<mgr->num_registered_workers; i++) {
180+
pid_t worker_pid = mgr->registered_workers[i];
179181
int fd;
180182
size_t bytes_read = -1;
181183

@@ -188,10 +190,10 @@ _dispatch_workq_count_runnable_workers(void)
188190
// Must mean worker exited uncleanly (without executing _dispatch_worker_unregister())
189191
// Clean up by removing pid and decrementing number of registered workers
190192
_dispatch_debug("workq: Unable to open /proc/%d/stat; removing worker from monitoring list", worker_pid);
191-
int last = _dispatch_workq_manager.num_registered_workers-1;
192-
_dispatch_workq_manager.registered_workers[i] = _dispatch_workq_manager.registered_workers[last];
193-
_dispatch_workq_manager.registered_workers[last] = 0;
194-
_dispatch_workq_manager.num_registered_workers--;
193+
int last = mgr->num_registered_workers-1;
194+
mgr->registered_workers[i] = mgr->registered_workers[last];
195+
mgr->registered_workers[last] = 0;
196+
mgr->num_registered_workers--;
195197
} else {
196198
bytes_read = read(fd, buf, sizeof(buf));
197199
(void)close(fd);
@@ -213,31 +215,33 @@ _dispatch_workq_count_runnable_workers(void)
213215
}
214216
}
215217

216-
_dispatch_unfair_lock_unlock(&_dispatch_workq_manager.registered_worker_lock);
218+
mgr->runnable_workers = running_count;
217219

218-
return running_count;
220+
_dispatch_unfair_lock_unlock(&mgr->registered_worker_lock);
219221
}
220222

221223
static void
222224
_dispatch_workq_monitor_thread_pool(void *context DISPATCH_UNUSED)
223225
{
226+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
224227
dispatch_queue_t dq = _dispatch_workq_get_default_root_queue();
225228
bool work_available = _dispatch_workq_root_queue_has_work(dq);
226229
if (work_available) {
227-
_dispatch_workq_runnable_workers = _dispatch_workq_count_runnable_workers();
230+
_dispatch_workq_count_runnable_workers(mgr);
228231

229232
_dispatch_debug("workq: %s is non-empty and has %d runnable workers\n",
230-
dq->dq_label, _dispatch_workq_runnable_workers);
233+
dq->dq_label, mgr->runnable_workers);
231234

232-
if (_dispatch_workq_runnable_workers < _dispatch_workq_target_runnable_workers) {
233-
int32_t count = _dispatch_pthread_root_queue_size(dq);
234-
int32_t allowed_over = WORKQ_OVERSUBSCRIBE_FACTOR * _dispatch_workq_target_runnable_workers;
235+
if (mgr->runnable_workers < mgr->target_runnable_workers) {
236+
int32_t count = _dispatch_pthread_root_queue_thread_pool_size(dq);
237+
int32_t allowed_over = WORKQ_OVERSUBSCRIBE_FACTOR * mgr->target_runnable_workers;
238+
allowed_over = MIN(allowed_over, WORKQ_MAX_TRACKED_WORKERS - mgr->target_runnable_workers);
235239
if (count + allowed_over > 0) {
236240
_dispatch_debug("workq: %s has count %d; requesting 1 additional worker",
237241
dq->dq_label, count);
238242
_dispatch_pthread_root_queue_oversubscribe(dq, 1);
239243
} else {
240-
_dispatch_debug("workq: %s is already oversubscribed by %d; taking no action",
244+
_dispatch_debug("workq: %s already oversubscribed by %d; taking no action",
241245
dq->dq_label, -count);
242246
}
243247
}
@@ -257,10 +261,9 @@ _dispatch_workq_init_once(void *context DISPATCH_UNUSED)
257261
return;
258262
}
259263
#endif
260-
_dispatch_workq_manager.registered_workers =
261-
_dispatch_calloc(WORKQ_MAX_TRACKED_WORKERS, sizeof(pid_t));
262-
263-
_dispatch_workq_target_runnable_workers = dispatch_hw_config(active_cpus);
264+
dispatch_workq_manager_t mgr = _dispatch_workq_get_default_manager();
265+
mgr->registered_workers = _dispatch_calloc(WORKQ_MAX_TRACKED_WORKERS, sizeof(pid_t));
266+
mgr->target_runnable_workers = dispatch_hw_config(active_cpus);
264267

265268
// Create monitoring timer that will periodically run on dispatch_mgr_q
266269
dispatch_source_t ds = dispatch_source_create(DISPATCH_SOURCE_TYPE_TIMER, 0, 0, &_dispatch_mgr_q);

src/queue.c

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2078,7 +2078,7 @@ dispatch_pthread_root_queue_copy_current(void)
20782078
}
20792079

20802080
int32_t
2081-
_dispatch_pthread_root_queue_size(dispatch_queue_t dq) {
2081+
_dispatch_pthread_root_queue_thread_pool_size(dispatch_queue_t dq) {
20822082
dispatch_root_queue_context_t qc = dq->do_ctxt;
20832083
return os_atomic_load2o(qc, dgq_thread_pool_size, ordered);
20842084
}
@@ -3958,7 +3958,7 @@ _dispatch_runloop_queue_poke(dispatch_queue_t dq, dispatch_qos_t qos,
39583958
#if DISPATCH_USE_PTHREAD_POOL
39593959
DISPATCH_INLINE
39603960
void
3961-
_dispatch_spawn_pthread_root_queue_workers(dispatch_queue_t dq,
3961+
_dispatch_pthread_root_queue_spawn_workers(dispatch_queue_t dq,
39623962
pthread_attr_t *attr, int32_t n)
39633963
{
39643964
pthread_t tid, *pthr = &tid;
@@ -3984,7 +3984,7 @@ _dispatch_pthread_root_queue_oversubscribe(dispatch_queue_t dq, int n)
39843984

39853985
(void)os_atomic_sub2o(qc, dgq_thread_pool_size, n, release);
39863986

3987-
_dispatch_spawn_pthread_root_queue_workers(dq, attr, n);
3987+
_dispatch_pthread_root_queue_spawn_workers(dq, attr, n);
39883988
}
39893989
#endif
39903990

@@ -4062,7 +4062,7 @@ _dispatch_global_queue_poke_slow(dispatch_queue_t dq, unsigned int n)
40624062
pthr = _dispatch_mgr_root_queue_init();
40634063
}
40644064
#endif
4065-
_dispatch_spawn_pthread_root_queue_workers(dq, attr, j);
4065+
_dispatch_pthread_root_queue_spawn_workers(dq, attr, j);
40664066
#endif // DISPATCH_USE_PTHREAD_POOL
40674067
}
40684068

src/queue_internal.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -593,7 +593,7 @@ void _dispatch_runloop_queue_dispose(dispatch_queue_t dq);
593593
void _dispatch_mgr_queue_drain(void);
594594
#if DISPATCH_USE_MGR_THREAD && DISPATCH_ENABLE_PTHREAD_ROOT_QUEUES
595595
void _dispatch_mgr_priority_init(void);
596-
int32_t _dispatch_pthread_root_queue_size(dispatch_queue_t dq);
596+
int32_t _dispatch_pthread_root_queue_thread_pool_size(dispatch_queue_t dq);
597597
void _dispatch_pthread_root_queue_oversubscribe(dispatch_queue_t dq, int n);
598598
#else
599599
static inline void _dispatch_mgr_priority_init(void) {}

0 commit comments

Comments
 (0)