Skip to content

Commit 9bcc3b4

Browse files
committed
Use lock free queue
1 parent 94ddd62 commit 9bcc3b4

File tree

1 file changed

+85
-61
lines changed

1 file changed

+85
-61
lines changed

thpool.c

Lines changed: 85 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,31 @@ this is not part of original thpool, thats me hacking it to work on windows
2525
// ref: https://github.com/ggerganov/whisper.cpp/issues/168
2626
#include <windows.h>
2727
#endif
28+
#include <stdbool.h>
29+
30+
typedef volatile LONG atomic_int;
31+
typedef atomic_int atomic_bool;
32+
33+
static void atomic_store(atomic_int* ptr, LONG val) {
34+
InterlockedExchange(ptr, val);
35+
}
36+
static LONG atomic_load(atomic_int* ptr) {
37+
return InterlockedCompareExchange(ptr, 0, 0);
38+
}
39+
static LONG atomic_fetch_add(atomic_int* ptr, LONG inc) {
40+
return InterlockedExchangeAdd(ptr, inc);
41+
}
42+
static LONG atomic_fetch_sub(atomic_int* ptr, LONG dec) {
43+
return atomic_fetch_add(ptr, -(dec));
44+
}
45+
static bool atomic_compare_exchange_strong(atomic_int* ptr, atomic_int* expected, int desired) {
46+
int old_value = InterlockedCompareExchange((LONG*)ptr, desired, *expected);
47+
bool success = (old_value == *expected);
48+
if (!success) {
49+
*expected = old_value;
50+
}
51+
return success;
52+
}
2853

