xref: /src/contrib/jemalloc/include/jemalloc/internal/mpsc_queue.h (revision c43cad87172039ccf38172129c79755ea79e6102)
1 #ifndef JEMALLOC_INTERNAL_MPSC_QUEUE_H
2 #define JEMALLOC_INTERNAL_MPSC_QUEUE_H
3 
4 #include "jemalloc/internal/atomic.h"
5 
6 /*
7  * A concurrent implementation of a multi-producer, single-consumer queue.  It
8  * supports three concurrent operations:
9  * - Push
10  * - Push batch
11  * - Pop batch
12  *
13  * These operations are all lock-free.
14  *
15  * The implementation is the simple two-stack queue built on a Treiber stack.
16  * It's not terribly efficient, but this isn't expected to go into anywhere with
17  * hot code.  In fact, we don't really even need queue semantics in any
18  * anticipated use cases; we could get away with just the stack.  But this way
19  * lets us frame the API in terms of the existing list types, which is a nice
20  * convenience.  We can save on cache misses by introducing our own (parallel)
21  * single-linked list type here, and dropping FIFO semantics, if we need this to
22  * get faster.  Since we're currently providing queue semantics though, we use
23  * the prev field in the link rather than the next field for Treiber-stack
24  * linkage, so that we can preserve order for bash-pushed lists (recall that the
25  * two-stack tricks reverses orders in the lock-free first stack).
26  */
27 
28 #define mpsc_queue(a_type)						\
29 struct {								\
30 	atomic_p_t tail;						\
31 }
32 
33 #define mpsc_queue_proto(a_attr, a_prefix, a_queue_type, a_type,	\
34     a_list_type)							\
35 /* Initialize a queue. */						\
36 a_attr void								\
37 a_prefix##new(a_queue_type *queue);					\
38 /* Insert all items in src into the queue, clearing src. */		\
39 a_attr void								\
40 a_prefix##push_batch(a_queue_type *queue, a_list_type *src);		\
41 /* Insert node into the queue. */					\
42 a_attr void								\
43 a_prefix##push(a_queue_type *queue, a_type *node);			\
44 /*									\
45  * Pop all items in the queue into the list at dst.  dst should already	\
46  * be initialized (and may contain existing items, which then remain	\
47  * in dst).								\
48  */									\
49 a_attr void								\
50 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst);
51 
52 #define mpsc_queue_gen(a_attr, a_prefix, a_queue_type, a_type,		\
53     a_list_type, a_link)						\
54 a_attr void								\
55 a_prefix##new(a_queue_type *queue) {					\
56 	atomic_store_p(&queue->tail, NULL, ATOMIC_RELAXED);		\
57 }									\
58 a_attr void								\
59 a_prefix##push_batch(a_queue_type *queue, a_list_type *src) {		\
60 	/*								\
61 	 * Reuse the ql list next field as the Treiber stack next	\
62 	 * field.							\
63 	 */								\
64 	a_type *first = ql_first(src);					\
65 	a_type *last = ql_last(src, a_link);				\
66 	void* cur_tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
67 	do {								\
68 		/*							\
69 		 * Note that this breaks the queue ring structure;	\
70 		 * it's not a ring any more!				\
71 		 */							\
72 		first->a_link.qre_prev = cur_tail;			\
73 		/*							\
74 		 * Note: the upcoming CAS doesn't need an atomic; every	\
75 		 * push only needs to synchronize with the next pop,	\
76 		 * which we get from the release sequence rules.	\
77 		 */							\
78 	} while (!atomic_compare_exchange_weak_p(&queue->tail,		\
79 	    &cur_tail, last, ATOMIC_RELEASE, ATOMIC_RELAXED));		\
80 	ql_new(src);							\
81 }									\
82 a_attr void								\
83 a_prefix##push(a_queue_type *queue, a_type *node) {			\
84 	ql_elm_new(node, a_link);					\
85 	a_list_type list;						\
86 	ql_new(&list);							\
87 	ql_head_insert(&list, node, a_link);				\
88 	a_prefix##push_batch(queue, &list);				\
89 }									\
90 a_attr void								\
91 a_prefix##pop_batch(a_queue_type *queue, a_list_type *dst) {		\
92 	a_type *tail = atomic_load_p(&queue->tail, ATOMIC_RELAXED);	\
93 	if (tail == NULL) {						\
94 		/*							\
95 		 * In the common special case where there are no	\
96 		 * pending elements, bail early without a costly RMW.	\
97 		 */							\
98 		return;							\
99 	}								\
100 	tail = atomic_exchange_p(&queue->tail, NULL, ATOMIC_ACQUIRE);	\
101 	/*								\
102 	 * It's a single-consumer queue, so if cur started non-NULL,	\
103 	 * it'd better stay non-NULL.					\
104 	 */								\
105 	assert(tail != NULL);						\
106 	/*								\
107 	 * We iterate through the stack and both fix up the link	\
108 	 * structure (stack insertion broke the list requirement that	\
109 	 * the list be circularly linked).  It's just as efficient at	\
110 	 * this point to make the queue a "real" queue, so do that as	\
111 	 * well.							\
112 	 * If this ever gets to be a hot spot, we can omit this fixup	\
113 	 * and make the queue a bag (i.e. not necessarily ordered), but	\
114 	 * that would mean jettisoning the existing list API as the 	\
115 	 * batch pushing/popping interface.				\
116 	 */								\
117 	a_list_type reversed;						\
118 	ql_new(&reversed);						\
119 	while (tail != NULL) {						\
120 		/*							\
121 		 * Pop an item off the stack, prepend it onto the list	\
122 		 * (reversing the order).  Recall that we use the	\
123 		 * list prev field as the Treiber stack next field to	\
124 		 * preserve order of batch-pushed items when reversed.	\
125 		 */							\
126 		a_type *next = tail->a_link.qre_prev;			\
127 		ql_elm_new(tail, a_link);				\
128 		ql_head_insert(&reversed, tail, a_link);		\
129 		tail = next;						\
130 	}								\
131 	ql_concat(dst, &reversed, a_link);				\
132 }
133 
134 #endif /* JEMALLOC_INTERNAL_MPSC_QUEUE_H */
135