@@ -2014,11 +2014,10 @@ struct ggml_threadpool {
2014
2014
// these are atomic as an annotation for thread-sanitizer
2015
2015
atomic_bool stop; // Used for stopping the threadpool altogether
2016
2016
atomic_bool pause; // Used for pausing the threadpool or individual threads
2017
- atomic_bool abort; // Used for aborting processing of a graph
2018
2017
2019
2018
struct ggml_compute_state * workers; // per thread state
2020
2019
int n_threads_max; // number of threads in the pool
2021
- atomic_int n_threads_cur; // number of threads used in the current graph
2020
+ int n_threads_cur; // number of threads used in the current graph
2022
2021
2023
2022
int32_t prio; // Scheduling priority
2024
2023
uint32_t poll; // Polling level (0 - no polling)
@@ -3182,36 +3181,41 @@ inline static void ggml_critical_section_start(void) {
3182
3181
}
3183
3182
}
3184
3183
3185
- static void ggml_barrier(struct ggml_threadpool * tp) {
3186
- int n_threads = atomic_load_explicit(&tp->n_threads_cur, memory_order_relaxed);
3187
- if (n_threads == 1) {
3184
+ #ifdef GGML_USE_OPENMP
3185
+ static void ggml_barrier(struct ggml_threadpool * threadpool) {
3186
+ if (threadpool->n_threads_cur == 1) {
3188
3187
return;
3189
3188
}
3190
3189
3191
- #ifdef GGML_USE_OPENMP
3192
3190
#pragma omp barrier
3191
+ }
3193
3192
#else
3194
- int n_passed = atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed);
3193
+ static void ggml_barrier(struct ggml_threadpool * threadpool) {
3194
+ if (threadpool->n_threads_cur == 1) {
3195
+ return;
3196
+ }
3197
+
3198
+ atomic_int * n_barrier = &threadpool->n_barrier;
3199
+ atomic_int * n_barrier_passed = &threadpool->n_barrier_passed;
3195
3200
3196
- // enter barrier (full seq-cst fence)
3197
- int n_barrier = atomic_fetch_add_explicit(&tp->n_barrier, 1, memory_order_seq_cst );
3201
+ int n_threads = threadpool->n_threads_cur;
3202
+ int passed_old = atomic_load_explicit(n_barrier_passed, memory_order_relaxed );
3198
3203
3199
- int last = 0;
3200
- if (n_barrier == (n_threads - 1)) {
3204
+ if (atomic_fetch_add(n_barrier, 1) == n_threads - 1) {
3201
3205
// last thread
3202
- atomic_store_explicit(&tp-> n_barrier, 0, memory_order_relaxed );
3203
- last = 1 ;
3206
+ atomic_store( n_barrier, 0);
3207
+ atomic_fetch_add_explicit(n_barrier_passed, 1, memory_order_relaxed) ;
3204
3208
} else {
3205
3209
// wait for other threads
3206
- while (atomic_load_explicit(&tp->n_barrier_passed, memory_order_relaxed) == n_passed) {
3210
+ while (true) {
3211
+ if (atomic_load_explicit(n_barrier_passed, memory_order_relaxed) != passed_old) {
3212
+ return;
3213
+ }
3207
3214
ggml_thread_cpu_relax();
3208
3215
}
3209
3216
}
3210
-
3211
- // exit barrier (full seq-cst fence)
3212
- atomic_fetch_add_explicit(&tp->n_barrier_passed, last, memory_order_seq_cst);
3213
- #endif
3214
3217
}
3218
+ #endif
3215
3219
3216
3220
// TODO: make this somehow automatically executed
3217
3221
// some sort of "sentry" mechanism
@@ -20181,84 +20185,64 @@ struct ggml_cplan ggml_graph_plan(
20181
20185
20182
20186
static thread_ret_t ggml_graph_compute_thread(void * data) {
20183
20187
struct ggml_compute_state * state = (struct ggml_compute_state *) data;
20184
- struct ggml_threadpool * tp = state->threadpool;
20185
20188
20186
- const struct ggml_cgraph * cgraph = tp ->cgraph;
20187
- const struct ggml_cplan * cplan = tp ->cplan;
20189
+ const struct ggml_cgraph * cgraph = state->threadpool ->cgraph;
20190
+ const struct ggml_cplan * cplan = state->threadpool ->cplan;
20188
20191
20189
20192
set_numa_thread_affinity(state->ith);
20190
20193
20191
20194
struct ggml_compute_params params = {
20192
20195
/*.ith =*/ state->ith,
20193
- /*.nth =*/ atomic_load_explicit(&tp-> n_threads_cur, memory_order_relaxed) ,
20196
+ /*.nth =*/ state->threadpool-> n_threads_cur,
20194
20197
/*.wsize =*/ cplan->work_size,
20195
20198
/*.wdata =*/ cplan->work_data,
20196
- /*.threadpool=*/ tp ,
20199
+ /*.threadpool=*/ state->threadpool ,
20197
20200
};
20198
20201
20199
- for (int node_n = 0; node_n < cgraph->n_nodes && !tp->abort ; node_n++) {
20202
+ for (int node_n = 0; node_n < cgraph->n_nodes; node_n++) {
20200
20203
struct ggml_tensor * node = cgraph->nodes[node_n];
20201
20204
20202
20205
ggml_compute_forward(¶ms, node);
20203
20206
20204
- if (state->ith == 0 && cplan->abort_callback &&
20205
- cplan->abort_callback(cplan->abort_callback_data)) {
20206
- tp->abort = true;
20207
- tp->ec = GGML_STATUS_ABORTED;
20207
+ if (state->ith == 0 && cplan->abort_callback && cplan->abort_callback(cplan->abort_callback_data)) {
20208
+ state->threadpool->ec = GGML_STATUS_ABORTED;
20208
20209
}
20209
20210
20210
20211
ggml_barrier(state->threadpool);
20212
+
20213
+ if (state->threadpool->ec != GGML_STATUS_SUCCESS) {
20214
+ break;
20215
+ }
20211
20216
}
20212
20217
20213
20218
return 0;
20214
20219
}
20215
20220
20216
20221
#ifndef GGML_USE_OPENMP
20217
20222
20218
- // check if thread is active
20219
- static inline bool ggml_graph_compute_thread_active(struct ggml_compute_state * state) {
20220
- struct ggml_threadpool * threadpool = state->threadpool;
20221
- int n_threads = atomic_load_explicit(&threadpool->n_threads_cur, memory_order_relaxed);
20222
- return (state->ith < n_threads);
20223
- }
20224
-
20225
- // check if thread is ready to proceed (exit from polling or sleeping)
20226
- static inline bool ggml_graph_compute_thread_ready(struct ggml_compute_state * state) {
20223
+ static inline bool ggml_graph_compute_ready(struct ggml_compute_state * state) {
20227
20224
struct ggml_threadpool * threadpool = state->threadpool;
20228
20225
20229
20226
if (state->pending || threadpool->stop || threadpool->pause) { return true; }
20230
20227
20231
20228
// check for new graph/work
20232
20229
int new_graph = atomic_load_explicit(&threadpool->n_graph, memory_order_relaxed);
20233
20230
if (new_graph != state->last_graph) {
20234
- state->pending = ggml_graph_compute_thread_active (state);
20231
+ state->pending = (state->ith < threadpool->n_threads_cur );
20235
20232
state->last_graph = new_graph;
20236
20233
}
20237
20234
20238
20235
return state->pending;
20239
20236
}
20240
20237
20241
- // sync thread state after polling
20242
- static inline void ggml_graph_compute_thread_sync(struct ggml_compute_state * state) {
20243
- struct ggml_threadpool * threadpool = state->threadpool;
20244
- // this should just be atomic_thread_fence(seq_cst) but it confuses thread-sanitizer
20245
- // so instead we just use a dummy read-modify-write
20246
- atomic_fetch_add_explicit(&threadpool->n_graph, 0, memory_order_seq_cst);
20247
- }
20248
-
20249
20238
static inline bool ggml_graph_compute_poll_for_work(struct ggml_compute_state * state) {
20250
20239
struct ggml_threadpool * threadpool = state->threadpool;
20251
20240
20252
- // Skip polling for unused threads
20253
- if (!ggml_graph_compute_thread_active(state)) {
20254
- return state->pending;
20255
- }
20256
-
20257
20241
// This seems to make 0 ... 100 a decent range for polling level across modern processors.
20258
20242
// Perhaps, we can adjust it dynamically based on load and things.
20259
20243
const uint64_t n_rounds = 1024UL * 128 * threadpool->poll;
20260
20244
20261
- for (uint64_t i=0; !ggml_graph_compute_thread_ready (state) && i < n_rounds; i++) {
20245
+ for (uint64_t i=0; !ggml_graph_compute_ready (state) && i< n_rounds; i++) {
20262
20246
// No new work. Keep polling.
20263
20247
ggml_thread_cpu_relax();
20264
20248
}
@@ -20270,14 +20254,13 @@ static inline bool ggml_graph_compute_check_for_work(struct ggml_compute_state *
20270
20254
struct ggml_threadpool * threadpool = state->threadpool;
20271
20255
20272
20256
if (ggml_graph_compute_poll_for_work(state)) {
20273
- ggml_graph_compute_thread_sync(state);
20274
20257
return state->pending;
20275
20258
}
20276
20259
20277
20260
ggml_mutex_lock_shared(&threadpool->mutex);
20278
- while (!ggml_graph_compute_thread_ready (state)) {
20261
+ while (!ggml_graph_compute_ready (state)) {
20279
20262
// No new work. Wait for the signal.
20280
- GGML_PRINT_DEBUG("thread #%d waiting for work (sleeping) \n", state->ith);
20263
+ GGML_PRINT_DEBUG("thread #%d waiting for work\n", state->ith);
20281
20264
ggml_cond_wait(&threadpool->cond, &threadpool->mutex);
20282
20265
}
20283
20266
ggml_mutex_unlock_shared(&threadpool->mutex);
@@ -20324,20 +20307,13 @@ static thread_ret_t ggml_graph_compute_secondary_thread(void* data) {
20324
20307
}
20325
20308
20326
20309
// Start processing new graph
20327
- static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool, int n_threads )
20310
+ static void ggml_graph_compute_kickoff(struct ggml_threadpool * threadpool)
20328
20311
{
20329
- // Always take the mutex here because the worker threads are doing hybrid poll/wait
20312
+ // always take the mutex here because the worker threads are doing hybrid poll/wait
20330
20313
20331
20314
ggml_mutex_lock(&threadpool->mutex);
20332
20315
20333
- GGML_PRINT_DEBUG("threadpool: n_threads_cur %d n_threads %d\n", threadpool->n_threads_cur, n_threads);
20334
-
20335
- // Update the number of active threads
20336
- atomic_store_explicit(&threadpool->n_threads_cur, n_threads, memory_order_relaxed);
20337
-
20338
- // Indicate the graph is ready to be processed
20339
- // We need the full seq-cst fence here because of the polling threads (used in thread_sync)
20340
- atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_seq_cst);
20316
+ atomic_fetch_add_explicit(&threadpool->n_graph, 1, memory_order_relaxed);
20341
20317
20342
20318
if (threadpool->pause) {
20343
20319
// Update main thread prio and affinity to match the threadpool settings
@@ -20396,7 +20372,6 @@ static struct ggml_threadpool * ggml_threadpool_new_impl(
20396
20372
threadpool->current_chunk = 0;
20397
20373
threadpool->stop = false;
20398
20374
threadpool->pause = tpp->paused;
20399
- threadpool->abort = false;
20400
20375
threadpool->workers = NULL;
20401
20376
threadpool->n_threads_max = tpp->n_threads;
20402
20377
threadpool->n_threads_cur = tpp->n_threads;
@@ -20472,11 +20447,15 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20472
20447
// No worker threads should be accessing the parameters below at this stage
20473
20448
threadpool->cgraph = cgraph;
20474
20449
threadpool->cplan = cplan;
20450
+ threadpool->n_threads_cur = n_threads;
20475
20451
threadpool->current_chunk = 0;
20476
- threadpool->abort = false;
20477
20452
threadpool->ec = GGML_STATUS_SUCCESS;
20478
20453
}
20479
20454
20455
+ if (n_threads > threadpool->n_threads_max) {
20456
+ GGML_PRINT("WARNING: cplan is requesting more threads than the threadpool contains. Expect a bad time!\n");
20457
+ }
20458
+
20480
20459
#ifdef GGML_USE_OPENMP
20481
20460
if (n_threads > 1) {
20482
20461
#pragma omp parallel num_threads(n_threads)
@@ -20485,7 +20464,7 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20485
20464
{
20486
20465
// update the number of threads from the actual number of threads that we got from OpenMP
20487
20466
n_threads = omp_get_num_threads();
20488
- atomic_store_explicit(& threadpool->n_threads_cur, n_threads, memory_order_relaxed) ;
20467
+ threadpool->n_threads_cur = n_threads ;
20489
20468
}
20490
20469
20491
20470
ggml_graph_compute_thread(&threadpool->workers[omp_get_thread_num()]);
@@ -20495,13 +20474,8 @@ enum ggml_status ggml_graph_compute(struct ggml_cgraph * cgraph, struct ggml_cpl
20495
20474
ggml_graph_compute_thread(&threadpool->workers[0]);
20496
20475
}
20497
20476
#else
20498
- if (n_threads > threadpool->n_threads_max) {
20499
- GGML_PRINT("WARNING: cplan requested more threads (%d) than available (%d)\n", n_threads, threadpool->n_threads_max);
20500
- n_threads = threadpool->n_threads_max;
20501
- }
20502
-
20503
20477
// Kick all threads to start the new graph
20504
- ggml_graph_compute_kickoff(threadpool, n_threads );
20478
+ ggml_graph_compute_kickoff(threadpool);
20505
20479
20506
20480
// This is a work thread too
20507
20481
ggml_graph_compute_thread(&threadpool->workers[0]);
0 commit comments