1 /* SPDX-License-Identifier: GPL-2.0-or-later */ 2 /* 3 * Definitions for the 'struct ptr_ring' datastructure. 4 * 5 * Author: 6 * Michael S. Tsirkin <mst@redhat.com> 7 * 8 * Copyright (C) 2016 Red Hat, Inc. 9 * 10 * This is a limited-size FIFO maintaining pointers in FIFO order, with 11 * one CPU producing entries and another consuming entries from a FIFO. 12 * 13 * This implementation tries to minimize cache-contention when there is a 14 * single producer and a single consumer CPU. 15 */ 16 17 #ifndef _LINUX_PTR_RING_H 18 #define _LINUX_PTR_RING_H 1 19 20 #ifdef __KERNEL__ 21 #include <linux/spinlock.h> 22 #include <linux/cache.h> 23 #include <linux/types.h> 24 #include <linux/compiler.h> 25 #include <linux/slab.h> 26 #include <linux/mm.h> 27 #include <asm/errno.h> 28 #endif 29 30 struct ptr_ring { 31 int producer ____cacheline_aligned_in_smp; 32 spinlock_t producer_lock; 33 int consumer_head ____cacheline_aligned_in_smp; /* next valid entry */ 34 int consumer_tail; /* next entry to invalidate */ 35 spinlock_t consumer_lock; 36 /* Shared consumer/producer data */ 37 /* Read-only by both the producer and the consumer */ 38 int size ____cacheline_aligned_in_smp; /* max entries in queue */ 39 int batch; /* number of entries to consume in a batch */ 40 void **queue; 41 }; 42 43 /* Note: callers invoking this in a loop must use a compiler barrier, 44 * for example cpu_relax(). 45 * 46 * NB: this is unlike __ptr_ring_empty in that callers must hold producer_lock: 47 * see e.g. ptr_ring_full. 48 */ 49 static inline bool __ptr_ring_full(struct ptr_ring *r) 50 { 51 return data_race(r->queue[r->producer]); 52 } 53 54 static inline bool ptr_ring_full(struct ptr_ring *r) 55 { 56 bool ret; 57 58 spin_lock(&r->producer_lock); 59 ret = __ptr_ring_full(r); 60 spin_unlock(&r->producer_lock); 61 62 return ret; 63 } 64 65 static inline bool ptr_ring_full_irq(struct ptr_ring *r) 66 { 67 bool ret; 68 69 spin_lock_irq(&r->producer_lock); 70 ret = __ptr_ring_full(r); 71 spin_unlock_irq(&r->producer_lock); 72 73 return ret; 74 } 75 76 static inline bool ptr_ring_full_any(struct ptr_ring *r) 77 { 78 unsigned long flags; 79 bool ret; 80 81 spin_lock_irqsave(&r->producer_lock, flags); 82 ret = __ptr_ring_full(r); 83 spin_unlock_irqrestore(&r->producer_lock, flags); 84 85 return ret; 86 } 87 88 static inline bool ptr_ring_full_bh(struct ptr_ring *r) 89 { 90 bool ret; 91 92 spin_lock_bh(&r->producer_lock); 93 ret = __ptr_ring_full(r); 94 spin_unlock_bh(&r->producer_lock); 95 96 return ret; 97 } 98 99 /* Note: callers invoking this in a loop must use a compiler barrier, 100 * for example cpu_relax(). Callers must hold producer_lock. 101 * Callers are responsible for making sure pointer that is being queued 102 * points to a valid data. 103 */ 104 static inline int __ptr_ring_produce(struct ptr_ring *r, void *ptr) 105 { 106 if (unlikely(!r->size) || data_race(r->queue[r->producer])) 107 return -ENOSPC; 108 109 /* Make sure the pointer we are storing points to a valid data. */ 110 /* Pairs with the dependency ordering in __ptr_ring_consume. */ 111 smp_wmb(); 112 113 WRITE_ONCE(r->queue[r->producer++], ptr); 114 if (unlikely(r->producer >= r->size)) 115 r->producer = 0; 116 return 0; 117 } 118 119 /* 120 * Note: resize (below) nests producer lock within consumer lock, so if you 121 * consume in interrupt or BH context, you must disable interrupts/BH when 122 * calling this. 123 */ 124 static inline int ptr_ring_produce(struct ptr_ring *r, void *ptr) 125 { 126 int ret; 127 128 spin_lock(&r->producer_lock); 129 ret = __ptr_ring_produce(r, ptr); 130 spin_unlock(&r->producer_lock); 131 132 return ret; 133 } 134 135 static inline int ptr_ring_produce_irq(struct ptr_ring *r, void *ptr) 136 { 137 int ret; 138 139 spin_lock_irq(&r->producer_lock); 140 ret = __ptr_ring_produce(r, ptr); 141 spin_unlock_irq(&r->producer_lock); 142 143 return ret; 144 } 145 146 static inline int ptr_ring_produce_any(struct ptr_ring *r, void *ptr) 147 { 148 unsigned long flags; 149 int ret; 150 151 spin_lock_irqsave(&r->producer_lock, flags); 152 ret = __ptr_ring_produce(r, ptr); 153 spin_unlock_irqrestore(&r->producer_lock, flags); 154 155 return ret; 156 } 157 158 static inline int ptr_ring_produce_bh(struct ptr_ring *r, void *ptr) 159 { 160 int ret; 161 162 spin_lock_bh(&r->producer_lock); 163 ret = __ptr_ring_produce(r, ptr); 164 spin_unlock_bh(&r->producer_lock); 165 166 return ret; 167 } 168 169 static inline void *__ptr_ring_peek(struct ptr_ring *r) 170 { 171 if (likely(r->size)) 172 return READ_ONCE(r->queue[r->consumer_head]); 173 return NULL; 174 } 175 176 /* 177 * Test ring empty status without taking any locks. 178 * 179 * NB: This is only safe to call if ring is never resized. 180 * 181 * However, if some other CPU consumes ring entries at the same time, the value 182 * returned is not guaranteed to be correct. 183 * 184 * In this case - to avoid incorrectly detecting the ring 185 * as empty - the CPU consuming the ring entries is responsible 186 * for either consuming all ring entries until the ring is empty, 187 * or synchronizing with some other CPU and causing it to 188 * re-test __ptr_ring_empty and/or consume the ring enteries 189 * after the synchronization point. 190 * 191 * Note: callers invoking this in a loop must use a compiler barrier, 192 * for example cpu_relax(). 193 */ 194 static inline bool __ptr_ring_empty(struct ptr_ring *r) 195 { 196 if (likely(r->size)) 197 return !data_race(r->queue[READ_ONCE(r->consumer_head)]); 198 return true; 199 } 200 201 static inline bool ptr_ring_empty(struct ptr_ring *r) 202 { 203 bool ret; 204 205 spin_lock(&r->consumer_lock); 206 ret = __ptr_ring_empty(r); 207 spin_unlock(&r->consumer_lock); 208 209 return ret; 210 } 211 212 static inline bool ptr_ring_empty_irq(struct ptr_ring *r) 213 { 214 bool ret; 215 216 spin_lock_irq(&r->consumer_lock); 217 ret = __ptr_ring_empty(r); 218 spin_unlock_irq(&r->consumer_lock); 219 220 return ret; 221 } 222 223 static inline bool ptr_ring_empty_any(struct ptr_ring *r) 224 { 225 unsigned long flags; 226 bool ret; 227 228 spin_lock_irqsave(&r->consumer_lock, flags); 229 ret = __ptr_ring_empty(r); 230 spin_unlock_irqrestore(&r->consumer_lock, flags); 231 232 return ret; 233 } 234 235 static inline bool ptr_ring_empty_bh(struct ptr_ring *r) 236 { 237 bool ret; 238 239 spin_lock_bh(&r->consumer_lock); 240 ret = __ptr_ring_empty(r); 241 spin_unlock_bh(&r->consumer_lock); 242 243 return ret; 244 } 245 246 /* Zero entries from tail to specified head. 247 * NB: if consumer_head can be >= r->size need to fixup tail later. 248 */ 249 static inline void __ptr_ring_zero_tail(struct ptr_ring *r, int consumer_head) 250 { 251 int head = consumer_head; 252 253 /* Zero out entries in the reverse order: this way we touch the 254 * cache line that producer might currently be reading the last; 255 * producer won't make progress and touch other cache lines 256 * besides the first one until we write out all entries. 257 */ 258 while (likely(head > r->consumer_tail)) 259 data_race(r->queue[--head] = NULL); 260 261 r->consumer_tail = consumer_head; 262 } 263 264 /* Must only be called after __ptr_ring_peek returned !NULL */ 265 static inline void __ptr_ring_discard_one(struct ptr_ring *r) 266 { 267 /* Fundamentally, what we want to do is update consumer 268 * index and zero out the entry so producer can reuse it. 269 * Doing it naively at each consume would be as simple as: 270 * consumer = r->consumer; 271 * r->queue[consumer++] = NULL; 272 * if (unlikely(consumer >= r->size)) 273 * consumer = 0; 274 * r->consumer = consumer; 275 * but that is suboptimal when the ring is full as producer is writing 276 * out new entries in the same cache line. Defer these updates until a 277 * batch of entries has been consumed. 278 */ 279 /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty 280 * to work correctly. 281 */ 282 int consumer_head = r->consumer_head + 1; 283 284 /* Once we have processed enough entries invalidate them in 285 * the ring all at once so producer can reuse their space in the ring. 286 * We also do this when we reach end of the ring - not mandatory 287 * but helps keep the implementation simple. 288 */ 289 if (unlikely(consumer_head - r->consumer_tail >= r->batch || 290 consumer_head >= r->size)) 291 __ptr_ring_zero_tail(r, consumer_head); 292 293 if (unlikely(consumer_head >= r->size)) { 294 consumer_head = 0; 295 r->consumer_tail = 0; 296 } 297 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 298 WRITE_ONCE(r->consumer_head, consumer_head); 299 } 300 301 static inline void *__ptr_ring_consume(struct ptr_ring *r) 302 { 303 void *ptr; 304 305 /* The READ_ONCE in __ptr_ring_peek guarantees that anyone 306 * accessing data through the pointer is up to date. Pairs 307 * with smp_wmb in __ptr_ring_produce. 308 */ 309 ptr = __ptr_ring_peek(r); 310 if (ptr) 311 __ptr_ring_discard_one(r); 312 313 return ptr; 314 } 315 316 static inline int __ptr_ring_consume_batched(struct ptr_ring *r, 317 void **array, int n) 318 { 319 void *ptr; 320 int i; 321 322 for (i = 0; i < n; i++) { 323 ptr = __ptr_ring_consume(r); 324 if (!ptr) 325 break; 326 array[i] = ptr; 327 } 328 329 return i; 330 } 331 332 /* 333 * Note: resize (below) nests producer lock within consumer lock, so if you 334 * call this in interrupt or BH context, you must disable interrupts/BH when 335 * producing. 336 */ 337 static inline void *ptr_ring_consume(struct ptr_ring *r) 338 { 339 void *ptr; 340 341 spin_lock(&r->consumer_lock); 342 ptr = __ptr_ring_consume(r); 343 spin_unlock(&r->consumer_lock); 344 345 return ptr; 346 } 347 348 static inline void *ptr_ring_consume_irq(struct ptr_ring *r) 349 { 350 void *ptr; 351 352 spin_lock_irq(&r->consumer_lock); 353 ptr = __ptr_ring_consume(r); 354 spin_unlock_irq(&r->consumer_lock); 355 356 return ptr; 357 } 358 359 static inline void *ptr_ring_consume_any(struct ptr_ring *r) 360 { 361 unsigned long flags; 362 void *ptr; 363 364 spin_lock_irqsave(&r->consumer_lock, flags); 365 ptr = __ptr_ring_consume(r); 366 spin_unlock_irqrestore(&r->consumer_lock, flags); 367 368 return ptr; 369 } 370 371 static inline void *ptr_ring_consume_bh(struct ptr_ring *r) 372 { 373 void *ptr; 374 375 spin_lock_bh(&r->consumer_lock); 376 ptr = __ptr_ring_consume(r); 377 spin_unlock_bh(&r->consumer_lock); 378 379 return ptr; 380 } 381 382 static inline int ptr_ring_consume_batched(struct ptr_ring *r, 383 void **array, int n) 384 { 385 int ret; 386 387 spin_lock(&r->consumer_lock); 388 ret = __ptr_ring_consume_batched(r, array, n); 389 spin_unlock(&r->consumer_lock); 390 391 return ret; 392 } 393 394 static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, 395 void **array, int n) 396 { 397 int ret; 398 399 spin_lock_irq(&r->consumer_lock); 400 ret = __ptr_ring_consume_batched(r, array, n); 401 spin_unlock_irq(&r->consumer_lock); 402 403 return ret; 404 } 405 406 static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, 407 void **array, int n) 408 { 409 unsigned long flags; 410 int ret; 411 412 spin_lock_irqsave(&r->consumer_lock, flags); 413 ret = __ptr_ring_consume_batched(r, array, n); 414 spin_unlock_irqrestore(&r->consumer_lock, flags); 415 416 return ret; 417 } 418 419 static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, 420 void **array, int n) 421 { 422 int ret; 423 424 spin_lock_bh(&r->consumer_lock); 425 ret = __ptr_ring_consume_batched(r, array, n); 426 spin_unlock_bh(&r->consumer_lock); 427 428 return ret; 429 } 430 431 /* Cast to structure type and call a function without discarding from FIFO. 432 * Function must return a value. 433 * Callers must take consumer_lock. 434 */ 435 #define __PTR_RING_PEEK_CALL(r, f) ((f)(__ptr_ring_peek(r))) 436 437 #define PTR_RING_PEEK_CALL(r, f) ({ \ 438 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 439 \ 440 spin_lock(&(r)->consumer_lock); \ 441 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 442 spin_unlock(&(r)->consumer_lock); \ 443 __PTR_RING_PEEK_CALL_v; \ 444 }) 445 446 #define PTR_RING_PEEK_CALL_IRQ(r, f) ({ \ 447 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 448 \ 449 spin_lock_irq(&(r)->consumer_lock); \ 450 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 451 spin_unlock_irq(&(r)->consumer_lock); \ 452 __PTR_RING_PEEK_CALL_v; \ 453 }) 454 455 #define PTR_RING_PEEK_CALL_BH(r, f) ({ \ 456 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 457 \ 458 spin_lock_bh(&(r)->consumer_lock); \ 459 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 460 spin_unlock_bh(&(r)->consumer_lock); \ 461 __PTR_RING_PEEK_CALL_v; \ 462 }) 463 464 #define PTR_RING_PEEK_CALL_ANY(r, f) ({ \ 465 typeof((f)(NULL)) __PTR_RING_PEEK_CALL_v; \ 466 unsigned long __PTR_RING_PEEK_CALL_f;\ 467 \ 468 spin_lock_irqsave(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 469 __PTR_RING_PEEK_CALL_v = __PTR_RING_PEEK_CALL(r, f); \ 470 spin_unlock_irqrestore(&(r)->consumer_lock, __PTR_RING_PEEK_CALL_f); \ 471 __PTR_RING_PEEK_CALL_v; \ 472 }) 473 474 /* Not all gfp_t flags (besides GFP_KERNEL) are allowed. See 475 * documentation for vmalloc for which of them are legal. 476 */ 477 static inline void **__ptr_ring_init_queue_alloc_noprof(unsigned int size, gfp_t gfp) 478 { 479 if (size > KMALLOC_MAX_SIZE / sizeof(void *)) 480 return NULL; 481 return kvmalloc_array_noprof(size, sizeof(void *), gfp | __GFP_ZERO); 482 } 483 484 static inline void __ptr_ring_set_size(struct ptr_ring *r, int size) 485 { 486 r->size = size; 487 r->batch = SMP_CACHE_BYTES * 2 / sizeof(*(r->queue)); 488 /* We need to set batch at least to 1 to make logic 489 * in __ptr_ring_discard_one work correctly. 490 * Batching too much (because ring is small) would cause a lot of 491 * burstiness. Needs tuning, for now disable batching. 492 */ 493 if (r->batch > r->size / 2 || !r->batch) 494 r->batch = 1; 495 } 496 497 static inline int ptr_ring_init_noprof(struct ptr_ring *r, int size, gfp_t gfp) 498 { 499 r->queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 500 if (!r->queue) 501 return -ENOMEM; 502 503 __ptr_ring_set_size(r, size); 504 r->producer = r->consumer_head = r->consumer_tail = 0; 505 spin_lock_init(&r->producer_lock); 506 spin_lock_init(&r->consumer_lock); 507 508 return 0; 509 } 510 #define ptr_ring_init(...) alloc_hooks(ptr_ring_init_noprof(__VA_ARGS__)) 511 512 /* 513 * Return entries into ring. Destroy entries that don't fit. 514 * 515 * Note: this is expected to be a rare slow path operation. 516 * 517 * Note: producer lock is nested within consumer lock, so if you 518 * resize you must make sure all uses nest correctly. 519 * In particular if you consume ring in interrupt or BH context, you must 520 * disable interrupts/BH when doing so. 521 */ 522 static inline void ptr_ring_unconsume(struct ptr_ring *r, void **batch, int n, 523 void (*destroy)(void *)) 524 { 525 unsigned long flags; 526 527 spin_lock_irqsave(&r->consumer_lock, flags); 528 spin_lock(&r->producer_lock); 529 530 if (!r->size) 531 goto done; 532 533 /* 534 * Clean out buffered entries (for simplicity). This way following code 535 * can test entries for NULL and if not assume they are valid. 536 */ 537 __ptr_ring_zero_tail(r, r->consumer_head); 538 539 /* 540 * Go over entries in batch, start moving head back and copy entries. 541 * Stop when we run into previously unconsumed entries. 542 */ 543 while (n) { 544 int head = r->consumer_head - 1; 545 if (head < 0) 546 head = r->size - 1; 547 if (r->queue[head]) { 548 /* This batch entry will have to be destroyed. */ 549 goto done; 550 } 551 r->queue[head] = batch[--n]; 552 r->consumer_tail = head; 553 /* matching READ_ONCE in __ptr_ring_empty for lockless tests */ 554 WRITE_ONCE(r->consumer_head, head); 555 } 556 557 done: 558 /* Destroy all entries left in the batch. */ 559 while (n) 560 destroy(batch[--n]); 561 spin_unlock(&r->producer_lock); 562 spin_unlock_irqrestore(&r->consumer_lock, flags); 563 } 564 565 static inline void **__ptr_ring_swap_queue(struct ptr_ring *r, void **queue, 566 int size, gfp_t gfp, 567 void (*destroy)(void *)) 568 { 569 int producer = 0; 570 void **old; 571 void *ptr; 572 573 while ((ptr = __ptr_ring_consume(r))) 574 if (producer < size) 575 queue[producer++] = ptr; 576 else if (destroy) 577 destroy(ptr); 578 579 if (producer >= size) 580 producer = 0; 581 __ptr_ring_set_size(r, size); 582 r->producer = producer; 583 r->consumer_head = 0; 584 r->consumer_tail = 0; 585 old = r->queue; 586 r->queue = queue; 587 588 return old; 589 } 590 591 /* 592 * Note: producer lock is nested within consumer lock, so if you 593 * resize you must make sure all uses nest correctly. 594 * In particular if you consume ring in interrupt or BH context, you must 595 * disable interrupts/BH when doing so. 596 */ 597 static inline int ptr_ring_resize_noprof(struct ptr_ring *r, int size, gfp_t gfp, 598 void (*destroy)(void *)) 599 { 600 unsigned long flags; 601 void **queue = __ptr_ring_init_queue_alloc_noprof(size, gfp); 602 void **old; 603 604 if (!queue) 605 return -ENOMEM; 606 607 spin_lock_irqsave(&(r)->consumer_lock, flags); 608 spin_lock(&(r)->producer_lock); 609 610 old = __ptr_ring_swap_queue(r, queue, size, gfp, destroy); 611 612 spin_unlock(&(r)->producer_lock); 613 spin_unlock_irqrestore(&(r)->consumer_lock, flags); 614 615 kvfree(old); 616 617 return 0; 618 } 619 #define ptr_ring_resize(...) alloc_hooks(ptr_ring_resize_noprof(__VA_ARGS__)) 620 621 /* 622 * Note: producer lock is nested within consumer lock, so if you 623 * resize you must make sure all uses nest correctly. 624 * In particular if you consume ring in BH context, you must 625 * disable BH when doing so. 626 */ 627 static inline int ptr_ring_resize_multiple_bh_noprof(struct ptr_ring **rings, 628 unsigned int nrings, 629 int size, gfp_t gfp, 630 void (*destroy)(void *)) 631 { 632 void ***queues; 633 int i; 634 635 queues = kmalloc_array_noprof(nrings, sizeof(*queues), gfp); 636 if (!queues) 637 goto noqueues; 638 639 for (i = 0; i < nrings; ++i) { 640 queues[i] = __ptr_ring_init_queue_alloc_noprof(size, gfp); 641 if (!queues[i]) 642 goto nomem; 643 } 644 645 for (i = 0; i < nrings; ++i) { 646 spin_lock_bh(&(rings[i])->consumer_lock); 647 spin_lock(&(rings[i])->producer_lock); 648 queues[i] = __ptr_ring_swap_queue(rings[i], queues[i], 649 size, gfp, destroy); 650 spin_unlock(&(rings[i])->producer_lock); 651 spin_unlock_bh(&(rings[i])->consumer_lock); 652 } 653 654 for (i = 0; i < nrings; ++i) 655 kvfree(queues[i]); 656 657 kfree(queues); 658 659 return 0; 660 661 nomem: 662 while (--i >= 0) 663 kvfree(queues[i]); 664 665 kfree(queues); 666 667 noqueues: 668 return -ENOMEM; 669 } 670 #define ptr_ring_resize_multiple_bh(...) \ 671 alloc_hooks(ptr_ring_resize_multiple_bh_noprof(__VA_ARGS__)) 672 673 static inline void ptr_ring_cleanup(struct ptr_ring *r, void (*destroy)(void *)) 674 { 675 void *ptr; 676 677 if (destroy) 678 while ((ptr = ptr_ring_consume(r))) 679 destroy(ptr); 680 kvfree(r->queue); 681 } 682 683 #endif /* _LINUX_PTR_RING_H */ 684