@@ -284,6 +284,16 @@ struct wq_flusher {
284
284
285
285
struct wq_device ;
286
286
287
+ /*
288
+ * Unlike in a per-cpu workqueue where max_active limits its concurrency level
289
+ * on each CPU, in an unbound workqueue, max_active applies to the whole system.
290
+ * As sharing a single nr_active across multiple sockets can be very expensive,
291
+ * the counting and enforcement is per NUMA node.
292
+ */
293
+ struct wq_node_nr_active {
294
+ atomic_t nr ; /* per-node nr_active count */
295
+ };
296
+
287
297
/*
288
298
* The externally visible workqueue. It relays the issued work items to
289
299
* the appropriate worker_pool through its pool_workqueues.
@@ -330,6 +340,7 @@ struct workqueue_struct {
330
340
/* hot fields used during command issue, aligned to cacheline */
331
341
unsigned int flags ____cacheline_aligned ; /* WQ: WQ_* flags */
332
342
struct pool_workqueue __percpu __rcu * * cpu_pwq ; /* I: per-cpu pwqs */
343
+ struct wq_node_nr_active * node_nr_active []; /* I: per-node nr_active */
333
344
};
334
345
335
346
static struct kmem_cache * pwq_cache ;
@@ -1425,6 +1436,31 @@ work_func_t wq_worker_last_func(struct task_struct *task)
1425
1436
return worker -> last_func ;
1426
1437
}
1427
1438
1439
+ /**
1440
+ * wq_node_nr_active - Determine wq_node_nr_active to use
1441
+ * @wq: workqueue of interest
1442
+ * @node: NUMA node, can be %NUMA_NO_NODE
1443
+ *
1444
+ * Determine wq_node_nr_active to use for @wq on @node. Returns:
1445
+ *
1446
+ * - %NULL for per-cpu workqueues as they don't need to use shared nr_active.
1447
+ *
1448
+ * - node_nr_active[nr_node_ids] if @node is %NUMA_NO_NODE.
1449
+ *
1450
+ * - Otherwise, node_nr_active[@node].
1451
+ */
1452
+ static struct wq_node_nr_active * wq_node_nr_active (struct workqueue_struct * wq ,
1453
+ int node )
1454
+ {
1455
+ if (!(wq -> flags & WQ_UNBOUND ))
1456
+ return NULL ;
1457
+
1458
+ if (node == NUMA_NO_NODE )
1459
+ node = nr_node_ids ;
1460
+
1461
+ return wq -> node_nr_active [node ];
1462
+ }
1463
+
1428
1464
/**
1429
1465
* get_pwq - get an extra reference on the specified pool_workqueue
1430
1466
* @pwq: pool_workqueue to get
@@ -1506,12 +1542,17 @@ static bool pwq_activate_work(struct pool_workqueue *pwq,
1506
1542
struct work_struct * work )
1507
1543
{
1508
1544
struct worker_pool * pool = pwq -> pool ;
1545
+ struct wq_node_nr_active * nna ;
1509
1546
1510
1547
lockdep_assert_held (& pool -> lock );
1511
1548
1512
1549
if (!(* work_data_bits (work ) & WORK_STRUCT_INACTIVE ))
1513
1550
return false;
1514
1551
1552
+ nna = wq_node_nr_active (pwq -> wq , pool -> node );
1553
+ if (nna )
1554
+ atomic_inc (& nna -> nr );
1555
+
1515
1556
pwq -> nr_active ++ ;
1516
1557
__pwq_activate_work (pwq , work );
1517
1558
return true;
@@ -1528,14 +1569,18 @@ static bool pwq_tryinc_nr_active(struct pool_workqueue *pwq)
1528
1569
{
1529
1570
struct workqueue_struct * wq = pwq -> wq ;
1530
1571
struct worker_pool * pool = pwq -> pool ;
1572
+ struct wq_node_nr_active * nna = wq_node_nr_active (wq , pool -> node );
1531
1573
bool obtained ;
1532
1574
1533
1575
lockdep_assert_held (& pool -> lock );
1534
1576
1535
1577
obtained = pwq -> nr_active < READ_ONCE (wq -> max_active );
1536
1578
1537
- if (obtained )
1579
+ if (obtained ) {
1538
1580
pwq -> nr_active ++ ;
1581
+ if (nna )
1582
+ atomic_inc (& nna -> nr );
1583
+ }
1539
1584
return obtained ;
1540
1585
}
1541
1586
@@ -1572,10 +1617,26 @@ static bool pwq_activate_first_inactive(struct pool_workqueue *pwq)
1572
1617
static void pwq_dec_nr_active (struct pool_workqueue * pwq )
1573
1618
{
1574
1619
struct worker_pool * pool = pwq -> pool ;
1620
+ struct wq_node_nr_active * nna = wq_node_nr_active (pwq -> wq , pool -> node );
1575
1621
1576
1622
lockdep_assert_held (& pool -> lock );
1577
1623
1624
+ /*
1625
+ * @pwq->nr_active should be decremented for both percpu and unbound
1626
+ * workqueues.
1627
+ */
1578
1628
pwq -> nr_active -- ;
1629
+
1630
+ /*
1631
+ * For a percpu workqueue, it's simple. Just need to kick the first
1632
+ * inactive work item on @pwq itself.
1633
+ */
1634
+ if (!nna ) {
1635
+ pwq_activate_first_inactive (pwq );
1636
+ return ;
1637
+ }
1638
+
1639
+ atomic_dec (& nna -> nr );
1579
1640
pwq_activate_first_inactive (pwq );
1580
1641
}
1581
1642
@@ -4039,11 +4100,63 @@ static void wq_free_lockdep(struct workqueue_struct *wq)
4039
4100
}
4040
4101
#endif
4041
4102
4103
+ static void free_node_nr_active (struct wq_node_nr_active * * nna_ar )
4104
+ {
4105
+ int node ;
4106
+
4107
+ for_each_node (node ) {
4108
+ kfree (nna_ar [node ]);
4109
+ nna_ar [node ] = NULL ;
4110
+ }
4111
+
4112
+ kfree (nna_ar [nr_node_ids ]);
4113
+ nna_ar [nr_node_ids ] = NULL ;
4114
+ }
4115
+
4116
+ static void init_node_nr_active (struct wq_node_nr_active * nna )
4117
+ {
4118
+ atomic_set (& nna -> nr , 0 );
4119
+ }
4120
+
4121
+ /*
4122
+ * Each node's nr_active counter will be accessed mostly from its own node and
4123
+ * should be allocated in the node.
4124
+ */
4125
+ static int alloc_node_nr_active (struct wq_node_nr_active * * nna_ar )
4126
+ {
4127
+ struct wq_node_nr_active * nna ;
4128
+ int node ;
4129
+
4130
+ for_each_node (node ) {
4131
+ nna = kzalloc_node (sizeof (* nna ), GFP_KERNEL , node );
4132
+ if (!nna )
4133
+ goto err_free ;
4134
+ init_node_nr_active (nna );
4135
+ nna_ar [node ] = nna ;
4136
+ }
4137
+
4138
+ /* [nr_node_ids] is used as the fallback */
4139
+ nna = kzalloc_node (sizeof (* nna ), GFP_KERNEL , NUMA_NO_NODE );
4140
+ if (!nna )
4141
+ goto err_free ;
4142
+ init_node_nr_active (nna );
4143
+ nna_ar [nr_node_ids ] = nna ;
4144
+
4145
+ return 0 ;
4146
+
4147
+ err_free :
4148
+ free_node_nr_active (nna_ar );
4149
+ return - ENOMEM ;
4150
+ }
4151
+
4042
4152
static void rcu_free_wq (struct rcu_head * rcu )
4043
4153
{
4044
4154
struct workqueue_struct * wq =
4045
4155
container_of (rcu , struct workqueue_struct , rcu );
4046
4156
4157
+ if (wq -> flags & WQ_UNBOUND )
4158
+ free_node_nr_active (wq -> node_nr_active );
4159
+
4047
4160
wq_free_lockdep (wq );
4048
4161
free_percpu (wq -> cpu_pwq );
4049
4162
free_workqueue_attrs (wq -> unbound_attrs );
@@ -4785,7 +4898,8 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
4785
4898
{
4786
4899
va_list args ;
4787
4900
struct workqueue_struct * wq ;
4788
- int len ;
4901
+ size_t wq_size ;
4902
+ int name_len ;
4789
4903
4790
4904
/*
4791
4905
* Unbound && max_active == 1 used to imply ordered, which is no longer
@@ -4801,7 +4915,12 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
4801
4915
flags |= WQ_UNBOUND ;
4802
4916
4803
4917
/* allocate wq and format name */
4804
- wq = kzalloc (sizeof (* wq ), GFP_KERNEL );
4918
+ if (flags & WQ_UNBOUND )
4919
+ wq_size = struct_size (wq , node_nr_active , nr_node_ids + 1 );
4920
+ else
4921
+ wq_size = sizeof (* wq );
4922
+
4923
+ wq = kzalloc (wq_size , GFP_KERNEL );
4805
4924
if (!wq )
4806
4925
return NULL ;
4807
4926
@@ -4812,11 +4931,12 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
4812
4931
}
4813
4932
4814
4933
va_start (args , max_active );
4815
- len = vsnprintf (wq -> name , sizeof (wq -> name ), fmt , args );
4934
+ name_len = vsnprintf (wq -> name , sizeof (wq -> name ), fmt , args );
4816
4935
va_end (args );
4817
4936
4818
- if (len >= WQ_NAME_LEN )
4819
- pr_warn_once ("workqueue: name exceeds WQ_NAME_LEN. Truncating to: %s\n" , wq -> name );
4937
+ if (name_len >= WQ_NAME_LEN )
4938
+ pr_warn_once ("workqueue: name exceeds WQ_NAME_LEN. Truncating to: %s\n" ,
4939
+ wq -> name );
4820
4940
4821
4941
max_active = max_active ?: WQ_DFL_ACTIVE ;
4822
4942
max_active = wq_clamp_max_active (max_active , flags , wq -> name );
@@ -4835,8 +4955,13 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
4835
4955
wq_init_lockdep (wq );
4836
4956
INIT_LIST_HEAD (& wq -> list );
4837
4957
4958
+ if (flags & WQ_UNBOUND ) {
4959
+ if (alloc_node_nr_active (wq -> node_nr_active ) < 0 )
4960
+ goto err_unreg_lockdep ;
4961
+ }
4962
+
4838
4963
if (alloc_and_link_pwqs (wq ) < 0 )
4839
- goto err_unreg_lockdep ;
4964
+ goto err_free_node_nr_active ;
4840
4965
4841
4966
if (wq_online && init_rescuer (wq ) < 0 )
4842
4967
goto err_destroy ;
@@ -4861,6 +4986,9 @@ struct workqueue_struct *alloc_workqueue(const char *fmt,
4861
4986
4862
4987
return wq ;
4863
4988
4989
+ err_free_node_nr_active :
4990
+ if (wq -> flags & WQ_UNBOUND )
4991
+ free_node_nr_active (wq -> node_nr_active );
4864
4992
err_unreg_lockdep :
4865
4993
wq_unregister_lockdep (wq );
4866
4994
wq_free_lockdep (wq );
0 commit comments