xref: /linux/drivers/md/dm-vdo/indexer/volume.c (revision ab93e0dd72c37d378dd936f031ffb83ff2bd87ce)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "volume.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/dm-bufio.h>
10 #include <linux/err.h>
11 
12 #include "errors.h"
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16 #include "string-utils.h"
17 #include "thread-utils.h"
18 
19 #include "chapter-index.h"
20 #include "config.h"
21 #include "geometry.h"
22 #include "hash-utils.h"
23 #include "index.h"
24 #include "sparse-cache.h"
25 
26 /*
27  * The first block of the volume layout is reserved for the volume header, which is no longer used.
28  * The remainder of the volume is divided into chapters consisting of several pages of records, and
29  * several pages of static index to use to find those records. The index pages are recorded first,
30  * followed by the record pages. The chapters are written in order as they are filled, so the
31  * volume storage acts as a circular log of the most recent chapters, with each new chapter
32  * overwriting the oldest saved one.
33  *
34  * When a new chapter is filled and closed, the records from that chapter are sorted and
35  * interleaved in approximate temporal order, and assigned to record pages. Then a static delta
36  * index is generated to store which record page contains each record. The in-memory index page map
37  * is also updated to indicate which delta lists fall on each chapter index page. This means that
38  * when a record is read, the volume only has to load a single index page and a single record page,
39  * rather than search the entire chapter. These index and record pages are written to storage, and
40  * the index pages are transferred to the page cache under the theory that the most recently
41  * written chapter is likely to be accessed again soon.
42  *
43  * When reading a record, the volume index will indicate which chapter should contain it. The
44  * volume uses the index page map to determine which chapter index page needs to be loaded, and
45  * then reads the relevant record page number from the chapter index. Both index and record pages
46  * are stored in a page cache when read for the common case that subsequent records need the same
47  * pages. The page cache evicts the least recently accessed entries when caching new pages. In
48  * addition, the volume uses dm-bufio to manage access to the storage, which may allow for
49  * additional caching depending on available system resources.
50  *
51  * Record requests are handled from cached pages when possible. If a page needs to be read, it is
52  * placed on a queue along with the request that wants to read it. Any requests for the same page
53  * that arrive while the read is pending are added to the queue entry. A separate reader thread
54  * handles the queued reads, adding the page to the cache and updating any requests queued with it
55  * so they can continue processing. This allows the index zone threads to continue processing new
56  * requests rather than wait for the storage reads.
57  *
58  * When an index rebuild is necessary, the volume reads each stored chapter to determine which
59  * range of chapters contain valid records, so that those records can be used to reconstruct the
60  * in-memory volume index.
61  */
62 
63 /* The maximum allowable number of contiguous bad chapters */
64 #define MAX_BAD_CHAPTERS 100
65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1)
66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15)
67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096
68 
69 static const u64 BAD_CHAPTER = U64_MAX;
70 
71 /*
72  * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits
73  * are the physical page number of the cached page being read. The high order 32 bits are a
74  * sequence number. This value is written when the zone that owns it begins or completes a cache
75  * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting
76  * to update the cache contents.
77  */
78 union invalidate_counter {
79 	u64 value;
80 	struct {
81 		u32 page;
82 		u32 counter;
83 	};
84 };
85 
map_to_page_number(struct index_geometry * geometry,u32 physical_page)86 static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page)
87 {
88 	return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter;
89 }
90 
map_to_chapter_number(struct index_geometry * geometry,u32 physical_page)91 static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page)
92 {
93 	return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter;
94 }
95 
is_record_page(struct index_geometry * geometry,u32 physical_page)96 static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page)
97 {
98 	return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter;
99 }
100 
map_to_physical_page(const struct index_geometry * geometry,u32 chapter,u32 page)101 static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page)
102 {
103 	/* Page zero is the header page, so the first chapter index page is page one. */
104 	return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page;
105 }
106 
get_invalidate_counter(struct page_cache * cache,unsigned int zone_number)107 static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache,
108 							      unsigned int zone_number)
109 {
110 	return (union invalidate_counter) {
111 		.value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value),
112 	};
113 }
114 
set_invalidate_counter(struct page_cache * cache,unsigned int zone_number,union invalidate_counter invalidate_counter)115 static inline void set_invalidate_counter(struct page_cache *cache,
116 					  unsigned int zone_number,
117 					  union invalidate_counter invalidate_counter)
118 {
119 	WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value,
120 		   invalidate_counter.value);
121 }
122 
search_pending(union invalidate_counter invalidate_counter)123 static inline bool search_pending(union invalidate_counter invalidate_counter)
124 {
125 	return (invalidate_counter.counter & 1) != 0;
126 }
127 
128 /* Lock the cache for a zone in order to search for a page. */
begin_pending_search(struct page_cache * cache,u32 physical_page,unsigned int zone_number)129 static void begin_pending_search(struct page_cache *cache, u32 physical_page,
130 				 unsigned int zone_number)
131 {
132 	union invalidate_counter invalidate_counter =
133 		get_invalidate_counter(cache, zone_number);
134 
135 	invalidate_counter.page = physical_page;
136 	invalidate_counter.counter++;
137 	set_invalidate_counter(cache, zone_number, invalidate_counter);
138 	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
139 			    "Search is pending for zone %u", zone_number);
140 	/*
141 	 * This memory barrier ensures that the write to the invalidate counter is seen by other
142 	 * threads before this thread accesses the cached page. The corresponding read memory
143 	 * barrier is in wait_for_pending_searches().
144 	 */
145 	smp_mb();
146 }
147 
148 /* Unlock the cache for a zone by clearing its invalidate counter. */
end_pending_search(struct page_cache * cache,unsigned int zone_number)149 static void end_pending_search(struct page_cache *cache, unsigned int zone_number)
150 {
151 	union invalidate_counter invalidate_counter;
152 
153 	/*
154 	 * This memory barrier ensures that this thread completes reads of the
155 	 * cached page before other threads see the write to the invalidate
156 	 * counter.
157 	 */
158 	smp_mb();
159 
160 	invalidate_counter = get_invalidate_counter(cache, zone_number);
161 	VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
162 			    "Search is pending for zone %u", zone_number);
163 	invalidate_counter.counter++;
164 	set_invalidate_counter(cache, zone_number, invalidate_counter);
165 }
166 
wait_for_pending_searches(struct page_cache * cache,u32 physical_page)167 static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page)
168 {
169 	union invalidate_counter initial_counters[MAX_ZONES];
170 	unsigned int i;
171 
172 	/*
173 	 * We hold the read_threads_mutex. We are waiting for threads that do not hold the
174 	 * read_threads_mutex. Those threads have "locked" their targeted page by setting the
175 	 * search_pending_counter. The corresponding write memory barrier is in
176 	 * begin_pending_search().
177 	 */
178 	smp_mb();
179 
180 	for (i = 0; i < cache->zone_count; i++)
181 		initial_counters[i] = get_invalidate_counter(cache, i);
182 	for (i = 0; i < cache->zone_count; i++) {
183 		if (search_pending(initial_counters[i]) &&
184 		    (initial_counters[i].page == physical_page)) {
185 			/*
186 			 * There is an active search using the physical page. We need to wait for
187 			 * the search to finish.
188 			 *
189 			 * FIXME: Investigate using wait_event() to wait for the search to finish.
190 			 */
191 			while (initial_counters[i].value ==
192 			       get_invalidate_counter(cache, i).value)
193 				cond_resched();
194 		}
195 	}
196 }
197 
release_page_buffer(struct cached_page * page)198 static void release_page_buffer(struct cached_page *page)
199 {
200 	if (page->buffer != NULL)
201 		dm_bufio_release(vdo_forget(page->buffer));
202 }
203 
clear_cache_page(struct page_cache * cache,struct cached_page * page)204 static void clear_cache_page(struct page_cache *cache, struct cached_page *page)
205 {
206 	/* Do not clear read_pending because the read queue relies on it. */
207 	release_page_buffer(page);
208 	page->physical_page = cache->indexable_pages;
209 	WRITE_ONCE(page->last_used, 0);
210 }
211 
make_page_most_recent(struct page_cache * cache,struct cached_page * page)212 static void make_page_most_recent(struct page_cache *cache, struct cached_page *page)
213 {
214 	/*
215 	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
216 	 * thread holding the read_threads_mutex.
217 	 */
218 	if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used))
219 		WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock));
220 }
221 
222 /* Select a page to remove from the cache to make space for a new entry. */
select_victim_in_cache(struct page_cache * cache)223 static struct cached_page *select_victim_in_cache(struct page_cache *cache)
224 {
225 	struct cached_page *page;
226 	int oldest_index = 0;
227 	s64 oldest_time = S64_MAX;
228 	s64 last_used;
229 	u16 i;
230 
231 	/* Find the oldest unclaimed page. We hold the read_threads_mutex. */
232 	for (i = 0; i < cache->cache_slots; i++) {
233 		/* A page with a pending read must not be replaced. */
234 		if (cache->cache[i].read_pending)
235 			continue;
236 
237 		last_used = READ_ONCE(cache->cache[i].last_used);
238 		if (last_used <= oldest_time) {
239 			oldest_time = last_used;
240 			oldest_index = i;
241 		}
242 	}
243 
244 	page = &cache->cache[oldest_index];
245 	if (page->physical_page != cache->indexable_pages) {
246 		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
247 		wait_for_pending_searches(cache, page->physical_page);
248 	}
249 
250 	page->read_pending = true;
251 	clear_cache_page(cache, page);
252 	return page;
253 }
254 
255 /* Make a newly filled cache entry available to other threads. */
put_page_in_cache(struct page_cache * cache,u32 physical_page,struct cached_page * page)256 static int put_page_in_cache(struct page_cache *cache, u32 physical_page,
257 			     struct cached_page *page)
258 {
259 	int result;
260 
261 	/* We hold the read_threads_mutex. */
262 	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
263 	if (result != VDO_SUCCESS)
264 		return result;
265 
266 	page->physical_page = physical_page;
267 	make_page_most_recent(cache, page);
268 	page->read_pending = false;
269 
270 	/*
271 	 * We hold the read_threads_mutex, but we must have a write memory barrier before making
272 	 * the cached_page available to the readers that do not hold the mutex. The corresponding
273 	 * read memory barrier is in get_page_and_index().
274 	 */
275 	smp_wmb();
276 
277 	/* This assignment also clears the queued flag. */
278 	WRITE_ONCE(cache->index[physical_page], page - cache->cache);
279 	return UDS_SUCCESS;
280 }
281 
cancel_page_in_cache(struct page_cache * cache,u32 physical_page,struct cached_page * page)282 static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page,
283 				 struct cached_page *page)
284 {
285 	int result;
286 
287 	/* We hold the read_threads_mutex. */
288 	result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
289 	if (result != VDO_SUCCESS)
290 		return;
291 
292 	clear_cache_page(cache, page);
293 	page->read_pending = false;
294 
295 	/* Clear the mapping and the queued flag for the new page. */
296 	WRITE_ONCE(cache->index[physical_page], cache->cache_slots);
297 }
298 
next_queue_position(u16 position)299 static inline u16 next_queue_position(u16 position)
300 {
301 	return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS;
302 }
303 
advance_queue_position(u16 * position)304 static inline void advance_queue_position(u16 *position)
305 {
306 	*position = next_queue_position(*position);
307 }
308 
read_queue_is_full(struct page_cache * cache)309 static inline bool read_queue_is_full(struct page_cache *cache)
310 {
311 	return cache->read_queue_first == next_queue_position(cache->read_queue_last);
312 }
313 
enqueue_read(struct page_cache * cache,struct uds_request * request,u32 physical_page)314 static bool enqueue_read(struct page_cache *cache, struct uds_request *request,
315 			 u32 physical_page)
316 {
317 	struct queued_read *queue_entry;
318 	u16 last = cache->read_queue_last;
319 	u16 read_queue_index;
320 
321 	/* We hold the read_threads_mutex. */
322 	if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) {
323 		/* This page has no existing entry in the queue. */
324 		if (read_queue_is_full(cache))
325 			return false;
326 
327 		/* Fill in the read queue entry. */
328 		cache->read_queue[last].physical_page = physical_page;
329 		cache->read_queue[last].invalid = false;
330 		cache->read_queue[last].first_request = NULL;
331 		cache->read_queue[last].last_request = NULL;
332 
333 		/* Point the cache index to the read queue entry. */
334 		read_queue_index = last;
335 		WRITE_ONCE(cache->index[physical_page],
336 			   read_queue_index | VOLUME_CACHE_QUEUED_FLAG);
337 
338 		advance_queue_position(&cache->read_queue_last);
339 	} else {
340 		/* It's already queued, so add this request to the existing entry. */
341 		read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG;
342 	}
343 
344 	request->next_request = NULL;
345 	queue_entry = &cache->read_queue[read_queue_index];
346 	if (queue_entry->first_request == NULL)
347 		queue_entry->first_request = request;
348 	else
349 		queue_entry->last_request->next_request = request;
350 	queue_entry->last_request = request;
351 
352 	return true;
353 }
354 
enqueue_page_read(struct volume * volume,struct uds_request * request,u32 physical_page)355 static void enqueue_page_read(struct volume *volume, struct uds_request *request,
356 			      u32 physical_page)
357 {
358 	/* Mark the page as queued, so that chapter invalidation knows to cancel a read. */
359 	while (!enqueue_read(&volume->page_cache, request, physical_page)) {
360 		vdo_log_debug("Read queue full, waiting for reads to finish");
361 		uds_wait_cond(&volume->read_threads_read_done_cond,
362 			      &volume->read_threads_mutex);
363 	}
364 
365 	uds_signal_cond(&volume->read_threads_cond);
366 }
367 
368 /*
369  * Reserve the next read queue entry for processing, but do not actually remove it from the queue.
370  * Must be followed by release_queued_requests().
371  */
reserve_read_queue_entry(struct page_cache * cache)372 static struct queued_read *reserve_read_queue_entry(struct page_cache *cache)
373 {
374 	/* We hold the read_threads_mutex. */
375 	struct queued_read *entry;
376 	u16 index_value;
377 	bool queued;
378 
379 	/* No items to dequeue */
380 	if (cache->read_queue_next_read == cache->read_queue_last)
381 		return NULL;
382 
383 	entry = &cache->read_queue[cache->read_queue_next_read];
384 	index_value = cache->index[entry->physical_page];
385 	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
386 	/* Check to see if it's still queued before resetting. */
387 	if (entry->invalid && queued)
388 		WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots);
389 
390 	/*
391 	 * If a synchronous read has taken this page, set invalid to true so it doesn't get
392 	 * overwritten. Requests will just be requeued.
393 	 */
394 	if (!queued)
395 		entry->invalid = true;
396 
397 	entry->reserved = true;
398 	advance_queue_position(&cache->read_queue_next_read);
399 	return entry;
400 }
401 
wait_to_reserve_read_queue_entry(struct volume * volume)402 static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume)
403 {
404 	struct queued_read *queue_entry = NULL;
405 
406 	while (!volume->read_threads_exiting) {
407 		queue_entry = reserve_read_queue_entry(&volume->page_cache);
408 		if (queue_entry != NULL)
409 			break;
410 
411 		uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex);
412 	}
413 
414 	return queue_entry;
415 }
416 
init_chapter_index_page(const struct volume * volume,u8 * index_page,u32 chapter,u32 index_page_number,struct delta_index_page * chapter_index_page)417 static int init_chapter_index_page(const struct volume *volume, u8 *index_page,
418 				   u32 chapter, u32 index_page_number,
419 				   struct delta_index_page *chapter_index_page)
420 {
421 	u64 ci_virtual;
422 	u32 ci_chapter;
423 	u32 lowest_list;
424 	u32 highest_list;
425 	struct index_geometry *geometry = volume->geometry;
426 	int result;
427 
428 	result = uds_initialize_chapter_index_page(chapter_index_page, geometry,
429 						   index_page, volume->nonce);
430 	if (volume->lookup_mode == LOOKUP_FOR_REBUILD)
431 		return result;
432 
433 	if (result != UDS_SUCCESS) {
434 		return vdo_log_error_strerror(result,
435 					      "Reading chapter index page for chapter %u page %u",
436 					      chapter, index_page_number);
437 	}
438 
439 	uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number,
440 				   &lowest_list, &highest_list);
441 	ci_virtual = chapter_index_page->virtual_chapter_number;
442 	ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual);
443 	if ((chapter == ci_chapter) &&
444 	    (lowest_list == chapter_index_page->lowest_list_number) &&
445 	    (highest_list == chapter_index_page->highest_list_number))
446 		return UDS_SUCCESS;
447 
448 	vdo_log_warning("Index page map updated to %llu",
449 			(unsigned long long) volume->index_page_map->last_update);
450 	vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u",
451 			chapter, index_page_number, lowest_list, highest_list,
452 			(unsigned long long) ci_virtual,
453 			chapter_index_page->lowest_list_number,
454 			chapter_index_page->highest_list_number);
455 	return vdo_log_error_strerror(UDS_CORRUPT_DATA,
456 				      "index page map mismatch with chapter index");
457 }
458 
initialize_index_page(const struct volume * volume,u32 physical_page,struct cached_page * page)459 static int initialize_index_page(const struct volume *volume, u32 physical_page,
460 				 struct cached_page *page)
461 {
462 	u32 chapter = map_to_chapter_number(volume->geometry, physical_page);
463 	u32 index_page_number = map_to_page_number(volume->geometry, physical_page);
464 
465 	return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer),
466 				       chapter, index_page_number, &page->index_page);
467 }
468 
search_record_page(const u8 record_page[],const struct uds_record_name * name,const struct index_geometry * geometry,struct uds_record_data * metadata)469 static bool search_record_page(const u8 record_page[],
470 			       const struct uds_record_name *name,
471 			       const struct index_geometry *geometry,
472 			       struct uds_record_data *metadata)
473 {
474 	/*
475 	 * The array of records is sorted by name and stored as a binary tree in heap order, so the
476 	 * root of the tree is the first array element.
477 	 */
478 	u32 node = 0;
479 	const struct uds_volume_record *records = (const struct uds_volume_record *) record_page;
480 
481 	while (node < geometry->records_per_page) {
482 		int result;
483 		const struct uds_volume_record *record = &records[node];
484 
485 		result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE);
486 		if (result == 0) {
487 			if (metadata != NULL)
488 				*metadata = record->data;
489 			return true;
490 		}
491 
492 		/* The children of node N are at indexes 2N+1 and 2N+2. */
493 		node = ((2 * node) + ((result < 0) ? 1 : 2));
494 	}
495 
496 	return false;
497 }
498 
499 /*
500  * If we've read in a record page, we're going to do an immediate search, to speed up processing by
501  * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If
502  * we've read in an index page, we save the record page number so we don't have to resolve the
503  * index page again. We use the location, virtual_chapter, and old_metadata fields in the request
504  * to allow the index code to know where to begin processing the request again.
505  */
search_page(struct cached_page * page,const struct volume * volume,struct uds_request * request,u32 physical_page)506 static int search_page(struct cached_page *page, const struct volume *volume,
507 		       struct uds_request *request, u32 physical_page)
508 {
509 	int result;
510 	enum uds_index_region location;
511 	u16 record_page_number;
512 
513 	if (is_record_page(volume->geometry, physical_page)) {
514 		if (search_record_page(dm_bufio_get_block_data(page->buffer),
515 				       &request->record_name, volume->geometry,
516 				       &request->old_metadata))
517 			location = UDS_LOCATION_RECORD_PAGE_LOOKUP;
518 		else
519 			location = UDS_LOCATION_UNAVAILABLE;
520 	} else {
521 		result = uds_search_chapter_index_page(&page->index_page,
522 						       volume->geometry,
523 						       &request->record_name,
524 						       &record_page_number);
525 		if (result != UDS_SUCCESS)
526 			return result;
527 
528 		if (record_page_number == NO_CHAPTER_INDEX_ENTRY) {
529 			location = UDS_LOCATION_UNAVAILABLE;
530 		} else {
531 			location = UDS_LOCATION_INDEX_PAGE_LOOKUP;
532 			*((u16 *) &request->old_metadata) = record_page_number;
533 		}
534 	}
535 
536 	request->location = location;
537 	request->found = false;
538 	return UDS_SUCCESS;
539 }
540 
process_entry(struct volume * volume,struct queued_read * entry)541 static int process_entry(struct volume *volume, struct queued_read *entry)
542 {
543 	u32 page_number = entry->physical_page;
544 	struct uds_request *request;
545 	struct cached_page *page = NULL;
546 	u8 *page_data;
547 	int result;
548 
549 	if (entry->invalid) {
550 		vdo_log_debug("Requeuing requests for invalid page");
551 		return UDS_SUCCESS;
552 	}
553 
554 	page = select_victim_in_cache(&volume->page_cache);
555 
556 	mutex_unlock(&volume->read_threads_mutex);
557 	page_data = dm_bufio_read(volume->client, page_number, &page->buffer);
558 	mutex_lock(&volume->read_threads_mutex);
559 	if (IS_ERR(page_data)) {
560 		result = -PTR_ERR(page_data);
561 		vdo_log_warning_strerror(result,
562 					 "error reading physical page %u from volume",
563 					 page_number);
564 		cancel_page_in_cache(&volume->page_cache, page_number, page);
565 		return result;
566 	}
567 
568 	if (entry->invalid) {
569 		vdo_log_warning("Page %u invalidated after read", page_number);
570 		cancel_page_in_cache(&volume->page_cache, page_number, page);
571 		return UDS_SUCCESS;
572 	}
573 
574 	if (!is_record_page(volume->geometry, page_number)) {
575 		result = initialize_index_page(volume, page_number, page);
576 		if (result != UDS_SUCCESS) {
577 			vdo_log_warning("Error initializing chapter index page");
578 			cancel_page_in_cache(&volume->page_cache, page_number, page);
579 			return result;
580 		}
581 	}
582 
583 	result = put_page_in_cache(&volume->page_cache, page_number, page);
584 	if (result != UDS_SUCCESS) {
585 		vdo_log_warning("Error putting page %u in cache", page_number);
586 		cancel_page_in_cache(&volume->page_cache, page_number, page);
587 		return result;
588 	}
589 
590 	request = entry->first_request;
591 	while ((request != NULL) && (result == UDS_SUCCESS)) {
592 		result = search_page(page, volume, request, page_number);
593 		request = request->next_request;
594 	}
595 
596 	return result;
597 }
598 
release_queued_requests(struct volume * volume,struct queued_read * entry,int result)599 static void release_queued_requests(struct volume *volume, struct queued_read *entry,
600 				    int result)
601 {
602 	struct page_cache *cache = &volume->page_cache;
603 	u16 next_read = cache->read_queue_next_read;
604 	struct uds_request *request;
605 	struct uds_request *next;
606 
607 	for (request = entry->first_request; request != NULL; request = next) {
608 		next = request->next_request;
609 		request->status = result;
610 		request->requeued = true;
611 		uds_enqueue_request(request, STAGE_INDEX);
612 	}
613 
614 	entry->reserved = false;
615 
616 	/* Move the read_queue_first pointer as far as we can. */
617 	while ((cache->read_queue_first != next_read) &&
618 	       (!cache->read_queue[cache->read_queue_first].reserved))
619 		advance_queue_position(&cache->read_queue_first);
620 	uds_broadcast_cond(&volume->read_threads_read_done_cond);
621 }
622 
read_thread_function(void * arg)623 static void read_thread_function(void *arg)
624 {
625 	struct volume *volume = arg;
626 
627 	vdo_log_debug("reader starting");
628 	mutex_lock(&volume->read_threads_mutex);
629 	while (true) {
630 		struct queued_read *queue_entry;
631 		int result;
632 
633 		queue_entry = wait_to_reserve_read_queue_entry(volume);
634 		if (volume->read_threads_exiting)
635 			break;
636 
637 		result = process_entry(volume, queue_entry);
638 		release_queued_requests(volume, queue_entry, result);
639 	}
640 	mutex_unlock(&volume->read_threads_mutex);
641 	vdo_log_debug("reader done");
642 }
643 
get_page_and_index(struct page_cache * cache,u32 physical_page,int * queue_index,struct cached_page ** page_ptr)644 static void get_page_and_index(struct page_cache *cache, u32 physical_page,
645 			       int *queue_index, struct cached_page **page_ptr)
646 {
647 	u16 index_value;
648 	u16 index;
649 	bool queued;
650 
651 	/*
652 	 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
653 	 * thread holding the read_threads_mutex.
654 	 *
655 	 * Holding only a search_pending_counter is the most frequent case.
656 	 */
657 	/*
658 	 * It would be unlikely for the compiler to turn the usage of index_value into two reads of
659 	 * cache->index, but it would be possible and very bad if those reads did not return the
660 	 * same bits.
661 	 */
662 	index_value = READ_ONCE(cache->index[physical_page]);
663 	queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
664 	index = index_value & ~VOLUME_CACHE_QUEUED_FLAG;
665 
666 	if (!queued && (index < cache->cache_slots)) {
667 		*page_ptr = &cache->cache[index];
668 		/*
669 		 * We have acquired access to the cached page, but unless we hold the
670 		 * read_threads_mutex, we need a read memory barrier now. The corresponding write
671 		 * memory barrier is in put_page_in_cache().
672 		 */
673 		smp_rmb();
674 	} else {
675 		*page_ptr = NULL;
676 	}
677 
678 	*queue_index = queued ? index : -1;
679 }
680 
get_page_from_cache(struct page_cache * cache,u32 physical_page,struct cached_page ** page)681 static void get_page_from_cache(struct page_cache *cache, u32 physical_page,
682 				struct cached_page **page)
683 {
684 	/*
685 	 * ASSERTION: We are in a zone thread.
686 	 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex.
687 	 */
688 	int queue_index = -1;
689 
690 	get_page_and_index(cache, physical_page, &queue_index, page);
691 }
692 
read_page_locked(struct volume * volume,u32 physical_page,struct cached_page ** page_ptr)693 static int read_page_locked(struct volume *volume, u32 physical_page,
694 			    struct cached_page **page_ptr)
695 {
696 	int result = UDS_SUCCESS;
697 	struct cached_page *page = NULL;
698 	u8 *page_data;
699 
700 	page = select_victim_in_cache(&volume->page_cache);
701 	page_data = dm_bufio_read(volume->client, physical_page, &page->buffer);
702 	if (IS_ERR(page_data)) {
703 		result = -PTR_ERR(page_data);
704 		vdo_log_warning_strerror(result,
705 					 "error reading physical page %u from volume",
706 					 physical_page);
707 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
708 		return result;
709 	}
710 
711 	if (!is_record_page(volume->geometry, physical_page)) {
712 		result = initialize_index_page(volume, physical_page, page);
713 		if (result != UDS_SUCCESS) {
714 			if (volume->lookup_mode != LOOKUP_FOR_REBUILD)
715 				vdo_log_warning("Corrupt index page %u", physical_page);
716 			cancel_page_in_cache(&volume->page_cache, physical_page, page);
717 			return result;
718 		}
719 	}
720 
721 	result = put_page_in_cache(&volume->page_cache, physical_page, page);
722 	if (result != UDS_SUCCESS) {
723 		vdo_log_warning("Error putting page %u in cache", physical_page);
724 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
725 		return result;
726 	}
727 
728 	*page_ptr = page;
729 	return UDS_SUCCESS;
730 }
731 
732 /* Retrieve a page from the cache while holding the read threads mutex. */
get_volume_page_locked(struct volume * volume,u32 physical_page,struct cached_page ** page_ptr)733 static int get_volume_page_locked(struct volume *volume, u32 physical_page,
734 				  struct cached_page **page_ptr)
735 {
736 	int result;
737 	struct cached_page *page = NULL;
738 
739 	get_page_from_cache(&volume->page_cache, physical_page, &page);
740 	if (page == NULL) {
741 		result = read_page_locked(volume, physical_page, &page);
742 		if (result != UDS_SUCCESS)
743 			return result;
744 	} else {
745 		make_page_most_recent(&volume->page_cache, page);
746 	}
747 
748 	*page_ptr = page;
749 	return UDS_SUCCESS;
750 }
751 
752 /* Retrieve a page from the cache while holding a search_pending lock. */
get_volume_page_protected(struct volume * volume,struct uds_request * request,u32 physical_page,struct cached_page ** page_ptr)753 static int get_volume_page_protected(struct volume *volume, struct uds_request *request,
754 				     u32 physical_page, struct cached_page **page_ptr)
755 {
756 	struct cached_page *page;
757 	unsigned int zone_number = request->zone_number;
758 
759 	get_page_from_cache(&volume->page_cache, physical_page, &page);
760 	if (page != NULL) {
761 		if (zone_number == 0) {
762 			/* Only one zone is allowed to update the LRU. */
763 			make_page_most_recent(&volume->page_cache, page);
764 		}
765 
766 		*page_ptr = page;
767 		return UDS_SUCCESS;
768 	}
769 
770 	/* Prepare to enqueue a read for the page. */
771 	end_pending_search(&volume->page_cache, zone_number);
772 	mutex_lock(&volume->read_threads_mutex);
773 
774 	/*
775 	 * Do the lookup again while holding the read mutex (no longer the fast case so this should
776 	 * be fine to repeat). We need to do this because a page may have been added to the cache
777 	 * by a reader thread between the time we searched above and the time we went to actually
778 	 * try to enqueue it below. This could result in us enqueuing another read for a page which
779 	 * is already in the cache, which would mean we end up with two entries in the cache for
780 	 * the same page.
781 	 */
782 	get_page_from_cache(&volume->page_cache, physical_page, &page);
783 	if (page == NULL) {
784 		enqueue_page_read(volume, request, physical_page);
785 		/*
786 		 * The performance gain from unlocking first, while "search pending" mode is off,
787 		 * turns out to be significant in some cases. The page is not available yet so
788 		 * the order does not matter for correctness as it does below.
789 		 */
790 		mutex_unlock(&volume->read_threads_mutex);
791 		begin_pending_search(&volume->page_cache, physical_page, zone_number);
792 		return UDS_QUEUED;
793 	}
794 
795 	/*
796 	 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and
797 	 * "search pending" state in careful order so no other thread can mess with the data before
798 	 * the caller gets to look at it.
799 	 */
800 	begin_pending_search(&volume->page_cache, physical_page, zone_number);
801 	mutex_unlock(&volume->read_threads_mutex);
802 	*page_ptr = page;
803 	return UDS_SUCCESS;
804 }
805 
get_volume_page(struct volume * volume,u32 chapter,u32 page_number,struct cached_page ** page_ptr)806 static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number,
807 			   struct cached_page **page_ptr)
808 {
809 	int result;
810 	u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
811 
812 	mutex_lock(&volume->read_threads_mutex);
813 	result = get_volume_page_locked(volume, physical_page, page_ptr);
814 	mutex_unlock(&volume->read_threads_mutex);
815 	return result;
816 }
817 
uds_get_volume_record_page(struct volume * volume,u32 chapter,u32 page_number,u8 ** data_ptr)818 int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number,
819 			       u8 **data_ptr)
820 {
821 	int result;
822 	struct cached_page *page = NULL;
823 
824 	result = get_volume_page(volume, chapter, page_number, &page);
825 	if (result == UDS_SUCCESS)
826 		*data_ptr = dm_bufio_get_block_data(page->buffer);
827 	return result;
828 }
829 
uds_get_volume_index_page(struct volume * volume,u32 chapter,u32 page_number,struct delta_index_page ** index_page_ptr)830 int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number,
831 			      struct delta_index_page **index_page_ptr)
832 {
833 	int result;
834 	struct cached_page *page = NULL;
835 
836 	result = get_volume_page(volume, chapter, page_number, &page);
837 	if (result == UDS_SUCCESS)
838 		*index_page_ptr = &page->index_page;
839 	return result;
840 }
841 
842 /*
843  * Find the record page associated with a name in a given index page. This will return UDS_QUEUED
844  * if the page in question must be read from storage.
845  */
search_cached_index_page(struct volume * volume,struct uds_request * request,u32 chapter,u32 index_page_number,u16 * record_page_number)846 static int search_cached_index_page(struct volume *volume, struct uds_request *request,
847 				    u32 chapter, u32 index_page_number,
848 				    u16 *record_page_number)
849 {
850 	int result;
851 	struct cached_page *page = NULL;
852 	unsigned int zone_number = request->zone_number;
853 	u32 physical_page = map_to_physical_page(volume->geometry, chapter,
854 						 index_page_number);
855 
856 	/*
857 	 * Make sure the invalidate counter is updated before we try and read the mapping. This
858 	 * prevents this thread from reading a page in the cache which has already been marked for
859 	 * invalidation by the reader thread, before the reader thread has noticed that the
860 	 * invalidate_counter has been incremented.
861 	 */
862 	begin_pending_search(&volume->page_cache, physical_page, zone_number);
863 
864 	result = get_volume_page_protected(volume, request, physical_page, &page);
865 	if (result != UDS_SUCCESS) {
866 		end_pending_search(&volume->page_cache, zone_number);
867 		return result;
868 	}
869 
870 	result = uds_search_chapter_index_page(&page->index_page, volume->geometry,
871 					       &request->record_name,
872 					       record_page_number);
873 	end_pending_search(&volume->page_cache, zone_number);
874 	return result;
875 }
876 
877 /*
878  * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if
879  * the page in question must be read from storage.
880  */
uds_search_cached_record_page(struct volume * volume,struct uds_request * request,u32 chapter,u16 record_page_number,bool * found)881 int uds_search_cached_record_page(struct volume *volume, struct uds_request *request,
882 				  u32 chapter, u16 record_page_number, bool *found)
883 {
884 	struct cached_page *record_page;
885 	struct index_geometry *geometry = volume->geometry;
886 	unsigned int zone_number = request->zone_number;
887 	int result;
888 	u32 physical_page, page_number;
889 
890 	*found = false;
891 	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
892 		return UDS_SUCCESS;
893 
894 	result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter,
895 			    "0 <= %d < %u", record_page_number,
896 			    geometry->record_pages_per_chapter);
897 	if (result != VDO_SUCCESS)
898 		return result;
899 
900 	page_number = geometry->index_pages_per_chapter + record_page_number;
901 
902 	physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
903 
904 	/*
905 	 * Make sure the invalidate counter is updated before we try and read the mapping. This
906 	 * prevents this thread from reading a page in the cache which has already been marked for
907 	 * invalidation by the reader thread, before the reader thread has noticed that the
908 	 * invalidate_counter has been incremented.
909 	 */
910 	begin_pending_search(&volume->page_cache, physical_page, zone_number);
911 
912 	result = get_volume_page_protected(volume, request, physical_page, &record_page);
913 	if (result != UDS_SUCCESS) {
914 		end_pending_search(&volume->page_cache, zone_number);
915 		return result;
916 	}
917 
918 	if (search_record_page(dm_bufio_get_block_data(record_page->buffer),
919 			       &request->record_name, geometry, &request->old_metadata))
920 		*found = true;
921 
922 	end_pending_search(&volume->page_cache, zone_number);
923 	return UDS_SUCCESS;
924 }
925 
uds_prefetch_volume_chapter(const struct volume * volume,u32 chapter)926 void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter)
927 {
928 	const struct index_geometry *geometry = volume->geometry;
929 	u32 physical_page = map_to_physical_page(geometry, chapter, 0);
930 
931 	dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter);
932 }
933 
uds_read_chapter_index_from_volume(const struct volume * volume,u64 virtual_chapter,struct dm_buffer * volume_buffers[],struct delta_index_page index_pages[])934 int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter,
935 				       struct dm_buffer *volume_buffers[],
936 				       struct delta_index_page index_pages[])
937 {
938 	int result;
939 	u32 i;
940 	const struct index_geometry *geometry = volume->geometry;
941 	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
942 	u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0);
943 
944 	dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter);
945 	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
946 		u8 *index_page;
947 
948 		index_page = dm_bufio_read(volume->client, physical_page + i,
949 					   &volume_buffers[i]);
950 		if (IS_ERR(index_page)) {
951 			result = -PTR_ERR(index_page);
952 			vdo_log_warning_strerror(result,
953 						 "error reading physical page %u",
954 						 physical_page);
955 			return result;
956 		}
957 
958 		result = init_chapter_index_page(volume, index_page, physical_chapter, i,
959 						 &index_pages[i]);
960 		if (result != UDS_SUCCESS)
961 			return result;
962 	}
963 
964 	return UDS_SUCCESS;
965 }
966 
uds_search_volume_page_cache(struct volume * volume,struct uds_request * request,bool * found)967 int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request,
968 				 bool *found)
969 {
970 	int result;
971 	u32 physical_chapter =
972 		uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter);
973 	u32 index_page_number;
974 	u16 record_page_number;
975 
976 	index_page_number = uds_find_index_page_number(volume->index_page_map,
977 						       &request->record_name,
978 						       physical_chapter);
979 
980 	if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) {
981 		record_page_number = *((u16 *) &request->old_metadata);
982 	} else {
983 		result = search_cached_index_page(volume, request, physical_chapter,
984 						  index_page_number,
985 						  &record_page_number);
986 		if (result != UDS_SUCCESS)
987 			return result;
988 	}
989 
990 	return uds_search_cached_record_page(volume, request, physical_chapter,
991 					     record_page_number, found);
992 }
993 
uds_search_volume_page_cache_for_rebuild(struct volume * volume,const struct uds_record_name * name,u64 virtual_chapter,bool * found)994 int uds_search_volume_page_cache_for_rebuild(struct volume *volume,
995 					     const struct uds_record_name *name,
996 					     u64 virtual_chapter, bool *found)
997 {
998 	int result;
999 	struct index_geometry *geometry = volume->geometry;
1000 	struct cached_page *page;
1001 	u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
1002 	u32 index_page_number;
1003 	u16 record_page_number;
1004 	u32 page_number;
1005 
1006 	*found = false;
1007 	index_page_number =
1008 		uds_find_index_page_number(volume->index_page_map, name,
1009 					   physical_chapter);
1010 	result = get_volume_page(volume, physical_chapter, index_page_number, &page);
1011 	if (result != UDS_SUCCESS)
1012 		return result;
1013 
1014 	result = uds_search_chapter_index_page(&page->index_page, geometry, name,
1015 					       &record_page_number);
1016 	if (result != UDS_SUCCESS)
1017 		return result;
1018 
1019 	if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
1020 		return UDS_SUCCESS;
1021 
1022 	page_number = geometry->index_pages_per_chapter + record_page_number;
1023 	result = get_volume_page(volume, physical_chapter, page_number, &page);
1024 	if (result != UDS_SUCCESS)
1025 		return result;
1026 
1027 	*found = search_record_page(dm_bufio_get_block_data(page->buffer), name,
1028 				    geometry, NULL);
1029 	return UDS_SUCCESS;
1030 }
1031 
invalidate_page(struct page_cache * cache,u32 physical_page)1032 static void invalidate_page(struct page_cache *cache, u32 physical_page)
1033 {
1034 	struct cached_page *page;
1035 	int queue_index = -1;
1036 
1037 	/* We hold the read_threads_mutex. */
1038 	get_page_and_index(cache, physical_page, &queue_index, &page);
1039 	if (page != NULL) {
1040 		WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
1041 		wait_for_pending_searches(cache, page->physical_page);
1042 		clear_cache_page(cache, page);
1043 	} else if (queue_index > -1) {
1044 		vdo_log_debug("setting pending read to invalid");
1045 		cache->read_queue[queue_index].invalid = true;
1046 	}
1047 }
1048 
uds_forget_chapter(struct volume * volume,u64 virtual_chapter)1049 void uds_forget_chapter(struct volume *volume, u64 virtual_chapter)
1050 {
1051 	u32 physical_chapter =
1052 		uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
1053 	u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0);
1054 	u32 i;
1055 
1056 	vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter);
1057 	mutex_lock(&volume->read_threads_mutex);
1058 	for (i = 0; i < volume->geometry->pages_per_chapter; i++)
1059 		invalidate_page(&volume->page_cache, first_page + i);
1060 	mutex_unlock(&volume->read_threads_mutex);
1061 }
1062 
1063 /*
1064  * Donate an index pages from a newly written chapter to the page cache since it is likely to be
1065  * used again soon. The caller must already hold the reader thread mutex.
1066  */
donate_index_page_locked(struct volume * volume,u32 physical_chapter,u32 index_page_number,struct dm_buffer * page_buffer)1067 static int donate_index_page_locked(struct volume *volume, u32 physical_chapter,
1068 				    u32 index_page_number, struct dm_buffer *page_buffer)
1069 {
1070 	int result;
1071 	struct cached_page *page = NULL;
1072 	u32 physical_page =
1073 		map_to_physical_page(volume->geometry, physical_chapter,
1074 				     index_page_number);
1075 
1076 	page = select_victim_in_cache(&volume->page_cache);
1077 	page->buffer = page_buffer;
1078 	result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer),
1079 					 physical_chapter, index_page_number,
1080 					 &page->index_page);
1081 	if (result != UDS_SUCCESS) {
1082 		vdo_log_warning("Error initialize chapter index page");
1083 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1084 		return result;
1085 	}
1086 
1087 	result = put_page_in_cache(&volume->page_cache, physical_page, page);
1088 	if (result != UDS_SUCCESS) {
1089 		vdo_log_warning("Error putting page %u in cache", physical_page);
1090 		cancel_page_in_cache(&volume->page_cache, physical_page, page);
1091 		return result;
1092 	}
1093 
1094 	return UDS_SUCCESS;
1095 }
1096 
write_index_pages(struct volume * volume,u32 physical_chapter_number,struct open_chapter_index * chapter_index)1097 static int write_index_pages(struct volume *volume, u32 physical_chapter_number,
1098 			     struct open_chapter_index *chapter_index)
1099 {
1100 	struct index_geometry *geometry = volume->geometry;
1101 	struct dm_buffer *page_buffer;
1102 	u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0);
1103 	u32 delta_list_number = 0;
1104 	u32 index_page_number;
1105 
1106 	for (index_page_number = 0;
1107 	     index_page_number < geometry->index_pages_per_chapter;
1108 	     index_page_number++) {
1109 		u8 *page_data;
1110 		u32 physical_page = first_index_page + index_page_number;
1111 		u32 lists_packed;
1112 		bool last_page;
1113 		int result;
1114 
1115 		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1116 		if (IS_ERR(page_data)) {
1117 			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1118 							"failed to prepare index page");
1119 		}
1120 
1121 		last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter);
1122 		result = uds_pack_open_chapter_index_page(chapter_index, page_data,
1123 							  delta_list_number, last_page,
1124 							  &lists_packed);
1125 		if (result != UDS_SUCCESS) {
1126 			dm_bufio_release(page_buffer);
1127 			return vdo_log_warning_strerror(result,
1128 							"failed to pack index page");
1129 		}
1130 
1131 		dm_bufio_mark_buffer_dirty(page_buffer);
1132 
1133 		if (lists_packed == 0) {
1134 			vdo_log_debug("no delta lists packed on chapter %u page %u",
1135 				      physical_chapter_number, index_page_number);
1136 		} else {
1137 			delta_list_number += lists_packed;
1138 		}
1139 
1140 		uds_update_index_page_map(volume->index_page_map,
1141 					  chapter_index->virtual_chapter_number,
1142 					  physical_chapter_number, index_page_number,
1143 					  delta_list_number - 1);
1144 
1145 		mutex_lock(&volume->read_threads_mutex);
1146 		result = donate_index_page_locked(volume, physical_chapter_number,
1147 						  index_page_number, page_buffer);
1148 		mutex_unlock(&volume->read_threads_mutex);
1149 		if (result != UDS_SUCCESS) {
1150 			dm_bufio_release(page_buffer);
1151 			return result;
1152 		}
1153 	}
1154 
1155 	return UDS_SUCCESS;
1156 }
1157 
encode_tree(u8 record_page[],const struct uds_volume_record * sorted_pointers[],u32 next_record,u32 node,u32 node_count)1158 static u32 encode_tree(u8 record_page[],
1159 		       const struct uds_volume_record *sorted_pointers[],
1160 		       u32 next_record, u32 node, u32 node_count)
1161 {
1162 	if (node < node_count) {
1163 		u32 child = (2 * node) + 1;
1164 
1165 		next_record = encode_tree(record_page, sorted_pointers, next_record,
1166 					  child, node_count);
1167 
1168 		/*
1169 		 * In-order traversal: copy the contents of the next record into the page at the
1170 		 * node offset.
1171 		 */
1172 		memcpy(&record_page[node * BYTES_PER_RECORD],
1173 		       sorted_pointers[next_record++], BYTES_PER_RECORD);
1174 
1175 		next_record = encode_tree(record_page, sorted_pointers, next_record,
1176 					  child + 1, node_count);
1177 	}
1178 
1179 	return next_record;
1180 }
1181 
encode_record_page(const struct volume * volume,const struct uds_volume_record records[],u8 record_page[])1182 static int encode_record_page(const struct volume *volume,
1183 			      const struct uds_volume_record records[], u8 record_page[])
1184 {
1185 	int result;
1186 	u32 i;
1187 	u32 records_per_page = volume->geometry->records_per_page;
1188 	const struct uds_volume_record **record_pointers = volume->record_pointers;
1189 
1190 	for (i = 0; i < records_per_page; i++)
1191 		record_pointers[i] = &records[i];
1192 
1193 	/*
1194 	 * Sort the record pointers by using just the names in the records, which is less work than
1195 	 * sorting the entire record values.
1196 	 */
1197 	BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0);
1198 	result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers,
1199 				records_per_page, UDS_RECORD_NAME_SIZE);
1200 	if (result != UDS_SUCCESS)
1201 		return result;
1202 
1203 	encode_tree(record_page, record_pointers, 0, 0, records_per_page);
1204 	return UDS_SUCCESS;
1205 }
1206 
write_record_pages(struct volume * volume,u32 physical_chapter_number,const struct uds_volume_record * records)1207 static int write_record_pages(struct volume *volume, u32 physical_chapter_number,
1208 			      const struct uds_volume_record *records)
1209 {
1210 	u32 record_page_number;
1211 	struct index_geometry *geometry = volume->geometry;
1212 	struct dm_buffer *page_buffer;
1213 	const struct uds_volume_record *next_record = records;
1214 	u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number,
1215 						     geometry->index_pages_per_chapter);
1216 
1217 	for (record_page_number = 0;
1218 	     record_page_number < geometry->record_pages_per_chapter;
1219 	     record_page_number++) {
1220 		u8 *page_data;
1221 		u32 physical_page = first_record_page + record_page_number;
1222 		int result;
1223 
1224 		page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1225 		if (IS_ERR(page_data)) {
1226 			return vdo_log_warning_strerror(-PTR_ERR(page_data),
1227 							"failed to prepare record page");
1228 		}
1229 
1230 		result = encode_record_page(volume, next_record, page_data);
1231 		if (result != UDS_SUCCESS) {
1232 			dm_bufio_release(page_buffer);
1233 			return vdo_log_warning_strerror(result,
1234 							"failed to encode record page %u",
1235 							record_page_number);
1236 		}
1237 
1238 		next_record += geometry->records_per_page;
1239 		dm_bufio_mark_buffer_dirty(page_buffer);
1240 		dm_bufio_release(page_buffer);
1241 	}
1242 
1243 	return UDS_SUCCESS;
1244 }
1245 
uds_write_chapter(struct volume * volume,struct open_chapter_index * chapter_index,const struct uds_volume_record * records)1246 int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index,
1247 		      const struct uds_volume_record *records)
1248 {
1249 	int result;
1250 	u32 physical_chapter_number =
1251 		uds_map_to_physical_chapter(volume->geometry,
1252 					    chapter_index->virtual_chapter_number);
1253 
1254 	result = write_index_pages(volume, physical_chapter_number, chapter_index);
1255 	if (result != UDS_SUCCESS)
1256 		return result;
1257 
1258 	result = write_record_pages(volume, physical_chapter_number, records);
1259 	if (result != UDS_SUCCESS)
1260 		return result;
1261 
1262 	result = -dm_bufio_write_dirty_buffers(volume->client);
1263 	if (result != UDS_SUCCESS)
1264 		vdo_log_error_strerror(result, "cannot sync chapter to volume");
1265 
1266 	return result;
1267 }
1268 
probe_chapter(struct volume * volume,u32 chapter_number,u64 * virtual_chapter_number)1269 static void probe_chapter(struct volume *volume, u32 chapter_number,
1270 			  u64 *virtual_chapter_number)
1271 {
1272 	const struct index_geometry *geometry = volume->geometry;
1273 	u32 expected_list_number = 0;
1274 	u32 i;
1275 	u64 vcn = BAD_CHAPTER;
1276 
1277 	*virtual_chapter_number = BAD_CHAPTER;
1278 	dm_bufio_prefetch(volume->client,
1279 			  map_to_physical_page(geometry, chapter_number, 0),
1280 			  geometry->index_pages_per_chapter);
1281 
1282 	for (i = 0; i < geometry->index_pages_per_chapter; i++) {
1283 		struct delta_index_page *page;
1284 		int result;
1285 
1286 		result = uds_get_volume_index_page(volume, chapter_number, i, &page);
1287 		if (result != UDS_SUCCESS)
1288 			return;
1289 
1290 		if (page->virtual_chapter_number == BAD_CHAPTER) {
1291 			vdo_log_error("corrupt index page in chapter %u",
1292 				      chapter_number);
1293 			return;
1294 		}
1295 
1296 		if (vcn == BAD_CHAPTER) {
1297 			vcn = page->virtual_chapter_number;
1298 		} else if (page->virtual_chapter_number != vcn) {
1299 			vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu",
1300 				      chapter_number, i, (unsigned long long) vcn,
1301 				      (unsigned long long) page->virtual_chapter_number);
1302 			return;
1303 		}
1304 
1305 		if (expected_list_number != page->lowest_list_number) {
1306 			vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u",
1307 				      chapter_number, i, expected_list_number,
1308 				      page->lowest_list_number);
1309 			return;
1310 		}
1311 		expected_list_number = page->highest_list_number + 1;
1312 
1313 		result = uds_validate_chapter_index_page(page, geometry);
1314 		if (result != UDS_SUCCESS)
1315 			return;
1316 	}
1317 
1318 	if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) {
1319 		vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number,
1320 			      (unsigned long long) vcn, geometry->chapters_per_volume);
1321 		return;
1322 	}
1323 
1324 	*virtual_chapter_number = vcn;
1325 }
1326 
1327 /* Find the last valid physical chapter in the volume. */
find_real_end_of_volume(struct volume * volume,u32 limit,u32 * limit_ptr)1328 static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr)
1329 {
1330 	u32 span = 1;
1331 	u32 tries = 0;
1332 
1333 	while (limit > 0) {
1334 		u32 chapter = (span > limit) ? 0 : limit - span;
1335 		u64 vcn = 0;
1336 
1337 		probe_chapter(volume, chapter, &vcn);
1338 		if (vcn == BAD_CHAPTER) {
1339 			limit = chapter;
1340 			if (++tries > 1)
1341 				span *= 2;
1342 		} else {
1343 			if (span == 1)
1344 				break;
1345 			span /= 2;
1346 			tries = 0;
1347 		}
1348 	}
1349 
1350 	*limit_ptr = limit;
1351 }
1352 
find_chapter_limits(struct volume * volume,u32 chapter_limit,u64 * lowest_vcn,u64 * highest_vcn)1353 static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn,
1354 			       u64 *highest_vcn)
1355 {
1356 	struct index_geometry *geometry = volume->geometry;
1357 	u64 zero_vcn;
1358 	u64 lowest = BAD_CHAPTER;
1359 	u64 highest = BAD_CHAPTER;
1360 	u64 moved_chapter = BAD_CHAPTER;
1361 	u32 left_chapter = 0;
1362 	u32 right_chapter = 0;
1363 	u32 bad_chapters = 0;
1364 
1365 	/*
1366 	 * This method assumes there is at most one run of contiguous bad chapters caused by
1367 	 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the
1368 	 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the
1369 	 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately
1370 	 * precedes the lowest one.
1371 	 */
1372 
1373 	/* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */
1374 	probe_chapter(volume, 0, &zero_vcn);
1375 
1376 	/*
1377 	 * Binary search for end of the discontinuity in the monotonically increasing virtual
1378 	 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're
1379 	 * searching for the index of the smallest value less than zero_vcn. In the case we go off
1380 	 * the end it means that chapter 0 has the lowest vcn.
1381 	 *
1382 	 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always
1383 	 * skip over the moved chapter when searching, adding it to the range at the end if
1384 	 * necessary.
1385 	 */
1386 	if (geometry->remapped_physical > 0) {
1387 		u64 remapped_vcn;
1388 
1389 		probe_chapter(volume, geometry->remapped_physical, &remapped_vcn);
1390 		if (remapped_vcn == geometry->remapped_virtual)
1391 			moved_chapter = geometry->remapped_physical;
1392 	}
1393 
1394 	left_chapter = 0;
1395 	right_chapter = chapter_limit;
1396 
1397 	while (left_chapter < right_chapter) {
1398 		u64 probe_vcn;
1399 		u32 chapter = (left_chapter + right_chapter) / 2;
1400 
1401 		if (chapter == moved_chapter)
1402 			chapter--;
1403 
1404 		probe_chapter(volume, chapter, &probe_vcn);
1405 		if (zero_vcn <= probe_vcn) {
1406 			left_chapter = chapter + 1;
1407 			if (left_chapter == moved_chapter)
1408 				left_chapter++;
1409 		} else {
1410 			right_chapter = chapter;
1411 		}
1412 	}
1413 
1414 	/* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/
1415 	if (left_chapter >= chapter_limit)
1416 		left_chapter = 0;
1417 
1418 	/* At this point, left_chapter is the chapter with the lowest virtual chapter number. */
1419 	probe_chapter(volume, left_chapter, &lowest);
1420 
1421 	/* The moved chapter might be the lowest in the range. */
1422 	if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1))
1423 		lowest = geometry->remapped_virtual;
1424 
1425 	/*
1426 	 * Circularly scan backwards, moving over any bad chapters until encountering a good one,
1427 	 * which is the chapter with the highest vcn.
1428 	 */
1429 	while (highest == BAD_CHAPTER) {
1430 		right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit;
1431 		if (right_chapter == moved_chapter)
1432 			continue;
1433 
1434 		probe_chapter(volume, right_chapter, &highest);
1435 		if (bad_chapters++ >= MAX_BAD_CHAPTERS) {
1436 			vdo_log_error("too many bad chapters in volume: %u",
1437 				      bad_chapters);
1438 			return UDS_CORRUPT_DATA;
1439 		}
1440 	}
1441 
1442 	*lowest_vcn = lowest;
1443 	*highest_vcn = highest;
1444 	return UDS_SUCCESS;
1445 }
1446 
1447 /*
1448  * Find the highest and lowest contiguous chapters present in the volume and determine their
1449  * virtual chapter numbers. This is used by rebuild.
1450  */
uds_find_volume_chapter_boundaries(struct volume * volume,u64 * lowest_vcn,u64 * highest_vcn,bool * is_empty)1451 int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn,
1452 				       u64 *highest_vcn, bool *is_empty)
1453 {
1454 	u32 chapter_limit = volume->geometry->chapters_per_volume;
1455 
1456 	find_real_end_of_volume(volume, chapter_limit, &chapter_limit);
1457 	if (chapter_limit == 0) {
1458 		*lowest_vcn = 0;
1459 		*highest_vcn = 0;
1460 		*is_empty = true;
1461 		return UDS_SUCCESS;
1462 	}
1463 
1464 	*is_empty = false;
1465 	return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn);
1466 }
1467 
uds_replace_volume_storage(struct volume * volume,struct index_layout * layout,struct block_device * bdev)1468 int __must_check uds_replace_volume_storage(struct volume *volume,
1469 					    struct index_layout *layout,
1470 					    struct block_device *bdev)
1471 {
1472 	int result;
1473 	u32 i;
1474 
1475 	result = uds_replace_index_layout_storage(layout, bdev);
1476 	if (result != UDS_SUCCESS)
1477 		return result;
1478 
1479 	/* Release all outstanding dm_bufio objects */
1480 	for (i = 0; i < volume->page_cache.indexable_pages; i++)
1481 		volume->page_cache.index[i] = volume->page_cache.cache_slots;
1482 	for (i = 0; i < volume->page_cache.cache_slots; i++)
1483 		clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]);
1484 	if (volume->sparse_cache != NULL)
1485 		uds_invalidate_sparse_cache(volume->sparse_cache);
1486 	if (volume->client != NULL)
1487 		dm_bufio_client_destroy(vdo_forget(volume->client));
1488 
1489 	return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page,
1490 				     volume->reserved_buffers, &volume->client);
1491 }
1492 
initialize_page_cache(struct page_cache * cache,const struct index_geometry * geometry,u32 chapters_in_cache,unsigned int zone_count)1493 static int __must_check initialize_page_cache(struct page_cache *cache,
1494 					      const struct index_geometry *geometry,
1495 					      u32 chapters_in_cache,
1496 					      unsigned int zone_count)
1497 {
1498 	int result;
1499 	u32 i;
1500 
1501 	cache->indexable_pages = geometry->pages_per_volume + 1;
1502 	cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter;
1503 	cache->zone_count = zone_count;
1504 	atomic64_set(&cache->clock, 1);
1505 
1506 	result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES),
1507 			    "requested cache size, %u, within limit %u",
1508 			    cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES);
1509 	if (result != VDO_SUCCESS)
1510 		return result;
1511 
1512 	result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read,
1513 			      "volume read queue", &cache->read_queue);
1514 	if (result != VDO_SUCCESS)
1515 		return result;
1516 
1517 	result = vdo_allocate(cache->zone_count, struct search_pending_counter,
1518 			      "Volume Cache Zones", &cache->search_pending_counters);
1519 	if (result != VDO_SUCCESS)
1520 		return result;
1521 
1522 	result = vdo_allocate(cache->indexable_pages, u16, "page cache index",
1523 			      &cache->index);
1524 	if (result != VDO_SUCCESS)
1525 		return result;
1526 
1527 	result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache",
1528 			      &cache->cache);
1529 	if (result != VDO_SUCCESS)
1530 		return result;
1531 
1532 	/* Initialize index values to invalid values. */
1533 	for (i = 0; i < cache->indexable_pages; i++)
1534 		cache->index[i] = cache->cache_slots;
1535 
1536 	for (i = 0; i < cache->cache_slots; i++)
1537 		clear_cache_page(cache, &cache->cache[i]);
1538 
1539 	return UDS_SUCCESS;
1540 }
1541 
uds_make_volume(const struct uds_configuration * config,struct index_layout * layout,struct volume ** new_volume)1542 int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout,
1543 		    struct volume **new_volume)
1544 {
1545 	unsigned int i;
1546 	struct volume *volume = NULL;
1547 	struct index_geometry *geometry;
1548 	unsigned int reserved_buffers;
1549 	int result;
1550 
1551 	result = vdo_allocate(1, struct volume, "volume", &volume);
1552 	if (result != VDO_SUCCESS)
1553 		return result;
1554 
1555 	volume->nonce = uds_get_volume_nonce(layout);
1556 
1557 	result = uds_copy_index_geometry(config->geometry, &volume->geometry);
1558 	if (result != UDS_SUCCESS) {
1559 		uds_free_volume(volume);
1560 		return vdo_log_warning_strerror(result,
1561 						"failed to allocate geometry: error");
1562 	}
1563 	geometry = volume->geometry;
1564 
1565 	/*
1566 	 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one
1567 	 * for each entry in the sparse cache.
1568 	 */
1569 	reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter;
1570 	reserved_buffers += 1;
1571 	if (uds_is_sparse_index_geometry(geometry))
1572 		reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter);
1573 	volume->reserved_buffers = reserved_buffers;
1574 	result = uds_open_volume_bufio(layout, geometry->bytes_per_page,
1575 				       volume->reserved_buffers, &volume->client);
1576 	if (result != UDS_SUCCESS) {
1577 		uds_free_volume(volume);
1578 		return result;
1579 	}
1580 
1581 	result = uds_make_radix_sorter(geometry->records_per_page,
1582 				       &volume->radix_sorter);
1583 	if (result != UDS_SUCCESS) {
1584 		uds_free_volume(volume);
1585 		return result;
1586 	}
1587 
1588 	result = vdo_allocate(geometry->records_per_page,
1589 			      const struct uds_volume_record *, "record pointers",
1590 			      &volume->record_pointers);
1591 	if (result != VDO_SUCCESS) {
1592 		uds_free_volume(volume);
1593 		return result;
1594 	}
1595 
1596 	if (uds_is_sparse_index_geometry(geometry)) {
1597 		size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page;
1598 
1599 		result = uds_make_sparse_cache(geometry, config->cache_chapters,
1600 					       config->zone_count,
1601 					       &volume->sparse_cache);
1602 		if (result != UDS_SUCCESS) {
1603 			uds_free_volume(volume);
1604 			return result;
1605 		}
1606 
1607 		volume->cache_size =
1608 			page_size * geometry->index_pages_per_chapter * config->cache_chapters;
1609 	}
1610 
1611 	result = initialize_page_cache(&volume->page_cache, geometry,
1612 				       config->cache_chapters, config->zone_count);
1613 	if (result != UDS_SUCCESS) {
1614 		uds_free_volume(volume);
1615 		return result;
1616 	}
1617 
1618 	volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page);
1619 	result = uds_make_index_page_map(geometry, &volume->index_page_map);
1620 	if (result != UDS_SUCCESS) {
1621 		uds_free_volume(volume);
1622 		return result;
1623 	}
1624 
1625 	mutex_init(&volume->read_threads_mutex);
1626 	uds_init_cond(&volume->read_threads_read_done_cond);
1627 	uds_init_cond(&volume->read_threads_cond);
1628 
1629 	result = vdo_allocate(config->read_threads, struct thread *, "reader threads",
1630 			      &volume->reader_threads);
1631 	if (result != VDO_SUCCESS) {
1632 		uds_free_volume(volume);
1633 		return result;
1634 	}
1635 
1636 	for (i = 0; i < config->read_threads; i++) {
1637 		result = vdo_create_thread(read_thread_function, (void *) volume,
1638 					   "reader", &volume->reader_threads[i]);
1639 		if (result != VDO_SUCCESS) {
1640 			uds_free_volume(volume);
1641 			return result;
1642 		}
1643 
1644 		volume->read_thread_count = i + 1;
1645 	}
1646 
1647 	*new_volume = volume;
1648 	return UDS_SUCCESS;
1649 }
1650 
uninitialize_page_cache(struct page_cache * cache)1651 static void uninitialize_page_cache(struct page_cache *cache)
1652 {
1653 	u16 i;
1654 
1655 	if (cache->cache != NULL) {
1656 		for (i = 0; i < cache->cache_slots; i++)
1657 			release_page_buffer(&cache->cache[i]);
1658 	}
1659 	vdo_free(cache->index);
1660 	vdo_free(cache->cache);
1661 	vdo_free(cache->search_pending_counters);
1662 	vdo_free(cache->read_queue);
1663 }
1664 
uds_free_volume(struct volume * volume)1665 void uds_free_volume(struct volume *volume)
1666 {
1667 	if (volume == NULL)
1668 		return;
1669 
1670 	if (volume->reader_threads != NULL) {
1671 		unsigned int i;
1672 
1673 		/* This works even if some threads weren't started. */
1674 		mutex_lock(&volume->read_threads_mutex);
1675 		volume->read_threads_exiting = true;
1676 		uds_broadcast_cond(&volume->read_threads_cond);
1677 		mutex_unlock(&volume->read_threads_mutex);
1678 		for (i = 0; i < volume->read_thread_count; i++)
1679 			vdo_join_threads(volume->reader_threads[i]);
1680 		vdo_free(volume->reader_threads);
1681 		volume->reader_threads = NULL;
1682 	}
1683 
1684 	/* Must destroy the client AFTER freeing the cached pages. */
1685 	uninitialize_page_cache(&volume->page_cache);
1686 	uds_free_sparse_cache(volume->sparse_cache);
1687 	if (volume->client != NULL)
1688 		dm_bufio_client_destroy(vdo_forget(volume->client));
1689 
1690 	uds_free_index_page_map(volume->index_page_map);
1691 	uds_free_radix_sorter(volume->radix_sorter);
1692 	vdo_free(volume->geometry);
1693 	vdo_free(volume->record_pointers);
1694 	vdo_free(volume);
1695 }
1696