2954
unsigned int sleep(unsigned int seconds) {
3055
Sleep(seconds * 1000);
@@ -179,8 +204,7 @@ typedef struct thpool_{
179204
thread** threads; /* pointer to threads */
180205
volatile int num_threads_alive; /* threads currently alive */
181206
volatile int num_threads_working; /* threads currently working */
182-
pthread_mutex_t thcount_lock; /* used for thread count etc */
183-
pthread_cond_t threads_all_idle; /* signal to thpool_wait */
207+
bsem threads_all_idle; /* signal to thpool_wait */
184208
jobqueue jobqueue; /* job queue */
185209
} thpool_;
186210

@@ -251,8 +275,7 @@ struct thpool_* thpool_init(int num_threads){
251275
return NULL;
252276
}
253277

254-
pthread_mutex_init(&(thpool_p->thcount_lock), NULL);
255-
pthread_cond_init(&thpool_p->threads_all_idle, NULL);
278+
bsem_init(&thpool_p->threads_all_idle, 0);
256279

257280
/* Thread init */
258281
int n;
@@ -283,11 +306,9 @@ int thpool_add_work(thpool_* thpool_p, void (*function_p)(void*), job * newjob){
283306

284307
/* Wait until all jobs have finished */
285308
void thpool_wait(thpool_* thpool_p){
286-
pthread_mutex_lock(&thpool_p->thcount_lock);
287309
while (thpool_p->jobqueue.len || thpool_p->num_threads_working) {
288-
pthread_cond_wait(&thpool_p->threads_all_idle, &thpool_p->thcount_lock);
310+
bsem_wait(&thpool_p->threads_all_idle);
289311
}
290-
pthread_mutex_unlock(&thpool_p->thcount_lock);
291312
}
292313

293314

@@ -422,19 +443,15 @@ static void* thread_do(struct thread* thread_p){
422443
}*/
423444

424445
/* Mark thread as alive (initialized) */
425-
pthread_mutex_lock(&thpool_p->thcount_lock);
426-
thpool_p->num_threads_alive += 1;
427-
pthread_mutex_unlock(&thpool_p->thcount_lock);
446+
atomic_fetch_add(&thpool_p->num_threads_alive, 1);
428447

429448
while(threads_keepalive){
430449

431450
bsem_wait(thpool_p->jobqueue.has_jobs);
432451

433452
if (threads_keepalive){
434453

435-
pthread_mutex_lock(&thpool_p->thcount_lock);
436-
thpool_p->num_threads_working++;
437-
pthread_mutex_unlock(&thpool_p->thcount_lock);
454+
atomic_fetch_add(&thpool_p->num_threads_working, 1);
438455

439456
/* Read job from queue and execute it */
440457
void (*func_buff)(void*);
@@ -446,18 +463,13 @@ static void* thread_do(struct thread* thread_p){
446463
func_buff(arg_buff);
447464
}
448465

449-
pthread_mutex_lock(&thpool_p->thcount_lock);
450-
thpool_p->num_threads_working--;
451-
if (!thpool_p->num_threads_working) {
452-
pthread_cond_signal(&thpool_p->threads_all_idle);
466+
if (atomic_fetch_sub(&thpool_p->num_threads_working, 1) == 1) {
467+
bsem_post_all(&thpool_p->threads_all_idle);
453468
}
454-
pthread_mutex_unlock(&thpool_p->thcount_lock);
455-
456469
}
457470
}
458-
pthread_mutex_lock(&thpool_p->thcount_lock);
459-
thpool_p->num_threads_alive --;
460-
pthread_mutex_unlock(&thpool_p->thcount_lock);
471+
472+
atomic_fetch_sub(&thpool_p->num_threads_working, 1);
461473

462474
return NULL;
463475
}
@@ -507,65 +519,77 @@ static void jobqueue_clear(jobqueue* jobqueue_p){
507519

508520
}
509521

510-
511522
/* Add (allocated) job to queue
512523
*/
513-
static void jobqueue_push(jobqueue* jobqueue_p, struct job* newjob){
524+
static void jobqueue_push(jobqueue *jobqueue_p, struct job *newjob)
525+
{
514526

515-
pthread_mutex_lock(&jobqueue_p->rwmutex);
516527
newjob->prev = NULL;
517-
518-
switch(jobqueue_p->len){
519-
520-
case 0: /* if no jobs in queue */
521-
jobqueue_p->front = newjob;
522-
jobqueue_p->rear = newjob;
523-
break;
524-
525-
default: /* if jobs in queue */
526-
jobqueue_p->rear->prev = newjob;
527-
jobqueue_p->rear = newjob;
528-
528+
struct job *expected_rear = NULL;
529+
530+
// Use atomic compare-and-swap to add newjob to the end of the queue
531+
if (atomic_compare_exchange_strong(&jobqueue_p->rear, &expected_rear, newjob))
532+
{
533+
if (jobqueue_p->front == NULL)
534+
{
535+
// if no jobs in queue, set front to newjob
536+
atomic_store(&jobqueue_p->front, newjob);
537+
}
538+
}
539+
else
540+
{
541+
// if jobs in queue, update previous pointer of previous rear and set rear to newjob
542+
struct job *prev_rear = (struct job *)atomic_load(&jobqueue_p->rear);
543+
atomic_store(&prev_rear->prev, newjob);
544+
atomic_store(&jobqueue_p->rear, newjob);
529545
}
530-
jobqueue_p->len++;
531546

547+
atomic_fetch_add(&jobqueue_p->len, 1);
532548
bsem_post(jobqueue_p->has_jobs);
533-
pthread_mutex_unlock(&jobqueue_p->rwmutex);
534549
}
535550

536-
537551
/* Get first job from queue(removes it from queue)
538552
* Notice: Caller MUST hold a mutex
539553
*/
540-
static struct job* jobqueue_pull(jobqueue* jobqueue_p){
541-
542-
pthread_mutex_lock(&jobqueue_p->rwmutex);
543-
job* job_p = jobqueue_p->front;
554+
static struct job *jobqueue_pull(jobqueue *jobqueue_p)
555+
{
544556

545-
switch(jobqueue_p->len){
557+
struct job *expected_front = NULL;
558+
struct job *expected_rear = NULL;
559+
struct job *job_p = NULL;
546560

547-
case 0: /* if no jobs in queue */
548-
break;
549-
550-
case 1: /* if one job in queue */
551-
jobqueue_p->front = NULL;
552-
jobqueue_p->rear = NULL;
553-
jobqueue_p->len = 0;
554-
break;
555-
556-
default: /* if >1 jobs in queue */
557-
jobqueue_p->front = job_p->prev;
558-
jobqueue_p->len--;
559-
/* more than one job in queue -> post it */
560-
bsem_post(jobqueue_p->has_jobs);
561+
// Use atomic compare-and-swap to remove the front job from the queue
562+
if (atomic_compare_exchange_strong(&jobqueue_p->front, &expected_front, NULL))
563+
{
564+
// Queue is empty
565+
return NULL;
566+
}
567+
else
568+
{
569+
job_p = expected_front;
570+
}
561571

572+
// Update rear pointer if necessary
573+
if (jobqueue_p->rear == job_p)
574+
{
575+
if (atomic_compare_exchange_strong(&jobqueue_p->rear, &expected_rear, NULL))
576+
{
577+
// The queue only had one element
578+
atomic_fetch_sub(&jobqueue_p->len, 1);
579+
return job_p;
580+
}
581+
else
582+
{
583+
atomic_store(&jobqueue_p->rear, expected_front->prev);
584+
}
562585
}
563586

564-
pthread_mutex_unlock(&jobqueue_p->rwmutex);
587+
// Remove the job from the queue
588+
atomic_fetch_sub(&jobqueue_p->len, 1);
589+
bsem_post(jobqueue_p->has_jobs);
565590
return job_p;
566591
}
567592

568-
569593
/* Free all queue resources back to the system */
570594
static void jobqueue_destroy(jobqueue* jobqueue_p){
571595
jobqueue_clear(jobqueue_p);

0 commit comments

Comments
 (0)