Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
From e26200a16b12befc4f2be816ef1f6f0a0b34653d Mon Sep 17 00:00:00 2001
From: Daniel Jordan <daniel.m.jordan@oracle.com>
Date: Tue, 26 Nov 2019 02:38:09 -0500
Subject: [PATCH] padata: allocate work structures for parallel jobs from a pool
padata allocates per-CPU, per-instance work structs for parallel jobs.
A do_parallel call assigns a job to a sequence number and hashes the
number to a CPU, where the job will eventually run using the
corresponding work.
This approach fit with how padata used to bind a job to each CPU
round-robin, makes less sense after commit bfde23ce200e6 ("padata:
unbind parallel jobs from specific CPUs") because a work isn't bound to
a particular CPU anymore, and isn't needed at all for multithreaded jobs
because they don't have sequence numbers.
Replace the per-CPU works with a preallocated pool, which allows sharing
them between existing padata users and the upcoming multithreaded user.
The pool will also facilitate setting NUMA-aware concurrency limits with
later users.
The pool is sized according to the number of possible CPUs. With this
limit, MAX_OBJ_NUM no longer makes sense, so remove it.
If the global pool is exhausted, a parallel job is run in the current
task instead to throttle a system trying to do too much in parallel.
Signed-off-by: Daniel Jordan <daniel.m.jordan@oracle.com>
---
include/linux/padata.h | 8 +---
kernel/padata.c | 118 +++++++++++++++++++++++++++++++-----------------
2 files changed, 78 insertions(+), 48 deletions(-)
diff --git a/include/linux/padata.h b/include/linux/padata.h
index 476ecfa..3bfa503 100644
--- a/include/linux/padata.h
+++ b/include/linux/padata.h
@@ -24,7 +24,6 @@
* @list: List entry, to attach to the padata lists.
* @pd: Pointer to the internal control structure.
* @cb_cpu: Callback cpu for serializatioon.
- * @cpu: Cpu for parallelization.
* @seq_nr: Sequence number of the parallelized data object.
* @info: Used to pass information from the parallel to the serial function.
* @parallel: Parallel execution function.
@@ -34,7 +33,6 @@ struct padata_priv {
struct list_head list;
struct parallel_data *pd;
int cb_cpu;
- int cpu;
unsigned int seq_nr;
int info;
void (*parallel)(struct padata_priv *padata);
@@ -68,15 +66,11 @@ struct padata_serial_queue {
/**
* struct padata_parallel_queue - The percpu padata parallel queue
*
- * @parallel: List to wait for parallelization.
* @reorder: List to wait for reordering after parallel processing.
- * @work: work struct for parallelization.
* @num_obj: Number of objects that are processed by this cpu.
*/
struct padata_parallel_queue {
- struct padata_list parallel;
struct padata_list reorder;
- struct work_struct work;
atomic_t num_obj;
};
@@ -111,7 +105,7 @@ struct parallel_data {
struct padata_parallel_queue __percpu *pqueue;
struct padata_serial_queue __percpu *squeue;
atomic_t refcnt;
- atomic_t seq_nr;
+ unsigned int seq_nr;
unsigned int processed;
int cpu;
struct padata_cpumask cpumask;
diff --git a/kernel/padata.c b/kernel/padata.c
index b05cd30..edd3ff5 100644
--- a/kernel/padata.c
+++ b/kernel/padata.c
@@ -32,7 +32,15 @@
#include <linux/sysfs.h>
#include <linux/rcupdate.h>
-#define MAX_OBJ_NUM 1000
+struct padata_work {
+ struct work_struct pw_work;
+ struct list_head pw_list; /* padata_free_works linkage */
+ void *pw_data;
+};
+
+static DEFINE_SPINLOCK(padata_works_lock);
+static struct padata_work *padata_works;
+static LIST_HEAD(padata_free_works);
static void padata_free_pd(struct parallel_data *pd);
@@ -58,30 +66,44 @@ static int padata_cpu_hash(struct parallel_data *pd, unsigned int seq_nr)
return padata_index_to_cpu(pd, cpu_index);
}
-static void padata_parallel_worker(struct work_struct *parallel_work)
+static struct padata_work *padata_work_alloc(void)
{
- struct padata_parallel_queue *pqueue;
- LIST_HEAD(local_list);
+ struct padata_work *pw;
- local_bh_disable();
- pqueue = container_of(parallel_work,
- struct padata_parallel_queue, work);
+ lockdep_assert_held(&padata_works_lock);
- spin_lock(&pqueue->parallel.lock);
- list_replace_init(&pqueue->parallel.list, &local_list);
- spin_unlock(&pqueue->parallel.lock);
+ if (list_empty(&padata_free_works))
+ return NULL; /* No more work items allowed to be queued. */
- while (!list_empty(&local_list)) {
- struct padata_priv *padata;
+ pw = list_first_entry(&padata_free_works, struct padata_work, pw_list);
+ list_del(&pw->pw_list);
+ return pw;
+}
- padata = list_entry(local_list.next,
- struct padata_priv, list);
+static void padata_work_init(struct padata_work *pw, work_func_t work_fn,
+ void *data)
+{
+ INIT_WORK(&pw->pw_work, work_fn);
+ pw->pw_data = data;
+}
- list_del_init(&padata->list);
+static void padata_work_free(struct padata_work *pw)
+{
+ lockdep_assert_held(&padata_works_lock);
+ list_add(&pw->pw_list, &padata_free_works);
+}
- padata->parallel(padata);
- }
+static void padata_parallel_worker(struct work_struct *parallel_work)
+{
+ struct padata_work *pw = container_of(parallel_work, struct padata_work,
+ pw_work);
+ struct padata_priv *padata = pw->pw_data;
+ local_bh_disable();
+ padata->parallel(padata);
+ spin_lock(&padata_works_lock);
+ padata_work_free(pw);
+ spin_unlock(&padata_works_lock);
local_bh_enable();
}
@@ -105,9 +127,9 @@ int padata_do_parallel(struct padata_shell *ps,
struct padata_priv *padata, int *cb_cpu)
{
struct padata_instance *pinst = ps->pinst;
- int i, cpu, cpu_index, target_cpu, err;
- struct padata_parallel_queue *queue;
+ int i, cpu, cpu_index, err;
struct parallel_data *pd;
+ struct padata_work *pw;
rcu_read_lock_bh();
@@ -135,25 +157,25 @@ int padata_do_parallel(struct padata_shell *ps,
if ((pinst->flags & PADATA_RESET))
goto out;
- if (atomic_read(&pd->refcnt) >= MAX_OBJ_NUM)
- goto out;
-
- err = 0;
atomic_inc(&pd->refcnt);
padata->pd = pd;
padata->cb_cpu = *cb_cpu;
- padata->seq_nr = atomic_inc_return(&pd->seq_nr);
- target_cpu = padata_cpu_hash(pd, padata->seq_nr);
- padata->cpu = target_cpu;
- queue = per_cpu_ptr(pd->pqueue, target_cpu);
-
- spin_lock(&queue->parallel.lock);
- list_add_tail(&padata->list, &queue->parallel.list);
- spin_unlock(&queue->parallel.lock);
+ rcu_read_unlock_bh();
- queue_work(pinst->parallel_wq, &queue->work);
+ spin_lock(&padata_works_lock);
+ padata->seq_nr = ++pd->seq_nr;
+ pw = padata_work_alloc();
+ spin_unlock(&padata_works_lock);
+ if (pw) {
+ padata_work_init(pw, padata_parallel_worker, padata);
+ queue_work(pinst->parallel_wq, &pw->pw_work);
+ } else {
+ /* Maximum works limit exceeded, run in the current task. */
+ padata->parallel(padata);
+ }
+ return 0;
out:
rcu_read_unlock_bh();
@@ -324,8 +346,9 @@ static void padata_serial_worker(struct work_struct *serial_work)
void padata_do_serial(struct padata_priv *padata)
{
struct parallel_data *pd = padata->pd;
+ int hashed_cpu = padata_cpu_hash(pd, padata->seq_nr);
struct padata_parallel_queue *pqueue = per_cpu_ptr(pd->pqueue,
- padata->cpu);
+ hashed_cpu);
struct padata_priv *cur;
spin_lock(&pqueue->reorder.lock);
@@ -416,8 +439,6 @@ static void padata_init_pqueues(struct parallel_data *pd)
pqueue = per_cpu_ptr(pd->pqueue, cpu);
__padata_list_init(&pqueue->reorder);
- __padata_list_init(&pqueue->parallel);
- INIT_WORK(&pqueue->work, padata_parallel_worker);
atomic_set(&pqueue->num_obj, 0);
}
}
@@ -451,7 +472,7 @@ static struct parallel_data *padata_alloc_pd(struct padata_shell *ps)
padata_init_pqueues(pd);
padata_init_squeues(pd);
- atomic_set(&pd->seq_nr, -1);
+ pd->seq_nr = -1;
atomic_set(&pd->refcnt, 1);
spin_lock_init(&pd->lock);
pd->cpu = cpumask_first(pd->cpumask.pcpu);
@@ -1050,6 +1071,7 @@ EXPORT_SYMBOL(padata_free_shell);
void __init padata_init(void)
{
+ unsigned int i, possible_cpus;
#ifdef CONFIG_HOTPLUG_CPU
int ret;
@@ -1061,13 +1083,27 @@ void __init padata_init(void)
ret = cpuhp_setup_state_multi(CPUHP_PADATA_DEAD, "padata:dead",
NULL, padata_cpu_dead);
- if (ret < 0) {
- cpuhp_remove_multi_state(hp_online);
- goto err;
- }
+ if (ret < 0)
+ goto remove_online_state;
+#endif
+
+ possible_cpus = num_possible_cpus();
+ padata_works = kmalloc_array(possible_cpus, sizeof(struct padata_work),
+ GFP_KERNEL);
+ if (!padata_works)
+ goto remove_dead_state;
+
+ for (i = 0; i < possible_cpus; ++i)
+ list_add(&padata_works[i].pw_list, &padata_free_works);
return;
+
+remove_dead_state:
+#ifdef CONFIG_HOTPLUG_CPU
+ cpuhp_remove_multi_state(CPUHP_PADATA_DEAD);
+remove_online_state:
+ cpuhp_remove_multi_state(hp_online);
err:
- pr_warn("padata: initialization failed\n");
#endif
+ pr_warn("padata: initialization failed\n");
}
--
1.7.4.1