@@ -87,7 +87,6 @@ struct io_wqe {
87
87
struct {
88
88
raw_spinlock_t lock ;
89
89
struct io_wq_work_list work_list ;
90
- unsigned long hash_map ;
91
90
unsigned flags ;
92
91
} ____cacheline_aligned_in_smp ;
93
92
@@ -97,6 +96,8 @@ struct io_wqe {
97
96
struct hlist_nulls_head free_list ;
98
97
struct list_head all_list ;
99
98
99
+ struct wait_queue_entry wait ;
100
+
100
101
struct io_wq * wq ;
101
102
struct io_wq_work * hash_tail [IO_WQ_NR_HASH_BUCKETS ];
102
103
};
@@ -113,6 +114,9 @@ struct io_wq {
113
114
114
115
struct task_struct * manager ;
115
116
struct user_struct * user ;
117
+
118
+ struct io_wq_hash * hash ;
119
+
116
120
refcount_t refs ;
117
121
struct completion done ;
118
122
@@ -328,14 +332,31 @@ static inline unsigned int io_get_work_hash(struct io_wq_work *work)
328
332
return work -> flags >> IO_WQ_HASH_SHIFT ;
329
333
}
330
334
335
+ static void io_wait_on_hash (struct io_wqe * wqe , unsigned int hash )
336
+ {
337
+ struct io_wq * wq = wqe -> wq ;
338
+
339
+ spin_lock (& wq -> hash -> wait .lock );
340
+ if (list_empty (& wqe -> wait .entry )) {
341
+ __add_wait_queue (& wq -> hash -> wait , & wqe -> wait );
342
+ if (!test_bit (hash , & wq -> hash -> map )) {
343
+ __set_current_state (TASK_RUNNING );
344
+ list_del_init (& wqe -> wait .entry );
345
+ }
346
+ }
347
+ spin_unlock (& wq -> hash -> wait .lock );
348
+ }
349
+
331
350
static struct io_wq_work * io_get_next_work (struct io_wqe * wqe )
332
351
__must_hold (wqe - > lock )
333
352
{
334
353
struct io_wq_work_node * node , * prev ;
335
354
struct io_wq_work * work , * tail ;
336
- unsigned int hash ;
355
+ unsigned int stall_hash = -1U ;
337
356
338
357
wq_list_for_each (node , prev , & wqe -> work_list ) {
358
+ unsigned int hash ;
359
+
339
360
work = container_of (node , struct io_wq_work , list );
340
361
341
362
/* not hashed, can run anytime */
@@ -344,16 +365,26 @@ static struct io_wq_work *io_get_next_work(struct io_wqe *wqe)
344
365
return work ;
345
366
}
346
367
347
- /* hashed, can run if not already running */
348
368
hash = io_get_work_hash (work );
349
- if (!(wqe -> hash_map & BIT (hash ))) {
350
- wqe -> hash_map |= BIT (hash );
351
- /* all items with this hash lie in [work, tail] */
352
- tail = wqe -> hash_tail [hash ];
369
+ /* all items with this hash lie in [work, tail] */
370
+ tail = wqe -> hash_tail [hash ];
371
+
372
+ /* hashed, can run if not already running */
373
+ if (!test_and_set_bit (hash , & wqe -> wq -> hash -> map )) {
353
374
wqe -> hash_tail [hash ] = NULL ;
354
375
wq_list_cut (& wqe -> work_list , & tail -> list , prev );
355
376
return work ;
356
377
}
378
+ if (stall_hash == -1U )
379
+ stall_hash = hash ;
380
+ /* fast forward to a next hash, for-each will fix up @prev */
381
+ node = & tail -> list ;
382
+ }
383
+
384
+ if (stall_hash != -1U ) {
385
+ raw_spin_unlock (& wqe -> lock );
386
+ io_wait_on_hash (wqe , stall_hash );
387
+ raw_spin_lock (& wqe -> lock );
357
388
}
358
389
359
390
return NULL ;
@@ -421,6 +452,7 @@ static void io_worker_handle_work(struct io_worker *worker)
421
452
if (!work )
422
453
break ;
423
454
io_assign_current_work (worker , work );
455
+ __set_current_state (TASK_RUNNING );
424
456
425
457
/* handle a whole dependent link */
426
458
do {
@@ -444,8 +476,10 @@ static void io_worker_handle_work(struct io_worker *worker)
444
476
io_wqe_enqueue (wqe , linked );
445
477
446
478
if (hash != -1U && !next_hashed ) {
479
+ clear_bit (hash , & wq -> hash -> map );
480
+ if (wq_has_sleeper (& wq -> hash -> wait ))
481
+ wake_up (& wq -> hash -> wait );
447
482
raw_spin_lock_irq (& wqe -> lock );
448
- wqe -> hash_map &= ~BIT_ULL (hash );
449
483
wqe -> flags &= ~IO_WQE_FLAG_STALLED ;
450
484
/* skip unnecessary unlock-lock wqe->lock */
451
485
if (!work )
@@ -471,7 +505,6 @@ static int io_wqe_worker(void *data)
471
505
loop :
472
506
raw_spin_lock_irq (& wqe -> lock );
473
507
if (io_wqe_run_queue (wqe )) {
474
- __set_current_state (TASK_RUNNING );
475
508
io_worker_handle_work (worker );
476
509
goto loop ;
477
510
}
@@ -928,6 +961,24 @@ enum io_wq_cancel io_wq_cancel_cb(struct io_wq *wq, work_cancel_fn *cancel,
928
961
return IO_WQ_CANCEL_NOTFOUND ;
929
962
}
930
963
964
+ static int io_wqe_hash_wake (struct wait_queue_entry * wait , unsigned mode ,
965
+ int sync , void * key )
966
+ {
967
+ struct io_wqe * wqe = container_of (wait , struct io_wqe , wait );
968
+ int ret ;
969
+
970
+ list_del_init (& wait -> entry );
971
+
972
+ rcu_read_lock ();
973
+ ret = io_wqe_activate_free_worker (wqe );
974
+ rcu_read_unlock ();
975
+
976
+ if (!ret )
977
+ wake_up_process (wqe -> wq -> manager );
978
+
979
+ return 1 ;
980
+ }
981
+
931
982
struct io_wq * io_wq_create (unsigned bounded , struct io_wq_data * data )
932
983
{
933
984
int ret = - ENOMEM , node ;
@@ -948,6 +999,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
948
999
if (ret )
949
1000
goto err_wqes ;
950
1001
1002
+ refcount_inc (& data -> hash -> refs );
1003
+ wq -> hash = data -> hash ;
951
1004
wq -> free_work = data -> free_work ;
952
1005
wq -> do_work = data -> do_work ;
953
1006
@@ -968,6 +1021,8 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
968
1021
wqe -> acct [IO_WQ_ACCT_UNBOUND ].max_workers =
969
1022
task_rlimit (current , RLIMIT_NPROC );
970
1023
atomic_set (& wqe -> acct [IO_WQ_ACCT_UNBOUND ].nr_running , 0 );
1024
+ wqe -> wait .func = io_wqe_hash_wake ;
1025
+ INIT_LIST_HEAD (& wqe -> wait .entry );
971
1026
wqe -> wq = wq ;
972
1027
raw_spin_lock_init (& wqe -> lock );
973
1028
INIT_WQ_LIST (& wqe -> work_list );
@@ -989,6 +1044,7 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
989
1044
990
1045
if (refcount_dec_and_test (& wq -> refs ))
991
1046
complete (& wq -> done );
1047
+ io_wq_put_hash (data -> hash );
992
1048
err :
993
1049
cpuhp_state_remove_instance_nocalls (io_wq_online , & wq -> cpuhp_node );
994
1050
for_each_node (node )
@@ -1017,8 +1073,15 @@ void io_wq_destroy(struct io_wq *wq)
1017
1073
1018
1074
wait_for_completion (& wq -> done );
1019
1075
1020
- for_each_node (node )
1021
- kfree (wq -> wqes [node ]);
1076
+ spin_lock_irq (& wq -> hash -> wait .lock );
1077
+ for_each_node (node ) {
1078
+ struct io_wqe * wqe = wq -> wqes [node ];
1079
+
1080
+ list_del_init (& wqe -> wait .entry );
1081
+ kfree (wqe );
1082
+ }
1083
+ spin_unlock_irq (& wq -> hash -> wait .lock );
1084
+ io_wq_put_hash (wq -> hash );
1022
1085
kfree (wq -> wqes );
1023
1086
kfree (wq );
1024
1087
}
0 commit comments