1 /* SPDX-License-Identifier: GPL-2.0-or-later */
2 /*
3 * Definitions for the 'struct ptr_ring' datastructure.
4 *
5 * Author:
6 * Michael S. Tsirkin <mst@redhat.com>
7 *
8 * Copyright (C) 2016 Red Hat, Inc.
9 *
10 * This is a limited-size FIFO maintaining pointers in FIFO order, with
11 * one CPU producing entries and another consuming entries from a FIFO.
12 *
13 * This implementation tries to minimize cache-contention when there is a
14 * single producer and a single consumer CPU.
15 */
16
17 #ifndef _LINUX_PTR_RING_H
18 #define _LINUX_PTR_RING_H 1
19
20 #ifdef __KERNEL__
21 #include <linux/spinlock.h>
22 #include <linux/cache.h>
23 #include <linux/types.h>
24 #include <linux/compiler.h>
25 #include <linux/slab.h>
26 #include <linux/mm.h>
27 #include <asm/errno.h>
28 #endif
29
30 struct ptr_ring {
31 int producer ____cacheline_aligned_in_smp;
32 spinlock_t producer_lock;
33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */
34 int consumer_tail; /* next entry to invalidate */
35 spinlock_t consumer_lock;
36 /* Shared consumer/producer data */
37 /* Read-only by both the producer and the consumer */
38 int size ____cacheline_aligned_in_smp; /* max entries in queue */
39 int batch; /* number of entries to consume in a batch */
40 void **queue;
41 };
42
43 /* Note: callers invoking this in a loop must use a compiler barrier,
44 * for example cpu_relax().
45 *
46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock:
47 * see e.g. ptr_ring_full.
48 */
__ptr_ring_full(struct ptr_ring * r)49 static inline bool __ptr_ring_full(struct ptr_ring *r)
50 {
51 return r->queue[r->producer];
52 }
53
ptr_ring_full(struct ptr_ring * r)54 static inline bool ptr_ring_full(struct ptr_ring *r)
55 {
56 bool ret;
57
58 spin_lock(&r->producer_lock);
59 ret = __ptr_ring_full(r);
60 spin_unlock(&r->producer_lock);
61
62 return ret;
63 }
64
ptr_ring_full_irq(struct ptr_ring * r)65 static inline bool ptr_ring_full_irq(struct ptr_ring *r)
66 {
67 bool ret;
68
69 spin_lock_irq(&r->producer_lock);
70 ret = __ptr_ring_full(r);
71 spin_unlock_irq(&r->producer_lock);
72
73 return ret;
74 }
75
ptr_ring_full_any(struct ptr_ring * r)76 static inline bool ptr_ring_full_any(struct ptr_ring *r)
77 {
78 unsigned long flags;
79 bool ret;
80
81 spin_lock_irqsave(&r->producer_lock, flags);
82 ret = __ptr_ring_full(r);
83 spin_unlock_irqrestore(&r->producer_lock, flags);
84
85 return ret;
86 }
87
ptr_ring_full_bh(struct ptr_ring * r)88 static inline bool ptr_ring_full_bh(struct ptr_ring *r)
89 {
90 bool ret;
91
92 spin_lock_bh(&r->producer_lock);
93 ret = __ptr_ring_full(r);
94 spin_unlock_bh(&r->producer_lock);
95
96 return ret;
97 }
98
99 /* Note: callers invoking this in a loop must use a compiler barrier,
100 * for example cpu_relax(). Callers must hold producer_lock.
101 * Callers are responsible for making sure pointer that is being queued
102 * points to a valid data.
103 */
__ptr_ring_produce(struct ptr_ring * r,void * ptr)104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr)
105 {
106 if (unlikely(!r->size) || r->queue[r->producer])
107 return -ENOSPC;
108
109 /* Make sure the pointer we are storing points to a valid data. */
110 /* Pairs with the dependency ordering in __ptr_ring_consume. */
111 smp_wmb();
112
113 WRITE_ONCE(r->queue[r->producer++], ptr);
114 if (unlikely(r->producer >= r->size))
115 r->producer = 0;
116 return 0;
117 }
118
119 /*
120 * Note: resize (below) nests producer lock within consumer lock, so if you
121 * consume in interrupt or BH context, you must disable interrupts/BH when
122 * calling this.
123 */
ptr_ring_produce(struct ptr_ring * r,void * ptr)124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr)
125 {
126 int ret;
127
128 spin_lock(&r->producer_lock);
129 ret = __ptr_ring_produce(r, ptr);
130 spin_unlock(&r->producer_lock);
131
132 return ret;
133 }
134
ptr_ring_produce_irq(struct ptr_ring * r,void * ptr)135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr)
136 {
137 int ret;
138
139 spin_lock_irq(&r->producer_lock);
140 ret = __ptr_ring_produce(r, ptr);
141 spin_unlock_irq(&r->producer_lock);
142
143 return ret;
144 }
145
ptr_ring_produce_any(struct ptr_ring * r,void * ptr)146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr)
147 {
148 unsigned long flags;
149 int ret;
150
151 spin_lock_irqsave(&r->producer_lock, flags);
152 ret = __ptr_ring_produce(r, ptr);
153 spin_unlock_irqrestore(&r->producer_lock, flags);
154
155 return ret;
156 }
157
ptr_ring_produce_bh(struct ptr_ring * r,void * ptr)158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr)
159 {
160 int ret;
161
162 spin_lock_bh(&r->producer_lock);
163 ret = __ptr_ring_produce(r, ptr);
164 spin_unlock_bh(&r->producer_lock);
165
166 return ret;
167 }
168
__ptr_ring_peek(struct ptr_ring * r)169 static inline void *__ptr_ring_peek(struct ptr_ring *r)
170 {
171 if (likely(r->size))
172 return READ_ONCE(r->queue[r->consumer_head]);
173 return NULL;
174 }
175
176 /*
177 * Test ring empty status without taking any locks.
178 *
179 * NB: This is only safe to call if ring is never resized.
180 *
181 * However, if some other CPU consumes ring entries at the same time, the value
182 * returned is not guaranteed to be correct.
183 *
184 * In this case - to avoid incorrectly detecting the ring
185 * as empty - the CPU consuming the ring entries is responsible
186 * for either consuming all ring entries until the ring is empty,
187 * or synchronizing with some other CPU and causing it to
188 * re-test __ptr_ring_empty and/or consume the ring enteries
189 * after the synchronization point.
190 *
191 * Note: callers invoking this in a loop must use a compiler barrier,
192 * for example cpu_relax().
193 */
__ptr_ring_empty(struct ptr_ring * r)194 static inline bool __ptr_ring_empty(struct ptr_ring *r)
195 {
196 if (likely(r->size))
197 return !r->queue[READ_ONCE(r->consumer_head)];
198 return true;
199 }
200
ptr_ring_empty(struct ptr_ring * r)201 static inline bool ptr_ring_empty(struct ptr_ring *r)
202 {
203 bool ret;
204
205 spin_lock(&r->consumer_lock);
206 ret = __ptr_ring_empty(r);
207 spin_unlock(&r->consumer_lock);
208
209 return ret;
210 }
211
ptr_ring_empty_irq(struct ptr_ring * r)212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r)
213 {
214 bool ret;
215
216 spin_lock_irq(&r->consumer_lock);
217 ret = __ptr_ring_empty(r);
218 spin_unlock_irq(&r->consumer_lock);
219
220 return ret;
221 }
222
ptr_ring_empty_any(struct ptr_ring * r)223 static inline bool ptr_ring_empty_any(struct ptr_ring *r)
224 {
225 unsigned long flags;
226 bool ret;
227
228 spin_lock_irqsave(&r->consumer_lock, flags);
229 ret = __ptr_ring_empty(r);
230 spin_unlock_irqrestore(&r->consumer_lock, flags);
231
232 return ret;
233 }
234
ptr_ring_empty_bh(struct ptr_ring * r)235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r)
236 {
237 bool ret;
238
239 spin_lock_bh(&r->consumer_lock);
240 ret = __ptr_ring_empty(r);
241 spin_unlock_bh(&r->consumer_lock);
242
243 return ret;
244 }
245
246 /* Zero entries from tail to specified head.
247 * NB: if consumer_head can be >= r->size need to fixup tail later.
248 */
__ptr_ring_zero_tail(struct ptr_ring * r,int consumer_head)249 static inline void __ptr_ring_zero_tail(struct ptr_ring *r, int consumer_head)
250 {
251 int head = consumer_head;
252
253 /* Zero out entries in the reverse order: this way we touch the
254 * cache line that producer might currently be reading the last;
255 * producer won't make progress and touch other cache lines
256 * besides the first one until we write out all entries.
257 */
258 while (likely(head > r->consumer_tail))
259 r->queue[--head] = NULL;
260
261 r->consumer_tail = consumer_head;
262 }
263
264 /* Must only be called after __ptr_ring_peek returned !NULL */
__ptr_ring_discard_one(struct ptr_ring * r)265 static inline void __ptr_ring_discard_one(struct ptr_ring *r)
266 {
267 /* Fundamentally, what we want to do is update consumer
268 * index and zero out the entry so producer can reuse it.
269 * Doing it naively at each consume would be as simple as:
270 * consumer = r->consumer;
271 * r->queue[consumer++] = NULL;
272 * if (unlikely(consumer >= r->size))
273 * consumer = 0;
274 * r->consumer = consumer;
275 * but that is suboptimal when the ring is full as producer is writing
276 * out new entries in the same cache line. Defer these updates until a
277 * batch of entries has been consumed.
278 */
279 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
280 * to work correctly.
281 */
282 int consumer_head = r->consumer_head + 1;
283
284 /* Once we have processed enough entries invalidate them in
285 * the ring all at once so producer can reuse their space in the ring.
286 * We also do this when we reach end of the ring - not mandatory
287 * but helps keep the implementation simple.
288 */
289 if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
290 consumer_head >= r->size))
291 __ptr_ring_zero_tail(r, consumer_head);
292
293 if (unlikely(consumer_head >= r->size)) {
294 consumer_head = 0;
295 r->consumer_tail = 0;
296 }
297 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
298 WRITE_ONCE(r->consumer_head, consumer_head);
299 }
300
__ptr_ring_consume(struct ptr_ring * r)301 static inline void *__ptr_ring_consume(struct ptr_ring *r)
302 {
303 void *ptr;
304
305 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone
306 * accessing data through the pointer is up to date. Pairs
307 * with smp_wmb in __ptr_ring_produce.
308 */
309 ptr = __ptr_ring_peek(r);
310 if (ptr)
311 __ptr_ring_discard_one(r);
312
313 return ptr;
314 }
315
__ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)316 static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
317 void **array, int n)
318 {
319 void *ptr;
320 int i;
321
322 for (i = 0; i < n; i++) {
323 ptr = __ptr_ring_consume(r);
324 if (!ptr)
325 break;
326 array[i] = ptr;
327 }
328
329 return i;
330 }
331
332 /*
333 * Note: resize (below) nests producer lock within consumer lock, so if you
334 * call this in interrupt or BH context, you must disable interrupts/BH when
335 * producing.
336 */
ptr_ring_consume(struct ptr_ring * r)337 static inline void *ptr_ring_consume(struct ptr_ring *r)
338 {
339 void *ptr;
340
341 spin_lock(&r->consumer_lock);
342 ptr = __ptr_ring_consume(r);
343 spin_unlock(&r->consumer_lock);
344
345 return ptr;
346 }
347
ptr_ring_consume_irq(struct ptr_ring * r)348 static inline void *ptr_ring_consume_irq(struct ptr_ring *r)
349 {
350 void *ptr;
351
352 spin_lock_irq(&r->consumer_lock);
353 ptr = __ptr_ring_consume(r);
354 spin_unlock_irq(&r->consumer_lock);
355
356 return ptr;
357 }
358
ptr_ring_consume_any(struct ptr_ring * r)359 static inline void *ptr_ring_consume_any(struct ptr_ring *r)
360 {
361 unsigned long flags;
362 void *ptr;
363
364 spin_lock_irqsave(&r->consumer_lock, flags);
365 ptr = __ptr_ring_consume(r);
366 spin_unlock_irqrestore(&r->consumer_lock, flags);
367
368 return ptr;
369 }
370
ptr_ring_consume_bh(struct ptr_ring * r)371 static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
372 {
373 void *ptr;
374
375 spin_lock_bh(&r->consumer_lock);
376 ptr = __ptr_ring_consume(r);
377 spin_unlock_bh(&r->consumer_lock);
378
379 return ptr;
380 }
381
ptr_ring_consume_batched(struct ptr_ring * r,void ** array,int n)382 static inline int ptr_ring_consume_batched(struct ptr_ring *r,
383 void **array, int n)
384 {
385 int ret;
386
387 spin_lock(&r->consumer_lock);
388 ret = __ptr_ring_consume_batched(r, array, n);
389 spin_unlock(&r->consumer_lock);
390
391 return ret;
392 }
393
ptr_ring_consume_batched_irq(struct ptr_ring * r,void ** array,int n)394 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
395 void **array, int n)
396 {
397 int ret;
398
399 spin_lock_irq(&r->consumer_lock);
400 ret = __ptr_ring_consume_batched(r, array, n);
401 spin_unlock_irq(&r->consumer_lock);
402
403 return ret;
404 }
405
ptr_ring_consume_batched_any(struct ptr_ring * r,void ** array,int n)406 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
407 void **array, int n)
408 {
409 unsigned long flags;
410 int ret;
411
412 spin_lock_irqsave(&r->consumer_lock, flags);
413 ret = __ptr_ring_consume_batched(r, array, n);
414 spin_unlock_irqrestore(&r->consumer_lock, flags);
415
416 return ret;
417 }
418
ptr_ring_consume_batched_bh(struct ptr_ring * r,void ** array,int n)419 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
420 void **array, int n)
421 {
422 int ret;
423
424 spin_lock_bh(&r->consumer_lock);
425 ret = __ptr_ring_consume_batched(r, array, n);
426 spin_unlock_bh(&r->consumer_lock);
427
428 return ret;
429 }
430
431 /* Cast to structure type and call a function without discarding from FIFO.
432 * Function must return a value.
433 * Callers must take consumer_lock.
434 */
435 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r)))
436
437 #define PTR_RING_PEEK_CALL(r, f) ({ \
438 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
439 \
440 spin_lock(&(r)->consumer_lock); \
441 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
442 spin_unlock(&(r)->consumer_lock); \
443 __PTR_RING_PEEK_CALL_v; \
444 })
445
446 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \
447 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
448 \
449 spin_lock_irq(&(r)->consumer_lock); \
450 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
451 spin_unlock_irq(&(r)->consumer_lock); \
452 __PTR_RING_PEEK_CALL_v; \
453 })
454
455 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \
456 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
457 \
458 spin_lock_bh(&(r)->consumer_lock); \
459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
460 spin_unlock_bh(&(r)->consumer_lock); \
461 __PTR_RING_PEEK_CALL_v; \
462 })
463
464 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \
465 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \
466 unsigned long __PTR_RING_PEEK_CALL_f;\
467 \
468 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
469 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \
470 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \
471 __PTR_RING_PEEK_CALL_v; \
472 })
473
474 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See
475 * documentation for vmalloc for which of them are legal.
476 */
__ptr_ring_init_queue_alloc_noprof(unsigned int size,gfp_t gfp)477 static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp)
478 {
479 if (size > KMALLOC_MAX_SIZE / sizeof(void *))
480 return NULL;
481 return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO);
482 }
483
__ptr_ring_set_size(struct ptr_ring * r,int size)484 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size)
485 {
486 r->size = size;
487 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue));
488 /* We need to set batch at least to 1 to make logic
489 * in __ptr_ring_discard_one work correctly.
490 * Batching too much (because ring is small) would cause a lot of
491 * burstiness. Needs tuning, for now disable batching.
492 */
493 if (r->batch > r->size / 2 || !r->batch)
494 r->batch = 1;
495 }
496
ptr_ring_init_noprof(struct ptr_ring * r,int size,gfp_t gfp)497 static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp)
498 {
499 r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
500 if (!r->queue)
501 return -ENOMEM;
502
503 __ptr_ring_set_size(r, size);
504 r->producer = r->consumer_head = r->consumer_tail = 0;
505 spin_lock_init(&r->producer_lock);
506 spin_lock_init(&r->consumer_lock);
507
508 return 0;
509 }
510 #define ptr_ring_init(...) alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__))
511
512 /*
513 * Return entries into ring. Destroy entries that don't fit.
514 *
515 * Note: this is expected to be a rare slow path operation.
516 *
517 * Note: producer lock is nested within consumer lock, so if you
518 * resize you must make sure all uses nest correctly.
519 * In particular if you consume ring in interrupt or BH context, you must
520 * disable interrupts/BH when doing so.
521 */
ptr_ring_unconsume(struct ptr_ring * r,void ** batch,int n,void (* destroy)(void *))522 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n,
523 void (*destroy)(void *))
524 {
525 unsigned long flags;
526
527 spin_lock_irqsave(&r->consumer_lock, flags);
528 spin_lock(&r->producer_lock);
529
530 if (!r->size)
531 goto done;
532
533 /*
534 * Clean out buffered entries (for simplicity). This way following code
535 * can test entries for NULL and if not assume they are valid.
536 */
537 __ptr_ring_zero_tail(r, r->consumer_head);
538
539 /*
540 * Go over entries in batch, start moving head back and copy entries.
541 * Stop when we run into previously unconsumed entries.
542 */
543 while (n) {
544 int head = r->consumer_head - 1;
545 if (head < 0)
546 head = r->size - 1;
547 if (r->queue[head]) {
548 /* This batch entry will have to be destroyed. */
549 goto done;
550 }
551 r->queue[head] = batch[--n];
552 r->consumer_tail = head;
553 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
554 WRITE_ONCE(r->consumer_head, head);
555 }
556
557 done:
558 /* Destroy all entries left in the batch. */
559 while (n)
560 destroy(batch[--n]);
561 spin_unlock(&r->producer_lock);
562 spin_unlock_irqrestore(&r->consumer_lock, flags);
563 }
564
__ptr_ring_swap_queue(struct ptr_ring * r,void ** queue,int size,gfp_t gfp,void (* destroy)(void *))565 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue,
566 int size, gfp_t gfp,
567 void (*destroy)(void *))
568 {
569 int producer = 0;
570 void **old;
571 void *ptr;
572
573 while ((ptr = __ptr_ring_consume(r)))
574 if (producer < size)
575 queue[producer++] = ptr;
576 else if (destroy)
577 destroy(ptr);
578
579 if (producer >= size)
580 producer = 0;
581 __ptr_ring_set_size(r, size);
582 r->producer = producer;
583 r->consumer_head = 0;
584 r->consumer_tail = 0;
585 old = r->queue;
586 r->queue = queue;
587
588 return old;
589 }
590
591 /*
592 * Note: producer lock is nested within consumer lock, so if you
593 * resize you must make sure all uses nest correctly.
594 * In particular if you consume ring in interrupt or BH context, you must
595 * disable interrupts/BH when doing so.
596 */
ptr_ring_resize_noprof(struct ptr_ring * r,int size,gfp_t gfp,void (* destroy)(void *))597 static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp,
598 void (*destroy)(void *))
599 {
600 unsigned long flags;
601 void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp);
602 void **old;
603
604 if (!queue)
605 return -ENOMEM;
606
607 spin_lock_irqsave(&(r)->consumer_lock, flags);
608 spin_lock(&(r)->producer_lock);
609
610 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy);
611
612 spin_unlock(&(r)->producer_lock);
613 spin_unlock_irqrestore(&(r)->consumer_lock, flags);
614
615 kvfree(old);
616
617 return 0;
618 }
619 #define ptr_ring_resize(...) alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__))
620
621 /*
622 * Note: producer lock is nested within consumer lock, so if you
623 * resize you must make sure all uses nest correctly.
624 * In particular if you consume ring in BH context, you must
625 * disable BH when doing so.
626 */
ptr_ring_resize_multiple_bh_noprof(struct ptr_ring ** rings,unsigned int nrings,int size,gfp_t gfp,void (* destroy)(void *))627 static inline int ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings,
628 unsigned int nrings,
629 int size, gfp_t gfp,
630 void (*destroy)(void *))
631 {
632 void ***queues;
633 int i;
634
635 queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp);
636 if (!queues)
637 goto noqueues;
638
639 for (i = 0; i < nrings; ++i) {
640 queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp);
641 if (!queues[i])
642 goto nomem;
643 }
644
645 for (i = 0; i < nrings; ++i) {
646 spin_lock_bh(&(rings[i])->consumer_lock);
647 spin_lock(&(rings[i])->producer_lock);
648 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i],
649 size, gfp, destroy);
650 spin_unlock(&(rings[i])->producer_lock);
651 spin_unlock_bh(&(rings[i])->consumer_lock);
652 }
653
654 for (i = 0; i < nrings; ++i)
655 kvfree(queues[i]);
656
657 kfree(queues);
658
659 return 0;
660
661 nomem:
662 while (--i >= 0)
663 kvfree(queues[i]);
664
665 kfree(queues);
666
667 noqueues:
668 return -ENOMEM;
669 }
670 #define ptr_ring_resize_multiple_bh(...) \
671 alloc_hooks(ptr_ring_resize_multiple_bh_noprof(__VA_ARGS__))
672
ptr_ring_cleanup(struct ptr_ring * r,void (* destroy)(void *))673 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *))
674 {
675 void *ptr;
676
677 if (destroy)
678 while ((ptr = ptr_ring_consume(r)))
679 destroy(ptr);
680 kvfree(r->queue);
681 }
682
683 #endif /* _LINUX_PTR_RING_H */
684