9
9
#include <linux/init.h>
10
10
#include <linux/errno.h>
11
11
#include <linux/sched/signal.h>
12
- #include <linux/mm.h>
13
- #include <linux/sched/mm.h>
14
12
#include <linux/percpu.h>
15
13
#include <linux/slab.h>
16
14
#include <linux/rculist_nulls.h>
@@ -96,28 +94,29 @@ struct io_wqe {
96
94
97
95
struct io_wq * wq ;
98
96
struct io_wq_work * hash_tail [IO_WQ_NR_HASH_BUCKETS ];
97
+
98
+ cpumask_var_t cpu_mask ;
99
99
};
100
100
101
101
/*
102
102
* Per io_wq state
103
103
*/
104
104
struct io_wq {
105
- struct io_wqe * * wqes ;
106
105
unsigned long state ;
107
106
108
107
free_work_fn * free_work ;
109
108
io_wq_work_fn * do_work ;
110
109
111
110
struct io_wq_hash * hash ;
112
111
113
- refcount_t refs ;
114
-
115
112
atomic_t worker_refs ;
116
113
struct completion worker_done ;
117
114
118
115
struct hlist_node cpuhp_node ;
119
116
120
117
struct task_struct * task ;
118
+
119
+ struct io_wqe * wqes [];
121
120
};
122
121
123
122
static enum cpuhp_state io_wq_online ;
@@ -241,7 +240,8 @@ static void io_wqe_wake_worker(struct io_wqe *wqe, struct io_wqe_acct *acct)
241
240
* Most likely an attempt to queue unbounded work on an io_wq that
242
241
* wasn't setup with any unbounded workers.
243
242
*/
244
- WARN_ON_ONCE (!acct -> max_workers );
243
+ if (unlikely (!acct -> max_workers ))
244
+ pr_warn_once ("io-wq is not configured for unbound workers" );
245
245
246
246
rcu_read_lock ();
247
247
ret = io_wqe_activate_free_worker (wqe );
@@ -560,17 +560,13 @@ static int io_wqe_worker(void *data)
560
560
if (ret )
561
561
continue ;
562
562
/* timed out, exit unless we're the fixed worker */
563
- if (test_bit (IO_WQ_BIT_EXIT , & wq -> state ) ||
564
- !(worker -> flags & IO_WORKER_F_FIXED ))
563
+ if (!(worker -> flags & IO_WORKER_F_FIXED ))
565
564
break ;
566
565
}
567
566
568
567
if (test_bit (IO_WQ_BIT_EXIT , & wq -> state )) {
569
568
raw_spin_lock_irq (& wqe -> lock );
570
- if (!wq_list_empty (& wqe -> work_list ))
571
- io_worker_handle_work (worker );
572
- else
573
- raw_spin_unlock_irq (& wqe -> lock );
569
+ io_worker_handle_work (worker );
574
570
}
575
571
576
572
io_worker_exit (worker );
@@ -645,7 +641,7 @@ static void create_io_worker(struct io_wq *wq, struct io_wqe *wqe, int index)
645
641
646
642
tsk -> pf_io_worker = worker ;
647
643
worker -> task = tsk ;
648
- set_cpus_allowed_ptr (tsk , cpumask_of_node ( wqe -> node ) );
644
+ set_cpus_allowed_ptr (tsk , wqe -> cpu_mask );
649
645
tsk -> flags |= PF_NO_SETAFFINITY ;
650
646
651
647
raw_spin_lock_irq (& wqe -> lock );
@@ -901,23 +897,20 @@ static int io_wqe_hash_wake(struct wait_queue_entry *wait, unsigned mode,
901
897
902
898
struct io_wq * io_wq_create (unsigned bounded , struct io_wq_data * data )
903
899
{
904
- int ret = - ENOMEM , node ;
900
+ int ret , node ;
905
901
struct io_wq * wq ;
906
902
907
903
if (WARN_ON_ONCE (!data -> free_work || !data -> do_work ))
908
904
return ERR_PTR (- EINVAL );
905
+ if (WARN_ON_ONCE (!bounded ))
906
+ return ERR_PTR (- EINVAL );
909
907
910
- wq = kzalloc (sizeof ( * wq ), GFP_KERNEL );
908
+ wq = kzalloc (struct_size ( wq , wqes , nr_node_ids ), GFP_KERNEL );
911
909
if (!wq )
912
910
return ERR_PTR (- ENOMEM );
913
-
914
- wq -> wqes = kcalloc (nr_node_ids , sizeof (struct io_wqe * ), GFP_KERNEL );
915
- if (!wq -> wqes )
916
- goto err_wq ;
917
-
918
911
ret = cpuhp_state_add_instance_nocalls (io_wq_online , & wq -> cpuhp_node );
919
912
if (ret )
920
- goto err_wqes ;
913
+ goto err_wq ;
921
914
922
915
refcount_inc (& data -> hash -> refs );
923
916
wq -> hash = data -> hash ;
@@ -934,6 +927,9 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
934
927
wqe = kzalloc_node (sizeof (struct io_wqe ), GFP_KERNEL , alloc_node );
935
928
if (!wqe )
936
929
goto err ;
930
+ if (!alloc_cpumask_var (& wqe -> cpu_mask , GFP_KERNEL ))
931
+ goto err ;
932
+ cpumask_copy (wqe -> cpu_mask , cpumask_of_node (node ));
937
933
wq -> wqes [node ] = wqe ;
938
934
wqe -> node = alloc_node ;
939
935
wqe -> acct [IO_WQ_ACCT_BOUND ].index = IO_WQ_ACCT_BOUND ;
@@ -953,17 +949,18 @@ struct io_wq *io_wq_create(unsigned bounded, struct io_wq_data *data)
953
949
}
954
950
955
951
wq -> task = get_task_struct (data -> task );
956
- refcount_set (& wq -> refs , 1 );
957
952
atomic_set (& wq -> worker_refs , 1 );
958
953
init_completion (& wq -> worker_done );
959
954
return wq ;
960
955
err :
961
956
io_wq_put_hash (data -> hash );
962
957
cpuhp_state_remove_instance_nocalls (io_wq_online , & wq -> cpuhp_node );
963
- for_each_node (node )
958
+ for_each_node (node ) {
959
+ if (!wq -> wqes [node ])
960
+ continue ;
961
+ free_cpumask_var (wq -> wqes [node ]-> cpu_mask );
964
962
kfree (wq -> wqes [node ]);
965
- err_wqes :
966
- kfree (wq -> wqes );
963
+ }
967
964
err_wq :
968
965
kfree (wq );
969
966
return ERR_PTR (ret );
@@ -1033,10 +1030,10 @@ static void io_wq_destroy(struct io_wq *wq)
1033
1030
.cancel_all = true,
1034
1031
};
1035
1032
io_wqe_cancel_pending_work (wqe , & match );
1033
+ free_cpumask_var (wqe -> cpu_mask );
1036
1034
kfree (wqe );
1037
1035
}
1038
1036
io_wq_put_hash (wq -> hash );
1039
- kfree (wq -> wqes );
1040
1037
kfree (wq );
1041
1038
}
1042
1039
@@ -1045,25 +1042,67 @@ void io_wq_put_and_exit(struct io_wq *wq)
1045
1042
WARN_ON_ONCE (!test_bit (IO_WQ_BIT_EXIT , & wq -> state ));
1046
1043
1047
1044
io_wq_exit_workers (wq );
1048
- if (refcount_dec_and_test (& wq -> refs ))
1049
- io_wq_destroy (wq );
1045
+ io_wq_destroy (wq );
1050
1046
}
1051
1047
1048
+ struct online_data {
1049
+ unsigned int cpu ;
1050
+ bool online ;
1051
+ };
1052
+
1052
1053
static bool io_wq_worker_affinity (struct io_worker * worker , void * data )
1053
1054
{
1054
- set_cpus_allowed_ptr ( worker -> task , cpumask_of_node ( worker -> wqe -> node )) ;
1055
+ struct online_data * od = data ;
1055
1056
1057
+ if (od -> online )
1058
+ cpumask_set_cpu (od -> cpu , worker -> wqe -> cpu_mask );
1059
+ else
1060
+ cpumask_clear_cpu (od -> cpu , worker -> wqe -> cpu_mask );
1056
1061
return false;
1057
1062
}
1058
1063
1064
+ static int __io_wq_cpu_online (struct io_wq * wq , unsigned int cpu , bool online )
1065
+ {
1066
+ struct online_data od = {
1067
+ .cpu = cpu ,
1068
+ .online = online
1069
+ };
1070
+ int i ;
1071
+
1072
+ rcu_read_lock ();
1073
+ for_each_node (i )
1074
+ io_wq_for_each_worker (wq -> wqes [i ], io_wq_worker_affinity , & od );
1075
+ rcu_read_unlock ();
1076
+ return 0 ;
1077
+ }
1078
+
1059
1079
static int io_wq_cpu_online (unsigned int cpu , struct hlist_node * node )
1060
1080
{
1061
1081
struct io_wq * wq = hlist_entry_safe (node , struct io_wq , cpuhp_node );
1082
+
1083
+ return __io_wq_cpu_online (wq , cpu , true);
1084
+ }
1085
+
1086
+ static int io_wq_cpu_offline (unsigned int cpu , struct hlist_node * node )
1087
+ {
1088
+ struct io_wq * wq = hlist_entry_safe (node , struct io_wq , cpuhp_node );
1089
+
1090
+ return __io_wq_cpu_online (wq , cpu , false);
1091
+ }
1092
+
1093
+ int io_wq_cpu_affinity (struct io_wq * wq , cpumask_var_t mask )
1094
+ {
1062
1095
int i ;
1063
1096
1064
1097
rcu_read_lock ();
1065
- for_each_node (i )
1066
- io_wq_for_each_worker (wq -> wqes [i ], io_wq_worker_affinity , NULL );
1098
+ for_each_node (i ) {
1099
+ struct io_wqe * wqe = wq -> wqes [i ];
1100
+
1101
+ if (mask )
1102
+ cpumask_copy (wqe -> cpu_mask , mask );
1103
+ else
1104
+ cpumask_copy (wqe -> cpu_mask , cpumask_of_node (i ));
1105
+ }
1067
1106
rcu_read_unlock ();
1068
1107
return 0 ;
1069
1108
}
@@ -1073,7 +1112,7 @@ static __init int io_wq_init(void)
1073
1112
int ret ;
1074
1113
1075
1114
ret = cpuhp_setup_state_multi (CPUHP_AP_ONLINE_DYN , "io-wq/online" ,
1076
- io_wq_cpu_online , NULL );
1115
+ io_wq_cpu_online , io_wq_cpu_offline );
1077
1116
if (ret < 0 )
1078
1117
return ret ;
1079
1118
io_wq_online = ret ;
0 commit comments