1 #ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H 2 #define JEMALLOC_INTERNAL_MPSC_QUEUE_H 3 4 #include "jemalloc/internal/atomic.h" 5 6 /* 7 * A concurrent implementation of a multi-producer, single-consumer queue. It 8 * supports three concurrent operations: 9 * - Push 10 * - Push batch 11 * - Pop batch 12 * 13 * These operations are all lock-free. 14 * 15 * The implementation is the simple two-stack queue built on a Treiber stack. 16 * It's not terribly efficient, but this isn't expected to go into anywhere with 17 * hot code. In fact, we don't really even need queue semantics in any 18 * anticipated use cases; we could get away with just the stack. But this way 19 * lets us frame the API in terms of the existing list types, which is a nice 20 * convenience. We can save on cache misses by introducing our own (parallel) 21 * single-linked list type here, and dropping FIFO semantics, if we need this to 22 * get faster. Since we're currently providing queue semantics though, we use 23 * the prev field in the link rather than the next field for Treiber-stack 24 * linkage, so that we can preserve order for bash-pushed lists (recall that the 25 * two-stack tricks reverses orders in the lock-free first stack). 26 */ 27 28 #define mpsc_queue(a_type) \ 29 struct { \ 30 atomic_p_t tail; \ 31 } 32 33 #define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type, \ 34 a_list_type) \ 35 /* Initialize a queue. */ \ 36 a_attr void \ 37 a_prefix##new(a_queue_type *queue); \ 38 /* Insert all items in src into the queue, clearing src. */ \ 39 a_attr void \ 40 a_prefix##push_batch(a_queue_type *queue, a_list_type *src); \ 41 /* Insert node into the queue. */ \ 42 a_attr void \ 43 a_prefix##push(a_queue_type *queue, a_type *node); \ 44 /* \ 45 * Pop all items in the queue into the list at dst. dst should already \ 46 * be initialized (and may contain existing items, which then remain \ 47 * in dst). \ 48 */ \ 49 a_attr void \ 50 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst); 51 52 #define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type, \ 53 a_list_type, a_link) \ 54 a_attr void \ 55 a_prefix##new(a_queue_type *queue) { \ 56 atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED); \ 57 } \ 58 a_attr void \ 59 a_prefix##push_batch(a_queue_type *queue, a_list_type *src) { \ 60 /* \ 61 * Reuse the ql list next field as the Treiber stack next \ 62 * field. \ 63 */ \ 64 a_type *first = ql_first(src); \ 65 a_type *last = ql_last(src, a_link); \ 66 void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \ 67 do { \ 68 /* \ 69 * Note that this breaks the queue ring structure; \ 70 * it's not a ring any more! \ 71 */ \ 72 first->a_link.qre_prev = cur_tail; \ 73 /* \ 74 * Note: the upcoming CAS doesn't need an atomic; every \ 75 * push only needs to synchronize with the next pop, \ 76 * which we get from the release sequence rules. \ 77 */ \ 78 } while (!atomic_compare_exchange_weak_p(&queue->tail, \ 79 &cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED)); \ 80 ql_new(src); \ 81 } \ 82 a_attr void \ 83 a_prefix##push(a_queue_type *queue, a_type *node) { \ 84 ql_elm_new(node, a_link); \ 85 a_list_type list; \ 86 ql_new(&list); \ 87 ql_head_insert(&list, node, a_link); \ 88 a_prefix##push_batch(queue, &list); \ 89 } \ 90 a_attr void \ 91 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) { \ 92 a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED); \ 93 if (tail == NULL) { \ 94 /* \ 95 * In the common special case where there are no \ 96 * pending elements, bail early without a costly RMW. \ 97 */ \ 98 return; \ 99 } \ 100 tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE); \ 101 /* \ 102 * It's a single-consumer queue, so if cur started non-NULL, \ 103 * it'd better stay non-NULL. \ 104 */ \ 105 assert(tail != NULL); \ 106 /* \ 107 * We iterate through the stack and both fix up the link \ 108 * structure (stack insertion broke the list requirement that \ 109 * the list be circularly linked). It's just as efficient at \ 110 * this point to make the queue a "real" queue, so do that as \ 111 * well. \ 112 * If this ever gets to be a hot spot, we can omit this fixup \ 113 * and make the queue a bag (i.e. not necessarily ordered), but \ 114 * that would mean jettisoning the existing list API as the \ 115 * batch pushing/popping interface. \ 116 */ \ 117 a_list_type reversed; \ 118 ql_new(&reversed); \ 119 while (tail != NULL) { \ 120 /* \ 121 * Pop an item off the stack, prepend it onto the list \ 122 * (reversing the order). Recall that we use the \ 123 * list prev field as the Treiber stack next field to \ 124 * preserve order of batch-pushed items when reversed. \ 125 */ \ 126 a_type *next = tail->a_link.qre_prev; \ 127 ql_elm_new(tail, a_link); \ 128 ql_head_insert(&reversed, tail, a_link); \ 129 tail = next; \ 130 } \ 131 ql_concat(dst, &reversed, a_link); \ 132 } 133 134 #endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */ 135