xref: /linux/include/linux/ptr_ring.h (revision ec2e0fb07d789976c601bec19ecced7a501c3705)
1 /* SPDX-License-Identifier: GPL-2.0-or-later */
2 /*
3  *	Definitions for the 'struct ptr_ring' datastructure.
4  *
5  *	Author:
6  *		Michael S. Tsirkin <mst@redhat.com>
7  *
8  *	Copyright (C) 2016 Red Hat, Inc.
9  *
10  *	This is a limited-size FIFO maintaining pointers in FIFO order, with
11  *	one CPU producing entries and another consuming entries from a FIFO.
12  *
13  *	This implementation tries to minimize cache-contention when there is a
14  *	single producer and a single consumer CPU.
15  */
16 
17 #ifndef _LINUX_PTR_RING_H
18 #define _LINUX_PTR_RING_H 1
19 
20 #ifdef __KERNEL__
21 #include <linux/spinlock.h>
22 #include <linux/cache.h>
23 #include <linux/types.h>
24 #include <linux/compiler.h>
25 #include <linux/slab.h>
26 #include <linux/mm.h>
27 #include <asm/errno.h>
28 #endif
29 
30 struct ptr_ring {
31 	int producer ____cacheline_aligned_in_smp;
32 	spinlock_t producer_lock;
33 	int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34 	int consumer_tail; /* next entry to invalidate */
35 	spinlock_t consumer_lock;
36 	/* Shared consumer/producer data */
37 	/* Read-only by both the producer and the consumer */
38 	int size ____cacheline_aligned_in_smp; /* max entries in queue */
39 	int batch; /* number of entries to consume in a batch */
40 	void **queue;
41 };
42 
43 /* Note: callers invoking this in a loop must use a compiler barrier,
44  * for example cpu_relax().
45  *
46  * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47  * see e.g. ptr_ring_full.
48  */
__ptr_ring_full(struct ptr_ring * r)49 static inline bool __ptr_ring_full(struct ptr_ring *r)
50 {
51 	return r->queue[r->producer];
52 }
53 
ptr_ring_full(struct ptr_ring * r)54 static inline bool ptr_ring_full(struct ptr_ring *r)
55 {
56 	bool ret;
57 
58 	spin_lock(&r->producer_lock);
59 	ret = __ptr_ring_full(r);
60 	spin_unlock(&r->producer_lock);
61 
62 	return ret;
63 }
64 
ptr_ring_full_irq(struct ptr_ring * r)65 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66 {
67 	bool ret;
68 
69 	spin_lock_irq(&r->producer_lock);
70 	ret = __ptr_ring_full(r);
71 	spin_unlock_irq(&r->producer_lock);
72 
73 	return ret;
74 }
75 
ptr_ring_full_any(struct ptr_ring * r)76 static inline bool ptr_ring_full_any(struct ptr_ring *r)
77 {
78 	unsigned long flags;
79 	bool ret;
80 
81 	spin_lock_irqsave(&r->producer_lock, flags);
82 	ret = __ptr_ring_full(r);
83 	spin_unlock_irqrestore(&r->producer_lock, flags);
84 
85 	return ret;
86 }
87 
ptr_ring_full_bh(struct ptr_ring * r)88 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89 {
90 	bool ret;
91 
92 	spin_lock_bh(&r->producer_lock);
93 	ret = __ptr_ring_full(r);
94 	spin_unlock_bh(&r->producer_lock);
95 
96 	return ret;
97 }
98 
99 /* Note: callers invoking this in a loop must use a compiler barrier,
100  * for example cpu_relax(). Callers must hold producer_lock.
101  * Callers are responsible for making sure pointer that is being queued
102  * points to a valid data.
103  */
__ptr_ring_produce(struct ptr_ring * r,void * ptr)104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105 {
106 	if (unlikely(!r->size) || r->queue[r->producer])
107 		return -ENOSPC;
108 
109 	/* Make sure the pointer we are storing points to a valid data. */
110 	/* Pairs with the dependency ordering in __ptr_ring_consume. */
111 	smp_wmb();
112 
113 	WRITE_ONCE(r->queue[r->producer++], ptr);
114 	if (unlikely(r->producer >= r->size))
115 		r->producer = 0;
116 	return 0;
117 }
118 
119 /*
120  * Note: resize (below) nests producer lock within consumer lock, so if you
121  * consume in interrupt or BH context, you must disable interrupts/BH when
122  * calling this.
123  */
ptr_ring_produce(struct ptr_ring * r,void * ptr)124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125 {
126 	int ret;
127 
128 	spin_lock(&r->producer_lock);
129 	ret = __ptr_ring_produce(r, ptr);
130 	spin_unlock(&r->producer_lock);
131 
132 	return ret;
133 }
134 
ptr_ring_produce_irq(struct ptr_ring * r,void * ptr)135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136 {
137 	int ret;
138 
139 	spin_lock_irq(&r->producer_lock);
140 	ret = __ptr_ring_produce(r, ptr);
141 	spin_unlock_irq(&r->producer_lock);
142 
143 	return ret;
144 }
145 
ptr_ring_produce_any(struct ptr_ring * r,void * ptr)146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147 {
148 	unsigned long flags;
149 	int ret;
150 
151 	spin_lock_irqsave(&r->producer_lock, flags);
152 	ret = __ptr_ring_produce(r, ptr);
153 	spin_unlock_irqrestore(&r->producer_lock, flags);
154 
155 	return ret;
156 }
157 
ptr_ring_produce_bh(struct ptr_ring * r,void * ptr)158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159 {
160 	int ret;
161 
162 	spin_lock_bh(&r->producer_lock);
163 	ret = __ptr_ring_produce(r, ptr);
164 	spin_unlock_bh(&r->producer_lock);
165 
166 	return ret;
167 }
168 
__ptr_ring_peek(struct ptr_ring * r)169 static inline void *__ptr_ring_peek(struct ptr_ring *r)
170 {
171 	if (likely(r->size))
172 		return READ_ONCE(r->queue[r->consumer_head]);
173 	return NULL;
174 }
175 
176 /*
177  * Test ring empty status without taking any locks.
178  *
179  * NB: This is only safe to call if ring is never resized.
180  *
181  * However, if some other CPU consumes ring entries at the same time, the value
182  * returned is not guaranteed to be correct.
183  *
184  * In this case - to avoid incorrectly detecting the ring
185  * as empty - the CPU consuming the ring entries is responsible
186  * for either consuming all ring entries until the ring is empty,
187  * or synchronizing with some other CPU and causing it to
188  * re-test __ptr_ring_empty and/or consume the ring enteries
189  * after the synchronization point.
190  *
191  * Note: callers invoking this in a loop must use a compiler barrier,
192  * for example cpu_relax().
193  */
__ptr_ring_empty(struct ptr_ring * r)194 static inline bool __ptr_ring_empty(struct ptr_ring *r)
195 {
196 	if (likely(r->size))
197 		return !r->queue[READ_ONCE(r->consumer_head)];
198 	return true;
199 }
200 
ptr_ring_empty(struct ptr_ring * r)201 static inline bool ptr_ring_empty(struct ptr_ring *r)
202 {
203 	bool ret;
204 
205 	spin_lock(&r->consumer_lock);
206 	ret = __ptr_ring_empty(r);
207 	spin_unlock(&r->consumer_lock);
208 
209 	return ret;
210 }
211 
ptr_ring_empty_irq(struct ptr_ring * r)212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213 {
214 	bool ret;
215 
216 	spin_lock_irq(&r->consumer_lock);
217 	ret = __ptr_ring_empty(r);
218 	spin_unlock_irq(&r->consumer_lock);
219 
220 	return ret;
221 }
222 
ptr_ring_empty_any(struct ptr_ring * r)223 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224 {
225 	unsigned long flags;
226 	bool ret;
227 
228 	spin_lock_irqsave(&r->consumer_lock, flags);
229 	ret = __ptr_ring_empty(r);
230 	spin_unlock_irqrestore(&r->consumer_lock, flags);
231 
232 	return ret;
233 }
234 
ptr_ring_empty_bh(struct ptr_ring * r)235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236 {
237 	bool ret;
238 
239 	spin_lock_bh(&r->consumer_lock);
240 	ret = __ptr_ring_empty(r);
241 	spin_unlock_bh(&r->consumer_lock);
242 
243 	return ret;
244 }
245 
246 /* Zero entries from tail to specified head.
247  * NB: if consumer_head can be >= r->size need to fixup tail later.
248  */
__ptr_ring_zero_tail(struct ptr_ring * r,int consumer_head)249 static inline void __ptr_ring_zero_tail(struct ptr_ring *r, int consumer_head)
250 {
251 	int head = consumer_head;
252 
253 	/* Zero out entries in the reverse order: this way we touch the
254 	 * cache line that producer might currently be reading the last;
255 	 * producer won't make progress and touch other cache lines
256 	 * besides the first one until we write out all entries.
257 	 */
258 	while (likely(head > r->consumer_tail))
259 		r->queue[--head] = NULL;
260 
261 	r->consumer_tail = consumer_head;
262 }
263 
264 /* Must only be called after __ptr_ring_peek returned !NULL */
__ptr_ring_discard_one(struct ptr_ring * r)265 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
266 {
267 	/* Fundamentally, what we want to do is update consumer
268 	 * index and zero out the entry so producer can reuse it.
269 	 * Doing it naively at each consume would be as simple as:
270 	 *       consumer = r->consumer;
271 	 *       r->queue[consumer++] = NULL;
272 	 *       if (unlikely(consumer >= r->size))
273 	 *               consumer = 0;
274 	 *       r->consumer = consumer;
275 	 * but that is suboptimal when the ring is full as producer is writing
276 	 * out new entries in the same cache line.  Defer these updates until a
277 	 * batch of entries has been consumed.
278 	 */
279 	/* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
280 	 * to work correctly.
281 	 */
282 	int consumer_head = r->consumer_head + 1;
283 
284 	/* Once we have processed enough entries invalidate them in
285 	 * the ring all at once so producer can reuse their space in the ring.
286 	 * We also do this when we reach end of the ring - not mandatory
287 	 * but helps keep the implementation simple.
288 	 */
289 	if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
290 		     consumer_head >= r->size))
291 		__ptr_ring_zero_tail(r, consumer_head);
292 
293 	if (unlikely(consumer_head >= r->size)) {
294 		consumer_head = 0;
295 		r->consumer_tail = 0;
296 	}
297 	/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
298 	WRITE_ONCE(r->consumer_head, consumer_head);
299 }
300 
__ptr_ring_consume(struct ptr_ring * r)301 static inline void *__ptr_ring_consume(struct ptr_ring *r)
302 {
303 	void *ptr;
304 
305 	/* The READ_ONCE in __ptr_ring_peek guarantees that anyone
306 	 * accessing data through the pointer is up to date. Pairs
307 	 * with smp_wmb in __ptr_ring_produce.
308 	 */
309 	ptr = __ptr_ring_peek(r);
310 	if (ptr)
311 		__ptr_ring_discard_one(r);
312 
313 	return ptr;
314 }
315 
__ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)316 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
317 					     void **array, int n)
318 {
319 	void *ptr;
320 	int i;
321 
322 	for (i = 0; i < n; i++) {
323 		ptr = __ptr_ring_consume(r);
324 		if (!ptr)
325 			break;
326 		array[i] = ptr;
327 	}
328 
329 	return i;
330 }
331 
332 /*
333  * Note: resize (below) nests producer lock within consumer lock, so if you
334  * call this in interrupt or BH context, you must disable interrupts/BH when
335  * producing.
336  */
ptr_ring_consume(struct ptr_ring * r)337 static inline void *ptr_ring_consume(struct ptr_ring *r)
338 {
339 	void *ptr;
340 
341 	spin_lock(&r->consumer_lock);
342 	ptr = __ptr_ring_consume(r);
343 	spin_unlock(&r->consumer_lock);
344 
345 	return ptr;
346 }
347 
ptr_ring_consume_irq(struct ptr_ring * r)348 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
349 {
350 	void *ptr;
351 
352 	spin_lock_irq(&r->consumer_lock);
353 	ptr = __ptr_ring_consume(r);
354 	spin_unlock_irq(&r->consumer_lock);
355 
356 	return ptr;
357 }
358 
ptr_ring_consume_any(struct ptr_ring * r)359 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
360 {
361 	unsigned long flags;
362 	void *ptr;
363 
364 	spin_lock_irqsave(&r->consumer_lock, flags);
365 	ptr = __ptr_ring_consume(r);
366 	spin_unlock_irqrestore(&r->consumer_lock, flags);
367 
368 	return ptr;
369 }
370 
ptr_ring_consume_bh(struct ptr_ring * r)371 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
372 {
373 	void *ptr;
374 
375 	spin_lock_bh(&r->consumer_lock);
376 	ptr = __ptr_ring_consume(r);
377 	spin_unlock_bh(&r->consumer_lock);
378 
379 	return ptr;
380 }
381 
ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)382 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
383 					   void **array, int n)
384 {
385 	int ret;
386 
387 	spin_lock(&r->consumer_lock);
388 	ret = __ptr_ring_consume_batched(r, array, n);
389 	spin_unlock(&r->consumer_lock);
390 
391 	return ret;
392 }
393 
ptr_ring_consume_batched_irq(struct ptr_ring * r,void ** array,int n)394 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
395 					       void **array, int n)
396 {
397 	int ret;
398 
399 	spin_lock_irq(&r->consumer_lock);
400 	ret = __ptr_ring_consume_batched(r, array, n);
401 	spin_unlock_irq(&r->consumer_lock);
402 
403 	return ret;
404 }
405 
ptr_ring_consume_batched_any(struct ptr_ring * r,void ** array,int n)406 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
407 					       void **array, int n)
408 {
409 	unsigned long flags;
410 	int ret;
411 
412 	spin_lock_irqsave(&r->consumer_lock, flags);
413 	ret = __ptr_ring_consume_batched(r, array, n);
414 	spin_unlock_irqrestore(&r->consumer_lock, flags);
415 
416 	return ret;
417 }
418 
ptr_ring_consume_batched_bh(struct ptr_ring * r,void ** array,int n)419 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
420 					      void **array, int n)
421 {
422 	int ret;
423 
424 	spin_lock_bh(&r->consumer_lock);
425 	ret = __ptr_ring_consume_batched(r, array, n);
426 	spin_unlock_bh(&r->consumer_lock);
427 
428 	return ret;
429 }
430 
431 /* Cast to structure type and call a function without discarding from FIFO.
432  * Function must return a value.
433  * Callers must take consumer_lock.
434  */
435 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
436 
437 #define PTR_RING_PEEK_CALL(r, f) ({ \
438 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
439 	\
440 	spin_lock(&(r)->consumer_lock); \
441 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
442 	spin_unlock(&(r)->consumer_lock); \
443 	__PTR_RING_PEEK_CALL_v; \
444 })
445 
446 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
447 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
448 	\
449 	spin_lock_irq(&(r)->consumer_lock); \
450 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
451 	spin_unlock_irq(&(r)->consumer_lock); \
452 	__PTR_RING_PEEK_CALL_v; \
453 })
454 
455 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
456 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
457 	\
458 	spin_lock_bh(&(r)->consumer_lock); \
459 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460 	spin_unlock_bh(&(r)->consumer_lock); \
461 	__PTR_RING_PEEK_CALL_v; \
462 })
463 
464 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
465 	typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
466 	unsigned long __PTR_RING_PEEK_CALL_f;\
467 	\
468 	spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
469 	__PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
470 	spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
471 	__PTR_RING_PEEK_CALL_v; \
472 })
473 
474 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
475  * documentation for vmalloc for which of them are legal.
476  */
__ptr_ring_init_queue_alloc_noprof(unsigned int size,gfp_t gfp)477 static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp)
478 {
479 	if (size > KMALLOC_MAX_SIZE / sizeof(void *))
480 		return NULL;
481 	return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO);
482 }
483 
__ptr_ring_set_size(struct ptr_ring * r,int size)484 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
485 {
486 	r->size = size;
487 	r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
488 	/* We need to set batch at least to 1 to make logic
489 	 * in __ptr_ring_discard_one work correctly.
490 	 * Batching too much (because ring is small) would cause a lot of
491 	 * burstiness. Needs tuning, for now disable batching.
492 	 */
493 	if (r->batch > r->size / 2 || !r->batch)
494 		r->batch = 1;
495 }
496 
ptr_ring_init_noprof(struct ptr_ring * r,int size,gfp_t gfp)497 static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp)
498 {
499 	r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
500 	if (!r->queue)
501 		return -ENOMEM;
502 
503 	__ptr_ring_set_size(r, size);
504 	r->producer = r->consumer_head = r->consumer_tail = 0;
505 	spin_lock_init(&r->producer_lock);
506 	spin_lock_init(&r->consumer_lock);
507 
508 	return 0;
509 }
510 #define ptr_ring_init(...)	alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__))
511 
512 /*
513  * Return entries into ring. Destroy entries that don't fit.
514  *
515  * Note: this is expected to be a rare slow path operation.
516  *
517  * Note: producer lock is nested within consumer lock, so if you
518  * resize you must make sure all uses nest correctly.
519  * In particular if you consume ring in interrupt or BH context, you must
520  * disable interrupts/BH when doing so.
521  */
ptr_ring_unconsume(struct ptr_ring * r,void ** batch,int n,void (* destroy)(void *))522 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
523 				      void (*destroy)(void *))
524 {
525 	unsigned long flags;
526 
527 	spin_lock_irqsave(&r->consumer_lock, flags);
528 	spin_lock(&r->producer_lock);
529 
530 	if (!r->size)
531 		goto done;
532 
533 	/*
534 	 * Clean out buffered entries (for simplicity). This way following code
535 	 * can test entries for NULL and if not assume they are valid.
536 	 */
537 	__ptr_ring_zero_tail(r, r->consumer_head);
538 
539 	/*
540 	 * Go over entries in batch, start moving head back and copy entries.
541 	 * Stop when we run into previously unconsumed entries.
542 	 */
543 	while (n) {
544 		int head = r->consumer_head - 1;
545 		if (head < 0)
546 			head = r->size - 1;
547 		if (r->queue[head]) {
548 			/* This batch entry will have to be destroyed. */
549 			goto done;
550 		}
551 		r->queue[head] = batch[--n];
552 		r->consumer_tail = head;
553 		/* matching READ_ONCE in __ptr_ring_empty for lockless tests */
554 		WRITE_ONCE(r->consumer_head, head);
555 	}
556 
557 done:
558 	/* Destroy all entries left in the batch. */
559 	while (n)
560 		destroy(batch[--n]);
561 	spin_unlock(&r->producer_lock);
562 	spin_unlock_irqrestore(&r->consumer_lock, flags);
563 }
564 
__ptr_ring_swap_queue(struct ptr_ring * r,void ** queue,int size,gfp_t gfp,void (* destroy)(void *))565 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
566 					   int size, gfp_t gfp,
567 					   void (*destroy)(void *))
568 {
569 	int producer = 0;
570 	void **old;
571 	void *ptr;
572 
573 	while ((ptr = __ptr_ring_consume(r)))
574 		if (producer < size)
575 			queue[producer++] = ptr;
576 		else if (destroy)
577 			destroy(ptr);
578 
579 	if (producer >= size)
580 		producer = 0;
581 	__ptr_ring_set_size(r, size);
582 	r->producer = producer;
583 	r->consumer_head = 0;
584 	r->consumer_tail = 0;
585 	old = r->queue;
586 	r->queue = queue;
587 
588 	return old;
589 }
590 
591 /*
592  * Note: producer lock is nested within consumer lock, so if you
593  * resize you must make sure all uses nest correctly.
594  * In particular if you consume ring in interrupt or BH context, you must
595  * disable interrupts/BH when doing so.
596  */
ptr_ring_resize_noprof(struct ptr_ring * r,int size,gfp_t gfp,void (* destroy)(void *))597 static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp,
598 				  void (*destroy)(void *))
599 {
600 	unsigned long flags;
601 	void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
602 	void **old;
603 
604 	if (!queue)
605 		return -ENOMEM;
606 
607 	spin_lock_irqsave(&(r)->consumer_lock, flags);
608 	spin_lock(&(r)->producer_lock);
609 
610 	old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
611 
612 	spin_unlock(&(r)->producer_lock);
613 	spin_unlock_irqrestore(&(r)->consumer_lock, flags);
614 
615 	kvfree(old);
616 
617 	return 0;
618 }
619 #define ptr_ring_resize(...)	alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__))
620 
621 /*
622  * Note: producer lock is nested within consumer lock, so if you
623  * resize you must make sure all uses nest correctly.
624  * In particular if you consume ring in BH context, you must
625  * disable BH when doing so.
626  */
ptr_ring_resize_multiple_bh_noprof(struct ptr_ring ** rings,unsigned int nrings,int size,gfp_t gfp,void (* destroy)(void *))627 static inline int ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings,
628 						     unsigned int nrings,
629 						     int size, gfp_t gfp,
630 						     void (*destroy)(void *))
631 {
632 	void ***queues;
633 	int i;
634 
635 	queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp);
636 	if (!queues)
637 		goto noqueues;
638 
639 	for (i = 0; i < nrings; ++i) {
640 		queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp);
641 		if (!queues[i])
642 			goto nomem;
643 	}
644 
645 	for (i = 0; i < nrings; ++i) {
646 		spin_lock_bh(&(rings[i])->consumer_lock);
647 		spin_lock(&(rings[i])->producer_lock);
648 		queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
649 						  size, gfp, destroy);
650 		spin_unlock(&(rings[i])->producer_lock);
651 		spin_unlock_bh(&(rings[i])->consumer_lock);
652 	}
653 
654 	for (i = 0; i < nrings; ++i)
655 		kvfree(queues[i]);
656 
657 	kfree(queues);
658 
659 	return 0;
660 
661 nomem:
662 	while (--i >= 0)
663 		kvfree(queues[i]);
664 
665 	kfree(queues);
666 
667 noqueues:
668 	return -ENOMEM;
669 }
670 #define ptr_ring_resize_multiple_bh(...) \
671 		alloc_hooks(ptr_ring_resize_multiple_bh_noprof(__VA_ARGS__))
672 
ptr_ring_cleanup(struct ptr_ring * r,void (* destroy)(void *))673 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
674 {
675 	void *ptr;
676 
677 	if (destroy)
678 		while ((ptr = ptr_ring_consume(r)))
679 			destroy(ptr);
680 	kvfree(r->queue);
681 }
682 
683 #endif /* _LINUX_PTR_RING_H  */
684