xref: /qemu/util/qht.c (revision 019fbfa4bcd2d3a835c241295e22ab2b5b56129b)
1 /*
2  * qht.c - QEMU Hash Table, designed to scale for read-mostly workloads.
3  *
4  * Copyright (C) 2016, Emilio G. Cota <cota@braap.org>
5  *
6  * License: GNU GPL, version 2 or later.
7  *   See the COPYING file in the top-level directory.
8  *
9  * Assumptions:
10  * - NULL cannot be inserted/removed as a pointer value.
11  * - Trying to insert an already-existing hash-pointer pair is OK. However,
12  *   it is not OK to insert into the same hash table different hash-pointer
13  *   pairs that have the same pointer value, but not the hashes.
14  * - Lookups are performed under an RCU read-critical section; removals
15  *   must wait for a grace period to elapse before freeing removed objects.
16  *
17  * Features:
18  * - Reads (i.e. lookups and iterators) can be concurrent with other reads.
19  *   Lookups that are concurrent with writes to the same bucket will retry
20  *   via a seqlock; iterators acquire all bucket locks and therefore can be
21  *   concurrent with lookups and are serialized wrt writers.
22  * - Writes (i.e. insertions/removals) can be concurrent with writes to
23  *   different buckets; writes to the same bucket are serialized through a lock.
24  * - Optional auto-resizing: the hash table resizes up if the load surpasses
25  *   a certain threshold. Resizing is done concurrently with readers; writes
26  *   are serialized with the resize operation.
27  *
28  * The key structure is the bucket, which is cacheline-sized. Buckets
29  * contain a few hash values and pointers; the u32 hash values are stored in
30  * full so that resizing is fast. Having this structure instead of directly
31  * chaining items has two advantages:
32  * - Failed lookups fail fast, and touch a minimum number of cache lines.
33  * - Resizing the hash table with concurrent lookups is easy.
34  *
35  * There are two types of buckets:
36  * 1. "head" buckets are the ones allocated in the array of buckets in qht_map.
37  * 2. all "non-head" buckets (i.e. all others) are members of a chain that
38  *    starts from a head bucket.
39  * Note that the seqlock and spinlock of a head bucket applies to all buckets
40  * chained to it; these two fields are unused in non-head buckets.
41  *
42  * On removals, we move the last valid item in the chain to the position of the
43  * just-removed entry. This makes lookups slightly faster, since the moment an
44  * invalid entry is found, the (failed) lookup is over.
45  *
46  * Resizing is done by taking all bucket spinlocks (so that no other writers can
47  * race with us) and then copying all entries into a new hash map. Then, the
48  * ht->map pointer is set, and the old map is freed once no RCU readers can see
49  * it anymore.
50  *
51  * Writers check for concurrent resizes by comparing ht->map before and after
52  * acquiring their bucket lock. If they don't match, a resize has occurred
53  * while the bucket spinlock was being acquired.
54  *
55  * Related Work:
56  * - Idea of cacheline-sized buckets with full hashes taken from:
57  *   David, Guerraoui & Trigonakis, "Asynchronized Concurrency:
58  *   The Secret to Scaling Concurrent Search Data Structures", ASPLOS'15.
59  * - Why not RCU-based hash tables? They would allow us to get rid of the
60  *   seqlock, but resizing would take forever since RCU read critical
61  *   sections in QEMU take quite a long time.
62  *   More info on relativistic hash tables:
63  *   + Triplett, McKenney & Walpole, "Resizable, Scalable, Concurrent Hash
64  *     Tables via Relativistic Programming", USENIX ATC'11.
65  *   + Corbet, "Relativistic hash tables, part 1: Algorithms", @ lwn.net, 2014.
66  *     https://lwn.net/Articles/612021/
67  */
68 #include "qemu/osdep.h"
69 #include "qemu/qht.h"
70 #include "qemu/atomic.h"
71 #include "qemu/rcu.h"
72 #include "qemu/memalign.h"
73 
74 //#define QHT_DEBUG
75 
76 /*
77  * We want to avoid false sharing of cache lines. Most systems have 64-byte
78  * cache lines so we go with it for simplicity.
79  *
80  * Note that systems with smaller cache lines will be fine (the struct is
81  * almost 64-bytes); systems with larger cache lines might suffer from
82  * some false sharing.
83  */
84 #define QHT_BUCKET_ALIGN 64
85 
86 /* define these to keep sizeof(qht_bucket) within QHT_BUCKET_ALIGN */
87 #if HOST_LONG_BITS == 32
88 #define QHT_BUCKET_ENTRIES 6
89 #else /* 64-bit */
90 #define QHT_BUCKET_ENTRIES 4
91 #endif
92 
93 enum qht_iter_type {
94     QHT_ITER_VOID,    /* do nothing; use retvoid */
95     QHT_ITER_RM,      /* remove element if retbool returns true */
96 };
97 
98 struct qht_iter {
99     union {
100         qht_iter_func_t retvoid;
101         qht_iter_bool_func_t retbool;
102     } f;
103     enum qht_iter_type type;
104 };
105 
106 /*
107  * Do _not_ use qemu_mutex_[try]lock directly! Use these macros, otherwise
108  * the profiler (QSP) will deadlock.
109  */
qht_lock(struct qht * ht)110 static inline void qht_lock(struct qht *ht)
111 {
112     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
113         qemu_mutex_lock__raw(&ht->lock);
114     } else {
115         qemu_mutex_lock(&ht->lock);
116     }
117 }
118 
qht_trylock(struct qht * ht)119 static inline int qht_trylock(struct qht *ht)
120 {
121     if (ht->mode & QHT_MODE_RAW_MUTEXES) {
122         return qemu_mutex_trylock__raw(&(ht)->lock);
123     }
124     return qemu_mutex_trylock(&(ht)->lock);
125 }
126 
127 /* this inline is not really necessary, but it helps keep code consistent */
qht_unlock(struct qht * ht)128 static inline void qht_unlock(struct qht *ht)
129 {
130     qemu_mutex_unlock(&ht->lock);
131 }
132 
133 /*
134  * Note: reading partially-updated pointers in @pointers could lead to
135  * segfaults. We thus access them with qatomic_read/set; this guarantees
136  * that the compiler makes all those accesses atomic. We also need the
137  * volatile-like behavior in qatomic_read, since otherwise the compiler
138  * might refetch the pointer.
139  * qatomic_read's are of course not necessary when the bucket lock is held.
140  *
141  * If both ht->lock and b->lock are grabbed, ht->lock should always
142  * be grabbed first.
143  */
144 struct qht_bucket {
145     QemuSpin lock;
146     QemuSeqLock sequence;
147     uint32_t hashes[QHT_BUCKET_ENTRIES];
148     void *pointers[QHT_BUCKET_ENTRIES];
149     struct qht_bucket *next;
150 } QEMU_ALIGNED(QHT_BUCKET_ALIGN);
151 
152 QEMU_BUILD_BUG_ON(sizeof(struct qht_bucket) > QHT_BUCKET_ALIGN);
153 
154 /*
155  * Under TSAN, we use striped locks instead of one lock per bucket chain.
156  * This avoids crashing under TSAN, since TSAN aborts the program if more than
157  * 64 locks are held (this is a hardcoded limit in TSAN).
158  * When resizing a QHT we grab all the buckets' locks, which can easily
159  * go over TSAN's limit. By using striped locks, we avoid this problem.
160  *
161  * Note: this number must be a power of two for easy index computation.
162  */
163 #define QHT_TSAN_BUCKET_LOCKS_BITS 4
164 #define QHT_TSAN_BUCKET_LOCKS (1 << QHT_TSAN_BUCKET_LOCKS_BITS)
165 
166 struct qht_tsan_lock {
167     QemuSpin lock;
168 } QEMU_ALIGNED(QHT_BUCKET_ALIGN);
169 
170 /**
171  * struct qht_map - structure to track an array of buckets
172  * @rcu: used by RCU. Keep it as the top field in the struct to help valgrind
173  *       find the whole struct.
174  * @buckets: array of head buckets. It is constant once the map is created.
175  * @n_buckets: number of head buckets. It is constant once the map is created.
176  * @n_added_buckets: number of added (i.e. "non-head") buckets
177  * @n_added_buckets_threshold: threshold to trigger an upward resize once the
178  *                             number of added buckets surpasses it.
179  * @tsan_bucket_locks: Array of striped locks to be used only under TSAN.
180  *
181  * Buckets are tracked in what we call a "map", i.e. this structure.
182  */
183 struct qht_map {
184     struct rcu_head rcu;
185     struct qht_bucket *buckets;
186     size_t n_buckets;
187     size_t n_added_buckets;
188     size_t n_added_buckets_threshold;
189 #ifdef CONFIG_TSAN
190     struct qht_tsan_lock tsan_bucket_locks[QHT_TSAN_BUCKET_LOCKS];
191 #endif
192 };
193 
194 /* trigger a resize when n_added_buckets > n_buckets / div */
195 #define QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV 8
196 
197 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new,
198                                 bool reset);
199 static void qht_grow_maybe(struct qht *ht);
200 
201 #ifdef QHT_DEBUG
202 
203 #define qht_debug_assert(X) do { assert(X); } while (0)
204 
qht_bucket_debug__locked(struct qht_bucket * b)205 static void qht_bucket_debug__locked(struct qht_bucket *b)
206 {
207     bool seen_empty = false;
208     bool corrupt = false;
209     int i;
210 
211     do {
212         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
213             if (b->pointers[i] == NULL) {
214                 seen_empty = true;
215                 continue;
216             }
217             if (seen_empty) {
218                 fprintf(stderr, "%s: b: %p, pos: %i, hash: 0x%x, p: %p\n",
219                         __func__, b, i, b->hashes[i], b->pointers[i]);
220                 corrupt = true;
221             }
222         }
223         b = b->next;
224     } while (b);
225     qht_debug_assert(!corrupt);
226 }
227 
qht_map_debug__all_locked(struct qht_map * map)228 static void qht_map_debug__all_locked(struct qht_map *map)
229 {
230     int i;
231 
232     for (i = 0; i < map->n_buckets; i++) {
233         qht_bucket_debug__locked(&map->buckets[i]);
234     }
235 }
236 #else
237 
238 #define qht_debug_assert(X) do { (void)(X); } while (0)
239 
qht_bucket_debug__locked(struct qht_bucket * b)240 static inline void qht_bucket_debug__locked(struct qht_bucket *b)
241 { }
242 
qht_map_debug__all_locked(struct qht_map * map)243 static inline void qht_map_debug__all_locked(struct qht_map *map)
244 { }
245 #endif /* QHT_DEBUG */
246 
qht_elems_to_buckets(size_t n_elems)247 static inline size_t qht_elems_to_buckets(size_t n_elems)
248 {
249     return pow2ceil(n_elems / QHT_BUCKET_ENTRIES);
250 }
251 
252 /*
253  * When using striped locks (i.e. under TSAN), we have to be careful not
254  * to operate on the same lock twice (e.g. when iterating through all buckets).
255  * We achieve this by operating only on each stripe's first matching lock.
256  */
qht_do_if_first_in_stripe(struct qht_map * map,struct qht_bucket * b,void (* func)(QemuSpin * spin))257 static inline void qht_do_if_first_in_stripe(struct qht_map *map,
258                                              struct qht_bucket *b,
259                                              void (*func)(QemuSpin *spin))
260 {
261 #ifdef CONFIG_TSAN
262     unsigned long bucket_idx = b - map->buckets;
263     bool is_first_in_stripe = (bucket_idx >> QHT_TSAN_BUCKET_LOCKS_BITS) == 0;
264     if (is_first_in_stripe) {
265         unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
266         func(&map->tsan_bucket_locks[lock_idx].lock);
267     }
268 #else
269     func(&b->lock);
270 #endif
271 }
272 
qht_bucket_lock_do(struct qht_map * map,struct qht_bucket * b,void (* func)(QemuSpin * lock))273 static inline void qht_bucket_lock_do(struct qht_map *map,
274                                       struct qht_bucket *b,
275                                       void (*func)(QemuSpin *lock))
276 {
277 #ifdef CONFIG_TSAN
278     unsigned long bucket_idx = b - map->buckets;
279     unsigned long lock_idx = bucket_idx & (QHT_TSAN_BUCKET_LOCKS - 1);
280     func(&map->tsan_bucket_locks[lock_idx].lock);
281 #else
282     func(&b->lock);
283 #endif
284 }
285 
qht_bucket_lock(struct qht_map * map,struct qht_bucket * b)286 static inline void qht_bucket_lock(struct qht_map *map,
287                                    struct qht_bucket *b)
288 {
289     qht_bucket_lock_do(map, b, qemu_spin_lock);
290 }
291 
qht_bucket_unlock(struct qht_map * map,struct qht_bucket * b)292 static inline void qht_bucket_unlock(struct qht_map *map,
293                                      struct qht_bucket *b)
294 {
295     qht_bucket_lock_do(map, b, qemu_spin_unlock);
296 }
297 
qht_head_init(struct qht_map * map,struct qht_bucket * b)298 static inline void qht_head_init(struct qht_map *map, struct qht_bucket *b)
299 {
300     memset(b, 0, sizeof(*b));
301     qht_do_if_first_in_stripe(map, b, qemu_spin_init);
302     seqlock_init(&b->sequence);
303 }
304 
305 static inline
qht_map_to_bucket(const struct qht_map * map,uint32_t hash)306 struct qht_bucket *qht_map_to_bucket(const struct qht_map *map, uint32_t hash)
307 {
308     return &map->buckets[hash & (map->n_buckets - 1)];
309 }
310 
311 /* acquire all bucket locks from a map */
qht_map_lock_buckets(struct qht_map * map)312 static void qht_map_lock_buckets(struct qht_map *map)
313 {
314     size_t i;
315 
316     for (i = 0; i < map->n_buckets; i++) {
317         struct qht_bucket *b = &map->buckets[i];
318 
319         qht_do_if_first_in_stripe(map, b, qemu_spin_lock);
320     }
321 }
322 
qht_map_unlock_buckets(struct qht_map * map)323 static void qht_map_unlock_buckets(struct qht_map *map)
324 {
325     size_t i;
326 
327     for (i = 0; i < map->n_buckets; i++) {
328         struct qht_bucket *b = &map->buckets[i];
329 
330         qht_do_if_first_in_stripe(map, b, qemu_spin_unlock);
331     }
332 }
333 
334 /*
335  * Call with at least a bucket lock held.
336  * @map should be the value read before acquiring the lock (or locks).
337  */
qht_map_is_stale__locked(const struct qht * ht,const struct qht_map * map)338 static inline bool qht_map_is_stale__locked(const struct qht *ht,
339                                             const struct qht_map *map)
340 {
341     return map != ht->map;
342 }
343 
344 /*
345  * Grab all bucket locks, and set @pmap after making sure the map isn't stale.
346  *
347  * Pairs with qht_map_unlock_buckets(), hence the pass-by-reference.
348  *
349  * Note: callers cannot have ht->lock held.
350  */
351 static inline
qht_map_lock_buckets__no_stale(struct qht * ht,struct qht_map ** pmap)352 void qht_map_lock_buckets__no_stale(struct qht *ht, struct qht_map **pmap)
353 {
354     struct qht_map *map;
355 
356     map = qatomic_rcu_read(&ht->map);
357     qht_map_lock_buckets(map);
358     if (likely(!qht_map_is_stale__locked(ht, map))) {
359         *pmap = map;
360         return;
361     }
362     qht_map_unlock_buckets(map);
363 
364     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
365     qht_lock(ht);
366     map = ht->map;
367     qht_map_lock_buckets(map);
368     qht_unlock(ht);
369     *pmap = map;
370 }
371 
372 /*
373  * Get a head bucket and lock it, making sure its parent map is not stale.
374  * @pmap is filled with a pointer to the bucket's parent map.
375  *
376  * Unlock with qht_bucket_unlock.
377  *
378  * Note: callers cannot have ht->lock held.
379  */
380 static inline
qht_bucket_lock__no_stale(struct qht * ht,uint32_t hash,struct qht_map ** pmap)381 struct qht_bucket *qht_bucket_lock__no_stale(struct qht *ht, uint32_t hash,
382                                              struct qht_map **pmap)
383 {
384     struct qht_bucket *b;
385     struct qht_map *map;
386 
387     map = qatomic_rcu_read(&ht->map);
388     b = qht_map_to_bucket(map, hash);
389 
390     qht_bucket_lock(map, b);
391     if (likely(!qht_map_is_stale__locked(ht, map))) {
392         *pmap = map;
393         return b;
394     }
395     qht_bucket_unlock(map, b);
396 
397     /* we raced with a resize; acquire ht->lock to see the updated ht->map */
398     qht_lock(ht);
399     map = ht->map;
400     b = qht_map_to_bucket(map, hash);
401     qht_bucket_lock(map, b);
402     qht_unlock(ht);
403     *pmap = map;
404     return b;
405 }
406 
qht_map_needs_resize(const struct qht_map * map)407 static inline bool qht_map_needs_resize(const struct qht_map *map)
408 {
409     return qatomic_read(&map->n_added_buckets) >
410            map->n_added_buckets_threshold;
411 }
412 
qht_chain_destroy(struct qht_map * map,struct qht_bucket * head)413 static inline void qht_chain_destroy(struct qht_map *map,
414                                      struct qht_bucket *head)
415 {
416     struct qht_bucket *curr = head->next;
417     struct qht_bucket *prev;
418 
419     qht_do_if_first_in_stripe(map, head, qemu_spin_destroy);
420     while (curr) {
421         prev = curr;
422         curr = curr->next;
423         qemu_vfree(prev);
424     }
425 }
426 
427 /* pass only an orphan map */
qht_map_destroy(struct qht_map * map)428 static void qht_map_destroy(struct qht_map *map)
429 {
430     size_t i;
431 
432     for (i = 0; i < map->n_buckets; i++) {
433         qht_chain_destroy(map, &map->buckets[i]);
434     }
435     qemu_vfree(map->buckets);
436     g_free(map);
437 }
438 
qht_map_create(size_t n_buckets)439 static struct qht_map *qht_map_create(size_t n_buckets)
440 {
441     struct qht_map *map;
442     size_t i;
443 
444     map = g_malloc(sizeof(*map));
445     map->n_buckets = n_buckets;
446 
447     map->n_added_buckets = 0;
448     map->n_added_buckets_threshold = n_buckets /
449         QHT_NR_ADDED_BUCKETS_THRESHOLD_DIV;
450 
451     /* let tiny hash tables to at least add one non-head bucket */
452     if (unlikely(map->n_added_buckets_threshold == 0)) {
453         map->n_added_buckets_threshold = 1;
454     }
455 
456     map->buckets = qemu_memalign(QHT_BUCKET_ALIGN,
457                                  sizeof(*map->buckets) * n_buckets);
458     for (i = 0; i < n_buckets; i++) {
459         qht_head_init(map, &map->buckets[i]);
460     }
461     return map;
462 }
463 
qht_init(struct qht * ht,qht_cmp_func_t cmp,size_t n_elems,unsigned int mode)464 void qht_init(struct qht *ht, qht_cmp_func_t cmp, size_t n_elems,
465               unsigned int mode)
466 {
467     struct qht_map *map;
468     size_t n_buckets = qht_elems_to_buckets(n_elems);
469 
470     g_assert(cmp);
471     ht->cmp = cmp;
472     ht->mode = mode;
473     qemu_mutex_init(&ht->lock);
474     map = qht_map_create(n_buckets);
475     qatomic_rcu_set(&ht->map, map);
476 }
477 
478 /* call only when there are no readers/writers left */
qht_destroy(struct qht * ht)479 void qht_destroy(struct qht *ht)
480 {
481     qht_map_destroy(ht->map);
482     memset(ht, 0, sizeof(*ht));
483 }
484 
qht_bucket_reset__locked(struct qht_bucket * head)485 static void qht_bucket_reset__locked(struct qht_bucket *head)
486 {
487     struct qht_bucket *b = head;
488     int i;
489 
490     seqlock_write_begin(&head->sequence);
491     do {
492         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
493             if (b->pointers[i] == NULL) {
494                 goto done;
495             }
496             qatomic_set(&b->hashes[i], 0);
497             qatomic_set(&b->pointers[i], NULL);
498         }
499         b = b->next;
500     } while (b);
501  done:
502     seqlock_write_end(&head->sequence);
503 }
504 
505 /* call with all bucket locks held */
qht_map_reset__all_locked(struct qht_map * map)506 static void qht_map_reset__all_locked(struct qht_map *map)
507 {
508     size_t i;
509 
510     for (i = 0; i < map->n_buckets; i++) {
511         qht_bucket_reset__locked(&map->buckets[i]);
512     }
513     qht_map_debug__all_locked(map);
514 }
515 
qht_reset(struct qht * ht)516 void qht_reset(struct qht *ht)
517 {
518     struct qht_map *map;
519 
520     qht_map_lock_buckets__no_stale(ht, &map);
521     qht_map_reset__all_locked(map);
522     qht_map_unlock_buckets(map);
523 }
524 
qht_do_resize(struct qht * ht,struct qht_map * new)525 static inline void qht_do_resize(struct qht *ht, struct qht_map *new)
526 {
527     qht_do_resize_reset(ht, new, false);
528 }
529 
qht_do_resize_and_reset(struct qht * ht,struct qht_map * new)530 static inline void qht_do_resize_and_reset(struct qht *ht, struct qht_map *new)
531 {
532     qht_do_resize_reset(ht, new, true);
533 }
534 
qht_reset_size(struct qht * ht,size_t n_elems)535 bool qht_reset_size(struct qht *ht, size_t n_elems)
536 {
537     struct qht_map *new = NULL;
538     struct qht_map *map;
539     size_t n_buckets;
540 
541     n_buckets = qht_elems_to_buckets(n_elems);
542 
543     qht_lock(ht);
544     map = ht->map;
545     if (n_buckets != map->n_buckets) {
546         new = qht_map_create(n_buckets);
547     }
548     qht_do_resize_and_reset(ht, new);
549     qht_unlock(ht);
550 
551     return !!new;
552 }
553 
554 static inline
qht_do_lookup(const struct qht_bucket * head,qht_lookup_func_t func,const void * userp,uint32_t hash)555 void *qht_do_lookup(const struct qht_bucket *head, qht_lookup_func_t func,
556                     const void *userp, uint32_t hash)
557 {
558     const struct qht_bucket *b = head;
559     int i;
560 
561     do {
562         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
563             if (qatomic_read(&b->hashes[i]) == hash) {
564                 /* The pointer is dereferenced before seqlock_read_retry,
565                  * so (unlike qht_insert__locked) we need to use
566                  * qatomic_rcu_read here.
567                  */
568                 void *p = qatomic_rcu_read(&b->pointers[i]);
569 
570                 if (likely(p) && likely(func(p, userp))) {
571                     return p;
572                 }
573             }
574         }
575         b = qatomic_rcu_read(&b->next);
576     } while (b);
577 
578     return NULL;
579 }
580 
581 static __attribute__((noinline))
qht_lookup__slowpath(const struct qht_bucket * b,qht_lookup_func_t func,const void * userp,uint32_t hash)582 void *qht_lookup__slowpath(const struct qht_bucket *b, qht_lookup_func_t func,
583                            const void *userp, uint32_t hash)
584 {
585     unsigned int version;
586     void *ret;
587 
588     do {
589         version = seqlock_read_begin(&b->sequence);
590         ret = qht_do_lookup(b, func, userp, hash);
591     } while (seqlock_read_retry(&b->sequence, version));
592     return ret;
593 }
594 
qht_lookup_custom(const struct qht * ht,const void * userp,uint32_t hash,qht_lookup_func_t func)595 void *qht_lookup_custom(const struct qht *ht, const void *userp, uint32_t hash,
596                         qht_lookup_func_t func)
597 {
598     const struct qht_bucket *b;
599     const struct qht_map *map;
600     unsigned int version;
601     void *ret;
602 
603     map = qatomic_rcu_read(&ht->map);
604     b = qht_map_to_bucket(map, hash);
605 
606     version = seqlock_read_begin(&b->sequence);
607     ret = qht_do_lookup(b, func, userp, hash);
608     if (likely(!seqlock_read_retry(&b->sequence, version))) {
609         return ret;
610     }
611     /*
612      * Removing the do/while from the fastpath gives a 4% perf. increase when
613      * running a 100%-lookup microbenchmark.
614      */
615     return qht_lookup__slowpath(b, func, userp, hash);
616 }
617 
qht_lookup(const struct qht * ht,const void * userp,uint32_t hash)618 void *qht_lookup(const struct qht *ht, const void *userp, uint32_t hash)
619 {
620     return qht_lookup_custom(ht, userp, hash, ht->cmp);
621 }
622 
623 /*
624  * call with head->lock held
625  * @ht is const since it is only used for ht->cmp()
626  */
qht_insert__locked(const struct qht * ht,struct qht_map * map,struct qht_bucket * head,void * p,uint32_t hash,bool * needs_resize)627 static void *qht_insert__locked(const struct qht *ht, struct qht_map *map,
628                                 struct qht_bucket *head, void *p, uint32_t hash,
629                                 bool *needs_resize)
630 {
631     struct qht_bucket *b = head;
632     struct qht_bucket *prev = NULL;
633     struct qht_bucket *new = NULL;
634     int i;
635 
636     do {
637         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
638             if (b->pointers[i]) {
639                 if (unlikely(b->hashes[i] == hash &&
640                              ht->cmp(b->pointers[i], p))) {
641                     return b->pointers[i];
642                 }
643             } else {
644                 goto found;
645             }
646         }
647         prev = b;
648         b = b->next;
649     } while (b);
650 
651     b = qemu_memalign(QHT_BUCKET_ALIGN, sizeof(*b));
652     memset(b, 0, sizeof(*b));
653     new = b;
654     i = 0;
655     qatomic_inc(&map->n_added_buckets);
656     if (unlikely(qht_map_needs_resize(map)) && needs_resize) {
657         *needs_resize = true;
658     }
659 
660  found:
661     /* found an empty key: acquire the seqlock and write */
662     seqlock_write_begin(&head->sequence);
663     if (new) {
664         qatomic_rcu_set(&prev->next, b);
665     }
666     /* smp_wmb() implicit in seqlock_write_begin.  */
667     qatomic_set(&b->hashes[i], hash);
668     qatomic_set(&b->pointers[i], p);
669     seqlock_write_end(&head->sequence);
670     return NULL;
671 }
672 
qht_grow_maybe(struct qht * ht)673 static __attribute__((noinline)) void qht_grow_maybe(struct qht *ht)
674 {
675     struct qht_map *map;
676 
677     /*
678      * If the lock is taken it probably means there's an ongoing resize,
679      * so bail out.
680      */
681     if (qht_trylock(ht)) {
682         return;
683     }
684     map = ht->map;
685     /* another thread might have just performed the resize we were after */
686     if (qht_map_needs_resize(map)) {
687         struct qht_map *new = qht_map_create(map->n_buckets * 2);
688 
689         qht_do_resize(ht, new);
690     }
691     qht_unlock(ht);
692 }
693 
qht_insert(struct qht * ht,void * p,uint32_t hash,void ** existing)694 bool qht_insert(struct qht *ht, void *p, uint32_t hash, void **existing)
695 {
696     struct qht_bucket *b;
697     struct qht_map *map;
698     bool needs_resize = false;
699     void *prev;
700 
701     /* NULL pointers are not supported */
702     qht_debug_assert(p);
703 
704     b = qht_bucket_lock__no_stale(ht, hash, &map);
705     prev = qht_insert__locked(ht, map, b, p, hash, &needs_resize);
706     qht_bucket_debug__locked(b);
707     qht_bucket_unlock(map, b);
708 
709     if (unlikely(needs_resize) && ht->mode & QHT_MODE_AUTO_RESIZE) {
710         qht_grow_maybe(ht);
711     }
712     if (likely(prev == NULL)) {
713         return true;
714     }
715     if (existing) {
716         *existing = prev;
717     }
718     return false;
719 }
720 
qht_entry_is_last(const struct qht_bucket * b,int pos)721 static inline bool qht_entry_is_last(const struct qht_bucket *b, int pos)
722 {
723     if (pos == QHT_BUCKET_ENTRIES - 1) {
724         if (b->next == NULL) {
725             return true;
726         }
727         return b->next->pointers[0] == NULL;
728     }
729     return b->pointers[pos + 1] == NULL;
730 }
731 
732 static void
qht_entry_move(struct qht_bucket * to,int i,struct qht_bucket * from,int j)733 qht_entry_move(struct qht_bucket *to, int i, struct qht_bucket *from, int j)
734 {
735     qht_debug_assert(!(to == from && i == j));
736     qht_debug_assert(to->pointers[i]);
737     qht_debug_assert(from->pointers[j]);
738 
739     qatomic_set(&to->hashes[i], from->hashes[j]);
740     qatomic_set(&to->pointers[i], from->pointers[j]);
741 
742     qatomic_set(&from->hashes[j], 0);
743     qatomic_set(&from->pointers[j], NULL);
744 }
745 
746 /*
747  * Find the last valid entry in @orig, and swap it with @orig[pos], which has
748  * just been invalidated.
749  */
qht_bucket_remove_entry(struct qht_bucket * orig,int pos)750 static inline void qht_bucket_remove_entry(struct qht_bucket *orig, int pos)
751 {
752     struct qht_bucket *b = orig;
753     struct qht_bucket *prev = NULL;
754     int i;
755 
756     if (qht_entry_is_last(orig, pos)) {
757         qatomic_set(&orig->hashes[pos], 0);
758         qatomic_set(&orig->pointers[pos], NULL);
759         return;
760     }
761     do {
762         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
763             if (b->pointers[i]) {
764                 continue;
765             }
766             if (i > 0) {
767                 return qht_entry_move(orig, pos, b, i - 1);
768             }
769             qht_debug_assert(prev);
770             return qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
771         }
772         prev = b;
773         b = b->next;
774     } while (b);
775     /* no free entries other than orig[pos], so swap it with the last one */
776     qht_entry_move(orig, pos, prev, QHT_BUCKET_ENTRIES - 1);
777 }
778 
779 /* call with b->lock held */
780 static inline
qht_remove__locked(struct qht_bucket * head,const void * p,uint32_t hash)781 bool qht_remove__locked(struct qht_bucket *head, const void *p, uint32_t hash)
782 {
783     struct qht_bucket *b = head;
784     int i;
785 
786     do {
787         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
788             void *q = b->pointers[i];
789 
790             if (unlikely(q == NULL)) {
791                 return false;
792             }
793             if (q == p) {
794                 qht_debug_assert(b->hashes[i] == hash);
795                 seqlock_write_begin(&head->sequence);
796                 qht_bucket_remove_entry(b, i);
797                 seqlock_write_end(&head->sequence);
798                 return true;
799             }
800         }
801         b = b->next;
802     } while (b);
803     return false;
804 }
805 
qht_remove(struct qht * ht,const void * p,uint32_t hash)806 bool qht_remove(struct qht *ht, const void *p, uint32_t hash)
807 {
808     struct qht_bucket *b;
809     struct qht_map *map;
810     bool ret;
811 
812     /* NULL pointers are not supported */
813     qht_debug_assert(p);
814 
815     b = qht_bucket_lock__no_stale(ht, hash, &map);
816     ret = qht_remove__locked(b, p, hash);
817     qht_bucket_debug__locked(b);
818     qht_bucket_unlock(map, b);
819     return ret;
820 }
821 
qht_bucket_iter(struct qht_bucket * head,const struct qht_iter * iter,void * userp)822 static inline void qht_bucket_iter(struct qht_bucket *head,
823                                    const struct qht_iter *iter, void *userp)
824 {
825     struct qht_bucket *b = head;
826     int i;
827 
828     do {
829         for (i = 0; i < QHT_BUCKET_ENTRIES; i++) {
830             if (b->pointers[i] == NULL) {
831                 return;
832             }
833             switch (iter->type) {
834             case QHT_ITER_VOID:
835                 iter->f.retvoid(b->pointers[i], b->hashes[i], userp);
836                 break;
837             case QHT_ITER_RM:
838                 if (iter->f.retbool(b->pointers[i], b->hashes[i], userp)) {
839                     /* replace i with the last valid element in the bucket */
840                     seqlock_write_begin(&head->sequence);
841                     qht_bucket_remove_entry(b, i);
842                     seqlock_write_end(&head->sequence);
843                     qht_bucket_debug__locked(b);
844                     /* reevaluate i, since it just got replaced */
845                     i--;
846                     continue;
847                 }
848                 break;
849             default:
850                 g_assert_not_reached();
851             }
852         }
853         b = b->next;
854     } while (b);
855 }
856 
857 /* call with all of the map's locks held */
qht_map_iter__all_locked(struct qht_map * map,const struct qht_iter * iter,void * userp)858 static inline void qht_map_iter__all_locked(struct qht_map *map,
859                                             const struct qht_iter *iter,
860                                             void *userp)
861 {
862     size_t i;
863 
864     for (i = 0; i < map->n_buckets; i++) {
865         qht_bucket_iter(&map->buckets[i], iter, userp);
866     }
867 }
868 
869 static inline void
do_qht_iter(struct qht * ht,const struct qht_iter * iter,void * userp)870 do_qht_iter(struct qht *ht, const struct qht_iter *iter, void *userp)
871 {
872     struct qht_map *map;
873 
874     map = qatomic_rcu_read(&ht->map);
875     qht_map_lock_buckets(map);
876     qht_map_iter__all_locked(map, iter, userp);
877     qht_map_unlock_buckets(map);
878 }
879 
qht_iter(struct qht * ht,qht_iter_func_t func,void * userp)880 void qht_iter(struct qht *ht, qht_iter_func_t func, void *userp)
881 {
882     const struct qht_iter iter = {
883         .f.retvoid = func,
884         .type = QHT_ITER_VOID,
885     };
886 
887     do_qht_iter(ht, &iter, userp);
888 }
889 
qht_iter_remove(struct qht * ht,qht_iter_bool_func_t func,void * userp)890 void qht_iter_remove(struct qht *ht, qht_iter_bool_func_t func, void *userp)
891 {
892     const struct qht_iter iter = {
893         .f.retbool = func,
894         .type = QHT_ITER_RM,
895     };
896 
897     do_qht_iter(ht, &iter, userp);
898 }
899 
900 struct qht_map_copy_data {
901     struct qht *ht;
902     struct qht_map *new;
903 };
904 
qht_map_copy(void * p,uint32_t hash,void * userp)905 static void qht_map_copy(void *p, uint32_t hash, void *userp)
906 {
907     struct qht_map_copy_data *data = userp;
908     struct qht *ht = data->ht;
909     struct qht_map *new = data->new;
910     struct qht_bucket *b = qht_map_to_bucket(new, hash);
911 
912     /* no need to acquire b->lock because no thread has seen this map yet */
913     qht_insert__locked(ht, new, b, p, hash, NULL);
914 }
915 
916 /*
917  * Atomically perform a resize and/or reset.
918  * Call with ht->lock held.
919  */
qht_do_resize_reset(struct qht * ht,struct qht_map * new,bool reset)920 static void qht_do_resize_reset(struct qht *ht, struct qht_map *new, bool reset)
921 {
922     struct qht_map *old;
923     const struct qht_iter iter = {
924         .f.retvoid = qht_map_copy,
925         .type = QHT_ITER_VOID,
926     };
927     struct qht_map_copy_data data;
928 
929     old = ht->map;
930     qht_map_lock_buckets(old);
931 
932     if (reset) {
933         qht_map_reset__all_locked(old);
934     }
935 
936     if (new == NULL) {
937         qht_map_unlock_buckets(old);
938         return;
939     }
940 
941     g_assert(new->n_buckets != old->n_buckets);
942     data.ht = ht;
943     data.new = new;
944     qht_map_iter__all_locked(old, &iter, &data);
945     qht_map_debug__all_locked(new);
946 
947     qatomic_rcu_set(&ht->map, new);
948     qht_map_unlock_buckets(old);
949     call_rcu(old, qht_map_destroy, rcu);
950 }
951 
qht_resize(struct qht * ht,size_t n_elems)952 bool qht_resize(struct qht *ht, size_t n_elems)
953 {
954     size_t n_buckets = qht_elems_to_buckets(n_elems);
955     size_t ret = false;
956 
957     qht_lock(ht);
958     if (n_buckets != ht->map->n_buckets) {
959         struct qht_map *new;
960 
961         new = qht_map_create(n_buckets);
962         qht_do_resize(ht, new);
963         ret = true;
964     }
965     qht_unlock(ht);
966 
967     return ret;
968 }
969 
970 /* pass @stats to qht_statistics_destroy() when done */
qht_statistics_init(const struct qht * ht,struct qht_stats * stats)971 void qht_statistics_init(const struct qht *ht, struct qht_stats *stats)
972 {
973     const struct qht_map *map;
974     int i;
975 
976     map = qatomic_rcu_read(&ht->map);
977 
978     stats->used_head_buckets = 0;
979     stats->entries = 0;
980     qdist_init(&stats->chain);
981     qdist_init(&stats->occupancy);
982     /* bail out if the qht has not yet been initialized */
983     if (unlikely(map == NULL)) {
984         stats->head_buckets = 0;
985         return;
986     }
987     stats->head_buckets = map->n_buckets;
988 
989     for (i = 0; i < map->n_buckets; i++) {
990         const struct qht_bucket *head = &map->buckets[i];
991         const struct qht_bucket *b;
992         unsigned int version;
993         size_t buckets;
994         size_t entries;
995         int j;
996 
997         do {
998             version = seqlock_read_begin(&head->sequence);
999             buckets = 0;
1000             entries = 0;
1001             b = head;
1002             do {
1003                 for (j = 0; j < QHT_BUCKET_ENTRIES; j++) {
1004                     if (qatomic_read(&b->pointers[j]) == NULL) {
1005                         break;
1006                     }
1007                     entries++;
1008                 }
1009                 buckets++;
1010                 b = qatomic_rcu_read(&b->next);
1011             } while (b);
1012         } while (seqlock_read_retry(&head->sequence, version));
1013 
1014         if (entries) {
1015             qdist_inc(&stats->chain, buckets);
1016             qdist_inc(&stats->occupancy,
1017                       (double)entries / QHT_BUCKET_ENTRIES / buckets);
1018             stats->used_head_buckets++;
1019             stats->entries += entries;
1020         } else {
1021             qdist_inc(&stats->occupancy, 0);
1022         }
1023     }
1024 }
1025 
qht_statistics_destroy(struct qht_stats * stats)1026 void qht_statistics_destroy(struct qht_stats *stats)
1027 {
1028     qdist_destroy(&stats->occupancy);
1029     qdist_destroy(&stats->chain);
1030 }
1031