xref: /linux/drivers/md/dm-vdo/block-map.c (revision a5f998094fa344cdd1342164948abb4d7c6101ce)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "block-map.h"
7 
8 #include <linux/bio.h>
9 #include <linux/ratelimit.h>
10 
11 #include "errors.h"
12 #include "logger.h"
13 #include "memory-alloc.h"
14 #include "permassert.h"
15 
16 #include "action-manager.h"
17 #include "admin-state.h"
18 #include "completion.h"
19 #include "constants.h"
20 #include "data-vio.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "recovery-journal.h"
25 #include "slab-depot.h"
26 #include "status-codes.h"
27 #include "types.h"
28 #include "vdo.h"
29 #include "vio.h"
30 #include "wait-queue.h"
31 
32 /**
33  * DOC: Block map eras
34  *
35  * The block map era, or maximum age, is used as follows:
36  *
37  * Each block map page, when dirty, records the earliest recovery journal block sequence number of
38  * the changes reflected in that dirty block. Sequence numbers are classified into eras: every
39  * @maximum_age sequence numbers, we switch to a new era. Block map pages are assigned to eras
40  * according to the sequence number they record.
41  *
42  * In the current (newest) era, block map pages are not written unless there is cache pressure. In
43  * the next oldest era, each time a new journal block is written 1/@maximum_age of the pages in
44  * this era are issued for write. In all older eras, pages are issued for write immediately.
45  */
46 
47 struct page_descriptor {
48 	root_count_t root_index;
49 	height_t height;
50 	page_number_t page_index;
51 	slot_number_t slot;
52 } __packed;
53 
54 union page_key {
55 	struct page_descriptor descriptor;
56 	u64 key;
57 };
58 
59 struct write_if_not_dirtied_context {
60 	struct block_map_zone *zone;
61 	u8 generation;
62 };
63 
64 struct block_map_tree_segment {
65 	struct tree_page *levels[VDO_BLOCK_MAP_TREE_HEIGHT];
66 };
67 
68 struct block_map_tree {
69 	struct block_map_tree_segment *segments;
70 };
71 
72 struct forest {
73 	struct block_map *map;
74 	size_t segments;
75 	struct boundary *boundaries;
76 	struct tree_page **pages;
77 	struct block_map_tree trees[];
78 };
79 
80 struct cursor_level {
81 	page_number_t page_index;
82 	slot_number_t slot;
83 };
84 
85 struct cursors;
86 
87 struct cursor {
88 	struct vdo_waiter waiter;
89 	struct block_map_tree *tree;
90 	height_t height;
91 	struct cursors *parent;
92 	struct boundary boundary;
93 	struct cursor_level levels[VDO_BLOCK_MAP_TREE_HEIGHT];
94 	struct pooled_vio *vio;
95 };
96 
97 struct cursors {
98 	struct block_map_zone *zone;
99 	struct vio_pool *pool;
100 	vdo_entry_callback_fn entry_callback;
101 	struct vdo_completion *completion;
102 	root_count_t active_roots;
103 	struct cursor cursors[];
104 };
105 
106 static const physical_block_number_t NO_PAGE = 0xFFFFFFFFFFFFFFFF;
107 
108 /* Used to indicate that the page holding the location of a tree root has been "loaded". */
109 static const physical_block_number_t VDO_INVALID_PBN = 0xFFFFFFFFFFFFFFFF;
110 
111 const struct block_map_entry UNMAPPED_BLOCK_MAP_ENTRY = {
112 	.mapping_state = VDO_MAPPING_STATE_UNMAPPED & 0x0F,
113 	.pbn_high_nibble = 0,
114 	.pbn_low_word = __cpu_to_le32(VDO_ZERO_BLOCK & UINT_MAX),
115 };
116 
117 #define LOG_INTERVAL 4000
118 #define DISPLAY_INTERVAL 100000
119 
120 /*
121  * For adjusting VDO page cache statistic fields which are only mutated on the logical zone thread.
122  * Prevents any compiler shenanigans from affecting other threads reading those stats.
123  */
124 #define ADD_ONCE(value, delta) WRITE_ONCE(value, (value) + (delta))
125 
is_dirty(const struct page_info * info)126 static inline bool is_dirty(const struct page_info *info)
127 {
128 	return info->state == PS_DIRTY;
129 }
130 
is_present(const struct page_info * info)131 static inline bool is_present(const struct page_info *info)
132 {
133 	return (info->state == PS_RESIDENT) || (info->state == PS_DIRTY);
134 }
135 
is_in_flight(const struct page_info * info)136 static inline bool is_in_flight(const struct page_info *info)
137 {
138 	return (info->state == PS_INCOMING) || (info->state == PS_OUTGOING);
139 }
140 
is_incoming(const struct page_info * info)141 static inline bool is_incoming(const struct page_info *info)
142 {
143 	return info->state == PS_INCOMING;
144 }
145 
is_outgoing(const struct page_info * info)146 static inline bool is_outgoing(const struct page_info *info)
147 {
148 	return info->state == PS_OUTGOING;
149 }
150 
is_valid(const struct page_info * info)151 static inline bool is_valid(const struct page_info *info)
152 {
153 	return is_present(info) || is_outgoing(info);
154 }
155 
get_page_buffer(struct page_info * info)156 static char *get_page_buffer(struct page_info *info)
157 {
158 	struct vdo_page_cache *cache = info->cache;
159 
160 	return &cache->pages[(info - cache->infos) * VDO_BLOCK_SIZE];
161 }
162 
page_completion_from_waiter(struct vdo_waiter * waiter)163 static inline struct vdo_page_completion *page_completion_from_waiter(struct vdo_waiter *waiter)
164 {
165 	struct vdo_page_completion *completion;
166 
167 	if (waiter == NULL)
168 		return NULL;
169 
170 	completion = container_of(waiter, struct vdo_page_completion, waiter);
171 	vdo_assert_completion_type(&completion->completion, VDO_PAGE_COMPLETION);
172 	return completion;
173 }
174 
175 /**
176  * initialize_info() - Initialize all page info structures and put them on the free list.
177  * @cache: The page cache.
178  *
179  * Return: VDO_SUCCESS or an error.
180  */
initialize_info(struct vdo_page_cache * cache)181 static int initialize_info(struct vdo_page_cache *cache)
182 {
183 	struct page_info *info;
184 
185 	INIT_LIST_HEAD(&cache->free_list);
186 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
187 		int result;
188 
189 		info->cache = cache;
190 		info->state = PS_FREE;
191 		info->pbn = NO_PAGE;
192 
193 		result = create_metadata_vio(cache->vdo, VIO_TYPE_BLOCK_MAP,
194 					     VIO_PRIORITY_METADATA, info,
195 					     get_page_buffer(info), &info->vio);
196 		if (result != VDO_SUCCESS)
197 			return result;
198 
199 		/* The thread ID should never change. */
200 		info->vio->completion.callback_thread_id = cache->zone->thread_id;
201 
202 		INIT_LIST_HEAD(&info->state_entry);
203 		list_add_tail(&info->state_entry, &cache->free_list);
204 		INIT_LIST_HEAD(&info->lru_entry);
205 	}
206 
207 	return VDO_SUCCESS;
208 }
209 
210 /**
211  * allocate_cache_components() - Allocate components of the cache which require their own
212  *                               allocation.
213  * @cache: The page cache.
214  *
215  * The caller is responsible for all clean up on errors.
216  *
217  * Return: VDO_SUCCESS or an error code.
218  */
allocate_cache_components(struct vdo_page_cache * cache)219 static int __must_check allocate_cache_components(struct vdo_page_cache *cache)
220 {
221 	u64 size = cache->page_count * (u64) VDO_BLOCK_SIZE;
222 	int result;
223 
224 	result = vdo_allocate(cache->page_count, "page infos", &cache->infos);
225 	if (result != VDO_SUCCESS)
226 		return result;
227 
228 	result = vdo_allocate_memory(size, VDO_BLOCK_SIZE, "cache pages", &cache->pages);
229 	if (result != VDO_SUCCESS)
230 		return result;
231 
232 	result = vdo_int_map_create(cache->page_count, &cache->page_map);
233 	if (result != VDO_SUCCESS)
234 		return result;
235 
236 	return initialize_info(cache);
237 }
238 
239 /**
240  * assert_on_cache_thread() - Assert that a function has been called on the VDO page cache's
241  *                            thread.
242  * @cache: The page cache.
243  * @function_name: The funtion name to report if the assertion fails.
244  */
assert_on_cache_thread(struct vdo_page_cache * cache,const char * function_name)245 static inline void assert_on_cache_thread(struct vdo_page_cache *cache,
246 					  const char *function_name)
247 {
248 	thread_id_t thread_id = vdo_get_callback_thread_id();
249 
250 	VDO_ASSERT_LOG_ONLY((thread_id == cache->zone->thread_id),
251 			    "%s() must only be called on cache thread %d, not thread %d",
252 			    function_name, cache->zone->thread_id, thread_id);
253 }
254 
255 /** assert_io_allowed() - Assert that a page cache may issue I/O. */
assert_io_allowed(struct vdo_page_cache * cache)256 static inline void assert_io_allowed(struct vdo_page_cache *cache)
257 {
258 	VDO_ASSERT_LOG_ONLY(!vdo_is_state_quiescent(&cache->zone->state),
259 			    "VDO page cache may issue I/O");
260 }
261 
262 /** report_cache_pressure() - Log and, if enabled, report cache pressure. */
report_cache_pressure(struct vdo_page_cache * cache)263 static void report_cache_pressure(struct vdo_page_cache *cache)
264 {
265 	ADD_ONCE(cache->stats.cache_pressure, 1);
266 	if (cache->waiter_count > cache->page_count) {
267 		if ((cache->pressure_report % LOG_INTERVAL) == 0)
268 			vdo_log_info("page cache pressure %u", cache->stats.cache_pressure);
269 
270 		if (++cache->pressure_report >= DISPLAY_INTERVAL)
271 			cache->pressure_report = 0;
272 	}
273 }
274 
275 /**
276  * get_page_state_name() - Return the name of a page state.
277  * @state: The page state to describe.
278  *
279  * If the page state is invalid a static string is returned and the invalid state is logged.
280  *
281  * Return: A pointer to a static page state name.
282  */
get_page_state_name(enum vdo_page_buffer_state state)283 static const char * __must_check get_page_state_name(enum vdo_page_buffer_state state)
284 {
285 	int result;
286 	static const char * const state_names[] = {
287 		"FREE", "INCOMING", "FAILED", "RESIDENT", "DIRTY", "OUTGOING"
288 	};
289 
290 	BUILD_BUG_ON(ARRAY_SIZE(state_names) != PAGE_STATE_COUNT);
291 
292 	result = VDO_ASSERT(state < ARRAY_SIZE(state_names),
293 			    "Unknown page_state value %d", state);
294 	if (result != VDO_SUCCESS)
295 		return "[UNKNOWN PAGE STATE]";
296 
297 	return state_names[state];
298 }
299 
300 /**
301  * update_counter() - Update the counter associated with a given state.
302  * @info: The page info to count.
303  * @delta: The delta to apply to the counter.
304  */
update_counter(struct page_info * info,s32 delta)305 static void update_counter(struct page_info *info, s32 delta)
306 {
307 	struct block_map_statistics *stats = &info->cache->stats;
308 
309 	switch (info->state) {
310 	case PS_FREE:
311 		ADD_ONCE(stats->free_pages, delta);
312 		return;
313 
314 	case PS_INCOMING:
315 		ADD_ONCE(stats->incoming_pages, delta);
316 		return;
317 
318 	case PS_OUTGOING:
319 		ADD_ONCE(stats->outgoing_pages, delta);
320 		return;
321 
322 	case PS_FAILED:
323 		ADD_ONCE(stats->failed_pages, delta);
324 		return;
325 
326 	case PS_RESIDENT:
327 		ADD_ONCE(stats->clean_pages, delta);
328 		return;
329 
330 	case PS_DIRTY:
331 		ADD_ONCE(stats->dirty_pages, delta);
332 		return;
333 
334 	default:
335 		return;
336 	}
337 }
338 
339 /** update_lru() - Update the lru information for an active page. */
update_lru(struct page_info * info)340 static void update_lru(struct page_info *info)
341 {
342 	if (info->cache->lru_list.prev != &info->lru_entry)
343 		list_move_tail(&info->lru_entry, &info->cache->lru_list);
344 }
345 
346 /**
347  * set_info_state() - Set the state of a page_info and put it on the right list, adjusting
348  *                    counters.
349  * @info: The page info to update.
350  * @new_state: The new state to set.
351  */
set_info_state(struct page_info * info,enum vdo_page_buffer_state new_state)352 static void set_info_state(struct page_info *info, enum vdo_page_buffer_state new_state)
353 {
354 	if (new_state == info->state)
355 		return;
356 
357 	update_counter(info, -1);
358 	info->state = new_state;
359 	update_counter(info, 1);
360 
361 	switch (info->state) {
362 	case PS_FREE:
363 	case PS_FAILED:
364 		list_move_tail(&info->state_entry, &info->cache->free_list);
365 		return;
366 
367 	case PS_OUTGOING:
368 		list_move_tail(&info->state_entry, &info->cache->outgoing_list);
369 		return;
370 
371 	case PS_DIRTY:
372 		return;
373 
374 	default:
375 		list_del_init(&info->state_entry);
376 	}
377 }
378 
379 /** set_info_pbn() - Set the pbn for an info, updating the map as needed. */
set_info_pbn(struct page_info * info,physical_block_number_t pbn)380 static int __must_check set_info_pbn(struct page_info *info, physical_block_number_t pbn)
381 {
382 	struct vdo_page_cache *cache = info->cache;
383 
384 	/* Either the new or the old page number must be NO_PAGE. */
385 	int result = VDO_ASSERT((pbn == NO_PAGE) || (info->pbn == NO_PAGE),
386 				"Must free a page before reusing it.");
387 	if (result != VDO_SUCCESS)
388 		return result;
389 
390 	if (info->pbn != NO_PAGE)
391 		vdo_int_map_remove(cache->page_map, info->pbn);
392 
393 	info->pbn = pbn;
394 
395 	if (pbn != NO_PAGE) {
396 		result = vdo_int_map_put(cache->page_map, pbn, info, true, NULL);
397 		if (result != VDO_SUCCESS)
398 			return result;
399 	}
400 	return VDO_SUCCESS;
401 }
402 
403 /** reset_page_info() - Reset page info to represent an unallocated page. */
reset_page_info(struct page_info * info)404 static int reset_page_info(struct page_info *info)
405 {
406 	int result;
407 
408 	result = VDO_ASSERT(info->busy == 0, "VDO Page must not be busy");
409 	if (result != VDO_SUCCESS)
410 		return result;
411 
412 	result = VDO_ASSERT(!vdo_waitq_has_waiters(&info->waiting),
413 			    "VDO Page must not have waiters");
414 	if (result != VDO_SUCCESS)
415 		return result;
416 
417 	result = set_info_pbn(info, NO_PAGE);
418 	set_info_state(info, PS_FREE);
419 	list_del_init(&info->lru_entry);
420 	return result;
421 }
422 
423 /**
424  * find_free_page() - Find a free page.
425  * @cache: The page cache.
426  *
427  * Return: A pointer to the page info structure (if found), NULL otherwise.
428  */
find_free_page(struct vdo_page_cache * cache)429 static struct page_info * __must_check find_free_page(struct vdo_page_cache *cache)
430 {
431 	struct page_info *info;
432 
433 	info = list_first_entry_or_null(&cache->free_list, struct page_info,
434 					state_entry);
435 	if (info != NULL)
436 		list_del_init(&info->state_entry);
437 
438 	return info;
439 }
440 
441 /**
442  * find_page() - Find the page info (if any) associated with a given pbn.
443  * @cache: The page cache.
444  * @pbn: The absolute physical block number of the page.
445  *
446  * Return: The page info for the page if available, or NULL if not.
447  */
find_page(struct vdo_page_cache * cache,physical_block_number_t pbn)448 static struct page_info * __must_check find_page(struct vdo_page_cache *cache,
449 						 physical_block_number_t pbn)
450 {
451 	if ((cache->last_found != NULL) && (cache->last_found->pbn == pbn))
452 		return cache->last_found;
453 
454 	cache->last_found = vdo_int_map_get(cache->page_map, pbn);
455 	return cache->last_found;
456 }
457 
458 /**
459  * select_lru_page() - Determine which page is least recently used.
460  * @cache: The page cache.
461  *
462  * Picks the least recently used from among the non-busy entries at the front of each of the lru
463  * list. Since whenever we mark a page busy we also put it to the end of the list it is unlikely
464  * that the entries at the front are busy unless the queue is very short, but not impossible.
465  *
466  * Return: A pointer to the info structure for a relevant page, or NULL if no such page can be
467  *         found. The page can be dirty or resident.
468  */
select_lru_page(struct vdo_page_cache * cache)469 static struct page_info * __must_check select_lru_page(struct vdo_page_cache *cache)
470 {
471 	struct page_info *info;
472 
473 	list_for_each_entry(info, &cache->lru_list, lru_entry)
474 		if ((info->busy == 0) && !is_in_flight(info))
475 			return info;
476 
477 	return NULL;
478 }
479 
480 /* ASYNCHRONOUS INTERFACE BEYOND THIS POINT */
481 
482 /**
483  * complete_with_page() - Helper to complete the VDO Page Completion request successfully.
484  * @info: The page info representing the result page.
485  * @vdo_page_comp: The VDO page completion to complete.
486  */
complete_with_page(struct page_info * info,struct vdo_page_completion * vdo_page_comp)487 static void complete_with_page(struct page_info *info,
488 			       struct vdo_page_completion *vdo_page_comp)
489 {
490 	bool available = vdo_page_comp->writable ? is_present(info) : is_valid(info);
491 
492 	if (!available) {
493 		vdo_log_error_strerror(VDO_BAD_PAGE,
494 				       "Requested cache page %llu in state %s is not %s",
495 				       (unsigned long long) info->pbn,
496 				       get_page_state_name(info->state),
497 				       vdo_page_comp->writable ? "present" : "valid");
498 		vdo_fail_completion(&vdo_page_comp->completion, VDO_BAD_PAGE);
499 		return;
500 	}
501 
502 	vdo_page_comp->info = info;
503 	vdo_page_comp->ready = true;
504 	vdo_finish_completion(&vdo_page_comp->completion);
505 }
506 
507 /**
508  * complete_waiter_with_error() - Complete a page completion with an error code.
509  * @waiter: The page completion, as a waiter.
510  * @result_ptr: A pointer to the error code.
511  *
512  * Implements waiter_callback_fn.
513  */
complete_waiter_with_error(struct vdo_waiter * waiter,void * result_ptr)514 static void complete_waiter_with_error(struct vdo_waiter *waiter, void *result_ptr)
515 {
516 	int *result = result_ptr;
517 
518 	vdo_fail_completion(&page_completion_from_waiter(waiter)->completion, *result);
519 }
520 
521 /**
522  * complete_waiter_with_page() - Complete a page completion with a page.
523  * @waiter: The page completion, as a waiter.
524  * @page_info: The page info to complete with.
525  *
526  * Implements waiter_callback_fn.
527  */
complete_waiter_with_page(struct vdo_waiter * waiter,void * page_info)528 static void complete_waiter_with_page(struct vdo_waiter *waiter, void *page_info)
529 {
530 	complete_with_page(page_info, page_completion_from_waiter(waiter));
531 }
532 
533 /**
534  * distribute_page_over_waitq() - Complete a waitq of VDO page completions with a page result.
535  * @info: The loaded page info.
536  * @waitq: The list of waiting data_vios.
537  *
538  * Upon completion the waitq will be empty.
539  *
540  * Return: The number of pages distributed.
541  */
distribute_page_over_waitq(struct page_info * info,struct vdo_wait_queue * waitq)542 static unsigned int distribute_page_over_waitq(struct page_info *info,
543 					       struct vdo_wait_queue *waitq)
544 {
545 	size_t num_pages;
546 
547 	update_lru(info);
548 	num_pages = vdo_waitq_num_waiters(waitq);
549 
550 	/*
551 	 * Increment the busy count once for each pending completion so that this page does not
552 	 * stop being busy until all completions have been processed.
553 	 */
554 	info->busy += num_pages;
555 
556 	vdo_waitq_notify_all_waiters(waitq, complete_waiter_with_page, info);
557 	return num_pages;
558 }
559 
560 /**
561  * set_persistent_error() - Set a persistent error which all requests will receive in the future.
562  * @cache: The page cache.
563  * @context: A string describing what triggered the error.
564  * @result: The error result to set on the cache.
565  *
566  * Once triggered, all enqueued completions will get this error. Any future requests will result in
567  * this error as well.
568  */
set_persistent_error(struct vdo_page_cache * cache,const char * context,int result)569 static void set_persistent_error(struct vdo_page_cache *cache, const char *context,
570 				 int result)
571 {
572 	struct page_info *info;
573 	/* If we're already read-only, there's no need to log. */
574 	struct vdo *vdo = cache->vdo;
575 
576 	if ((result != VDO_READ_ONLY) && !vdo_is_read_only(vdo)) {
577 		vdo_log_error_strerror(result, "VDO Page Cache persistent error: %s",
578 				       context);
579 		vdo_enter_read_only_mode(vdo, result);
580 	}
581 
582 	assert_on_cache_thread(cache, __func__);
583 
584 	vdo_waitq_notify_all_waiters(&cache->free_waiters,
585 				     complete_waiter_with_error, &result);
586 	cache->waiter_count = 0;
587 
588 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
589 		vdo_waitq_notify_all_waiters(&info->waiting,
590 					     complete_waiter_with_error, &result);
591 	}
592 }
593 
594 /**
595  * validate_completed_page() - Check that a page completion which is being freed to the cache
596  *                             referred to a valid page and is in a valid state.
597  * @completion: The page completion to check.
598  * @writable: Whether a writable page is required.
599  *
600  * Return: VDO_SUCCESS if the page was valid, otherwise as error
601  */
validate_completed_page(struct vdo_page_completion * completion,bool writable)602 static int __must_check validate_completed_page(struct vdo_page_completion *completion,
603 						bool writable)
604 {
605 	int result;
606 
607 	result = VDO_ASSERT(completion->ready, "VDO Page completion not ready");
608 	if (result != VDO_SUCCESS)
609 		return result;
610 
611 	result = VDO_ASSERT(completion->info != NULL,
612 			    "VDO Page Completion must be complete");
613 	if (result != VDO_SUCCESS)
614 		return result;
615 
616 	result = VDO_ASSERT(completion->info->pbn == completion->pbn,
617 			    "VDO Page Completion pbn must be consistent");
618 	if (result != VDO_SUCCESS)
619 		return result;
620 
621 	result = VDO_ASSERT(is_valid(completion->info),
622 			    "VDO Page Completion page must be valid");
623 	if (result != VDO_SUCCESS)
624 		return result;
625 
626 	if (writable) {
627 		result = VDO_ASSERT(completion->writable,
628 				    "VDO Page Completion must be writable");
629 		if (result != VDO_SUCCESS)
630 			return result;
631 	}
632 
633 	return VDO_SUCCESS;
634 }
635 
check_for_drain_complete(struct block_map_zone * zone)636 static void check_for_drain_complete(struct block_map_zone *zone)
637 {
638 	if (vdo_is_state_draining(&zone->state) &&
639 	    (zone->active_lookups == 0) &&
640 	    !vdo_waitq_has_waiters(&zone->flush_waiters) &&
641 	    !is_vio_pool_busy(zone->vio_pool) &&
642 	    (zone->page_cache.outstanding_reads == 0) &&
643 	    (zone->page_cache.outstanding_writes == 0)) {
644 		vdo_finish_draining_with_result(&zone->state,
645 						(vdo_is_read_only(zone->block_map->vdo) ?
646 						 VDO_READ_ONLY : VDO_SUCCESS));
647 	}
648 }
649 
enter_zone_read_only_mode(struct block_map_zone * zone,int result)650 static void enter_zone_read_only_mode(struct block_map_zone *zone, int result)
651 {
652 	vdo_enter_read_only_mode(zone->block_map->vdo, result);
653 
654 	/*
655 	 * We are in read-only mode, so we won't ever write any page out.
656 	 * Just take all waiters off the waitq so the zone can drain.
657 	 */
658 	vdo_waitq_init(&zone->flush_waiters);
659 	check_for_drain_complete(zone);
660 }
661 
662 static bool __must_check
validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion * completion,bool writable)663 validate_completed_page_or_enter_read_only_mode(struct vdo_page_completion *completion,
664 						bool writable)
665 {
666 	int result = validate_completed_page(completion, writable);
667 
668 	if (result == VDO_SUCCESS)
669 		return true;
670 
671 	enter_zone_read_only_mode(completion->info->cache->zone, result);
672 	return false;
673 }
674 
675 /**
676  * handle_load_error() - Handle page load errors.
677  * @completion: The page read vio.
678  */
handle_load_error(struct vdo_completion * completion)679 static void handle_load_error(struct vdo_completion *completion)
680 {
681 	int result = completion->result;
682 	struct page_info *info = completion->parent;
683 	struct vdo_page_cache *cache = info->cache;
684 
685 	assert_on_cache_thread(cache, __func__);
686 	vio_record_metadata_io_error(as_vio(completion));
687 	vdo_enter_read_only_mode(cache->zone->block_map->vdo, result);
688 	ADD_ONCE(cache->stats.failed_reads, 1);
689 	set_info_state(info, PS_FAILED);
690 	vdo_waitq_notify_all_waiters(&info->waiting, complete_waiter_with_error, &result);
691 	reset_page_info(info);
692 
693 	/*
694 	 * Don't decrement until right before calling check_for_drain_complete() to
695 	 * ensure that the above work can't cause the page cache to be freed out from under us.
696 	 */
697 	cache->outstanding_reads--;
698 	check_for_drain_complete(cache->zone);
699 }
700 
701 /**
702  * page_is_loaded() - Callback used when a page has been loaded.
703  * @completion: The vio which has loaded the page. Its parent is the page_info.
704  */
page_is_loaded(struct vdo_completion * completion)705 static void page_is_loaded(struct vdo_completion *completion)
706 {
707 	struct page_info *info = completion->parent;
708 	struct vdo_page_cache *cache = info->cache;
709 	nonce_t nonce = info->cache->zone->block_map->nonce;
710 	struct block_map_page *page;
711 	enum block_map_page_validity validity;
712 
713 	assert_on_cache_thread(cache, __func__);
714 
715 	page = (struct block_map_page *) get_page_buffer(info);
716 	validity = vdo_validate_block_map_page(page, nonce, info->pbn);
717 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
718 		physical_block_number_t pbn = vdo_get_block_map_page_pbn(page);
719 		int result = vdo_log_error_strerror(VDO_BAD_PAGE,
720 						    "Expected page %llu but got page %llu instead",
721 						    (unsigned long long) info->pbn,
722 						    (unsigned long long) pbn);
723 
724 		vdo_continue_completion(completion, result);
725 		return;
726 	}
727 
728 	if (validity == VDO_BLOCK_MAP_PAGE_INVALID)
729 		vdo_format_block_map_page(page, nonce, info->pbn, false);
730 
731 	info->recovery_lock = 0;
732 	set_info_state(info, PS_RESIDENT);
733 	distribute_page_over_waitq(info, &info->waiting);
734 
735 	/*
736 	 * Don't decrement until right before calling check_for_drain_complete() to
737 	 * ensure that the above work can't cause the page cache to be freed out from under us.
738 	 */
739 	cache->outstanding_reads--;
740 	check_for_drain_complete(cache->zone);
741 }
742 
743 /**
744  * handle_rebuild_read_error() - Handle a read error during a read-only rebuild.
745  * @completion: The page load completion.
746  */
handle_rebuild_read_error(struct vdo_completion * completion)747 static void handle_rebuild_read_error(struct vdo_completion *completion)
748 {
749 	struct page_info *info = completion->parent;
750 	struct vdo_page_cache *cache = info->cache;
751 
752 	assert_on_cache_thread(cache, __func__);
753 
754 	/*
755 	 * We are doing a read-only rebuild, so treat this as a successful read
756 	 * of an uninitialized page.
757 	 */
758 	vio_record_metadata_io_error(as_vio(completion));
759 	ADD_ONCE(cache->stats.failed_reads, 1);
760 	memset(get_page_buffer(info), 0, VDO_BLOCK_SIZE);
761 	vdo_reset_completion(completion);
762 	page_is_loaded(completion);
763 }
764 
load_cache_page_endio(struct bio * bio)765 static void load_cache_page_endio(struct bio *bio)
766 {
767 	struct vio *vio = bio->bi_private;
768 	struct page_info *info = vio->completion.parent;
769 
770 	continue_vio_after_io(vio, page_is_loaded, info->cache->zone->thread_id);
771 }
772 
773 /**
774  * launch_page_load() - Begin the process of loading a page.
775  * @info: The page info to launch.
776  * @pbn: The absolute physical block number of the page to load.
777  *
778  * Return: VDO_SUCCESS or an error code.
779  */
launch_page_load(struct page_info * info,physical_block_number_t pbn)780 static int __must_check launch_page_load(struct page_info *info,
781 					 physical_block_number_t pbn)
782 {
783 	int result;
784 	vdo_action_fn callback;
785 	struct vdo_page_cache *cache = info->cache;
786 
787 	assert_io_allowed(cache);
788 
789 	result = set_info_pbn(info, pbn);
790 	if (result != VDO_SUCCESS)
791 		return result;
792 
793 	result = VDO_ASSERT((info->busy == 0), "Page is not busy before loading.");
794 	if (result != VDO_SUCCESS)
795 		return result;
796 
797 	set_info_state(info, PS_INCOMING);
798 	cache->outstanding_reads++;
799 	ADD_ONCE(cache->stats.pages_loaded, 1);
800 	callback = (cache->rebuilding ? handle_rebuild_read_error : handle_load_error);
801 	vdo_submit_metadata_vio(info->vio, pbn, load_cache_page_endio,
802 				callback, REQ_OP_READ | REQ_PRIO);
803 	return VDO_SUCCESS;
804 }
805 
806 static void write_pages(struct vdo_completion *completion);
807 
808 /** handle_flush_error() - Handle errors flushing the layer. */
handle_flush_error(struct vdo_completion * completion)809 static void handle_flush_error(struct vdo_completion *completion)
810 {
811 	struct page_info *info = completion->parent;
812 
813 	vio_record_metadata_io_error(as_vio(completion));
814 	set_persistent_error(info->cache, "flush failed", completion->result);
815 	write_pages(completion);
816 }
817 
flush_endio(struct bio * bio)818 static void flush_endio(struct bio *bio)
819 {
820 	struct vio *vio = bio->bi_private;
821 	struct page_info *info = vio->completion.parent;
822 
823 	continue_vio_after_io(vio, write_pages, info->cache->zone->thread_id);
824 }
825 
826 /** save_pages() - Attempt to save the outgoing pages by first flushing the layer. */
save_pages(struct vdo_page_cache * cache)827 static void save_pages(struct vdo_page_cache *cache)
828 {
829 	struct page_info *info;
830 	struct vio *vio;
831 
832 	if ((cache->pages_in_flush > 0) || (cache->pages_to_flush == 0))
833 		return;
834 
835 	assert_io_allowed(cache);
836 
837 	info = list_first_entry(&cache->outgoing_list, struct page_info, state_entry);
838 
839 	cache->pages_in_flush = cache->pages_to_flush;
840 	cache->pages_to_flush = 0;
841 	ADD_ONCE(cache->stats.flush_count, 1);
842 
843 	vio = info->vio;
844 
845 	/*
846 	 * We must make sure that the recovery journal entries that changed these pages were
847 	 * successfully persisted, and thus must issue a flush before each batch of pages is
848 	 * written to ensure this.
849 	 */
850 	vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
851 }
852 
853 /**
854  * schedule_page_save() - Add a page to the outgoing list of pages waiting to be saved.
855  * @info: The page info to save.
856  *
857  * Once in the list, a page may not be used until it has been written out.
858  */
schedule_page_save(struct page_info * info)859 static void schedule_page_save(struct page_info *info)
860 {
861 	if (info->busy > 0) {
862 		info->write_status = WRITE_STATUS_DEFERRED;
863 		return;
864 	}
865 
866 	info->cache->pages_to_flush++;
867 	info->cache->outstanding_writes++;
868 	set_info_state(info, PS_OUTGOING);
869 }
870 
871 /**
872  * launch_page_save() - Add a page to outgoing pages waiting to be saved, and then start saving
873  * pages if another save is not in progress.
874  * @info: The page info to save.
875  */
launch_page_save(struct page_info * info)876 static void launch_page_save(struct page_info *info)
877 {
878 	schedule_page_save(info);
879 	save_pages(info->cache);
880 }
881 
882 /**
883  * completion_needs_page() - Determine whether a given vdo_page_completion (as a waiter) is
884  *                           requesting a given page number.
885  * @waiter: The page completion waiter to check.
886  * @context: A pointer to the pbn of the desired page.
887  *
888  * Implements waiter_match_fn.
889  *
890  * Return: true if the page completion is for the desired page number.
891  */
completion_needs_page(struct vdo_waiter * waiter,void * context)892 static bool completion_needs_page(struct vdo_waiter *waiter, void *context)
893 {
894 	physical_block_number_t *pbn = context;
895 
896 	return (page_completion_from_waiter(waiter)->pbn == *pbn);
897 }
898 
899 /**
900  * allocate_free_page() - Allocate a free page to the first completion in the waiting queue, and
901  *                        any other completions that match it in page number.
902  * @info: The page info to allocate a page for.
903  */
allocate_free_page(struct page_info * info)904 static void allocate_free_page(struct page_info *info)
905 {
906 	int result;
907 	struct vdo_waiter *oldest_waiter;
908 	physical_block_number_t pbn;
909 	struct vdo_page_cache *cache = info->cache;
910 
911 	assert_on_cache_thread(cache, __func__);
912 
913 	if (!vdo_waitq_has_waiters(&cache->free_waiters)) {
914 		if (cache->stats.cache_pressure > 0) {
915 			vdo_log_info("page cache pressure relieved");
916 			WRITE_ONCE(cache->stats.cache_pressure, 0);
917 		}
918 
919 		return;
920 	}
921 
922 	result = reset_page_info(info);
923 	if (result != VDO_SUCCESS) {
924 		set_persistent_error(cache, "cannot reset page info", result);
925 		return;
926 	}
927 
928 	oldest_waiter = vdo_waitq_get_first_waiter(&cache->free_waiters);
929 	pbn = page_completion_from_waiter(oldest_waiter)->pbn;
930 
931 	/*
932 	 * Remove all entries which match the page number in question and push them onto the page
933 	 * info's waitq.
934 	 */
935 	vdo_waitq_dequeue_matching_waiters(&cache->free_waiters, completion_needs_page,
936 					   &pbn, &info->waiting);
937 	cache->waiter_count -= vdo_waitq_num_waiters(&info->waiting);
938 
939 	result = launch_page_load(info, pbn);
940 	if (result != VDO_SUCCESS) {
941 		vdo_waitq_notify_all_waiters(&info->waiting,
942 					     complete_waiter_with_error, &result);
943 	}
944 }
945 
946 /**
947  * discard_a_page() - Begin the process of discarding a page.
948  * @cache: The page cache.
949  *
950  * If no page is discardable, increments a count of deferred frees so that the next release of a
951  * page which is no longer busy will kick off another discard cycle. This is an indication that the
952  * cache is not big enough.
953  *
954  * If the selected page is not dirty, immediately allocates the page to the oldest completion
955  * waiting for a free page.
956  */
discard_a_page(struct vdo_page_cache * cache)957 static void discard_a_page(struct vdo_page_cache *cache)
958 {
959 	struct page_info *info = select_lru_page(cache);
960 
961 	if (info == NULL) {
962 		report_cache_pressure(cache);
963 		return;
964 	}
965 
966 	if (!is_dirty(info)) {
967 		allocate_free_page(info);
968 		return;
969 	}
970 
971 	VDO_ASSERT_LOG_ONLY(!is_in_flight(info),
972 			    "page selected for discard is not in flight");
973 
974 	cache->discard_count++;
975 	info->write_status = WRITE_STATUS_DISCARD;
976 	launch_page_save(info);
977 }
978 
discard_page_for_completion(struct vdo_page_completion * vdo_page_comp)979 static void discard_page_for_completion(struct vdo_page_completion *vdo_page_comp)
980 {
981 	struct vdo_page_cache *cache = vdo_page_comp->cache;
982 
983 	cache->waiter_count++;
984 	vdo_waitq_enqueue_waiter(&cache->free_waiters, &vdo_page_comp->waiter);
985 	discard_a_page(cache);
986 }
987 
988 /**
989  * discard_page_if_needed() - Helper used to trigger a discard if the cache needs another free
990  *                            page.
991  * @cache: The page cache.
992  */
discard_page_if_needed(struct vdo_page_cache * cache)993 static void discard_page_if_needed(struct vdo_page_cache *cache)
994 {
995 	if (cache->waiter_count > cache->discard_count)
996 		discard_a_page(cache);
997 }
998 
999 /**
1000  * write_has_finished() - Inform the cache that a write has finished (possibly with an error).
1001  * @info: The info structure for the page whose write just completed.
1002  *
1003  * Return: true if the page write was a discard.
1004  */
write_has_finished(struct page_info * info)1005 static bool write_has_finished(struct page_info *info)
1006 {
1007 	bool was_discard = (info->write_status == WRITE_STATUS_DISCARD);
1008 
1009 	assert_on_cache_thread(info->cache, __func__);
1010 	info->cache->outstanding_writes--;
1011 
1012 	info->write_status = WRITE_STATUS_NORMAL;
1013 	return was_discard;
1014 }
1015 
1016 /**
1017  * handle_page_write_error() - Handler for page write errors.
1018  * @completion: The page write vio.
1019  */
handle_page_write_error(struct vdo_completion * completion)1020 static void handle_page_write_error(struct vdo_completion *completion)
1021 {
1022 	int result = completion->result;
1023 	struct page_info *info = completion->parent;
1024 	struct vdo_page_cache *cache = info->cache;
1025 
1026 	vio_record_metadata_io_error(as_vio(completion));
1027 
1028 	/* If we're already read-only, write failures are to be expected. */
1029 	if (result != VDO_READ_ONLY) {
1030 		vdo_log_ratelimit(vdo_log_error,
1031 				  "failed to write block map page %llu",
1032 				  (unsigned long long) info->pbn);
1033 	}
1034 
1035 	set_info_state(info, PS_DIRTY);
1036 	ADD_ONCE(cache->stats.failed_writes, 1);
1037 	set_persistent_error(cache, "cannot write page", result);
1038 
1039 	if (!write_has_finished(info))
1040 		discard_page_if_needed(cache);
1041 
1042 	check_for_drain_complete(cache->zone);
1043 }
1044 
1045 static void page_is_written_out(struct vdo_completion *completion);
1046 
write_cache_page_endio(struct bio * bio)1047 static void write_cache_page_endio(struct bio *bio)
1048 {
1049 	struct vio *vio = bio->bi_private;
1050 	struct page_info *info = vio->completion.parent;
1051 
1052 	continue_vio_after_io(vio, page_is_written_out, info->cache->zone->thread_id);
1053 }
1054 
1055 /**
1056  * page_is_written_out() - Callback used when a page has been written out.
1057  * @completion: The vio which wrote the page. Its parent is a page_info.
1058  */
page_is_written_out(struct vdo_completion * completion)1059 static void page_is_written_out(struct vdo_completion *completion)
1060 {
1061 	bool was_discard, reclaimed;
1062 	u32 reclamations;
1063 	struct page_info *info = completion->parent;
1064 	struct vdo_page_cache *cache = info->cache;
1065 	struct block_map_page *page = (struct block_map_page *) get_page_buffer(info);
1066 
1067 	if (!page->header.initialized) {
1068 		page->header.initialized = true;
1069 		vdo_submit_metadata_vio(info->vio, info->pbn,
1070 					write_cache_page_endio,
1071 					handle_page_write_error,
1072 					REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH);
1073 		return;
1074 	}
1075 
1076 	/* Handle journal updates and torn write protection. */
1077 	vdo_release_recovery_journal_block_reference(cache->zone->block_map->journal,
1078 						     info->recovery_lock,
1079 						     VDO_ZONE_TYPE_LOGICAL,
1080 						     cache->zone->zone_number);
1081 	info->recovery_lock = 0;
1082 	was_discard = write_has_finished(info);
1083 	reclaimed = (!was_discard || (info->busy > 0) || vdo_waitq_has_waiters(&info->waiting));
1084 
1085 	set_info_state(info, PS_RESIDENT);
1086 
1087 	reclamations = distribute_page_over_waitq(info, &info->waiting);
1088 	ADD_ONCE(cache->stats.reclaimed, reclamations);
1089 
1090 	if (was_discard)
1091 		cache->discard_count--;
1092 
1093 	if (reclaimed)
1094 		discard_page_if_needed(cache);
1095 	else
1096 		allocate_free_page(info);
1097 
1098 	check_for_drain_complete(cache->zone);
1099 }
1100 
1101 /**
1102  * write_pages() - Write the batch of pages which were covered by the layer flush which just
1103  *                 completed.
1104  * @flush_completion: The flush vio.
1105  *
1106  * This callback is registered in save_pages().
1107  */
write_pages(struct vdo_completion * flush_completion)1108 static void write_pages(struct vdo_completion *flush_completion)
1109 {
1110 	struct vdo_page_cache *cache = ((struct page_info *) flush_completion->parent)->cache;
1111 
1112 	/*
1113 	 * We need to cache these two values on the stack since it is possible for the last
1114 	 * page info to cause the page cache to get freed. Hence once we launch the last page,
1115 	 * it may be unsafe to dereference the cache.
1116 	 */
1117 	bool has_unflushed_pages = (cache->pages_to_flush > 0);
1118 	page_count_t pages_in_flush = cache->pages_in_flush;
1119 
1120 	cache->pages_in_flush = 0;
1121 	while (pages_in_flush-- > 0) {
1122 		struct page_info *info =
1123 			list_first_entry(&cache->outgoing_list, struct page_info,
1124 					 state_entry);
1125 
1126 		list_del_init(&info->state_entry);
1127 		if (vdo_is_read_only(info->cache->vdo)) {
1128 			struct vdo_completion *completion = &info->vio->completion;
1129 
1130 			vdo_reset_completion(completion);
1131 			completion->callback = page_is_written_out;
1132 			completion->error_handler = handle_page_write_error;
1133 			vdo_fail_completion(completion, VDO_READ_ONLY);
1134 			continue;
1135 		}
1136 		ADD_ONCE(info->cache->stats.pages_saved, 1);
1137 		vdo_submit_metadata_vio(info->vio, info->pbn, write_cache_page_endio,
1138 					handle_page_write_error, REQ_OP_WRITE | REQ_PRIO);
1139 	}
1140 
1141 	if (has_unflushed_pages) {
1142 		/*
1143 		 * If there are unflushed pages, the cache can't have been freed, so this call is
1144 		 * safe.
1145 		 */
1146 		save_pages(cache);
1147 	}
1148 }
1149 
1150 /**
1151  * vdo_release_page_completion() - Release a VDO Page Completion.
1152  * @completion: The page completion to release.
1153  *
1154  * The page referenced by this completion (if any) will no longer be held busy by this completion.
1155  * If a page becomes discardable and there are completions awaiting free pages then a new round of
1156  * page discarding is started.
1157  */
vdo_release_page_completion(struct vdo_completion * completion)1158 void vdo_release_page_completion(struct vdo_completion *completion)
1159 {
1160 	struct page_info *discard_info = NULL;
1161 	struct vdo_page_completion *page_completion = as_vdo_page_completion(completion);
1162 	struct vdo_page_cache *cache;
1163 
1164 	if (completion->result == VDO_SUCCESS) {
1165 		if (!validate_completed_page_or_enter_read_only_mode(page_completion, false))
1166 			return;
1167 
1168 		if (--page_completion->info->busy == 0)
1169 			discard_info = page_completion->info;
1170 	}
1171 
1172 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1173 			    "Page being released after leaving all queues");
1174 
1175 	page_completion->info = NULL;
1176 	cache = page_completion->cache;
1177 	assert_on_cache_thread(cache, __func__);
1178 
1179 	if (discard_info != NULL) {
1180 		if (discard_info->write_status == WRITE_STATUS_DEFERRED) {
1181 			discard_info->write_status = WRITE_STATUS_NORMAL;
1182 			launch_page_save(discard_info);
1183 		}
1184 
1185 		/*
1186 		 * if there are excess requests for pages (that have not already started discards)
1187 		 * we need to discard some page (which may be this one)
1188 		 */
1189 		discard_page_if_needed(cache);
1190 	}
1191 }
1192 
load_page_for_completion(struct page_info * info,struct vdo_page_completion * vdo_page_comp)1193 static void load_page_for_completion(struct page_info *info,
1194 				     struct vdo_page_completion *vdo_page_comp)
1195 {
1196 	int result;
1197 
1198 	vdo_waitq_enqueue_waiter(&info->waiting, &vdo_page_comp->waiter);
1199 	result = launch_page_load(info, vdo_page_comp->pbn);
1200 	if (result != VDO_SUCCESS) {
1201 		vdo_waitq_notify_all_waiters(&info->waiting,
1202 					     complete_waiter_with_error, &result);
1203 	}
1204 }
1205 
1206 /**
1207  * vdo_get_page() - Initialize a page completion and get a block map page.
1208  * @page_completion: The vdo_page_completion to initialize.
1209  * @zone: The block map zone of the desired page.
1210  * @pbn: The absolute physical block of the desired page.
1211  * @writable: Whether the page can be modified.
1212  * @parent: The object to notify when the fetch is complete.
1213  * @callback: The notification callback.
1214  * @error_handler: The handler for fetch errors.
1215  * @requeue: Whether we must requeue when notifying the parent.
1216  *
1217  * May cause another page to be discarded (potentially writing a dirty page) and the one nominated
1218  * by the completion to be loaded from disk. When the callback is invoked, the page will be
1219  * resident in the cache and marked busy. All callers must call vdo_release_page_completion()
1220  * when they are done with the page to clear the busy mark.
1221  */
vdo_get_page(struct vdo_page_completion * page_completion,struct block_map_zone * zone,physical_block_number_t pbn,bool writable,void * parent,vdo_action_fn callback,vdo_action_fn error_handler,bool requeue)1222 void vdo_get_page(struct vdo_page_completion *page_completion,
1223 		  struct block_map_zone *zone, physical_block_number_t pbn,
1224 		  bool writable, void *parent, vdo_action_fn callback,
1225 		  vdo_action_fn error_handler, bool requeue)
1226 {
1227 	struct vdo_page_cache *cache = &zone->page_cache;
1228 	struct vdo_completion *completion = &page_completion->completion;
1229 	struct page_info *info;
1230 
1231 	assert_on_cache_thread(cache, __func__);
1232 	VDO_ASSERT_LOG_ONLY((page_completion->waiter.next_waiter == NULL),
1233 			    "New page completion was not already on a wait queue");
1234 
1235 	*page_completion = (struct vdo_page_completion) {
1236 		.pbn = pbn,
1237 		.writable = writable,
1238 		.cache = cache,
1239 	};
1240 
1241 	vdo_initialize_completion(completion, cache->vdo, VDO_PAGE_COMPLETION);
1242 	vdo_prepare_completion(completion, callback, error_handler,
1243 			       cache->zone->thread_id, parent);
1244 	completion->requeue = requeue;
1245 
1246 	if (page_completion->writable && vdo_is_read_only(cache->vdo)) {
1247 		vdo_fail_completion(completion, VDO_READ_ONLY);
1248 		return;
1249 	}
1250 
1251 	if (page_completion->writable)
1252 		ADD_ONCE(cache->stats.write_count, 1);
1253 	else
1254 		ADD_ONCE(cache->stats.read_count, 1);
1255 
1256 	info = find_page(cache, page_completion->pbn);
1257 	if (info != NULL) {
1258 		/* The page is in the cache already. */
1259 		if ((info->write_status == WRITE_STATUS_DEFERRED) ||
1260 		    is_incoming(info) ||
1261 		    (is_outgoing(info) && page_completion->writable)) {
1262 			/* The page is unusable until it has finished I/O. */
1263 			ADD_ONCE(cache->stats.wait_for_page, 1);
1264 			vdo_waitq_enqueue_waiter(&info->waiting, &page_completion->waiter);
1265 			return;
1266 		}
1267 
1268 		if (is_valid(info)) {
1269 			/* The page is usable. */
1270 			ADD_ONCE(cache->stats.found_in_cache, 1);
1271 			if (!is_present(info))
1272 				ADD_ONCE(cache->stats.read_outgoing, 1);
1273 			update_lru(info);
1274 			info->busy++;
1275 			complete_with_page(info, page_completion);
1276 			return;
1277 		}
1278 
1279 		/* Something horrible has gone wrong. */
1280 		VDO_ASSERT_LOG_ONLY(false, "Info found in a usable state.");
1281 	}
1282 
1283 	/* The page must be fetched. */
1284 	info = find_free_page(cache);
1285 	if (info != NULL) {
1286 		ADD_ONCE(cache->stats.fetch_required, 1);
1287 		load_page_for_completion(info, page_completion);
1288 		return;
1289 	}
1290 
1291 	/* The page must wait for a page to be discarded. */
1292 	ADD_ONCE(cache->stats.discard_required, 1);
1293 	discard_page_for_completion(page_completion);
1294 }
1295 
1296 /**
1297  * vdo_request_page_write() - Request that a VDO page be written out as soon as it is not busy.
1298  * @completion: The vdo_page_completion containing the page.
1299  */
vdo_request_page_write(struct vdo_completion * completion)1300 void vdo_request_page_write(struct vdo_completion *completion)
1301 {
1302 	struct page_info *info;
1303 	struct vdo_page_completion *vdo_page_comp = as_vdo_page_completion(completion);
1304 
1305 	if (!validate_completed_page_or_enter_read_only_mode(vdo_page_comp, true))
1306 		return;
1307 
1308 	info = vdo_page_comp->info;
1309 	set_info_state(info, PS_DIRTY);
1310 	launch_page_save(info);
1311 }
1312 
1313 /**
1314  * vdo_get_cached_page() - Get the block map page from a page completion.
1315  * @completion: A vdo page completion whose callback has been called.
1316  * @page_ptr: A pointer to hold the page
1317  *
1318  * Return: VDO_SUCCESS or an error
1319  */
vdo_get_cached_page(struct vdo_completion * completion,struct block_map_page ** page_ptr)1320 int vdo_get_cached_page(struct vdo_completion *completion,
1321 			struct block_map_page **page_ptr)
1322 {
1323 	int result;
1324 	struct vdo_page_completion *vpc;
1325 
1326 	vpc = as_vdo_page_completion(completion);
1327 	result = validate_completed_page(vpc, true);
1328 	if (result == VDO_SUCCESS)
1329 		*page_ptr = (struct block_map_page *) get_page_buffer(vpc->info);
1330 
1331 	return result;
1332 }
1333 
1334 /**
1335  * vdo_invalidate_page_cache() - Invalidate all entries in the VDO page cache.
1336  * @cache: The page cache.
1337  *
1338  * There must not be any dirty pages in the cache.
1339  *
1340  * Return: A success or error code.
1341  */
vdo_invalidate_page_cache(struct vdo_page_cache * cache)1342 int vdo_invalidate_page_cache(struct vdo_page_cache *cache)
1343 {
1344 	struct page_info *info;
1345 
1346 	assert_on_cache_thread(cache, __func__);
1347 
1348 	/* Make sure we don't throw away any dirty pages. */
1349 	for (info = cache->infos; info < cache->infos + cache->page_count; info++) {
1350 		int result = VDO_ASSERT(!is_dirty(info), "cache must have no dirty pages");
1351 
1352 		if (result != VDO_SUCCESS)
1353 			return result;
1354 	}
1355 
1356 	/* Reset the page map by re-allocating it. */
1357 	vdo_int_map_free(vdo_forget(cache->page_map));
1358 	return vdo_int_map_create(cache->page_count, &cache->page_map);
1359 }
1360 
1361 /**
1362  * get_tree_page_by_index() - Get the tree page for a given height and page index.
1363  * @forest: The block map forest.
1364  * @root_index: The root index of the tree to search.
1365  * @height: The height in the tree.
1366  * @page_index: The page index.
1367  *
1368  * Return: The requested page.
1369  */
get_tree_page_by_index(struct forest * forest,root_count_t root_index,height_t height,page_number_t page_index)1370 static struct tree_page * __must_check get_tree_page_by_index(struct forest *forest,
1371 							      root_count_t root_index,
1372 							      height_t height,
1373 							      page_number_t page_index)
1374 {
1375 	page_number_t offset = 0;
1376 	size_t segment;
1377 
1378 	for (segment = 0; segment < forest->segments; segment++) {
1379 		page_number_t border = forest->boundaries[segment].levels[height - 1];
1380 
1381 		if (page_index < border) {
1382 			struct block_map_tree *tree = &forest->trees[root_index];
1383 
1384 			return &(tree->segments[segment].levels[height - 1][page_index - offset]);
1385 		}
1386 
1387 		offset = border;
1388 	}
1389 
1390 	return NULL;
1391 }
1392 
1393 /* Get the page referred to by the lock's tree slot at its current height. */
get_tree_page(const struct block_map_zone * zone,const struct tree_lock * lock)1394 static inline struct tree_page *get_tree_page(const struct block_map_zone *zone,
1395 					      const struct tree_lock *lock)
1396 {
1397 	return get_tree_page_by_index(zone->block_map->forest, lock->root_index,
1398 				      lock->height,
1399 				      lock->tree_slots[lock->height].page_index);
1400 }
1401 
1402 /** vdo_copy_valid_page() - Validate and copy a buffer to a page. */
vdo_copy_valid_page(char * buffer,nonce_t nonce,physical_block_number_t pbn,struct block_map_page * page)1403 bool vdo_copy_valid_page(char *buffer, nonce_t nonce,
1404 			 physical_block_number_t pbn,
1405 			 struct block_map_page *page)
1406 {
1407 	struct block_map_page *loaded = (struct block_map_page *) buffer;
1408 	enum block_map_page_validity validity =
1409 		vdo_validate_block_map_page(loaded, nonce, pbn);
1410 
1411 	if (validity == VDO_BLOCK_MAP_PAGE_VALID) {
1412 		memcpy(page, loaded, VDO_BLOCK_SIZE);
1413 		return true;
1414 	}
1415 
1416 	if (validity == VDO_BLOCK_MAP_PAGE_BAD) {
1417 		vdo_log_error_strerror(VDO_BAD_PAGE,
1418 				       "Expected page %llu but got page %llu instead",
1419 				       (unsigned long long) pbn,
1420 				       (unsigned long long) vdo_get_block_map_page_pbn(loaded));
1421 	}
1422 
1423 	return false;
1424 }
1425 
1426 /**
1427  * in_cyclic_range() - Check whether the given value is between the lower and upper bounds, within
1428  *                     a cyclic range of values from 0 to (modulus - 1).
1429  * @lower: The lowest value to accept.
1430  * @value: The value to check.
1431  * @upper: The highest value to accept.
1432  * @modulus: The size of the cyclic space, no more than 2^15.
1433  *
1434  * The value and both bounds must be smaller than the modulus.
1435  *
1436  * Return: true if the value is in range.
1437  */
in_cyclic_range(u16 lower,u16 value,u16 upper,u16 modulus)1438 static bool in_cyclic_range(u16 lower, u16 value, u16 upper, u16 modulus)
1439 {
1440 	if (value < lower)
1441 		value += modulus;
1442 	if (upper < lower)
1443 		upper += modulus;
1444 	return (value <= upper);
1445 }
1446 
1447 /**
1448  * is_not_older() - Check whether a generation is strictly older than some other generation in the
1449  *                  context of a zone's current generation range.
1450  * @zone: The zone in which to do the comparison.
1451  * @a: The generation in question.
1452  * @b: The generation to compare to.
1453  *
1454  * Return: true if generation @a is not strictly older than generation @b in the context of @zone
1455  */
is_not_older(struct block_map_zone * zone,u8 a,u8 b)1456 static bool __must_check is_not_older(struct block_map_zone *zone, u8 a, u8 b)
1457 {
1458 	int result;
1459 
1460 	result = VDO_ASSERT((in_cyclic_range(zone->oldest_generation, a, zone->generation, 1 << 8) &&
1461 			     in_cyclic_range(zone->oldest_generation, b, zone->generation, 1 << 8)),
1462 			    "generation(s) %u, %u are out of range [%u, %u]",
1463 			    a, b, zone->oldest_generation, zone->generation);
1464 	if (result != VDO_SUCCESS) {
1465 		enter_zone_read_only_mode(zone, result);
1466 		return true;
1467 	}
1468 
1469 	return in_cyclic_range(b, a, zone->generation, 1 << 8);
1470 }
1471 
release_generation(struct block_map_zone * zone,u8 generation)1472 static void release_generation(struct block_map_zone *zone, u8 generation)
1473 {
1474 	int result;
1475 
1476 	result = VDO_ASSERT((zone->dirty_page_counts[generation] > 0),
1477 			    "dirty page count underflow for generation %u", generation);
1478 	if (result != VDO_SUCCESS) {
1479 		enter_zone_read_only_mode(zone, result);
1480 		return;
1481 	}
1482 
1483 	zone->dirty_page_counts[generation]--;
1484 	while ((zone->dirty_page_counts[zone->oldest_generation] == 0) &&
1485 	       (zone->oldest_generation != zone->generation))
1486 		zone->oldest_generation++;
1487 }
1488 
set_generation(struct block_map_zone * zone,struct tree_page * page,u8 new_generation)1489 static void set_generation(struct block_map_zone *zone, struct tree_page *page,
1490 			   u8 new_generation)
1491 {
1492 	u32 new_count;
1493 	int result;
1494 	bool decrement_old = vdo_waiter_is_waiting(&page->waiter);
1495 	u8 old_generation = page->generation;
1496 
1497 	if (decrement_old && (old_generation == new_generation))
1498 		return;
1499 
1500 	page->generation = new_generation;
1501 	new_count = ++zone->dirty_page_counts[new_generation];
1502 	result = VDO_ASSERT((new_count != 0), "dirty page count overflow for generation %u",
1503 			    new_generation);
1504 	if (result != VDO_SUCCESS) {
1505 		enter_zone_read_only_mode(zone, result);
1506 		return;
1507 	}
1508 
1509 	if (decrement_old)
1510 		release_generation(zone, old_generation);
1511 }
1512 
1513 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio);
1514 
1515 /* Implements waiter_callback_fn */
write_page_callback(struct vdo_waiter * waiter,void * context)1516 static void write_page_callback(struct vdo_waiter *waiter, void *context)
1517 {
1518 	write_page(container_of(waiter, struct tree_page, waiter), context);
1519 }
1520 
acquire_vio(struct vdo_waiter * waiter,struct block_map_zone * zone)1521 static void acquire_vio(struct vdo_waiter *waiter, struct block_map_zone *zone)
1522 {
1523 	waiter->callback = write_page_callback;
1524 	acquire_vio_from_pool(zone->vio_pool, waiter);
1525 }
1526 
1527 /* Return: true if all possible generations were not already active */
attempt_increment(struct block_map_zone * zone)1528 static bool attempt_increment(struct block_map_zone *zone)
1529 {
1530 	u8 generation = zone->generation + 1;
1531 
1532 	if (zone->oldest_generation == generation)
1533 		return false;
1534 
1535 	zone->generation = generation;
1536 	return true;
1537 }
1538 
1539 /* Launches a flush if one is not already in progress. */
enqueue_page(struct tree_page * page,struct block_map_zone * zone)1540 static void enqueue_page(struct tree_page *page, struct block_map_zone *zone)
1541 {
1542 	if ((zone->flusher == NULL) && attempt_increment(zone)) {
1543 		zone->flusher = page;
1544 		acquire_vio(&page->waiter, zone);
1545 		return;
1546 	}
1547 
1548 	vdo_waitq_enqueue_waiter(&zone->flush_waiters, &page->waiter);
1549 }
1550 
write_page_if_not_dirtied(struct vdo_waiter * waiter,void * context)1551 static void write_page_if_not_dirtied(struct vdo_waiter *waiter, void *context)
1552 {
1553 	struct tree_page *page = container_of(waiter, struct tree_page, waiter);
1554 	struct write_if_not_dirtied_context *write_context = context;
1555 
1556 	if (page->generation == write_context->generation) {
1557 		acquire_vio(waiter, write_context->zone);
1558 		return;
1559 	}
1560 
1561 	enqueue_page(page, write_context->zone);
1562 }
1563 
return_to_pool(struct block_map_zone * zone,struct pooled_vio * vio)1564 static void return_to_pool(struct block_map_zone *zone, struct pooled_vio *vio)
1565 {
1566 	return_vio_to_pool(vio);
1567 	check_for_drain_complete(zone);
1568 }
1569 
1570 /* This callback is registered in write_initialized_page(). */
finish_page_write(struct vdo_completion * completion)1571 static void finish_page_write(struct vdo_completion *completion)
1572 {
1573 	bool dirty;
1574 	struct vio *vio = as_vio(completion);
1575 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1576 	struct tree_page *page = completion->parent;
1577 	struct block_map_zone *zone = pooled->context;
1578 
1579 	vdo_release_recovery_journal_block_reference(zone->block_map->journal,
1580 						     page->writing_recovery_lock,
1581 						     VDO_ZONE_TYPE_LOGICAL,
1582 						     zone->zone_number);
1583 
1584 	dirty = (page->writing_generation != page->generation);
1585 	release_generation(zone, page->writing_generation);
1586 	page->writing = false;
1587 
1588 	if (zone->flusher == page) {
1589 		struct write_if_not_dirtied_context context = {
1590 			.zone = zone,
1591 			.generation = page->writing_generation,
1592 		};
1593 
1594 		vdo_waitq_notify_all_waiters(&zone->flush_waiters,
1595 					     write_page_if_not_dirtied, &context);
1596 		if (dirty && attempt_increment(zone)) {
1597 			write_page(page, pooled);
1598 			return;
1599 		}
1600 
1601 		zone->flusher = NULL;
1602 	}
1603 
1604 	if (dirty) {
1605 		enqueue_page(page, zone);
1606 	} else if ((zone->flusher == NULL) && vdo_waitq_has_waiters(&zone->flush_waiters) &&
1607 		   attempt_increment(zone)) {
1608 		zone->flusher = container_of(vdo_waitq_dequeue_waiter(&zone->flush_waiters),
1609 					     struct tree_page, waiter);
1610 		write_page(zone->flusher, pooled);
1611 		return;
1612 	}
1613 
1614 	return_to_pool(zone, pooled);
1615 }
1616 
handle_write_error(struct vdo_completion * completion)1617 static void handle_write_error(struct vdo_completion *completion)
1618 {
1619 	int result = completion->result;
1620 	struct vio *vio = as_vio(completion);
1621 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1622 	struct block_map_zone *zone = pooled->context;
1623 
1624 	vio_record_metadata_io_error(vio);
1625 	enter_zone_read_only_mode(zone, result);
1626 	return_to_pool(zone, pooled);
1627 }
1628 
1629 static void write_page_endio(struct bio *bio);
1630 
write_initialized_page(struct vdo_completion * completion)1631 static void write_initialized_page(struct vdo_completion *completion)
1632 {
1633 	struct vio *vio = as_vio(completion);
1634 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1635 	struct block_map_zone *zone = pooled->context;
1636 	struct tree_page *tree_page = completion->parent;
1637 	struct block_map_page *page = (struct block_map_page *) vio->data;
1638 	blk_opf_t operation = REQ_OP_WRITE | REQ_PRIO;
1639 
1640 	/*
1641 	 * Now that we know the page has been written at least once, mark the copy we are writing
1642 	 * as initialized.
1643 	 */
1644 	page->header.initialized = true;
1645 
1646 	if (zone->flusher == tree_page)
1647 		operation |= REQ_PREFLUSH;
1648 
1649 	vdo_submit_metadata_vio(vio, vdo_get_block_map_page_pbn(page),
1650 				write_page_endio, handle_write_error,
1651 				operation);
1652 }
1653 
write_page_endio(struct bio * bio)1654 static void write_page_endio(struct bio *bio)
1655 {
1656 	struct pooled_vio *vio = bio->bi_private;
1657 	struct block_map_zone *zone = vio->context;
1658 	struct block_map_page *page = (struct block_map_page *) vio->vio.data;
1659 
1660 	continue_vio_after_io(&vio->vio,
1661 			      (page->header.initialized ?
1662 			       finish_page_write : write_initialized_page),
1663 			      zone->thread_id);
1664 }
1665 
write_page(struct tree_page * tree_page,struct pooled_vio * vio)1666 static void write_page(struct tree_page *tree_page, struct pooled_vio *vio)
1667 {
1668 	struct vdo_completion *completion = &vio->vio.completion;
1669 	struct block_map_zone *zone = vio->context;
1670 	struct block_map_page *page = vdo_as_block_map_page(tree_page);
1671 
1672 	if ((zone->flusher != tree_page) &&
1673 	    is_not_older(zone, tree_page->generation, zone->generation)) {
1674 		/*
1675 		 * This page was re-dirtied after the last flush was issued, hence we need to do
1676 		 * another flush.
1677 		 */
1678 		enqueue_page(tree_page, zone);
1679 		return_to_pool(zone, vio);
1680 		return;
1681 	}
1682 
1683 	completion->parent = tree_page;
1684 	memcpy(vio->vio.data, tree_page->page_buffer, VDO_BLOCK_SIZE);
1685 	completion->callback_thread_id = zone->thread_id;
1686 
1687 	tree_page->writing = true;
1688 	tree_page->writing_generation = tree_page->generation;
1689 	tree_page->writing_recovery_lock = tree_page->recovery_lock;
1690 
1691 	/* Clear this now so that we know this page is not on any dirty list. */
1692 	tree_page->recovery_lock = 0;
1693 
1694 	/*
1695 	 * We've already copied the page into the vio which will write it, so if it was not yet
1696 	 * initialized, the first write will indicate that (for torn write protection). It is now
1697 	 * safe to mark it as initialized in memory since if the write fails, the in memory state
1698 	 * will become irrelevant.
1699 	 */
1700 	if (page->header.initialized) {
1701 		write_initialized_page(completion);
1702 		return;
1703 	}
1704 
1705 	page->header.initialized = true;
1706 	vdo_submit_metadata_vio(&vio->vio, vdo_get_block_map_page_pbn(page),
1707 				write_page_endio, handle_write_error,
1708 				REQ_OP_WRITE | REQ_PRIO);
1709 }
1710 
1711 /* Release a lock on a page which was being loaded or allocated. */
release_page_lock(struct data_vio * data_vio,char * what)1712 static void release_page_lock(struct data_vio *data_vio, char *what)
1713 {
1714 	struct block_map_zone *zone;
1715 	struct tree_lock *lock_holder;
1716 	struct tree_lock *lock = &data_vio->tree_lock;
1717 
1718 	VDO_ASSERT_LOG_ONLY(lock->locked,
1719 			    "release of unlocked block map page %s for key %llu in tree %u",
1720 			    what, (unsigned long long) lock->key, lock->root_index);
1721 
1722 	zone = data_vio->logical.zone->block_map_zone;
1723 	lock_holder = vdo_int_map_remove(zone->loading_pages, lock->key);
1724 	VDO_ASSERT_LOG_ONLY((lock_holder == lock),
1725 			    "block map page %s mismatch for key %llu in tree %u",
1726 			    what, (unsigned long long) lock->key, lock->root_index);
1727 	lock->locked = false;
1728 }
1729 
finish_lookup(struct data_vio * data_vio,int result)1730 static void finish_lookup(struct data_vio *data_vio, int result)
1731 {
1732 	data_vio->tree_lock.height = 0;
1733 
1734 	--data_vio->logical.zone->block_map_zone->active_lookups;
1735 
1736 	set_data_vio_logical_callback(data_vio, continue_data_vio_with_block_map_slot);
1737 	data_vio->vio.completion.error_handler = handle_data_vio_error;
1738 	continue_data_vio_with_error(data_vio, result);
1739 }
1740 
abort_lookup_for_waiter(struct vdo_waiter * waiter,void * context)1741 static void abort_lookup_for_waiter(struct vdo_waiter *waiter, void *context)
1742 {
1743 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1744 	int result = *((int *) context);
1745 
1746 	if (!data_vio->write) {
1747 		if (result == VDO_NO_SPACE)
1748 			result = VDO_SUCCESS;
1749 	} else if (result != VDO_NO_SPACE) {
1750 		result = VDO_READ_ONLY;
1751 	}
1752 
1753 	finish_lookup(data_vio, result);
1754 }
1755 
abort_lookup(struct data_vio * data_vio,int result,char * what)1756 static void abort_lookup(struct data_vio *data_vio, int result, char *what)
1757 {
1758 	if (result != VDO_NO_SPACE)
1759 		enter_zone_read_only_mode(data_vio->logical.zone->block_map_zone, result);
1760 
1761 	if (data_vio->tree_lock.locked) {
1762 		release_page_lock(data_vio, what);
1763 		vdo_waitq_notify_all_waiters(&data_vio->tree_lock.waiters,
1764 					     abort_lookup_for_waiter,
1765 					     &result);
1766 	}
1767 
1768 	finish_lookup(data_vio, result);
1769 }
1770 
abort_load(struct data_vio * data_vio,int result)1771 static void abort_load(struct data_vio *data_vio, int result)
1772 {
1773 	abort_lookup(data_vio, result, "load");
1774 }
1775 
is_invalid_tree_entry(const struct vdo * vdo,const struct data_location * mapping,height_t height)1776 static bool __must_check is_invalid_tree_entry(const struct vdo *vdo,
1777 					       const struct data_location *mapping,
1778 					       height_t height)
1779 {
1780 	if (!vdo_is_valid_location(mapping) ||
1781 	    vdo_is_state_compressed(mapping->state) ||
1782 	    (vdo_is_mapped_location(mapping) && (mapping->pbn == VDO_ZERO_BLOCK)))
1783 		return true;
1784 
1785 	/* Roots aren't physical data blocks, so we can't check their PBNs. */
1786 	if (height == VDO_BLOCK_MAP_TREE_HEIGHT)
1787 		return false;
1788 
1789 	return !vdo_is_physical_data_block(vdo->depot, mapping->pbn);
1790 }
1791 
1792 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio);
1793 static void allocate_block_map_page(struct block_map_zone *zone,
1794 				    struct data_vio *data_vio);
1795 
continue_with_loaded_page(struct data_vio * data_vio,struct block_map_page * page)1796 static void continue_with_loaded_page(struct data_vio *data_vio,
1797 				      struct block_map_page *page)
1798 {
1799 	struct tree_lock *lock = &data_vio->tree_lock;
1800 	struct block_map_tree_slot slot = lock->tree_slots[lock->height];
1801 	struct data_location mapping =
1802 		vdo_unpack_block_map_entry(&page->entries[slot.block_map_slot.slot]);
1803 
1804 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
1805 		vdo_log_error_strerror(VDO_BAD_MAPPING,
1806 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
1807 				       (unsigned long long) mapping.pbn, mapping.state,
1808 				       lock->tree_slots[lock->height - 1].page_index,
1809 				       lock->height - 1);
1810 		abort_load(data_vio, VDO_BAD_MAPPING);
1811 		return;
1812 	}
1813 
1814 	if (!vdo_is_mapped_location(&mapping)) {
1815 		/* The page we need is unallocated */
1816 		allocate_block_map_page(data_vio->logical.zone->block_map_zone,
1817 					data_vio);
1818 		return;
1819 	}
1820 
1821 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
1822 	if (lock->height == 1) {
1823 		finish_lookup(data_vio, VDO_SUCCESS);
1824 		return;
1825 	}
1826 
1827 	/* We know what page we need to load next */
1828 	load_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1829 }
1830 
continue_load_for_waiter(struct vdo_waiter * waiter,void * context)1831 static void continue_load_for_waiter(struct vdo_waiter *waiter, void *context)
1832 {
1833 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1834 
1835 	data_vio->tree_lock.height--;
1836 	continue_with_loaded_page(data_vio, context);
1837 }
1838 
finish_block_map_page_load(struct vdo_completion * completion)1839 static void finish_block_map_page_load(struct vdo_completion *completion)
1840 {
1841 	physical_block_number_t pbn;
1842 	struct tree_page *tree_page;
1843 	struct block_map_page *page;
1844 	nonce_t nonce;
1845 	struct vio *vio = as_vio(completion);
1846 	struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1847 	struct data_vio *data_vio = completion->parent;
1848 	struct block_map_zone *zone = pooled->context;
1849 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1850 
1851 	tree_lock->height--;
1852 	pbn = tree_lock->tree_slots[tree_lock->height].block_map_slot.pbn;
1853 	tree_page = get_tree_page(zone, tree_lock);
1854 	page = (struct block_map_page *) tree_page->page_buffer;
1855 	nonce = zone->block_map->nonce;
1856 
1857 	if (!vdo_copy_valid_page(vio->data, nonce, pbn, page))
1858 		vdo_format_block_map_page(page, nonce, pbn, false);
1859 	return_vio_to_pool(pooled);
1860 
1861 	/* Release our claim to the load and wake any waiters */
1862 	release_page_lock(data_vio, "load");
1863 	vdo_waitq_notify_all_waiters(&tree_lock->waiters, continue_load_for_waiter, page);
1864 	continue_with_loaded_page(data_vio, page);
1865 }
1866 
handle_io_error(struct vdo_completion * completion)1867 static void handle_io_error(struct vdo_completion *completion)
1868 {
1869 	int result = completion->result;
1870 	struct vio *vio = as_vio(completion);
1871 	struct pooled_vio *pooled = container_of(vio, struct pooled_vio, vio);
1872 	struct data_vio *data_vio = completion->parent;
1873 
1874 	vio_record_metadata_io_error(vio);
1875 	return_vio_to_pool(pooled);
1876 	abort_load(data_vio, result);
1877 }
1878 
load_page_endio(struct bio * bio)1879 static void load_page_endio(struct bio *bio)
1880 {
1881 	struct vio *vio = bio->bi_private;
1882 	struct data_vio *data_vio = vio->completion.parent;
1883 
1884 	continue_vio_after_io(vio, finish_block_map_page_load,
1885 			      data_vio->logical.zone->thread_id);
1886 }
1887 
load_page(struct vdo_waiter * waiter,void * context)1888 static void load_page(struct vdo_waiter *waiter, void *context)
1889 {
1890 	struct pooled_vio *pooled = context;
1891 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1892 	struct tree_lock *lock = &data_vio->tree_lock;
1893 	physical_block_number_t pbn = lock->tree_slots[lock->height - 1].block_map_slot.pbn;
1894 
1895 	pooled->vio.completion.parent = data_vio;
1896 	vdo_submit_metadata_vio(&pooled->vio, pbn, load_page_endio,
1897 				handle_io_error, REQ_OP_READ | REQ_PRIO);
1898 }
1899 
1900 /*
1901  * If the page is already locked, queue up to wait for the lock to be released. If the lock is
1902  * acquired, @data_vio->tree_lock.locked will be true.
1903  */
attempt_page_lock(struct block_map_zone * zone,struct data_vio * data_vio)1904 static int attempt_page_lock(struct block_map_zone *zone, struct data_vio *data_vio)
1905 {
1906 	int result;
1907 	struct tree_lock *lock_holder;
1908 	struct tree_lock *lock = &data_vio->tree_lock;
1909 	height_t height = lock->height;
1910 	struct block_map_tree_slot tree_slot = lock->tree_slots[height];
1911 	union page_key key;
1912 
1913 	key.descriptor = (struct page_descriptor) {
1914 		.root_index = lock->root_index,
1915 		.height = height,
1916 		.page_index = tree_slot.page_index,
1917 		.slot = tree_slot.block_map_slot.slot,
1918 	};
1919 	lock->key = key.key;
1920 
1921 	result = vdo_int_map_put(zone->loading_pages, lock->key,
1922 				 lock, false, (void **) &lock_holder);
1923 	if (result != VDO_SUCCESS)
1924 		return result;
1925 
1926 	if (lock_holder == NULL) {
1927 		/* We got the lock */
1928 		data_vio->tree_lock.locked = true;
1929 		return VDO_SUCCESS;
1930 	}
1931 
1932 	/* Someone else is loading or allocating the page we need */
1933 	vdo_waitq_enqueue_waiter(&lock_holder->waiters, &data_vio->waiter);
1934 	return VDO_SUCCESS;
1935 }
1936 
1937 /* Load a block map tree page from disk, for the next level in the data vio tree lock. */
load_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)1938 static void load_block_map_page(struct block_map_zone *zone, struct data_vio *data_vio)
1939 {
1940 	int result;
1941 
1942 	result = attempt_page_lock(zone, data_vio);
1943 	if (result != VDO_SUCCESS) {
1944 		abort_load(data_vio, result);
1945 		return;
1946 	}
1947 
1948 	if (data_vio->tree_lock.locked) {
1949 		data_vio->waiter.callback = load_page;
1950 		acquire_vio_from_pool(zone->vio_pool, &data_vio->waiter);
1951 	}
1952 }
1953 
allocation_failure(struct vdo_completion * completion)1954 static void allocation_failure(struct vdo_completion *completion)
1955 {
1956 	struct data_vio *data_vio = as_data_vio(completion);
1957 
1958 	if (vdo_requeue_completion_if_needed(completion,
1959 					     data_vio->logical.zone->thread_id))
1960 		return;
1961 
1962 	abort_lookup(data_vio, completion->result, "allocation");
1963 }
1964 
continue_allocation_for_waiter(struct vdo_waiter * waiter,void * context)1965 static void continue_allocation_for_waiter(struct vdo_waiter *waiter, void *context)
1966 {
1967 	struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1968 	struct tree_lock *tree_lock = &data_vio->tree_lock;
1969 	physical_block_number_t pbn = *((physical_block_number_t *) context);
1970 
1971 	tree_lock->height--;
1972 	data_vio->tree_lock.tree_slots[tree_lock->height].block_map_slot.pbn = pbn;
1973 
1974 	if (tree_lock->height == 0) {
1975 		finish_lookup(data_vio, VDO_SUCCESS);
1976 		return;
1977 	}
1978 
1979 	allocate_block_map_page(data_vio->logical.zone->block_map_zone, data_vio);
1980 }
1981 
1982 /** expire_oldest_list() - Expire the oldest list. */
expire_oldest_list(struct dirty_lists * dirty_lists)1983 static void expire_oldest_list(struct dirty_lists *dirty_lists)
1984 {
1985 	block_count_t i = dirty_lists->offset++;
1986 
1987 	dirty_lists->oldest_period++;
1988 	if (!list_empty(&dirty_lists->eras[i][VDO_TREE_PAGE])) {
1989 		list_splice_tail_init(&dirty_lists->eras[i][VDO_TREE_PAGE],
1990 				      &dirty_lists->expired[VDO_TREE_PAGE]);
1991 	}
1992 
1993 	if (!list_empty(&dirty_lists->eras[i][VDO_CACHE_PAGE])) {
1994 		list_splice_tail_init(&dirty_lists->eras[i][VDO_CACHE_PAGE],
1995 				      &dirty_lists->expired[VDO_CACHE_PAGE]);
1996 	}
1997 
1998 	if (dirty_lists->offset == dirty_lists->maximum_age)
1999 		dirty_lists->offset = 0;
2000 }
2001 
2002 
2003 /** update_period() - Update the dirty_lists period if necessary. */
update_period(struct dirty_lists * dirty,sequence_number_t period)2004 static void update_period(struct dirty_lists *dirty, sequence_number_t period)
2005 {
2006 	while (dirty->next_period <= period) {
2007 		if ((dirty->next_period - dirty->oldest_period) == dirty->maximum_age)
2008 			expire_oldest_list(dirty);
2009 		dirty->next_period++;
2010 	}
2011 }
2012 
2013 /** write_expired_elements() - Write out the expired list. */
write_expired_elements(struct block_map_zone * zone)2014 static void write_expired_elements(struct block_map_zone *zone)
2015 {
2016 	struct tree_page *page, *ttmp;
2017 	struct page_info *info, *ptmp;
2018 	struct list_head *expired;
2019 	u8 generation = zone->generation;
2020 
2021 	expired = &zone->dirty_lists->expired[VDO_TREE_PAGE];
2022 	list_for_each_entry_safe(page, ttmp, expired, entry) {
2023 		int result;
2024 
2025 		list_del_init(&page->entry);
2026 
2027 		result = VDO_ASSERT(!vdo_waiter_is_waiting(&page->waiter),
2028 				    "Newly expired page not already waiting to write");
2029 		if (result != VDO_SUCCESS) {
2030 			enter_zone_read_only_mode(zone, result);
2031 			continue;
2032 		}
2033 
2034 		set_generation(zone, page, generation);
2035 		if (!page->writing)
2036 			enqueue_page(page, zone);
2037 	}
2038 
2039 	expired = &zone->dirty_lists->expired[VDO_CACHE_PAGE];
2040 	list_for_each_entry_safe(info, ptmp, expired, state_entry) {
2041 		list_del_init(&info->state_entry);
2042 		schedule_page_save(info);
2043 	}
2044 
2045 	save_pages(&zone->page_cache);
2046 }
2047 
2048 /**
2049  * add_to_dirty_lists() - Add an element to the dirty lists.
2050  * @zone: The zone in which we are operating.
2051  * @entry: The list entry of the element to add.
2052  * @type: The type of page.
2053  * @old_period: The period in which the element was previously dirtied, or 0 if it was not dirty.
2054  * @new_period: The period in which the element has now been dirtied, or 0 if it does not hold a
2055  *              lock.
2056  */
add_to_dirty_lists(struct block_map_zone * zone,struct list_head * entry,enum block_map_page_type type,sequence_number_t old_period,sequence_number_t new_period)2057 static void add_to_dirty_lists(struct block_map_zone *zone,
2058 			       struct list_head *entry,
2059 			       enum block_map_page_type type,
2060 			       sequence_number_t old_period,
2061 			       sequence_number_t new_period)
2062 {
2063 	struct dirty_lists *dirty_lists = zone->dirty_lists;
2064 
2065 	if ((old_period == new_period) || ((old_period != 0) && (old_period < new_period)))
2066 		return;
2067 
2068 	if (new_period < dirty_lists->oldest_period) {
2069 		list_move_tail(entry, &dirty_lists->expired[type]);
2070 	} else {
2071 		update_period(dirty_lists, new_period);
2072 		list_move_tail(entry,
2073 			       &dirty_lists->eras[new_period % dirty_lists->maximum_age][type]);
2074 	}
2075 
2076 	write_expired_elements(zone);
2077 }
2078 
2079 /*
2080  * Record the allocation in the tree and wake any waiters now that the write lock has been
2081  * released.
2082  */
finish_block_map_allocation(struct vdo_completion * completion)2083 static void finish_block_map_allocation(struct vdo_completion *completion)
2084 {
2085 	physical_block_number_t pbn;
2086 	struct tree_page *tree_page;
2087 	struct block_map_page *page;
2088 	sequence_number_t old_lock;
2089 	struct data_vio *data_vio = as_data_vio(completion);
2090 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2091 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2092 	height_t height = tree_lock->height;
2093 
2094 	assert_data_vio_in_logical_zone(data_vio);
2095 
2096 	tree_page = get_tree_page(zone, tree_lock);
2097 	pbn = tree_lock->tree_slots[height - 1].block_map_slot.pbn;
2098 
2099 	/* Record the allocation. */
2100 	page = (struct block_map_page *) tree_page->page_buffer;
2101 	old_lock = tree_page->recovery_lock;
2102 	vdo_update_block_map_page(page, data_vio, pbn,
2103 				  VDO_MAPPING_STATE_UNCOMPRESSED,
2104 				  &tree_page->recovery_lock);
2105 
2106 	if (vdo_waiter_is_waiting(&tree_page->waiter)) {
2107 		/* This page is waiting to be written out. */
2108 		if (zone->flusher != tree_page) {
2109 			/*
2110 			 * The outstanding flush won't cover the update we just made,
2111 			 * so mark the page as needing another flush.
2112 			 */
2113 			set_generation(zone, tree_page, zone->generation);
2114 		}
2115 	} else {
2116 		/* Put the page on a dirty list */
2117 		if (old_lock == 0)
2118 			INIT_LIST_HEAD(&tree_page->entry);
2119 		add_to_dirty_lists(zone, &tree_page->entry, VDO_TREE_PAGE,
2120 				   old_lock, tree_page->recovery_lock);
2121 	}
2122 
2123 	tree_lock->height--;
2124 	if (height > 1) {
2125 		/* Format the interior node we just allocated (in memory). */
2126 		tree_page = get_tree_page(zone, tree_lock);
2127 		vdo_format_block_map_page(tree_page->page_buffer,
2128 					  zone->block_map->nonce,
2129 					  pbn, false);
2130 	}
2131 
2132 	/* Release our claim to the allocation and wake any waiters */
2133 	release_page_lock(data_vio, "allocation");
2134 	vdo_waitq_notify_all_waiters(&tree_lock->waiters,
2135 				     continue_allocation_for_waiter, &pbn);
2136 	if (tree_lock->height == 0) {
2137 		finish_lookup(data_vio, VDO_SUCCESS);
2138 		return;
2139 	}
2140 
2141 	allocate_block_map_page(zone, data_vio);
2142 }
2143 
release_block_map_write_lock(struct vdo_completion * completion)2144 static void release_block_map_write_lock(struct vdo_completion *completion)
2145 {
2146 	struct data_vio *data_vio = as_data_vio(completion);
2147 
2148 	assert_data_vio_in_allocated_zone(data_vio);
2149 
2150 	release_data_vio_allocation_lock(data_vio, true);
2151 	launch_data_vio_logical_callback(data_vio, finish_block_map_allocation);
2152 }
2153 
2154 /*
2155  * Newly allocated block map pages are set to have to MAXIMUM_REFERENCES after they are journaled,
2156  * to prevent deduplication against the block after we release the write lock on it, but before we
2157  * write out the page.
2158  */
set_block_map_page_reference_count(struct vdo_completion * completion)2159 static void set_block_map_page_reference_count(struct vdo_completion *completion)
2160 {
2161 	struct data_vio *data_vio = as_data_vio(completion);
2162 
2163 	assert_data_vio_in_allocated_zone(data_vio);
2164 
2165 	completion->callback = release_block_map_write_lock;
2166 	vdo_modify_reference_count(completion, &data_vio->increment_updater);
2167 }
2168 
journal_block_map_allocation(struct vdo_completion * completion)2169 static void journal_block_map_allocation(struct vdo_completion *completion)
2170 {
2171 	struct data_vio *data_vio = as_data_vio(completion);
2172 
2173 	assert_data_vio_in_journal_zone(data_vio);
2174 
2175 	set_data_vio_allocated_zone_callback(data_vio,
2176 					     set_block_map_page_reference_count);
2177 	vdo_add_recovery_journal_entry(completion->vdo->recovery_journal, data_vio);
2178 }
2179 
allocate_block(struct vdo_completion * completion)2180 static void allocate_block(struct vdo_completion *completion)
2181 {
2182 	struct data_vio *data_vio = as_data_vio(completion);
2183 	struct tree_lock *lock = &data_vio->tree_lock;
2184 	physical_block_number_t pbn;
2185 
2186 	assert_data_vio_in_allocated_zone(data_vio);
2187 
2188 	if (!vdo_allocate_block_in_zone(data_vio))
2189 		return;
2190 
2191 	pbn = data_vio->allocation.pbn;
2192 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = pbn;
2193 	data_vio->increment_updater = (struct reference_updater) {
2194 		.operation = VDO_JOURNAL_BLOCK_MAP_REMAPPING,
2195 		.increment = true,
2196 		.zpbn = {
2197 			.pbn = pbn,
2198 			.state = VDO_MAPPING_STATE_UNCOMPRESSED,
2199 		},
2200 		.lock = data_vio->allocation.lock,
2201 	};
2202 
2203 	launch_data_vio_journal_callback(data_vio, journal_block_map_allocation);
2204 }
2205 
allocate_block_map_page(struct block_map_zone * zone,struct data_vio * data_vio)2206 static void allocate_block_map_page(struct block_map_zone *zone,
2207 				    struct data_vio *data_vio)
2208 {
2209 	int result;
2210 
2211 	if (!data_vio->write || data_vio->is_discard) {
2212 		/* This is a pure read or a discard, so there's nothing left to do here. */
2213 		finish_lookup(data_vio, VDO_SUCCESS);
2214 		return;
2215 	}
2216 
2217 	result = attempt_page_lock(zone, data_vio);
2218 	if (result != VDO_SUCCESS) {
2219 		abort_lookup(data_vio, result, "allocation");
2220 		return;
2221 	}
2222 
2223 	if (!data_vio->tree_lock.locked)
2224 		return;
2225 
2226 	data_vio_allocate_data_block(data_vio, VIO_BLOCK_MAP_WRITE_LOCK,
2227 				     allocate_block, allocation_failure);
2228 }
2229 
2230 /**
2231  * vdo_find_block_map_slot() - Find the block map slot in which the block map entry for a data_vio
2232  *                             resides and cache that result in the data_vio.
2233  * @data_vio: The data vio.
2234  *
2235  * All ancestors in the tree will be allocated or loaded, as needed.
2236  */
vdo_find_block_map_slot(struct data_vio * data_vio)2237 void vdo_find_block_map_slot(struct data_vio *data_vio)
2238 {
2239 	page_number_t page_index;
2240 	struct block_map_tree_slot tree_slot;
2241 	struct data_location mapping;
2242 	struct block_map_page *page = NULL;
2243 	struct tree_lock *lock = &data_vio->tree_lock;
2244 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
2245 
2246 	zone->active_lookups++;
2247 	if (vdo_is_state_draining(&zone->state)) {
2248 		finish_lookup(data_vio, VDO_SHUTTING_DOWN);
2249 		return;
2250 	}
2251 
2252 	lock->tree_slots[0].block_map_slot.slot =
2253 		data_vio->logical.lbn % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2254 	page_index = (lock->tree_slots[0].page_index / zone->block_map->root_count);
2255 	tree_slot = (struct block_map_tree_slot) {
2256 		.page_index = page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2257 		.block_map_slot = {
2258 			.pbn = 0,
2259 			.slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE,
2260 		},
2261 	};
2262 
2263 	for (lock->height = 1; lock->height <= VDO_BLOCK_MAP_TREE_HEIGHT; lock->height++) {
2264 		physical_block_number_t pbn;
2265 
2266 		lock->tree_slots[lock->height] = tree_slot;
2267 		page = (struct block_map_page *) (get_tree_page(zone, lock)->page_buffer);
2268 		pbn = vdo_get_block_map_page_pbn(page);
2269 		if (pbn != VDO_ZERO_BLOCK) {
2270 			lock->tree_slots[lock->height].block_map_slot.pbn = pbn;
2271 			break;
2272 		}
2273 
2274 		/* Calculate the index and slot for the next level. */
2275 		tree_slot.block_map_slot.slot =
2276 			tree_slot.page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2277 		tree_slot.page_index = tree_slot.page_index / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2278 	}
2279 
2280 	/* The page at this height has been allocated and loaded. */
2281 	mapping = vdo_unpack_block_map_entry(&page->entries[tree_slot.block_map_slot.slot]);
2282 	if (is_invalid_tree_entry(vdo_from_data_vio(data_vio), &mapping, lock->height)) {
2283 		vdo_log_error_strerror(VDO_BAD_MAPPING,
2284 				       "Invalid block map tree PBN: %llu with state %u for page index %u at height %u",
2285 				       (unsigned long long) mapping.pbn, mapping.state,
2286 				       lock->tree_slots[lock->height - 1].page_index,
2287 				       lock->height - 1);
2288 		abort_load(data_vio, VDO_BAD_MAPPING);
2289 		return;
2290 	}
2291 
2292 	if (!vdo_is_mapped_location(&mapping)) {
2293 		/* The page we want one level down has not been allocated, so allocate it. */
2294 		allocate_block_map_page(zone, data_vio);
2295 		return;
2296 	}
2297 
2298 	lock->tree_slots[lock->height - 1].block_map_slot.pbn = mapping.pbn;
2299 	if (lock->height == 1) {
2300 		/* This is the ultimate block map page, so we're done */
2301 		finish_lookup(data_vio, VDO_SUCCESS);
2302 		return;
2303 	}
2304 
2305 	/* We know what page we need to load. */
2306 	load_block_map_page(zone, data_vio);
2307 }
2308 
2309 /*
2310  * Find the PBN of a leaf block map page. This method may only be used after all allocated tree
2311  * pages have been loaded, otherwise, it may give the wrong answer (0).
2312  */
vdo_find_block_map_page_pbn(struct block_map * map,page_number_t page_number)2313 physical_block_number_t vdo_find_block_map_page_pbn(struct block_map *map,
2314 						    page_number_t page_number)
2315 {
2316 	struct data_location mapping;
2317 	struct tree_page *tree_page;
2318 	struct block_map_page *page;
2319 	root_count_t root_index = page_number % map->root_count;
2320 	page_number_t page_index = page_number / map->root_count;
2321 	slot_number_t slot = page_index % VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2322 
2323 	page_index /= VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2324 
2325 	tree_page = get_tree_page_by_index(map->forest, root_index, 1, page_index);
2326 	page = (struct block_map_page *) tree_page->page_buffer;
2327 	if (!page->header.initialized)
2328 		return VDO_ZERO_BLOCK;
2329 
2330 	mapping = vdo_unpack_block_map_entry(&page->entries[slot]);
2331 	if (!vdo_is_valid_location(&mapping) || vdo_is_state_compressed(mapping.state))
2332 		return VDO_ZERO_BLOCK;
2333 	return mapping.pbn;
2334 }
2335 
2336 /*
2337  * Write a tree page or indicate that it has been re-dirtied if it is already being written. This
2338  * method is used when correcting errors in the tree during read-only rebuild.
2339  */
vdo_write_tree_page(struct tree_page * page,struct block_map_zone * zone)2340 void vdo_write_tree_page(struct tree_page *page, struct block_map_zone *zone)
2341 {
2342 	bool waiting = vdo_waiter_is_waiting(&page->waiter);
2343 
2344 	if (waiting && (zone->flusher == page))
2345 		return;
2346 
2347 	set_generation(zone, page, zone->generation);
2348 	if (waiting || page->writing)
2349 		return;
2350 
2351 	enqueue_page(page, zone);
2352 }
2353 
make_segment(struct forest * old_forest,block_count_t new_pages,struct boundary * new_boundary,struct forest * forest)2354 static int make_segment(struct forest *old_forest, block_count_t new_pages,
2355 			struct boundary *new_boundary, struct forest *forest)
2356 {
2357 	size_t index = (old_forest == NULL) ? 0 : old_forest->segments;
2358 	struct tree_page *page_ptr;
2359 	page_count_t segment_sizes[VDO_BLOCK_MAP_TREE_HEIGHT];
2360 	height_t height;
2361 	root_count_t root;
2362 	int result;
2363 
2364 	forest->segments = index + 1;
2365 
2366 	result = vdo_allocate(forest->segments, "forest boundary array", &forest->boundaries);
2367 	if (result != VDO_SUCCESS)
2368 		return result;
2369 
2370 	result = vdo_allocate(forest->segments, "forest page pointers", &forest->pages);
2371 	if (result != VDO_SUCCESS)
2372 		return result;
2373 
2374 	result = vdo_allocate(new_pages, "new forest pages", &forest->pages[index]);
2375 	if (result != VDO_SUCCESS)
2376 		return result;
2377 
2378 	if (index > 0) {
2379 		memcpy(forest->boundaries, old_forest->boundaries,
2380 		       index * sizeof(struct boundary));
2381 		memcpy(forest->pages, old_forest->pages,
2382 		       index * sizeof(struct tree_page *));
2383 	}
2384 
2385 	memcpy(&(forest->boundaries[index]), new_boundary, sizeof(struct boundary));
2386 
2387 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2388 		segment_sizes[height] = new_boundary->levels[height];
2389 		if (index > 0)
2390 			segment_sizes[height] -= old_forest->boundaries[index - 1].levels[height];
2391 	}
2392 
2393 	page_ptr = forest->pages[index];
2394 	for (root = 0; root < forest->map->root_count; root++) {
2395 		struct block_map_tree_segment *segment;
2396 		struct block_map_tree *tree = &(forest->trees[root]);
2397 		height_t height;
2398 
2399 		result = vdo_allocate(forest->segments, "tree root segments", &tree->segments);
2400 		if (result != VDO_SUCCESS)
2401 			return result;
2402 
2403 		if (index > 0) {
2404 			memcpy(tree->segments, old_forest->trees[root].segments,
2405 			       index * sizeof(struct block_map_tree_segment));
2406 		}
2407 
2408 		segment = &(tree->segments[index]);
2409 		for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT; height++) {
2410 			if (segment_sizes[height] == 0)
2411 				continue;
2412 
2413 			segment->levels[height] = page_ptr;
2414 			if (height == (VDO_BLOCK_MAP_TREE_HEIGHT - 1)) {
2415 				/* Record the root. */
2416 				struct block_map_page *page =
2417 					vdo_format_block_map_page(page_ptr->page_buffer,
2418 								  forest->map->nonce,
2419 								  VDO_INVALID_PBN, true);
2420 				page->entries[0] =
2421 					vdo_pack_block_map_entry(forest->map->root_origin + root,
2422 								 VDO_MAPPING_STATE_UNCOMPRESSED);
2423 			}
2424 			page_ptr += segment_sizes[height];
2425 		}
2426 	}
2427 
2428 	return VDO_SUCCESS;
2429 }
2430 
deforest(struct forest * forest,size_t first_page_segment)2431 static void deforest(struct forest *forest, size_t first_page_segment)
2432 {
2433 	root_count_t root;
2434 
2435 	if (forest->pages != NULL) {
2436 		size_t segment;
2437 
2438 		for (segment = first_page_segment; segment < forest->segments; segment++)
2439 			vdo_free(forest->pages[segment]);
2440 		vdo_free(forest->pages);
2441 	}
2442 
2443 	for (root = 0; root < forest->map->root_count; root++)
2444 		vdo_free(forest->trees[root].segments);
2445 
2446 	vdo_free(forest->boundaries);
2447 	vdo_free(forest);
2448 }
2449 
2450 /**
2451  * make_forest() - Make a collection of trees for a block_map, expanding the existing forest if
2452  *                 there is one.
2453  * @map: The block map.
2454  * @entries: The number of entries the block map will hold.
2455  *
2456  * Return: VDO_SUCCESS or an error.
2457  */
make_forest(struct block_map * map,block_count_t entries)2458 static int make_forest(struct block_map *map, block_count_t entries)
2459 {
2460 	struct forest *forest, *old_forest = map->forest;
2461 	struct boundary new_boundary, *old_boundary = NULL;
2462 	block_count_t new_pages;
2463 	int result;
2464 
2465 	if (old_forest != NULL)
2466 		old_boundary = &(old_forest->boundaries[old_forest->segments - 1]);
2467 
2468 	new_pages = vdo_compute_new_forest_pages(map->root_count, old_boundary,
2469 						 entries, &new_boundary);
2470 	if (new_pages == 0) {
2471 		map->next_entry_count = entries;
2472 		return VDO_SUCCESS;
2473 	}
2474 
2475 	result = vdo_allocate_extended(map->root_count, trees, __func__, &forest);
2476 	if (result != VDO_SUCCESS)
2477 		return result;
2478 
2479 	forest->map = map;
2480 	result = make_segment(old_forest, new_pages, &new_boundary, forest);
2481 	if (result != VDO_SUCCESS) {
2482 		deforest(forest, forest->segments - 1);
2483 		return result;
2484 	}
2485 
2486 	map->next_forest = forest;
2487 	map->next_entry_count = entries;
2488 	return VDO_SUCCESS;
2489 }
2490 
2491 /**
2492  * replace_forest() - Replace a block_map's forest with the already-prepared larger forest.
2493  * @map: The block map.
2494  */
replace_forest(struct block_map * map)2495 static void replace_forest(struct block_map *map)
2496 {
2497 	if (map->next_forest != NULL) {
2498 		if (map->forest != NULL)
2499 			deforest(map->forest, map->forest->segments);
2500 		map->forest = vdo_forget(map->next_forest);
2501 	}
2502 
2503 	map->entry_count = map->next_entry_count;
2504 	map->next_entry_count = 0;
2505 }
2506 
2507 /**
2508  * finish_cursor() - Finish the traversal of a single tree. If it was the last cursor, finish the
2509  *                   traversal.
2510  * @cursor: The cursor to complete.
2511  */
finish_cursor(struct cursor * cursor)2512 static void finish_cursor(struct cursor *cursor)
2513 {
2514 	struct cursors *cursors = cursor->parent;
2515 	struct vdo_completion *completion = cursors->completion;
2516 
2517 	return_vio_to_pool(vdo_forget(cursor->vio));
2518 	if (--cursors->active_roots > 0)
2519 		return;
2520 
2521 	vdo_free(cursors);
2522 
2523 	vdo_finish_completion(completion);
2524 }
2525 
2526 static void traverse(struct cursor *cursor);
2527 
2528 /**
2529  * continue_traversal() - Continue traversing a block map tree.
2530  * @completion: The VIO doing a read or write.
2531  */
continue_traversal(struct vdo_completion * completion)2532 static void continue_traversal(struct vdo_completion *completion)
2533 {
2534 	vio_record_metadata_io_error(as_vio(completion));
2535 	traverse(completion->parent);
2536 }
2537 
2538 /**
2539  * finish_traversal_load() - Continue traversing a block map tree now that a page has been loaded.
2540  * @completion: The VIO doing the read.
2541  */
finish_traversal_load(struct vdo_completion * completion)2542 static void finish_traversal_load(struct vdo_completion *completion)
2543 {
2544 	struct cursor *cursor = completion->parent;
2545 	height_t height = cursor->height;
2546 	struct cursor_level *level = &cursor->levels[height];
2547 	struct tree_page *tree_page =
2548 		&(cursor->tree->segments[0].levels[height][level->page_index]);
2549 	struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2550 
2551 	vdo_copy_valid_page(cursor->vio->vio.data,
2552 			    cursor->parent->zone->block_map->nonce,
2553 			    pbn_from_vio_bio(cursor->vio->vio.bio), page);
2554 	traverse(cursor);
2555 }
2556 
traversal_endio(struct bio * bio)2557 static void traversal_endio(struct bio *bio)
2558 {
2559 	struct vio *vio = bio->bi_private;
2560 	struct cursor *cursor = vio->completion.parent;
2561 
2562 	continue_vio_after_io(vio, finish_traversal_load,
2563 			      cursor->parent->zone->thread_id);
2564 }
2565 
2566 /**
2567  * traverse() - Traverse a single block map tree.
2568  * @cursor: A cursor tracking traversal progress.
2569  *
2570  * This is the recursive heart of the traversal process.
2571  */
traverse(struct cursor * cursor)2572 static void traverse(struct cursor *cursor)
2573 {
2574 	for (; cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT; cursor->height++) {
2575 		height_t height = cursor->height;
2576 		struct cursor_level *level = &cursor->levels[height];
2577 		struct tree_page *tree_page =
2578 			&(cursor->tree->segments[0].levels[height][level->page_index]);
2579 		struct block_map_page *page = (struct block_map_page *) tree_page->page_buffer;
2580 
2581 		if (!page->header.initialized)
2582 			continue;
2583 
2584 		for (; level->slot < VDO_BLOCK_MAP_ENTRIES_PER_PAGE; level->slot++) {
2585 			struct cursor_level *next_level;
2586 			page_number_t entry_index =
2587 				(VDO_BLOCK_MAP_ENTRIES_PER_PAGE * level->page_index) + level->slot;
2588 			struct data_location location =
2589 				vdo_unpack_block_map_entry(&page->entries[level->slot]);
2590 
2591 			if (!vdo_is_valid_location(&location)) {
2592 				/* This entry is invalid, so remove it from the page. */
2593 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2594 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2595 				continue;
2596 			}
2597 
2598 			if (!vdo_is_mapped_location(&location))
2599 				continue;
2600 
2601 			/* Erase mapped entries past the end of the logical space. */
2602 			if (entry_index >= cursor->boundary.levels[height]) {
2603 				page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2604 				vdo_write_tree_page(tree_page, cursor->parent->zone);
2605 				continue;
2606 			}
2607 
2608 			if (cursor->height < VDO_BLOCK_MAP_TREE_HEIGHT - 1) {
2609 				int result = cursor->parent->entry_callback(location.pbn,
2610 									    cursor->parent->completion);
2611 				if (result != VDO_SUCCESS) {
2612 					page->entries[level->slot] = UNMAPPED_BLOCK_MAP_ENTRY;
2613 					vdo_write_tree_page(tree_page, cursor->parent->zone);
2614 					continue;
2615 				}
2616 			}
2617 
2618 			if (cursor->height == 0)
2619 				continue;
2620 
2621 			cursor->height--;
2622 			next_level = &cursor->levels[cursor->height];
2623 			next_level->page_index = entry_index;
2624 			next_level->slot = 0;
2625 			level->slot++;
2626 			vdo_submit_metadata_vio(&cursor->vio->vio, location.pbn,
2627 						traversal_endio, continue_traversal,
2628 						REQ_OP_READ | REQ_PRIO);
2629 			return;
2630 		}
2631 	}
2632 
2633 	finish_cursor(cursor);
2634 }
2635 
2636 /**
2637  * launch_cursor() - Start traversing a single block map tree now that the cursor has a VIO with
2638  *                   which to load pages.
2639  * @waiter: The parent of the cursor to launch.
2640  * @context: The pooled_vio just acquired.
2641  *
2642  * Implements waiter_callback_fn.
2643  */
launch_cursor(struct vdo_waiter * waiter,void * context)2644 static void launch_cursor(struct vdo_waiter *waiter, void *context)
2645 {
2646 	struct cursor *cursor = container_of(waiter, struct cursor, waiter);
2647 	struct pooled_vio *pooled = context;
2648 
2649 	cursor->vio = pooled;
2650 	pooled->vio.completion.parent = cursor;
2651 	pooled->vio.completion.callback_thread_id = cursor->parent->zone->thread_id;
2652 	traverse(cursor);
2653 }
2654 
2655 /**
2656  * compute_boundary() - Compute the number of pages used at each level of the given root's tree.
2657  * @map: The block map.
2658  * @root_index: The tree root index.
2659  *
2660  * Return: The list of page counts as a boundary structure.
2661  */
compute_boundary(struct block_map * map,root_count_t root_index)2662 static struct boundary compute_boundary(struct block_map *map, root_count_t root_index)
2663 {
2664 	struct boundary boundary;
2665 	height_t height;
2666 	page_count_t leaf_pages = vdo_compute_block_map_page_count(map->entry_count);
2667 	/*
2668 	 * Compute the leaf pages for this root. If the number of leaf pages does not distribute
2669 	 * evenly, we must determine if this root gets an extra page. Extra pages are assigned to
2670 	 * roots starting from tree 0.
2671 	 */
2672 	page_count_t last_tree_root = (leaf_pages - 1) % map->root_count;
2673 	page_count_t level_pages = leaf_pages / map->root_count;
2674 
2675 	if (root_index <= last_tree_root)
2676 		level_pages++;
2677 
2678 	for (height = 0; height < VDO_BLOCK_MAP_TREE_HEIGHT - 1; height++) {
2679 		boundary.levels[height] = level_pages;
2680 		level_pages = DIV_ROUND_UP(level_pages, VDO_BLOCK_MAP_ENTRIES_PER_PAGE);
2681 	}
2682 
2683 	/* The root node always exists, even if the root is otherwise unused. */
2684 	boundary.levels[VDO_BLOCK_MAP_TREE_HEIGHT - 1] = 1;
2685 
2686 	return boundary;
2687 }
2688 
2689 /**
2690  * vdo_traverse_forest() - Walk the entire forest of a block map.
2691  * @map: The block map.
2692  * @callback: A function to call with the pbn of each allocated node in the forest.
2693  * @completion: The completion to notify on each traversed PBN, and when traversal completes.
2694  */
vdo_traverse_forest(struct block_map * map,vdo_entry_callback_fn callback,struct vdo_completion * completion)2695 void vdo_traverse_forest(struct block_map *map, vdo_entry_callback_fn callback,
2696 			 struct vdo_completion *completion)
2697 {
2698 	root_count_t root;
2699 	struct cursors *cursors;
2700 	int result;
2701 
2702 	result = vdo_allocate_extended(map->root_count, cursors, __func__, &cursors);
2703 	if (result != VDO_SUCCESS) {
2704 		vdo_fail_completion(completion, result);
2705 		return;
2706 	}
2707 
2708 	cursors->zone = &map->zones[0];
2709 	cursors->pool = cursors->zone->vio_pool;
2710 	cursors->entry_callback = callback;
2711 	cursors->completion = completion;
2712 	cursors->active_roots = map->root_count;
2713 	for (root = 0; root < map->root_count; root++) {
2714 		struct cursor *cursor = &cursors->cursors[root];
2715 
2716 		*cursor = (struct cursor) {
2717 			.tree = &map->forest->trees[root],
2718 			.height = VDO_BLOCK_MAP_TREE_HEIGHT - 1,
2719 			.parent = cursors,
2720 			.boundary = compute_boundary(map, root),
2721 		};
2722 
2723 		cursor->waiter.callback = launch_cursor;
2724 		acquire_vio_from_pool(cursors->pool, &cursor->waiter);
2725 	}
2726 }
2727 
2728 /**
2729  * initialize_block_map_zone() - Initialize the per-zone portions of the block map.
2730  * @map: The block map.
2731  * @zone_number: The zone to initialize.
2732  * @cache_size: The total block map cache size.
2733  * @maximum_age: The number of journal blocks before a dirtied page is considered old and must be
2734  *               written out.
2735  */
initialize_block_map_zone(struct block_map * map,zone_count_t zone_number,page_count_t cache_size,block_count_t maximum_age)2736 static int __must_check initialize_block_map_zone(struct block_map *map,
2737 						  zone_count_t zone_number,
2738 						  page_count_t cache_size,
2739 						  block_count_t maximum_age)
2740 {
2741 	int result;
2742 	block_count_t i;
2743 	struct vdo *vdo = map->vdo;
2744 	struct block_map_zone *zone = &map->zones[zone_number];
2745 
2746 	BUILD_BUG_ON(sizeof(struct page_descriptor) != sizeof(u64));
2747 
2748 	zone->zone_number = zone_number;
2749 	zone->thread_id = vdo->thread_config.logical_threads[zone_number];
2750 	zone->block_map = map;
2751 
2752 	result = vdo_allocate_extended(maximum_age, eras, __func__, &zone->dirty_lists);
2753 	if (result != VDO_SUCCESS)
2754 		return result;
2755 
2756 	zone->dirty_lists->maximum_age = maximum_age;
2757 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_TREE_PAGE]);
2758 	INIT_LIST_HEAD(&zone->dirty_lists->expired[VDO_CACHE_PAGE]);
2759 
2760 	for (i = 0; i < maximum_age; i++) {
2761 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_TREE_PAGE]);
2762 		INIT_LIST_HEAD(&zone->dirty_lists->eras[i][VDO_CACHE_PAGE]);
2763 	}
2764 
2765 	result = vdo_int_map_create(VDO_LOCK_MAP_CAPACITY, &zone->loading_pages);
2766 	if (result != VDO_SUCCESS)
2767 		return result;
2768 
2769 	result = make_vio_pool(vdo, BLOCK_MAP_VIO_POOL_SIZE, 1,
2770 			       zone->thread_id, VIO_TYPE_BLOCK_MAP_INTERIOR,
2771 			       VIO_PRIORITY_METADATA, zone, &zone->vio_pool);
2772 	if (result != VDO_SUCCESS)
2773 		return result;
2774 
2775 	vdo_set_admin_state_code(&zone->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
2776 
2777 	zone->page_cache.zone = zone;
2778 	zone->page_cache.vdo = vdo;
2779 	zone->page_cache.page_count = cache_size / map->zone_count;
2780 	zone->page_cache.stats.free_pages = zone->page_cache.page_count;
2781 
2782 	result = allocate_cache_components(&zone->page_cache);
2783 	if (result != VDO_SUCCESS)
2784 		return result;
2785 
2786 	/* initialize empty circular queues */
2787 	INIT_LIST_HEAD(&zone->page_cache.lru_list);
2788 	INIT_LIST_HEAD(&zone->page_cache.outgoing_list);
2789 
2790 	return VDO_SUCCESS;
2791 }
2792 
2793 /* Implements vdo_zone_thread_getter_fn */
get_block_map_zone_thread_id(void * context,zone_count_t zone_number)2794 static thread_id_t get_block_map_zone_thread_id(void *context, zone_count_t zone_number)
2795 {
2796 	struct block_map *map = context;
2797 
2798 	return map->zones[zone_number].thread_id;
2799 }
2800 
2801 /* Implements vdo_action_preamble_fn */
prepare_for_era_advance(void * context,struct vdo_completion * parent)2802 static void prepare_for_era_advance(void *context, struct vdo_completion *parent)
2803 {
2804 	struct block_map *map = context;
2805 
2806 	map->current_era_point = map->pending_era_point;
2807 	vdo_finish_completion(parent);
2808 }
2809 
2810 /* Implements vdo_zone_action_fn */
advance_block_map_zone_era(void * context,zone_count_t zone_number,struct vdo_completion * parent)2811 static void advance_block_map_zone_era(void *context, zone_count_t zone_number,
2812 				       struct vdo_completion *parent)
2813 {
2814 	struct block_map *map = context;
2815 	struct block_map_zone *zone = &map->zones[zone_number];
2816 
2817 	update_period(zone->dirty_lists, map->current_era_point);
2818 	write_expired_elements(zone);
2819 	vdo_finish_completion(parent);
2820 }
2821 
2822 /*
2823  * Schedule an era advance if necessary. This method should not be called directly. Rather, call
2824  * vdo_schedule_default_action() on the block map's action manager.
2825  *
2826  * Implements vdo_action_scheduler_fn.
2827  */
schedule_era_advance(void * context)2828 static bool schedule_era_advance(void *context)
2829 {
2830 	struct block_map *map = context;
2831 
2832 	if (map->current_era_point == map->pending_era_point)
2833 		return false;
2834 
2835 	return vdo_schedule_action(map->action_manager, prepare_for_era_advance,
2836 				   advance_block_map_zone_era, NULL, NULL);
2837 }
2838 
uninitialize_block_map_zone(struct block_map_zone * zone)2839 static void uninitialize_block_map_zone(struct block_map_zone *zone)
2840 {
2841 	struct vdo_page_cache *cache = &zone->page_cache;
2842 
2843 	vdo_free(vdo_forget(zone->dirty_lists));
2844 	free_vio_pool(vdo_forget(zone->vio_pool));
2845 	vdo_int_map_free(vdo_forget(zone->loading_pages));
2846 	if (cache->infos != NULL) {
2847 		struct page_info *info;
2848 
2849 		for (info = cache->infos; info < cache->infos + cache->page_count; info++)
2850 			free_vio(vdo_forget(info->vio));
2851 	}
2852 
2853 	vdo_int_map_free(vdo_forget(cache->page_map));
2854 	vdo_free(vdo_forget(cache->infos));
2855 	vdo_free(vdo_forget(cache->pages));
2856 }
2857 
vdo_free_block_map(struct block_map * map)2858 void vdo_free_block_map(struct block_map *map)
2859 {
2860 	zone_count_t zone;
2861 
2862 	if (map == NULL)
2863 		return;
2864 
2865 	for (zone = 0; zone < map->zone_count; zone++)
2866 		uninitialize_block_map_zone(&map->zones[zone]);
2867 
2868 	vdo_abandon_block_map_growth(map);
2869 	if (map->forest != NULL)
2870 		deforest(vdo_forget(map->forest), 0);
2871 	vdo_free(vdo_forget(map->action_manager));
2872 	vdo_free(map);
2873 }
2874 
2875 /* @journal may be NULL. */
vdo_decode_block_map(struct block_map_state_2_0 state,block_count_t logical_blocks,struct vdo * vdo,struct recovery_journal * journal,nonce_t nonce,page_count_t cache_size,block_count_t maximum_age,struct block_map ** map_ptr)2876 int vdo_decode_block_map(struct block_map_state_2_0 state, block_count_t logical_blocks,
2877 			 struct vdo *vdo, struct recovery_journal *journal,
2878 			 nonce_t nonce, page_count_t cache_size, block_count_t maximum_age,
2879 			 struct block_map **map_ptr)
2880 {
2881 	struct block_map *map;
2882 	int result;
2883 	zone_count_t zone = 0;
2884 
2885 	BUILD_BUG_ON(VDO_BLOCK_MAP_ENTRIES_PER_PAGE !=
2886 		     ((VDO_BLOCK_SIZE - sizeof(struct block_map_page)) /
2887 		      sizeof(struct block_map_entry)));
2888 	result = VDO_ASSERT(cache_size > 0, "block map cache size is specified");
2889 	if (result != VDO_SUCCESS)
2890 		return result;
2891 
2892 	result = vdo_allocate_extended(vdo->thread_config.logical_zone_count,
2893 				       zones, __func__, &map);
2894 	if (result != VDO_SUCCESS)
2895 		return result;
2896 
2897 	map->vdo = vdo;
2898 	map->root_origin = state.root_origin;
2899 	map->root_count = state.root_count;
2900 	map->entry_count = logical_blocks;
2901 	map->journal = journal;
2902 	map->nonce = nonce;
2903 
2904 	result = make_forest(map, map->entry_count);
2905 	if (result != VDO_SUCCESS) {
2906 		vdo_free_block_map(map);
2907 		return result;
2908 	}
2909 
2910 	replace_forest(map);
2911 
2912 	map->zone_count = vdo->thread_config.logical_zone_count;
2913 	for (zone = 0; zone < map->zone_count; zone++) {
2914 		result = initialize_block_map_zone(map, zone, cache_size, maximum_age);
2915 		if (result != VDO_SUCCESS) {
2916 			vdo_free_block_map(map);
2917 			return result;
2918 		}
2919 	}
2920 
2921 	result = vdo_make_action_manager(map->zone_count, get_block_map_zone_thread_id,
2922 					 vdo_get_recovery_journal_thread_id(journal),
2923 					 map, schedule_era_advance, vdo,
2924 					 &map->action_manager);
2925 	if (result != VDO_SUCCESS) {
2926 		vdo_free_block_map(map);
2927 		return result;
2928 	}
2929 
2930 	*map_ptr = map;
2931 	return VDO_SUCCESS;
2932 }
2933 
vdo_record_block_map(const struct block_map * map)2934 struct block_map_state_2_0 vdo_record_block_map(const struct block_map *map)
2935 {
2936 	return (struct block_map_state_2_0) {
2937 		.flat_page_origin = VDO_BLOCK_MAP_FLAT_PAGE_ORIGIN,
2938 		/* This is the flat page count, which has turned out to always be 0. */
2939 		.flat_page_count = 0,
2940 		.root_origin = map->root_origin,
2941 		.root_count = map->root_count,
2942 	};
2943 }
2944 
2945 /* The block map needs to know the journals' sequence number to initialize the eras. */
vdo_initialize_block_map_from_journal(struct block_map * map,struct recovery_journal * journal)2946 void vdo_initialize_block_map_from_journal(struct block_map *map,
2947 					   struct recovery_journal *journal)
2948 {
2949 	zone_count_t z = 0;
2950 
2951 	map->current_era_point = vdo_get_recovery_journal_current_sequence_number(journal);
2952 	map->pending_era_point = map->current_era_point;
2953 
2954 	for (z = 0; z < map->zone_count; z++) {
2955 		struct dirty_lists *dirty_lists = map->zones[z].dirty_lists;
2956 
2957 		VDO_ASSERT_LOG_ONLY(dirty_lists->next_period == 0, "current period not set");
2958 		dirty_lists->oldest_period = map->current_era_point;
2959 		dirty_lists->next_period = map->current_era_point + 1;
2960 		dirty_lists->offset = map->current_era_point % dirty_lists->maximum_age;
2961 	}
2962 }
2963 
2964 /* Compute the logical zone for the LBN of a data vio. */
vdo_compute_logical_zone(struct data_vio * data_vio)2965 zone_count_t vdo_compute_logical_zone(struct data_vio *data_vio)
2966 {
2967 	struct block_map *map = vdo_from_data_vio(data_vio)->block_map;
2968 	struct tree_lock *tree_lock = &data_vio->tree_lock;
2969 	page_number_t page_number = data_vio->logical.lbn / VDO_BLOCK_MAP_ENTRIES_PER_PAGE;
2970 
2971 	tree_lock->tree_slots[0].page_index = page_number;
2972 	tree_lock->root_index = page_number % map->root_count;
2973 	return (tree_lock->root_index % map->zone_count);
2974 }
2975 
vdo_advance_block_map_era(struct block_map * map,sequence_number_t recovery_block_number)2976 void vdo_advance_block_map_era(struct block_map *map,
2977 			       sequence_number_t recovery_block_number)
2978 {
2979 	if (map == NULL)
2980 		return;
2981 
2982 	map->pending_era_point = recovery_block_number;
2983 	vdo_schedule_default_action(map->action_manager);
2984 }
2985 
2986 /* Implements vdo_admin_initiator_fn */
initiate_drain(struct admin_state * state)2987 static void initiate_drain(struct admin_state *state)
2988 {
2989 	struct block_map_zone *zone = container_of(state, struct block_map_zone, state);
2990 
2991 	VDO_ASSERT_LOG_ONLY((zone->active_lookups == 0),
2992 			    "%s() called with no active lookups", __func__);
2993 
2994 	if (!vdo_is_state_suspending(state)) {
2995 		while (zone->dirty_lists->oldest_period < zone->dirty_lists->next_period)
2996 			expire_oldest_list(zone->dirty_lists);
2997 		write_expired_elements(zone);
2998 	}
2999 
3000 	check_for_drain_complete(zone);
3001 }
3002 
3003 /* Implements vdo_zone_action_fn. */
drain_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3004 static void drain_zone(void *context, zone_count_t zone_number,
3005 		       struct vdo_completion *parent)
3006 {
3007 	struct block_map *map = context;
3008 	struct block_map_zone *zone = &map->zones[zone_number];
3009 
3010 	vdo_start_draining(&zone->state,
3011 			   vdo_get_current_manager_operation(map->action_manager),
3012 			   parent, initiate_drain);
3013 }
3014 
vdo_drain_block_map(struct block_map * map,const struct admin_state_code * operation,struct vdo_completion * parent)3015 void vdo_drain_block_map(struct block_map *map, const struct admin_state_code *operation,
3016 			 struct vdo_completion *parent)
3017 {
3018 	vdo_schedule_operation(map->action_manager, operation, NULL, drain_zone, NULL,
3019 			       parent);
3020 }
3021 
3022 /* Implements vdo_zone_action_fn. */
resume_block_map_zone(void * context,zone_count_t zone_number,struct vdo_completion * parent)3023 static void resume_block_map_zone(void *context, zone_count_t zone_number,
3024 				  struct vdo_completion *parent)
3025 {
3026 	struct block_map *map = context;
3027 	struct block_map_zone *zone = &map->zones[zone_number];
3028 
3029 	vdo_fail_completion(parent, vdo_resume_if_quiescent(&zone->state));
3030 }
3031 
vdo_resume_block_map(struct block_map * map,struct vdo_completion * parent)3032 void vdo_resume_block_map(struct block_map *map, struct vdo_completion *parent)
3033 {
3034 	vdo_schedule_operation(map->action_manager, VDO_ADMIN_STATE_RESUMING,
3035 			       NULL, resume_block_map_zone, NULL, parent);
3036 }
3037 
3038 /* Allocate an expanded collection of trees, for a future growth. */
vdo_prepare_to_grow_block_map(struct block_map * map,block_count_t new_logical_blocks)3039 int vdo_prepare_to_grow_block_map(struct block_map *map,
3040 				  block_count_t new_logical_blocks)
3041 {
3042 	if (map->next_entry_count == new_logical_blocks)
3043 		return VDO_SUCCESS;
3044 
3045 	if (map->next_entry_count > 0)
3046 		vdo_abandon_block_map_growth(map);
3047 
3048 	if (new_logical_blocks < map->entry_count) {
3049 		map->next_entry_count = map->entry_count;
3050 		return VDO_SUCCESS;
3051 	}
3052 
3053 	return make_forest(map, new_logical_blocks);
3054 }
3055 
3056 /* Implements vdo_action_preamble_fn */
grow_forest(void * context,struct vdo_completion * completion)3057 static void grow_forest(void *context, struct vdo_completion *completion)
3058 {
3059 	replace_forest(context);
3060 	vdo_finish_completion(completion);
3061 }
3062 
3063 /* Requires vdo_prepare_to_grow_block_map() to have been previously called. */
vdo_grow_block_map(struct block_map * map,struct vdo_completion * parent)3064 void vdo_grow_block_map(struct block_map *map, struct vdo_completion *parent)
3065 {
3066 	vdo_schedule_operation(map->action_manager,
3067 			       VDO_ADMIN_STATE_SUSPENDED_OPERATION,
3068 			       grow_forest, NULL, NULL, parent);
3069 }
3070 
vdo_abandon_block_map_growth(struct block_map * map)3071 void vdo_abandon_block_map_growth(struct block_map *map)
3072 {
3073 	struct forest *forest = vdo_forget(map->next_forest);
3074 
3075 	if (forest != NULL)
3076 		deforest(forest, forest->segments - 1);
3077 
3078 	map->next_entry_count = 0;
3079 }
3080 
3081 /* Release the page completion and then continue the requester. */
finish_processing_page(struct vdo_completion * completion,int result)3082 static inline void finish_processing_page(struct vdo_completion *completion, int result)
3083 {
3084 	struct vdo_completion *parent = completion->parent;
3085 
3086 	vdo_release_page_completion(completion);
3087 	vdo_continue_completion(parent, result);
3088 }
3089 
handle_page_error(struct vdo_completion * completion)3090 static void handle_page_error(struct vdo_completion *completion)
3091 {
3092 	finish_processing_page(completion, completion->result);
3093 }
3094 
3095 /* Fetch the mapping page for a block map update, and call the provided handler when fetched. */
fetch_mapping_page(struct data_vio * data_vio,bool modifiable,vdo_action_fn action)3096 static void fetch_mapping_page(struct data_vio *data_vio, bool modifiable,
3097 			       vdo_action_fn action)
3098 {
3099 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3100 
3101 	if (vdo_is_state_draining(&zone->state)) {
3102 		continue_data_vio_with_error(data_vio, VDO_SHUTTING_DOWN);
3103 		return;
3104 	}
3105 
3106 	vdo_get_page(&data_vio->page_completion, zone,
3107 		     data_vio->tree_lock.tree_slots[0].block_map_slot.pbn,
3108 		     modifiable, &data_vio->vio.completion,
3109 		     action, handle_page_error, false);
3110 }
3111 
3112 /**
3113  * clear_mapped_location() - Clear a data_vio's mapped block location, setting it to be unmapped.
3114  * @data_vio: The data vio.
3115  *
3116  * This indicates the block map entry for the logical block is either unmapped or corrupted.
3117  */
clear_mapped_location(struct data_vio * data_vio)3118 static void clear_mapped_location(struct data_vio *data_vio)
3119 {
3120 	data_vio->mapped = (struct zoned_pbn) {
3121 		.state = VDO_MAPPING_STATE_UNMAPPED,
3122 	};
3123 }
3124 
3125 /**
3126  * set_mapped_location() - Decode and validate a block map entry, and set the mapped location of a
3127  *                         data_vio.
3128  * @data_vio: The data vio.
3129  * @entry: The new mapped entry to set.
3130  *
3131  * Return: VDO_SUCCESS or VDO_BAD_MAPPING if the map entry is invalid or an error code for any
3132  *         other failure
3133  */
set_mapped_location(struct data_vio * data_vio,const struct block_map_entry * entry)3134 static int __must_check set_mapped_location(struct data_vio *data_vio,
3135 					    const struct block_map_entry *entry)
3136 {
3137 	/* Unpack the PBN for logging purposes even if the entry is invalid. */
3138 	struct data_location mapped = vdo_unpack_block_map_entry(entry);
3139 
3140 	if (vdo_is_valid_location(&mapped)) {
3141 		int result;
3142 
3143 		result = vdo_get_physical_zone(vdo_from_data_vio(data_vio),
3144 					       mapped.pbn, &data_vio->mapped.zone);
3145 		if (result == VDO_SUCCESS) {
3146 			data_vio->mapped.pbn = mapped.pbn;
3147 			data_vio->mapped.state = mapped.state;
3148 			return VDO_SUCCESS;
3149 		}
3150 
3151 		/*
3152 		 * Return all errors not specifically known to be errors from validating the
3153 		 * location.
3154 		 */
3155 		if ((result != VDO_OUT_OF_RANGE) && (result != VDO_BAD_MAPPING))
3156 			return result;
3157 	}
3158 
3159 	/*
3160 	 * Log the corruption even if we wind up ignoring it for write VIOs, converting all cases
3161 	 * to VDO_BAD_MAPPING.
3162 	 */
3163 	vdo_log_error_strerror(VDO_BAD_MAPPING,
3164 			       "PBN %llu with state %u read from the block map was invalid",
3165 			       (unsigned long long) mapped.pbn, mapped.state);
3166 
3167 	/*
3168 	 * A read VIO has no option but to report the bad mapping--reading zeros would be hiding
3169 	 * known data loss.
3170 	 */
3171 	if (!data_vio->write)
3172 		return VDO_BAD_MAPPING;
3173 
3174 	/*
3175 	 * A write VIO only reads this mapping to decref the old block. Treat this as an unmapped
3176 	 * entry rather than fail the write.
3177 	 */
3178 	clear_mapped_location(data_vio);
3179 	return VDO_SUCCESS;
3180 }
3181 
3182 /* This callback is registered in vdo_get_mapped_block(). */
get_mapping_from_fetched_page(struct vdo_completion * completion)3183 static void get_mapping_from_fetched_page(struct vdo_completion *completion)
3184 {
3185 	int result;
3186 	struct vdo_page_completion *vpc = as_vdo_page_completion(completion);
3187 	const struct block_map_page *page;
3188 	const struct block_map_entry *entry;
3189 	struct data_vio *data_vio = as_data_vio(completion->parent);
3190 	struct block_map_tree_slot *tree_slot;
3191 
3192 	if (completion->result != VDO_SUCCESS) {
3193 		finish_processing_page(completion, completion->result);
3194 		return;
3195 	}
3196 
3197 	result = validate_completed_page(vpc, false);
3198 	if (result != VDO_SUCCESS) {
3199 		finish_processing_page(completion, result);
3200 		return;
3201 	}
3202 
3203 	page = (const struct block_map_page *) get_page_buffer(vpc->info);
3204 	tree_slot = &data_vio->tree_lock.tree_slots[0];
3205 	entry = &page->entries[tree_slot->block_map_slot.slot];
3206 
3207 	result = set_mapped_location(data_vio, entry);
3208 	finish_processing_page(completion, result);
3209 }
3210 
vdo_update_block_map_page(struct block_map_page * page,struct data_vio * data_vio,physical_block_number_t pbn,enum block_mapping_state mapping_state,sequence_number_t * recovery_lock)3211 void vdo_update_block_map_page(struct block_map_page *page, struct data_vio *data_vio,
3212 			       physical_block_number_t pbn,
3213 			       enum block_mapping_state mapping_state,
3214 			       sequence_number_t *recovery_lock)
3215 {
3216 	struct block_map_zone *zone = data_vio->logical.zone->block_map_zone;
3217 	struct block_map *block_map = zone->block_map;
3218 	struct recovery_journal *journal = block_map->journal;
3219 	sequence_number_t old_locked, new_locked;
3220 	struct tree_lock *tree_lock = &data_vio->tree_lock;
3221 
3222 	/* Encode the new mapping. */
3223 	page->entries[tree_lock->tree_slots[tree_lock->height].block_map_slot.slot] =
3224 		vdo_pack_block_map_entry(pbn, mapping_state);
3225 
3226 	/* Adjust references on the recovery journal blocks. */
3227 	old_locked = *recovery_lock;
3228 	new_locked = data_vio->recovery_sequence_number;
3229 
3230 	if ((old_locked == 0) || (old_locked > new_locked)) {
3231 		vdo_acquire_recovery_journal_block_reference(journal, new_locked,
3232 							     VDO_ZONE_TYPE_LOGICAL,
3233 							     zone->zone_number);
3234 
3235 		if (old_locked > 0) {
3236 			vdo_release_recovery_journal_block_reference(journal, old_locked,
3237 								     VDO_ZONE_TYPE_LOGICAL,
3238 								     zone->zone_number);
3239 		}
3240 
3241 		*recovery_lock = new_locked;
3242 	}
3243 
3244 	/*
3245 	 * FIXME: explain this more
3246 	 * Release the transferred lock from the data_vio.
3247 	 */
3248 	vdo_release_journal_entry_lock(journal, new_locked);
3249 	data_vio->recovery_sequence_number = 0;
3250 }
3251 
put_mapping_in_fetched_page(struct vdo_completion * completion)3252 static void put_mapping_in_fetched_page(struct vdo_completion *completion)
3253 {
3254 	struct data_vio *data_vio = as_data_vio(completion->parent);
3255 	sequence_number_t old_lock;
3256 	struct vdo_page_completion *vpc;
3257 	struct page_info *info;
3258 	int result;
3259 
3260 	if (completion->result != VDO_SUCCESS) {
3261 		finish_processing_page(completion, completion->result);
3262 		return;
3263 	}
3264 
3265 	vpc = as_vdo_page_completion(completion);
3266 	result = validate_completed_page(vpc, true);
3267 	if (result != VDO_SUCCESS) {
3268 		finish_processing_page(completion, result);
3269 		return;
3270 	}
3271 
3272 	info = vpc->info;
3273 	old_lock = info->recovery_lock;
3274 	vdo_update_block_map_page((struct block_map_page *) get_page_buffer(info),
3275 				  data_vio, data_vio->new_mapped.pbn,
3276 				  data_vio->new_mapped.state, &info->recovery_lock);
3277 	set_info_state(info, PS_DIRTY);
3278 	add_to_dirty_lists(info->cache->zone, &info->state_entry,
3279 			   VDO_CACHE_PAGE, old_lock, info->recovery_lock);
3280 	finish_processing_page(completion, VDO_SUCCESS);
3281 }
3282 
3283 /* Read a stored block mapping into a data_vio. */
vdo_get_mapped_block(struct data_vio * data_vio)3284 void vdo_get_mapped_block(struct data_vio *data_vio)
3285 {
3286 	if (data_vio->tree_lock.tree_slots[0].block_map_slot.pbn == VDO_ZERO_BLOCK) {
3287 		/*
3288 		 * We know that the block map page for this LBN has not been allocated, so the
3289 		 * block must be unmapped.
3290 		 */
3291 		clear_mapped_location(data_vio);
3292 		continue_data_vio(data_vio);
3293 		return;
3294 	}
3295 
3296 	fetch_mapping_page(data_vio, false, get_mapping_from_fetched_page);
3297 }
3298 
3299 /* Update a stored block mapping to reflect a data_vio's new mapping. */
vdo_put_mapped_block(struct data_vio * data_vio)3300 void vdo_put_mapped_block(struct data_vio *data_vio)
3301 {
3302 	fetch_mapping_page(data_vio, true, put_mapping_in_fetched_page);
3303 }
3304 
vdo_get_block_map_statistics(struct block_map * map)3305 struct block_map_statistics vdo_get_block_map_statistics(struct block_map *map)
3306 {
3307 	zone_count_t zone = 0;
3308 	struct block_map_statistics totals;
3309 
3310 	memset(&totals, 0, sizeof(struct block_map_statistics));
3311 	for (zone = 0; zone < map->zone_count; zone++) {
3312 		const struct block_map_statistics *stats =
3313 			&(map->zones[zone].page_cache.stats);
3314 
3315 		totals.dirty_pages += READ_ONCE(stats->dirty_pages);
3316 		totals.clean_pages += READ_ONCE(stats->clean_pages);
3317 		totals.free_pages += READ_ONCE(stats->free_pages);
3318 		totals.failed_pages += READ_ONCE(stats->failed_pages);
3319 		totals.incoming_pages += READ_ONCE(stats->incoming_pages);
3320 		totals.outgoing_pages += READ_ONCE(stats->outgoing_pages);
3321 		totals.cache_pressure += READ_ONCE(stats->cache_pressure);
3322 		totals.read_count += READ_ONCE(stats->read_count);
3323 		totals.write_count += READ_ONCE(stats->write_count);
3324 		totals.failed_reads += READ_ONCE(stats->failed_reads);
3325 		totals.failed_writes += READ_ONCE(stats->failed_writes);
3326 		totals.reclaimed += READ_ONCE(stats->reclaimed);
3327 		totals.read_outgoing += READ_ONCE(stats->read_outgoing);
3328 		totals.found_in_cache += READ_ONCE(stats->found_in_cache);
3329 		totals.discard_required += READ_ONCE(stats->discard_required);
3330 		totals.wait_for_page += READ_ONCE(stats->wait_for_page);
3331 		totals.fetch_required += READ_ONCE(stats->fetch_required);
3332 		totals.pages_loaded += READ_ONCE(stats->pages_loaded);
3333 		totals.pages_saved += READ_ONCE(stats->pages_saved);
3334 		totals.flush_count += READ_ONCE(stats->flush_count);
3335 	}
3336 
3337 	return totals;
3338 }
3339