xref: /linux/drivers/md/dm-vdo/vdo.c (revision a5f998094fa344cdd1342164948abb4d7c6101ce)
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 /*
7  * This file contains the main entry points for normal operations on a vdo as well as functions for
8  * constructing and destroying vdo instances (in memory).
9  */
10 
11 /**
12  * DOC:
13  *
14  * A read_only_notifier has a single completion which is used to perform read-only notifications,
15  * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected
16  * by a spinlock, are used to control the read-only mode entry process. The first field holds the
17  * read-only error. The second is the state field, which may hold any of the four special values
18  * enumerated here.
19  *
20  * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field
21  * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already
22  * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in
23  * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is
24  * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then
25  * notifications are currently disallowed, generally due to the vdo being suspended. In this case,
26  * the nothing more will be done until the vdo is resumed, at which point the notification will be
27  * performed. In any other case, the vdo is already read-only, and there is nothing more to do.
28  */
29 
30 #include "vdo.h"
31 
32 #include <linux/completion.h>
33 #include <linux/device-mapper.h>
34 #include <linux/lz4.h>
35 #include <linux/mutex.h>
36 #include <linux/spinlock.h>
37 #include <linux/string.h>
38 #include <linux/types.h>
39 #include <linux/uuid.h>
40 
41 #include "logger.h"
42 #include "memory-alloc.h"
43 #include "permassert.h"
44 #include "string-utils.h"
45 
46 #include "block-map.h"
47 #include "completion.h"
48 #include "data-vio.h"
49 #include "dedupe.h"
50 #include "encodings.h"
51 #include "funnel-workqueue.h"
52 #include "io-submitter.h"
53 #include "logical-zone.h"
54 #include "packer.h"
55 #include "physical-zone.h"
56 #include "recovery-journal.h"
57 #include "slab-depot.h"
58 #include "statistics.h"
59 #include "status-codes.h"
60 #include "time-utils.h"
61 #include "vio.h"
62 
63 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0
64 
65 struct sync_completion {
66 	struct vdo_completion vdo_completion;
67 	struct completion completion;
68 };
69 
70 /* A linked list is adequate for the small number of entries we expect. */
71 struct device_registry {
72 	struct list_head links;
73 	/* TODO: Convert to rcu per kernel recommendation. */
74 	rwlock_t lock;
75 };
76 
77 static struct device_registry registry;
78 
79 /**
80  * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device
81  *                                         registry.
82  */
vdo_initialize_device_registry_once(void)83 void vdo_initialize_device_registry_once(void)
84 {
85 	INIT_LIST_HEAD(&registry.links);
86 	rwlock_init(&registry.lock);
87 }
88 
89 /** vdo_is_equal() - Implements vdo_filter_fn. */
vdo_is_equal(struct vdo * vdo,const void * context)90 static bool vdo_is_equal(struct vdo *vdo, const void *context)
91 {
92 	return (vdo == context);
93 }
94 
95 /**
96  * filter_vdos_locked() - Find a vdo in the registry if it exists there.
97  * @filter: The filter function to apply to devices.
98  * @context: A bit of context to provide the filter.
99  *
100  * Context: Must be called holding the lock.
101  *
102  * Return: the vdo object found, if any.
103  */
filter_vdos_locked(vdo_filter_fn filter,const void * context)104 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter,
105 						    const void *context)
106 {
107 	struct vdo *vdo;
108 
109 	list_for_each_entry(vdo, &registry.links, registration) {
110 		if (filter(vdo, context))
111 			return vdo;
112 	}
113 
114 	return NULL;
115 }
116 
117 /**
118  * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function.
119  * @filter: The filter function to apply to vdos.
120  * @context: A bit of context to provide the filter.
121  */
vdo_find_matching(vdo_filter_fn filter,const void * context)122 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context)
123 {
124 	struct vdo *vdo;
125 
126 	read_lock(&registry.lock);
127 	vdo = filter_vdos_locked(filter, context);
128 	read_unlock(&registry.lock);
129 
130 	return vdo;
131 }
132 
start_vdo_request_queue(void * ptr)133 static void start_vdo_request_queue(void *ptr)
134 {
135 	struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue());
136 
137 	vdo_register_allocating_thread(&thread->allocating_thread,
138 				       &thread->vdo->allocations_allowed);
139 }
140 
finish_vdo_request_queue(void * ptr)141 static void finish_vdo_request_queue(void *ptr)
142 {
143 	vdo_unregister_allocating_thread();
144 }
145 
146 static const struct vdo_work_queue_type default_queue_type = {
147 	.start = start_vdo_request_queue,
148 	.finish = finish_vdo_request_queue,
149 	.max_priority = VDO_DEFAULT_Q_MAX_PRIORITY,
150 	.default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY,
151 };
152 
153 static const struct vdo_work_queue_type bio_ack_q_type = {
154 	.start = NULL,
155 	.finish = NULL,
156 	.max_priority = BIO_ACK_Q_MAX_PRIORITY,
157 	.default_priority = BIO_ACK_Q_ACK_PRIORITY,
158 };
159 
160 static const struct vdo_work_queue_type cpu_q_type = {
161 	.start = NULL,
162 	.finish = NULL,
163 	.max_priority = CPU_Q_MAX_PRIORITY,
164 	.default_priority = CPU_Q_MAX_PRIORITY,
165 };
166 
uninitialize_thread_config(struct thread_config * config)167 static void uninitialize_thread_config(struct thread_config *config)
168 {
169 	vdo_free(vdo_forget(config->logical_threads));
170 	vdo_free(vdo_forget(config->physical_threads));
171 	vdo_free(vdo_forget(config->hash_zone_threads));
172 	vdo_free(vdo_forget(config->bio_threads));
173 	memset(config, 0, sizeof(struct thread_config));
174 }
175 
assign_thread_ids(struct thread_config * config,thread_id_t thread_ids[],zone_count_t count)176 static void assign_thread_ids(struct thread_config *config,
177 			      thread_id_t thread_ids[], zone_count_t count)
178 {
179 	zone_count_t zone;
180 
181 	for (zone = 0; zone < count; zone++)
182 		thread_ids[zone] = config->thread_count++;
183 }
184 
185 /**
186  * initialize_thread_config() - Initialize the thread mapping
187  * @counts: The number and types of threads to create.
188  * @config: The thread_config to initialize.
189  *
190  * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all
191  * three plus the packer and recovery journal. Otherwise, there must be at least one of each type,
192  * and each will have its own thread, as will the packer and recovery journal.
193  *
194  * Return: VDO_SUCCESS or an error.
195  */
initialize_thread_config(struct thread_count_config counts,struct thread_config * config)196 static int __must_check initialize_thread_config(struct thread_count_config counts,
197 						 struct thread_config *config)
198 {
199 	int result;
200 	bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0);
201 
202 	config->bio_thread_count = counts.bio_threads;
203 	if (single) {
204 		config->logical_zone_count = 1;
205 		config->physical_zone_count = 1;
206 		config->hash_zone_count = 1;
207 	} else {
208 		config->logical_zone_count = counts.logical_zones;
209 		config->physical_zone_count = counts.physical_zones;
210 		config->hash_zone_count = counts.hash_zones;
211 	}
212 
213 	result = vdo_allocate(config->logical_zone_count, "logical thread array",
214 			      &config->logical_threads);
215 	if (result != VDO_SUCCESS) {
216 		uninitialize_thread_config(config);
217 		return result;
218 	}
219 
220 	result = vdo_allocate(config->physical_zone_count, "physical thread array",
221 			      &config->physical_threads);
222 	if (result != VDO_SUCCESS) {
223 		uninitialize_thread_config(config);
224 		return result;
225 	}
226 
227 	result = vdo_allocate(config->hash_zone_count, "hash thread array",
228 			      &config->hash_zone_threads);
229 	if (result != VDO_SUCCESS) {
230 		uninitialize_thread_config(config);
231 		return result;
232 	}
233 
234 	result = vdo_allocate(config->bio_thread_count, "bio thread array", &config->bio_threads);
235 	if (result != VDO_SUCCESS) {
236 		uninitialize_thread_config(config);
237 		return result;
238 	}
239 
240 	if (single) {
241 		config->logical_threads[0] = config->thread_count;
242 		config->physical_threads[0] = config->thread_count;
243 		config->hash_zone_threads[0] = config->thread_count++;
244 	} else {
245 		config->admin_thread = config->thread_count;
246 		config->journal_thread = config->thread_count++;
247 		config->packer_thread = config->thread_count++;
248 		assign_thread_ids(config, config->logical_threads, counts.logical_zones);
249 		assign_thread_ids(config, config->physical_threads, counts.physical_zones);
250 		assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones);
251 	}
252 
253 	config->dedupe_thread = config->thread_count++;
254 	config->bio_ack_thread =
255 		((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID);
256 	config->cpu_thread = config->thread_count++;
257 	assign_thread_ids(config, config->bio_threads, counts.bio_threads);
258 	return VDO_SUCCESS;
259 }
260 
initialize_geometry_block(struct vdo * vdo,struct vdo_geometry_block * geometry_block)261 static int initialize_geometry_block(struct vdo *vdo,
262 				     struct vdo_geometry_block *geometry_block)
263 {
264 	int result;
265 
266 	result = vdo_allocate(VDO_BLOCK_SIZE, "encoded geometry block",
267 			      (char **) &vdo->geometry_block.buffer);
268 	if (result != VDO_SUCCESS)
269 		return result;
270 
271 	return allocate_vio_components(vdo, VIO_TYPE_GEOMETRY,
272 				       VIO_PRIORITY_METADATA, NULL, 1,
273 				       (char *) geometry_block->buffer,
274 				       &vdo->geometry_block.vio);
275 }
276 
initialize_super_block(struct vdo * vdo,struct vdo_super_block * super_block)277 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block)
278 {
279 	int result;
280 
281 	result = vdo_allocate(VDO_BLOCK_SIZE, "encoded super block",
282 			      (char **) &vdo->super_block.buffer);
283 	if (result != VDO_SUCCESS)
284 		return result;
285 
286 	return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK,
287 				       VIO_PRIORITY_METADATA, NULL, 1,
288 				       (char *) super_block->buffer,
289 				       &vdo->super_block.vio);
290 }
291 
get_zone_thread_name(const thread_id_t thread_ids[],zone_count_t count,thread_id_t id,const char * prefix,char * buffer,size_t buffer_length)292 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count,
293 				 thread_id_t id, const char *prefix,
294 				 char *buffer, size_t buffer_length)
295 {
296 	if (id >= thread_ids[0]) {
297 		thread_id_t index = id - thread_ids[0];
298 
299 		if (index < count) {
300 			snprintf(buffer, buffer_length, "%s%d", prefix, index);
301 			return true;
302 		}
303 	}
304 
305 	return false;
306 }
307 
308 /**
309  * get_thread_name() - Format the name of the worker thread desired to support a given work queue.
310  * @thread_config: The thread configuration.
311  * @thread_id: The thread id.
312  * @buffer: Where to put the formatted name.
313  * @buffer_length: Size of the output buffer.
314  *
315  * The physical layer may add a prefix identifying the product; the output from this function
316  * should just identify the thread.
317  */
get_thread_name(const struct thread_config * thread_config,thread_id_t thread_id,char * buffer,size_t buffer_length)318 static void get_thread_name(const struct thread_config *thread_config,
319 			    thread_id_t thread_id, char *buffer, size_t buffer_length)
320 {
321 	if (thread_id == thread_config->journal_thread) {
322 		if (thread_config->packer_thread == thread_id) {
323 			/*
324 			 * This is the "single thread" config where one thread is used for the
325 			 * journal, packer, logical, physical, and hash zones. In that case, it is
326 			 * known as the "request queue."
327 			 */
328 			snprintf(buffer, buffer_length, "reqQ");
329 			return;
330 		}
331 
332 		snprintf(buffer, buffer_length, "journalQ");
333 		return;
334 	} else if (thread_id == thread_config->admin_thread) {
335 		/* Theoretically this could be different from the journal thread. */
336 		snprintf(buffer, buffer_length, "adminQ");
337 		return;
338 	} else if (thread_id == thread_config->packer_thread) {
339 		snprintf(buffer, buffer_length, "packerQ");
340 		return;
341 	} else if (thread_id == thread_config->dedupe_thread) {
342 		snprintf(buffer, buffer_length, "dedupeQ");
343 		return;
344 	} else if (thread_id == thread_config->bio_ack_thread) {
345 		snprintf(buffer, buffer_length, "ackQ");
346 		return;
347 	} else if (thread_id == thread_config->cpu_thread) {
348 		snprintf(buffer, buffer_length, "cpuQ");
349 		return;
350 	}
351 
352 	if (get_zone_thread_name(thread_config->logical_threads,
353 				 thread_config->logical_zone_count,
354 				 thread_id, "logQ", buffer, buffer_length))
355 		return;
356 
357 	if (get_zone_thread_name(thread_config->physical_threads,
358 				 thread_config->physical_zone_count,
359 				 thread_id, "physQ", buffer, buffer_length))
360 		return;
361 
362 	if (get_zone_thread_name(thread_config->hash_zone_threads,
363 				 thread_config->hash_zone_count,
364 				 thread_id, "hashQ", buffer, buffer_length))
365 		return;
366 
367 	if (get_zone_thread_name(thread_config->bio_threads,
368 				 thread_config->bio_thread_count,
369 				 thread_id, "bioQ", buffer, buffer_length))
370 		return;
371 
372 	/* Some sort of misconfiguration? */
373 	snprintf(buffer, buffer_length, "reqQ%d", thread_id);
374 }
375 
376 /**
377  * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for
378  *                     round-robin queues).
379  * @vdo: The vdo which owns the thread.
380  * @thread_id: The id of the thread to create (as determined by the thread_config).
381  * @type: The description of the work queue for this thread.
382  * @queue_count: The number of actual threads/queues contained in the "thread".
383  * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL.
384  *
385  * Each "thread" constructed by this method is represented by a unique thread id in the thread
386  * config, and completions can be enqueued to the queue and run on the threads comprising this
387  * entity.
388  *
389  * Return: VDO_SUCCESS or an error.
390  */
vdo_make_thread(struct vdo * vdo,thread_id_t thread_id,const struct vdo_work_queue_type * type,unsigned int queue_count,void * contexts[])391 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id,
392 		    const struct vdo_work_queue_type *type,
393 		    unsigned int queue_count, void *contexts[])
394 {
395 	struct vdo_thread *thread = &vdo->threads[thread_id];
396 	char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN];
397 
398 	if (type == NULL)
399 		type = &default_queue_type;
400 
401 	if (thread->queue != NULL) {
402 		return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type),
403 				  "already constructed vdo thread %u is of the correct type",
404 				  thread_id);
405 	}
406 
407 	thread->vdo = vdo;
408 	thread->thread_id = thread_id;
409 	get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name));
410 	return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread,
411 				   type, queue_count, contexts, &thread->queue);
412 }
413 
414 /**
415  * register_vdo() - Register a VDO; it must not already be registered.
416  * @vdo: The vdo to register.
417  *
418  * Return: VDO_SUCCESS or an error.
419  */
register_vdo(struct vdo * vdo)420 static int register_vdo(struct vdo *vdo)
421 {
422 	int result;
423 
424 	write_lock(&registry.lock);
425 	result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL,
426 			    "VDO not already registered");
427 	if (result == VDO_SUCCESS) {
428 		INIT_LIST_HEAD(&vdo->registration);
429 		list_add_tail(&vdo->registration, &registry.links);
430 	}
431 	write_unlock(&registry.lock);
432 
433 	return result;
434 }
435 
436 /**
437  * vdo_format() - Format a block device to function as a new VDO.
438  * @vdo:       The vdo to format.
439  * @error_ptr: The reason for any failure during this call.
440  *
441  * This function must be called on a device before a VDO can be loaded for the first time.
442  * Once a device has been formatted, the VDO can be loaded and shut down repeatedly.
443  * If a new VDO is desired, this function should be called again.
444  *
445  * Return: VDO_SUCCESS or an error
446  **/
vdo_format(struct vdo * vdo,char ** error_ptr)447 static int __must_check vdo_format(struct vdo *vdo, char **error_ptr)
448 {
449 	int result;
450 	uuid_t uuid;
451 	nonce_t nonce = current_time_us();
452 	struct device_config *config = vdo->device_config;
453 
454 	struct index_config index_config = {
455 		.mem    = config->index_memory,
456 		.sparse = config->index_sparse,
457 	};
458 
459 	struct vdo_config vdo_config = {
460 		.logical_blocks        = config->logical_blocks,
461 		.physical_blocks       = config->physical_blocks,
462 		.slab_size             = config->slab_blocks,
463 		.slab_journal_blocks   = DEFAULT_VDO_SLAB_JOURNAL_SIZE,
464 		.recovery_journal_size = DEFAULT_VDO_RECOVERY_JOURNAL_SIZE,
465 	};
466 
467 	uuid_gen(&uuid);
468 	result = vdo_initialize_volume_geometry(nonce, &uuid, &index_config, &vdo->geometry);
469 	if (result != VDO_SUCCESS) {
470 		*error_ptr = "Could not initialize volume geometry during format";
471 		return result;
472 	}
473 
474 	result = vdo_initialize_component_states(&vdo_config, &vdo->geometry, nonce, &vdo->states);
475 	if (result == VDO_NO_SPACE) {
476 		block_count_t slab_blocks = config->slab_blocks;
477 		/* 1 is counting geometry block */
478 		block_count_t fixed_layout_size = 1 +
479 			vdo->geometry.regions[VDO_DATA_REGION].start_block +
480 			DEFAULT_VDO_BLOCK_MAP_TREE_ROOT_COUNT +
481 			DEFAULT_VDO_RECOVERY_JOURNAL_SIZE + VDO_SLAB_SUMMARY_BLOCKS;
482 		block_count_t necessary_size = fixed_layout_size + slab_blocks;
483 
484 		vdo_log_error("Minimum required size for VDO volume: %llu bytes",
485 			      (unsigned long long) necessary_size * VDO_BLOCK_SIZE);
486 		*error_ptr = "Could not allocate enough space for VDO during format";
487 		return result;
488 	}
489 	if (result != VDO_SUCCESS) {
490 		*error_ptr = "Could not initialize data layout during format";
491 		return result;
492 	}
493 
494 	vdo->needs_formatting = true;
495 
496 	return VDO_SUCCESS;
497 }
498 
499 /**
500  * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on
501  *                    error.
502  * @vdo: The vdo being initialized
503  * @config: The configuration of the vdo
504  * @instance: The instance number of the vdo
505  * @reason: The buffer to hold the failure reason on error
506  */
initialize_vdo(struct vdo * vdo,struct device_config * config,unsigned int instance,char ** reason)507 static int initialize_vdo(struct vdo *vdo, struct device_config *config,
508 			  unsigned int instance, char **reason)
509 {
510 	int result;
511 	zone_count_t i;
512 
513 	vdo->device_config = config;
514 	vdo->starting_sector_offset = config->owning_target->begin;
515 	vdo->instance = instance;
516 	vdo->allocations_allowed = true;
517 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW);
518 	INIT_LIST_HEAD(&vdo->device_config_list);
519 	vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION);
520 	init_completion(&vdo->admin.callback_sync);
521 	mutex_init(&vdo->stats_mutex);
522 
523 	result = initialize_geometry_block(vdo, &vdo->geometry_block);
524 	if (result != VDO_SUCCESS) {
525 		*reason = "Could not initialize geometry block";
526 		return result;
527 	}
528 
529 	result = initialize_super_block(vdo, &vdo->super_block);
530 	if (result != VDO_SUCCESS) {
531 		*reason = "Could not initialize super block";
532 		return result;
533 	}
534 
535 	result = vdo_submit_metadata_vio_wait(&vdo->geometry_block.vio,
536 					      VDO_GEOMETRY_BLOCK_LOCATION, REQ_OP_READ);
537 	if (result != VDO_SUCCESS) {
538 		*reason = "Could not load geometry block";
539 		return result;
540 	}
541 
542 	if (mem_is_zero(vdo->geometry_block.vio.data, VDO_BLOCK_SIZE)) {
543 		result = vdo_format(vdo, reason);
544 		if (result != VDO_SUCCESS)
545 			return result;
546 	} else {
547 		result = vdo_parse_geometry_block(vdo->geometry_block.buffer,
548 						  &vdo->geometry);
549 		if (result != VDO_SUCCESS) {
550 			*reason = "Could not parse geometry block";
551 			return result;
552 		}
553 	}
554 
555 	result = initialize_thread_config(config->thread_counts, &vdo->thread_config);
556 	if (result != VDO_SUCCESS) {
557 		*reason = "Cannot create thread configuration";
558 		return result;
559 	}
560 
561 	vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d",
562 		     config->thread_counts.logical_zones,
563 		     config->thread_counts.physical_zones,
564 		     config->thread_counts.hash_zones, vdo->thread_config.thread_count);
565 
566 	/* Compression context storage */
567 	result = vdo_allocate(config->thread_counts.cpu_threads, "LZ4 context",
568 			      &vdo->compression_context);
569 	if (result != VDO_SUCCESS) {
570 		*reason = "cannot allocate LZ4 context";
571 		return result;
572 	}
573 
574 	for (i = 0; i < config->thread_counts.cpu_threads; i++) {
575 		result = vdo_allocate(LZ4_MEM_COMPRESS, "LZ4 context",
576 				      &vdo->compression_context[i]);
577 		if (result != VDO_SUCCESS) {
578 			*reason = "cannot allocate LZ4 context";
579 			return result;
580 		}
581 	}
582 
583 	result = register_vdo(vdo);
584 	if (result != VDO_SUCCESS) {
585 		*reason = "Cannot add VDO to device registry";
586 		return result;
587 	}
588 
589 	vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED);
590 	return result;
591 }
592 
593 /**
594  * vdo_make() - Allocate and initialize a vdo.
595  * @instance: Device instantiation counter.
596  * @config: The device configuration.
597  * @reason: The reason for any failure during this call.
598  * @vdo_ptr: A pointer to hold the created vdo.
599  *
600  * Return: VDO_SUCCESS or an error.
601  */
vdo_make(unsigned int instance,struct device_config * config,char ** reason,struct vdo ** vdo_ptr)602 int vdo_make(unsigned int instance, struct device_config *config, char **reason,
603 	     struct vdo **vdo_ptr)
604 {
605 	int result;
606 	struct vdo *vdo;
607 
608 	/* Initialize with a generic failure reason to prevent returning garbage. */
609 	*reason = "Unspecified error";
610 
611 	result = vdo_allocate(1, __func__, &vdo);
612 	if (result != VDO_SUCCESS) {
613 		*reason = "Cannot allocate VDO";
614 		return result;
615 	}
616 
617 	result = initialize_vdo(vdo, config, instance, reason);
618 	if (result != VDO_SUCCESS) {
619 		vdo_destroy(vdo);
620 		return result;
621 	}
622 
623 	/* From here on, the caller will clean up if there is an error. */
624 	*vdo_ptr = vdo;
625 
626 	snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix),
627 		 "vdo%u", instance);
628 	result = vdo_allocate(vdo->thread_config.thread_count, __func__, &vdo->threads);
629 	if (result != VDO_SUCCESS) {
630 		*reason = "Cannot allocate thread structures";
631 		return result;
632 	}
633 
634 	result = vdo_make_thread(vdo, vdo->thread_config.admin_thread,
635 				 &default_queue_type, 1, NULL);
636 	if (result != VDO_SUCCESS) {
637 		*reason = "Cannot make admin thread";
638 		return result;
639 	}
640 
641 	result = vdo_make_flusher(vdo);
642 	if (result != VDO_SUCCESS) {
643 		*reason = "Cannot make flusher zones";
644 		return result;
645 	}
646 
647 	result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer);
648 	if (result != VDO_SUCCESS) {
649 		*reason = "Cannot make packer zones";
650 		return result;
651 	}
652 
653 	BUG_ON(vdo->device_config->logical_block_size <= 0);
654 	BUG_ON(vdo->device_config->owned_device == NULL);
655 	result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS,
656 				    MAXIMUM_VDO_USER_VIOS * 3 / 4,
657 				    &vdo->data_vio_pool);
658 	if (result != VDO_SUCCESS) {
659 		*reason = "Cannot allocate data_vio pool";
660 		return result;
661 	}
662 
663 	result = vdo_make_io_submitter(config->thread_counts.bio_threads,
664 				       config->thread_counts.bio_rotation_interval,
665 				       get_data_vio_pool_request_limit(vdo->data_vio_pool),
666 				       vdo, &vdo->io_submitter);
667 	if (result != VDO_SUCCESS) {
668 		*reason = "bio submission initialization failed";
669 		return result;
670 	}
671 
672 	if (vdo_uses_bio_ack_queue(vdo)) {
673 		result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread,
674 					 &bio_ack_q_type,
675 					 config->thread_counts.bio_ack_threads, NULL);
676 		if (result != VDO_SUCCESS) {
677 			*reason = "bio ack queue initialization failed";
678 			return result;
679 		}
680 	}
681 
682 	result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type,
683 				 config->thread_counts.cpu_threads,
684 				 (void **) vdo->compression_context);
685 	if (result != VDO_SUCCESS) {
686 		*reason = "CPU queue initialization failed";
687 		return result;
688 	}
689 
690 	return VDO_SUCCESS;
691 }
692 
finish_vdo(struct vdo * vdo)693 static void finish_vdo(struct vdo *vdo)
694 {
695 	int i;
696 
697 	if (vdo->threads == NULL)
698 		return;
699 
700 	vdo_cleanup_io_submitter(vdo->io_submitter);
701 	vdo_finish_dedupe_index(vdo->hash_zones);
702 
703 	for (i = 0; i < vdo->thread_config.thread_count; i++)
704 		vdo_finish_work_queue(vdo->threads[i].queue);
705 }
706 
707 /**
708  * free_listeners() - Free the list of read-only listeners associated with a thread.
709  * @thread: The thread holding the list to free.
710  */
free_listeners(struct vdo_thread * thread)711 static void free_listeners(struct vdo_thread *thread)
712 {
713 	struct read_only_listener *listener, *next;
714 
715 	for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) {
716 		next = vdo_forget(listener->next);
717 		vdo_free(listener);
718 	}
719 }
720 
uninitialize_geometry_block(struct vdo_geometry_block * geometry_block)721 static void uninitialize_geometry_block(struct vdo_geometry_block *geometry_block)
722 {
723 	free_vio_components(&geometry_block->vio);
724 	vdo_free(geometry_block->buffer);
725 }
726 
uninitialize_super_block(struct vdo_super_block * super_block)727 static void uninitialize_super_block(struct vdo_super_block *super_block)
728 {
729 	free_vio_components(&super_block->vio);
730 	vdo_free(super_block->buffer);
731 }
732 
733 /**
734  * unregister_vdo() - Remove a vdo from the device registry.
735  * @vdo: The vdo to remove.
736  */
unregister_vdo(struct vdo * vdo)737 static void unregister_vdo(struct vdo *vdo)
738 {
739 	write_lock(&registry.lock);
740 	if (filter_vdos_locked(vdo_is_equal, vdo) == vdo)
741 		list_del_init(&vdo->registration);
742 
743 	write_unlock(&registry.lock);
744 }
745 
746 /**
747  * vdo_destroy() - Destroy a vdo instance.
748  * @vdo: The vdo to destroy (may be NULL).
749  */
vdo_destroy(struct vdo * vdo)750 void vdo_destroy(struct vdo *vdo)
751 {
752 	unsigned int i;
753 
754 	if (vdo == NULL)
755 		return;
756 
757 	/* A running VDO should never be destroyed without suspending first. */
758 	BUG_ON(vdo_get_admin_state(vdo)->normal);
759 
760 	vdo->allocations_allowed = true;
761 
762 	finish_vdo(vdo);
763 	unregister_vdo(vdo);
764 	free_data_vio_pool(vdo->data_vio_pool);
765 	vdo_free_io_submitter(vdo_forget(vdo->io_submitter));
766 	vdo_free_flusher(vdo_forget(vdo->flusher));
767 	vdo_free_packer(vdo_forget(vdo->packer));
768 	vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal));
769 	vdo_free_slab_depot(vdo_forget(vdo->depot));
770 	vdo_uninitialize_layout(&vdo->layout);
771 	vdo_uninitialize_layout(&vdo->next_layout);
772 	if (vdo->partition_copier)
773 		dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier));
774 	uninitialize_geometry_block(&vdo->geometry_block);
775 	uninitialize_super_block(&vdo->super_block);
776 	vdo_free_block_map(vdo_forget(vdo->block_map));
777 	vdo_free_hash_zones(vdo_forget(vdo->hash_zones));
778 	vdo_free_physical_zones(vdo_forget(vdo->physical_zones));
779 	vdo_free_logical_zones(vdo_forget(vdo->logical_zones));
780 
781 	if (vdo->threads != NULL) {
782 		for (i = 0; i < vdo->thread_config.thread_count; i++) {
783 			free_listeners(&vdo->threads[i]);
784 			vdo_free_work_queue(vdo_forget(vdo->threads[i].queue));
785 		}
786 		vdo_free(vdo_forget(vdo->threads));
787 	}
788 
789 	uninitialize_thread_config(&vdo->thread_config);
790 
791 	if (vdo->compression_context != NULL) {
792 		for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++)
793 			vdo_free(vdo_forget(vdo->compression_context[i]));
794 
795 		vdo_free(vdo_forget(vdo->compression_context));
796 	}
797 	vdo_free(vdo);
798 }
799 
800 /**
801  * finish_reading_super_block() - Continue after loading the super block.
802  * @completion: The super block vio.
803  *
804  * This callback is registered in vdo_load_super_block().
805  */
finish_reading_super_block(struct vdo_completion * completion)806 static void finish_reading_super_block(struct vdo_completion *completion)
807 {
808 	struct vdo_super_block *super_block =
809 		container_of(as_vio(completion), struct vdo_super_block, vio);
810 
811 	vdo_continue_completion(vdo_forget(completion->parent),
812 				vdo_decode_super_block(super_block->buffer));
813 }
814 
815 /**
816  * handle_super_block_read_error() - Handle an error reading the super block.
817  * @completion: The super block vio.
818  *
819  * This error handler is registered in vdo_load_super_block().
820  */
handle_super_block_read_error(struct vdo_completion * completion)821 static void handle_super_block_read_error(struct vdo_completion *completion)
822 {
823 	vio_record_metadata_io_error(as_vio(completion));
824 	finish_reading_super_block(completion);
825 }
826 
read_super_block_endio(struct bio * bio)827 static void read_super_block_endio(struct bio *bio)
828 {
829 	struct vio *vio = bio->bi_private;
830 	struct vdo_completion *parent = vio->completion.parent;
831 
832 	continue_vio_after_io(vio, finish_reading_super_block,
833 			      parent->callback_thread_id);
834 }
835 
836 /**
837  * vdo_load_super_block() - Allocate a super block and read its contents from storage.
838  * @vdo: The vdo containing the super block on disk.
839  * @parent: The completion to notify after loading the super block.
840  */
vdo_load_super_block(struct vdo * vdo,struct vdo_completion * parent)841 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent)
842 {
843 	vdo->super_block.vio.completion.parent = parent;
844 	vdo_submit_metadata_vio(&vdo->super_block.vio,
845 				vdo_get_data_region_start(vdo->geometry),
846 				read_super_block_endio,
847 				handle_super_block_read_error,
848 				REQ_OP_READ);
849 }
850 
851 /**
852  * vdo_get_backing_device() - Get the block device object underlying a vdo.
853  * @vdo: The vdo.
854  *
855  * Return: The vdo's current block device.
856  */
vdo_get_backing_device(const struct vdo * vdo)857 struct block_device *vdo_get_backing_device(const struct vdo *vdo)
858 {
859 	return vdo->device_config->owned_device->bdev;
860 }
861 
862 /**
863  * vdo_get_device_name() - Get the device name associated with the vdo target.
864  * @target: The target device interface.
865  *
866  * Return: The block device name.
867  */
vdo_get_device_name(const struct dm_target * target)868 const char *vdo_get_device_name(const struct dm_target *target)
869 {
870 	return dm_device_name(dm_table_get_md(target->table));
871 }
872 
873 /**
874  * vdo_synchronous_flush() - Issue a flush request and wait for it to complete.
875  * @vdo: The vdo.
876  *
877  * Return: VDO_SUCCESS or an error.
878  */
vdo_synchronous_flush(struct vdo * vdo)879 int vdo_synchronous_flush(struct vdo *vdo)
880 {
881 	int result;
882 	struct bio bio;
883 
884 	bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0,
885 		 REQ_OP_WRITE | REQ_PREFLUSH);
886 	submit_bio_wait(&bio);
887 	result = blk_status_to_errno(bio.bi_status);
888 
889 	atomic64_inc(&vdo->stats.flush_out);
890 	if (result != 0) {
891 		vdo_log_error_strerror(result, "synchronous flush failed");
892 		result = -EIO;
893 	}
894 
895 	bio_uninit(&bio);
896 	return result;
897 }
898 
899 /**
900  * vdo_get_state() - Get the current state of the vdo.
901  * @vdo: The vdo.
902  *
903  * Context: This method may be called from any thread.
904  *
905  * Return: The current state of the vdo.
906  */
vdo_get_state(const struct vdo * vdo)907 enum vdo_state vdo_get_state(const struct vdo *vdo)
908 {
909 	enum vdo_state state = atomic_read(&vdo->state);
910 
911 	/* pairs with barriers where state field is changed */
912 	smp_rmb();
913 	return state;
914 }
915 
916 /**
917  * vdo_set_state() - Set the current state of the vdo.
918  * @vdo: The vdo whose state is to be set.
919  * @state: The new state of the vdo.
920  *
921  * Context: This method may be called from any thread.
922  */
vdo_set_state(struct vdo * vdo,enum vdo_state state)923 void vdo_set_state(struct vdo *vdo, enum vdo_state state)
924 {
925 	/* pairs with barrier in vdo_get_state */
926 	smp_wmb();
927 	atomic_set(&vdo->state, state);
928 }
929 
930 /**
931  * vdo_get_admin_state() - Get the admin state of the vdo.
932  * @vdo: The vdo.
933  *
934  * Return: The code for the vdo's current admin state.
935  */
vdo_get_admin_state(const struct vdo * vdo)936 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo)
937 {
938 	return vdo_get_admin_state_code(&vdo->admin.state);
939 }
940 
941 /**
942  * record_vdo() - Record the state of the VDO for encoding in the super block.
943  * @vdo: The vdo.
944  */
record_vdo(struct vdo * vdo)945 static void record_vdo(struct vdo *vdo)
946 {
947 	/* This is for backwards compatibility. */
948 	vdo->states.unused = vdo->geometry.unused;
949 	vdo->states.vdo.state = vdo_get_state(vdo);
950 	vdo->states.block_map = vdo_record_block_map(vdo->block_map);
951 	vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal);
952 	vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot);
953 	vdo->states.layout = vdo->layout;
954 }
955 
clear_partition(struct vdo * vdo,enum partition_id id)956 static int __must_check clear_partition(struct vdo *vdo, enum partition_id id)
957 {
958 	struct partition *partition;
959 	int result;
960 
961 	result = vdo_get_partition(&vdo->states.layout, id, &partition);
962 	if (result != VDO_SUCCESS)
963 		return result;
964 
965 	return blkdev_issue_zeroout(vdo_get_backing_device(vdo),
966 				    partition->offset * VDO_SECTORS_PER_BLOCK,
967 				    partition->count * VDO_SECTORS_PER_BLOCK,
968 				    GFP_NOWAIT, 0);
969 }
970 
vdo_clear_layout(struct vdo * vdo)971 int vdo_clear_layout(struct vdo *vdo)
972 {
973 	int result;
974 
975 	/* Zero out the uds index's first block. */
976 	result = blkdev_issue_zeroout(vdo_get_backing_device(vdo),
977 				      VDO_SECTORS_PER_BLOCK,
978 				      VDO_SECTORS_PER_BLOCK,
979 				      GFP_NOWAIT, 0);
980 	if (result != VDO_SUCCESS)
981 		return result;
982 
983 	result = clear_partition(vdo, VDO_BLOCK_MAP_PARTITION);
984 	if (result != VDO_SUCCESS)
985 		return result;
986 
987 	return clear_partition(vdo, VDO_RECOVERY_JOURNAL_PARTITION);
988 }
989 
990 /**
991  * continue_parent() - Continue the parent of a save operation.
992  * @completion: The completion to continue.
993  *
994  */
continue_parent(struct vdo_completion * completion)995 static void continue_parent(struct vdo_completion *completion)
996 {
997 	vdo_continue_completion(vdo_forget(completion->parent), completion->result);
998 }
999 
handle_write_endio(struct bio * bio)1000 static void handle_write_endio(struct bio *bio)
1001 {
1002 	struct vio *vio = bio->bi_private;
1003 	struct vdo_completion *parent = vio->completion.parent;
1004 
1005 	continue_vio_after_io(vio, continue_parent,
1006 			      parent->callback_thread_id);
1007 }
1008 
1009 /**
1010  * handle_geometry_block_save_error() - Log a geometry block save error.
1011  * @completion: The super block vio.
1012  *
1013  * This error handler is registered in vdo_save_geometry_block().
1014  */
handle_geometry_block_save_error(struct vdo_completion * completion)1015 static void handle_geometry_block_save_error(struct vdo_completion *completion)
1016 {
1017 	struct vdo_geometry_block *geometry_block =
1018 		container_of(as_vio(completion), struct vdo_geometry_block, vio);
1019 
1020 	vio_record_metadata_io_error(&geometry_block->vio);
1021 	vdo_log_error_strerror(completion->result, "geometry block save failed");
1022 	completion->callback(completion);
1023 }
1024 
1025 /**
1026  * vdo_save_geometry_block() - Encode the vdo and save the geometry block asynchronously.
1027  * @vdo: The vdo whose state is being saved.
1028  * @parent: The completion to notify when the save is complete.
1029  */
vdo_save_geometry_block(struct vdo * vdo,struct vdo_completion * parent)1030 void vdo_save_geometry_block(struct vdo *vdo, struct vdo_completion *parent)
1031 {
1032 	struct vdo_geometry_block *geometry_block = &vdo->geometry_block;
1033 
1034 	vdo_encode_volume_geometry(geometry_block->buffer, &vdo->geometry,
1035 				   VDO_DEFAULT_GEOMETRY_BLOCK_VERSION);
1036 	geometry_block->vio.completion.parent = parent;
1037 	geometry_block->vio.completion.callback_thread_id = parent->callback_thread_id;
1038 	vdo_submit_metadata_vio(&geometry_block->vio,
1039 				VDO_GEOMETRY_BLOCK_LOCATION,
1040 				handle_write_endio, handle_geometry_block_save_error,
1041 				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
1042 }
1043 
1044 /**
1045  * handle_super_block_save_error() - Log a super block save error.
1046  * @completion: The super block vio.
1047  *
1048  * This error handler is registered in vdo_save_components().
1049  */
handle_super_block_save_error(struct vdo_completion * completion)1050 static void handle_super_block_save_error(struct vdo_completion *completion)
1051 {
1052 	struct vdo_super_block *super_block =
1053 		container_of(as_vio(completion), struct vdo_super_block, vio);
1054 
1055 	vio_record_metadata_io_error(&super_block->vio);
1056 	vdo_log_error_strerror(completion->result, "super block save failed");
1057 	/*
1058 	 * Mark the super block as unwritable so that we won't attempt to write it again. This
1059 	 * avoids the case where a growth attempt fails writing the super block with the new size,
1060 	 * but the subsequent attempt to write out the read-only state succeeds. In this case,
1061 	 * writes which happened just before the suspend would not be visible if the VDO is
1062 	 * restarted without rebuilding, but, after a read-only rebuild, the effects of those
1063 	 * writes would reappear.
1064 	 */
1065 	super_block->unwritable = true;
1066 	completion->callback(completion);
1067 }
1068 
1069 /**
1070  * vdo_save_super_block() - Save the component states to the super block asynchronously.
1071  * @vdo: The vdo whose state is being saved.
1072  * @parent: The completion to notify when the save is complete.
1073  */
vdo_save_super_block(struct vdo * vdo,struct vdo_completion * parent)1074 void vdo_save_super_block(struct vdo *vdo, struct vdo_completion *parent)
1075 {
1076 	struct vdo_super_block *super_block = &vdo->super_block;
1077 
1078 	vdo_encode_super_block(super_block->buffer, &vdo->states);
1079 	super_block->vio.completion.parent = parent;
1080 	super_block->vio.completion.callback_thread_id = parent->callback_thread_id;
1081 	vdo_submit_metadata_vio(&super_block->vio,
1082 				vdo_get_data_region_start(vdo->geometry),
1083 				handle_write_endio, handle_super_block_save_error,
1084 				REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA);
1085 }
1086 
1087 /**
1088  * vdo_save_components() - Copy the current state of the VDO to the states struct and save
1089  *                         it to the super block asynchronously.
1090  * @vdo: The vdo whose state is being saved.
1091  * @parent: The completion to notify when the save is complete.
1092  */
vdo_save_components(struct vdo * vdo,struct vdo_completion * parent)1093 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent)
1094 {
1095 	struct vdo_super_block *super_block = &vdo->super_block;
1096 
1097 	if (super_block->unwritable) {
1098 		vdo_continue_completion(parent, VDO_READ_ONLY);
1099 		return;
1100 	}
1101 
1102 	if (super_block->vio.completion.parent != NULL) {
1103 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1104 		return;
1105 	}
1106 
1107 	record_vdo(vdo);
1108 	vdo_save_super_block(vdo, parent);
1109 }
1110 
1111 /**
1112  * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes
1113  *                                     read-only.
1114  * @vdo: The vdo to register with.
1115  * @listener: The object to notify.
1116  * @notification: The function to call to send the notification.
1117  * @thread_id: The id of the thread on which to send the notification.
1118  *
1119  * Return: VDO_SUCCESS or an error.
1120  */
vdo_register_read_only_listener(struct vdo * vdo,void * listener,vdo_read_only_notification_fn notification,thread_id_t thread_id)1121 int vdo_register_read_only_listener(struct vdo *vdo, void *listener,
1122 				    vdo_read_only_notification_fn notification,
1123 				    thread_id_t thread_id)
1124 {
1125 	struct vdo_thread *thread = &vdo->threads[thread_id];
1126 	struct read_only_listener *read_only_listener;
1127 	int result;
1128 
1129 	result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread,
1130 			    "read only listener not registered on dedupe thread");
1131 	if (result != VDO_SUCCESS)
1132 		return result;
1133 
1134 	result = vdo_allocate(1, __func__, &read_only_listener);
1135 	if (result != VDO_SUCCESS)
1136 		return result;
1137 
1138 	*read_only_listener = (struct read_only_listener) {
1139 		.listener = listener,
1140 		.notify = notification,
1141 		.next = thread->listeners,
1142 	};
1143 
1144 	thread->listeners = read_only_listener;
1145 	return VDO_SUCCESS;
1146 }
1147 
1148 /**
1149  * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only.
1150  * @listener: The vdo.
1151  * @parent: The completion to notify in order to acknowledge the notification.
1152  *
1153  * This will save the read-only state to the super block.
1154  *
1155  * Implements vdo_read_only_notification_fn.
1156  */
notify_vdo_of_read_only_mode(void * listener,struct vdo_completion * parent)1157 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent)
1158 {
1159 	struct vdo *vdo = listener;
1160 
1161 	if (vdo_in_read_only_mode(vdo))
1162 		vdo_finish_completion(parent);
1163 
1164 	vdo_set_state(vdo, VDO_READ_ONLY_MODE);
1165 	vdo_save_components(vdo, parent);
1166 }
1167 
1168 /**
1169  * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors.
1170  * @vdo: The vdo to enable.
1171  *
1172  * Return: VDO_SUCCESS or an error.
1173  */
vdo_enable_read_only_entry(struct vdo * vdo)1174 int vdo_enable_read_only_entry(struct vdo *vdo)
1175 {
1176 	thread_id_t id;
1177 	bool is_read_only = vdo_in_read_only_mode(vdo);
1178 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1179 
1180 	if (is_read_only) {
1181 		notifier->read_only_error = VDO_READ_ONLY;
1182 		notifier->state = NOTIFIED;
1183 	} else {
1184 		notifier->state = MAY_NOT_NOTIFY;
1185 	}
1186 
1187 	spin_lock_init(&notifier->lock);
1188 	vdo_initialize_completion(&notifier->completion, vdo,
1189 				  VDO_READ_ONLY_MODE_COMPLETION);
1190 
1191 	for (id = 0; id < vdo->thread_config.thread_count; id++)
1192 		vdo->threads[id].is_read_only = is_read_only;
1193 
1194 	return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode,
1195 					       vdo->thread_config.admin_thread);
1196 }
1197 
1198 /**
1199  * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in
1200  *                                                progress and prevent any subsequent
1201  *                                                notifications.
1202  * @parent: The completion to notify when no threads are entering read-only mode.
1203  *
1204  * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry().
1205  */
vdo_wait_until_not_entering_read_only_mode(struct vdo_completion * parent)1206 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent)
1207 {
1208 	struct vdo *vdo = parent->vdo;
1209 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1210 
1211 	vdo_assert_on_admin_thread(vdo, __func__);
1212 
1213 	if (notifier->waiter != NULL) {
1214 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1215 		return;
1216 	}
1217 
1218 	spin_lock(&notifier->lock);
1219 	if (notifier->state == NOTIFYING)
1220 		notifier->waiter = parent;
1221 	else if (notifier->state == MAY_NOTIFY)
1222 		notifier->state = MAY_NOT_NOTIFY;
1223 	spin_unlock(&notifier->lock);
1224 
1225 	if (notifier->waiter == NULL) {
1226 		/*
1227 		 * A notification was not in progress, and now they are
1228 		 * disallowed.
1229 		 */
1230 		vdo_launch_completion(parent);
1231 		return;
1232 	}
1233 }
1234 
1235 /**
1236  * as_notifier() - Convert a generic vdo_completion to a read_only_notifier.
1237  * @completion: The completion to convert.
1238  *
1239  * Return: The completion as a read_only_notifier.
1240  */
as_notifier(struct vdo_completion * completion)1241 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion)
1242 {
1243 	vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION);
1244 	return container_of(completion, struct read_only_notifier, completion);
1245 }
1246 
1247 /**
1248  * finish_entering_read_only_mode() - Complete the process of entering read only mode.
1249  * @completion: The read-only mode completion.
1250  */
finish_entering_read_only_mode(struct vdo_completion * completion)1251 static void finish_entering_read_only_mode(struct vdo_completion *completion)
1252 {
1253 	struct read_only_notifier *notifier = as_notifier(completion);
1254 
1255 	vdo_assert_on_admin_thread(completion->vdo, __func__);
1256 
1257 	spin_lock(&notifier->lock);
1258 	notifier->state = NOTIFIED;
1259 	spin_unlock(&notifier->lock);
1260 
1261 	if (notifier->waiter != NULL)
1262 		vdo_continue_completion(vdo_forget(notifier->waiter),
1263 					completion->result);
1264 }
1265 
1266 /**
1267  * make_thread_read_only() - Inform each thread that the VDO is in read-only mode.
1268  * @completion: The read-only mode completion.
1269  */
make_thread_read_only(struct vdo_completion * completion)1270 static void make_thread_read_only(struct vdo_completion *completion)
1271 {
1272 	struct vdo *vdo = completion->vdo;
1273 	thread_id_t thread_id = completion->callback_thread_id;
1274 	struct read_only_notifier *notifier = as_notifier(completion);
1275 	struct read_only_listener *listener = completion->parent;
1276 
1277 	if (listener == NULL) {
1278 		/* This is the first call on this thread */
1279 		struct vdo_thread *thread = &vdo->threads[thread_id];
1280 
1281 		thread->is_read_only = true;
1282 		listener = thread->listeners;
1283 		if (thread_id == 0)
1284 			vdo_log_error_strerror(READ_ONCE(notifier->read_only_error),
1285 					       "Unrecoverable error, entering read-only mode");
1286 	} else {
1287 		/* We've just finished notifying a listener */
1288 		listener = listener->next;
1289 	}
1290 
1291 	if (listener != NULL) {
1292 		/* We have a listener to notify */
1293 		vdo_prepare_completion(completion, make_thread_read_only,
1294 				       make_thread_read_only, thread_id,
1295 				       listener);
1296 		listener->notify(listener->listener, completion);
1297 		return;
1298 	}
1299 
1300 	/* We're done with this thread */
1301 	if (++thread_id == vdo->thread_config.dedupe_thread) {
1302 		/*
1303 		 * We don't want to notify the dedupe thread since it may be
1304 		 * blocked rebuilding the index.
1305 		 */
1306 		thread_id++;
1307 	}
1308 
1309 	if (thread_id >= vdo->thread_config.thread_count) {
1310 		/* There are no more threads */
1311 		vdo_prepare_completion(completion, finish_entering_read_only_mode,
1312 				       finish_entering_read_only_mode,
1313 				       vdo->thread_config.admin_thread, NULL);
1314 	} else {
1315 		vdo_prepare_completion(completion, make_thread_read_only,
1316 				       make_thread_read_only, thread_id, NULL);
1317 	}
1318 
1319 	vdo_launch_completion(completion);
1320 }
1321 
1322 /**
1323  * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode,
1324  *                                    reversing the effects of
1325  *                                    vdo_wait_until_not_entering_read_only_mode().
1326  * @parent: The object to notify once the operation is complete.
1327  *
1328  * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it
1329  * will be done when this method is called. If that happens, the parent will not be notified until
1330  * the vdo has actually entered read-only mode and attempted to save the super block.
1331  *
1332  * Context: This method may only be called from the admin thread.
1333  */
vdo_allow_read_only_mode_entry(struct vdo_completion * parent)1334 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent)
1335 {
1336 	struct vdo *vdo = parent->vdo;
1337 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1338 
1339 	vdo_assert_on_admin_thread(vdo, __func__);
1340 
1341 	if (notifier->waiter != NULL) {
1342 		vdo_continue_completion(parent, VDO_COMPONENT_BUSY);
1343 		return;
1344 	}
1345 
1346 	spin_lock(&notifier->lock);
1347 	if (notifier->state == MAY_NOT_NOTIFY) {
1348 		if (notifier->read_only_error == VDO_SUCCESS) {
1349 			notifier->state = MAY_NOTIFY;
1350 		} else {
1351 			notifier->state = NOTIFYING;
1352 			notifier->waiter = parent;
1353 		}
1354 	}
1355 	spin_unlock(&notifier->lock);
1356 
1357 	if (notifier->waiter == NULL) {
1358 		/* We're done */
1359 		vdo_launch_completion(parent);
1360 		return;
1361 	}
1362 
1363 	/* Do the pending notification. */
1364 	make_thread_read_only(&notifier->completion);
1365 }
1366 
1367 /**
1368  * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the
1369  *                              super block.
1370  * @vdo: The vdo.
1371  * @error_code: The error which caused the VDO to enter read-only mode.
1372  *
1373  * This method is a no-op if the VDO is already read-only.
1374  */
vdo_enter_read_only_mode(struct vdo * vdo,int error_code)1375 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code)
1376 {
1377 	bool notify = false;
1378 	thread_id_t thread_id = vdo_get_callback_thread_id();
1379 	struct read_only_notifier *notifier = &vdo->read_only_notifier;
1380 	struct vdo_thread *thread;
1381 
1382 	if (thread_id != VDO_INVALID_THREAD_ID) {
1383 		thread = &vdo->threads[thread_id];
1384 		if (thread->is_read_only) {
1385 			/* This thread has already gone read-only. */
1386 			return;
1387 		}
1388 
1389 		/* Record for this thread that the VDO is read-only. */
1390 		thread->is_read_only = true;
1391 	}
1392 
1393 	spin_lock(&notifier->lock);
1394 	if (notifier->read_only_error == VDO_SUCCESS) {
1395 		WRITE_ONCE(notifier->read_only_error, error_code);
1396 		if (notifier->state == MAY_NOTIFY) {
1397 			notifier->state = NOTIFYING;
1398 			notify = true;
1399 		}
1400 	}
1401 	spin_unlock(&notifier->lock);
1402 
1403 	if (!notify) {
1404 		/* The notifier is already aware of a read-only error */
1405 		return;
1406 	}
1407 
1408 	/* Initiate a notification starting on the lowest numbered thread. */
1409 	vdo_launch_completion_callback(&notifier->completion, make_thread_read_only, 0);
1410 }
1411 
1412 /**
1413  * vdo_is_read_only() - Check whether the VDO is read-only.
1414  * @vdo: The vdo.
1415  *
1416  * Return: True if the vdo is read-only.
1417  *
1418  * This method may be called from any thread, as opposed to examining the VDO's state field which
1419  * is only safe to check from the admin thread.
1420  */
vdo_is_read_only(struct vdo * vdo)1421 bool vdo_is_read_only(struct vdo *vdo)
1422 {
1423 	return vdo->threads[vdo_get_callback_thread_id()].is_read_only;
1424 }
1425 
1426 /**
1427  * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode.
1428  * @vdo: The vdo to query.
1429  *
1430  * Return: True if the vdo is in read-only mode.
1431  */
vdo_in_read_only_mode(const struct vdo * vdo)1432 bool vdo_in_read_only_mode(const struct vdo *vdo)
1433 {
1434 	return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE);
1435 }
1436 
1437 /**
1438  * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode.
1439  * @vdo: The vdo to query.
1440  *
1441  * Return: True if the vdo is in recovery mode.
1442  */
vdo_in_recovery_mode(const struct vdo * vdo)1443 bool vdo_in_recovery_mode(const struct vdo *vdo)
1444 {
1445 	return (vdo_get_state(vdo) == VDO_RECOVERING);
1446 }
1447 
1448 /**
1449  * vdo_enter_recovery_mode() - Put the vdo into recovery mode.
1450  * @vdo: The vdo.
1451  */
vdo_enter_recovery_mode(struct vdo * vdo)1452 void vdo_enter_recovery_mode(struct vdo *vdo)
1453 {
1454 	vdo_assert_on_admin_thread(vdo, __func__);
1455 
1456 	if (vdo_in_read_only_mode(vdo))
1457 		return;
1458 
1459 	vdo_log_info("Entering recovery mode");
1460 	vdo_set_state(vdo, VDO_RECOVERING);
1461 }
1462 
1463 /**
1464  * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete.
1465  * @completion: The sync completion.
1466  */
complete_synchronous_action(struct vdo_completion * completion)1467 static void complete_synchronous_action(struct vdo_completion *completion)
1468 {
1469 	vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION);
1470 	complete(&(container_of(completion, struct sync_completion,
1471 				vdo_completion)->completion));
1472 }
1473 
1474 /**
1475  * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete.
1476  * @vdo: The vdo.
1477  * @action: The callback to launch.
1478  * @thread_id: The thread on which to run the action.
1479  * @parent: The parent of the sync completion (may be NULL).
1480  */
perform_synchronous_action(struct vdo * vdo,vdo_action_fn action,thread_id_t thread_id,void * parent)1481 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action,
1482 				      thread_id_t thread_id, void *parent)
1483 {
1484 	struct sync_completion sync;
1485 
1486 	vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION);
1487 	init_completion(&sync.completion);
1488 	sync.vdo_completion.parent = parent;
1489 	vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id);
1490 	wait_for_completion(&sync.completion);
1491 	return sync.vdo_completion.result;
1492 }
1493 
1494 /**
1495  * set_compression_callback() - Callback to turn compression on or off.
1496  * @completion: The completion.
1497  */
set_compression_callback(struct vdo_completion * completion)1498 static void set_compression_callback(struct vdo_completion *completion)
1499 {
1500 	struct vdo *vdo = completion->vdo;
1501 	bool *enable = completion->parent;
1502 	bool was_enabled = vdo_get_compressing(vdo);
1503 
1504 	if (*enable != was_enabled) {
1505 		WRITE_ONCE(vdo->compressing, *enable);
1506 		if (was_enabled) {
1507 			/* Signal the packer to flush since compression has been disabled. */
1508 			vdo_flush_packer(vdo->packer);
1509 		}
1510 	}
1511 
1512 	vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled"));
1513 	*enable = was_enabled;
1514 	complete_synchronous_action(completion);
1515 }
1516 
1517 /**
1518  * vdo_set_compressing() - Turn compression on or off.
1519  * @vdo: The vdo.
1520  * @enable: Whether to enable or disable compression.
1521  *
1522  * Return: Whether compression was previously on or off.
1523  */
vdo_set_compressing(struct vdo * vdo,bool enable)1524 bool vdo_set_compressing(struct vdo *vdo, bool enable)
1525 {
1526 	perform_synchronous_action(vdo, set_compression_callback,
1527 				   vdo->thread_config.packer_thread,
1528 				   &enable);
1529 	return enable;
1530 }
1531 
1532 /**
1533  * vdo_get_compressing() - Get whether compression is enabled in a vdo.
1534  * @vdo: The vdo.
1535  *
1536  * Return: State of compression.
1537  */
vdo_get_compressing(struct vdo * vdo)1538 bool vdo_get_compressing(struct vdo *vdo)
1539 {
1540 	return READ_ONCE(vdo->compressing);
1541 }
1542 
get_block_map_cache_size(const struct vdo * vdo)1543 static size_t get_block_map_cache_size(const struct vdo *vdo)
1544 {
1545 	return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE;
1546 }
1547 
get_vdo_error_statistics(const struct vdo * vdo)1548 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo)
1549 {
1550 	/*
1551 	 * The error counts can be incremented from arbitrary threads and so must be incremented
1552 	 * atomically, but they are just statistics with no semantics that could rely on memory
1553 	 * order, so unfenced reads are sufficient.
1554 	 */
1555 	const struct atomic_statistics *atoms = &vdo->stats;
1556 
1557 	return (struct error_statistics) {
1558 		.invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count),
1559 		.no_space_error_count = atomic64_read(&atoms->no_space_error_count),
1560 		.read_only_error_count = atomic64_read(&atoms->read_only_error_count),
1561 	};
1562 }
1563 
copy_bio_stat(struct bio_stats * b,const struct atomic_bio_stats * a)1564 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a)
1565 {
1566 	b->read = atomic64_read(&a->read);
1567 	b->write = atomic64_read(&a->write);
1568 	b->discard = atomic64_read(&a->discard);
1569 	b->flush = atomic64_read(&a->flush);
1570 	b->empty_flush = atomic64_read(&a->empty_flush);
1571 	b->fua = atomic64_read(&a->fua);
1572 }
1573 
subtract_bio_stats(struct bio_stats minuend,struct bio_stats subtrahend)1574 static struct bio_stats subtract_bio_stats(struct bio_stats minuend,
1575 					   struct bio_stats subtrahend)
1576 {
1577 	return (struct bio_stats) {
1578 		.read = minuend.read - subtrahend.read,
1579 		.write = minuend.write - subtrahend.write,
1580 		.discard = minuend.discard - subtrahend.discard,
1581 		.flush = minuend.flush - subtrahend.flush,
1582 		.empty_flush = minuend.empty_flush - subtrahend.empty_flush,
1583 		.fua = minuend.fua - subtrahend.fua,
1584 	};
1585 }
1586 
1587 /**
1588  * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data.
1589  * @vdo: The vdo.
1590  *
1591  * Return: The number of blocks allocated for user data.
1592  */
vdo_get_physical_blocks_allocated(const struct vdo * vdo)1593 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo)
1594 {
1595 	return (vdo_get_slab_depot_allocated_blocks(vdo->depot) -
1596 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1597 }
1598 
1599 /**
1600  * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata.
1601  * @vdo: The vdo.
1602  *
1603  * Return: The number of overhead blocks.
1604  */
vdo_get_physical_blocks_overhead(const struct vdo * vdo)1605 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo)
1606 {
1607 	/*
1608 	 * config.physical_blocks is mutated during resize and is in a packed structure,
1609 	 * but resize runs on admin thread.
1610 	 * TODO: Verify that this is always safe.
1611 	 */
1612 	return (vdo->states.vdo.config.physical_blocks -
1613 		vdo_get_slab_depot_data_blocks(vdo->depot) +
1614 		vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal));
1615 }
1616 
vdo_describe_state(enum vdo_state state)1617 static const char *vdo_describe_state(enum vdo_state state)
1618 {
1619 	/* These strings should all fit in the 15 chars of VDOStatistics.mode. */
1620 	switch (state) {
1621 	case VDO_RECOVERING:
1622 		return "recovering";
1623 
1624 	case VDO_READ_ONLY_MODE:
1625 		return "read-only";
1626 
1627 	default:
1628 		return "normal";
1629 	}
1630 }
1631 
1632 /**
1633  * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread.
1634  * @vdo: The vdo.
1635  * @stats: The statistics structure to populate.
1636  */
get_vdo_statistics(const struct vdo * vdo,struct vdo_statistics * stats)1637 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats)
1638 {
1639 	struct recovery_journal *journal = vdo->recovery_journal;
1640 	enum vdo_state state = vdo_get_state(vdo);
1641 
1642 	vdo_assert_on_admin_thread(vdo, __func__);
1643 
1644 	/* start with a clean slate */
1645 	memset(stats, 0, sizeof(struct vdo_statistics));
1646 
1647 	/*
1648 	 * These are immutable properties of the vdo object, so it is safe to query them from any
1649 	 * thread.
1650 	 */
1651 	stats->version = STATISTICS_VERSION;
1652 	stats->logical_blocks = vdo->states.vdo.config.logical_blocks;
1653 	/*
1654 	 * config.physical_blocks is mutated during resize and is in a packed structure, but resize
1655 	 * runs on the admin thread.
1656 	 * TODO: verify that this is always safe
1657 	 */
1658 	stats->physical_blocks = vdo->states.vdo.config.physical_blocks;
1659 	stats->block_size = VDO_BLOCK_SIZE;
1660 	stats->complete_recoveries = vdo->states.vdo.complete_recoveries;
1661 	stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries;
1662 	stats->block_map_cache_size = get_block_map_cache_size(vdo);
1663 
1664 	/* The callees are responsible for thread-safety. */
1665 	stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo);
1666 	stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo);
1667 	stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal);
1668 	vdo_get_slab_depot_statistics(vdo->depot, stats);
1669 	stats->journal = vdo_get_recovery_journal_statistics(journal);
1670 	stats->packer = vdo_get_packer_statistics(vdo->packer);
1671 	stats->block_map = vdo_get_block_map_statistics(vdo->block_map);
1672 	vdo_get_dedupe_statistics(vdo->hash_zones, stats);
1673 	stats->errors = get_vdo_error_statistics(vdo);
1674 	stats->in_recovery_mode = (state == VDO_RECOVERING);
1675 	snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state));
1676 
1677 	stats->instance = vdo->instance;
1678 	stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool);
1679 	stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool);
1680 
1681 	stats->flush_out = atomic64_read(&vdo->stats.flush_out);
1682 	stats->logical_block_size = vdo->device_config->logical_block_size;
1683 	copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in);
1684 	copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial);
1685 	copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out);
1686 	copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta);
1687 	copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal);
1688 	copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache);
1689 	copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed);
1690 	copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed);
1691 	copy_bio_stat(&stats->bios_journal_completed,
1692 		      &vdo->stats.bios_journal_completed);
1693 	copy_bio_stat(&stats->bios_page_cache_completed,
1694 		      &vdo->stats.bios_page_cache_completed);
1695 	copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged);
1696 	copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial);
1697 	stats->bios_in_progress =
1698 		subtract_bio_stats(stats->bios_in, stats->bios_acknowledged);
1699 	vdo_get_memory_stats(&stats->memory_usage.bytes_used,
1700 			     &stats->memory_usage.peak_bytes_used);
1701 }
1702 
1703 /**
1704  * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics
1705  *                                   structure on the admin thread.
1706  * @completion: The completion.
1707  *
1708  * This callback is registered in vdo_fetch_statistics().
1709  */
vdo_fetch_statistics_callback(struct vdo_completion * completion)1710 static void vdo_fetch_statistics_callback(struct vdo_completion *completion)
1711 {
1712 	get_vdo_statistics(completion->vdo, completion->parent);
1713 	complete_synchronous_action(completion);
1714 }
1715 
1716 /**
1717  * vdo_fetch_statistics() - Fetch statistics on the correct thread.
1718  * @vdo: The vdo.
1719  * @stats: The vdo statistics are returned here.
1720  */
vdo_fetch_statistics(struct vdo * vdo,struct vdo_statistics * stats)1721 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats)
1722 {
1723 	perform_synchronous_action(vdo, vdo_fetch_statistics_callback,
1724 				   vdo->thread_config.admin_thread, stats);
1725 }
1726 
1727 /**
1728  * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is
1729  *                                currently running.
1730  *
1731  * Return: The current thread ID, or -1 if no such thread.
1732  */
vdo_get_callback_thread_id(void)1733 thread_id_t vdo_get_callback_thread_id(void)
1734 {
1735 	struct vdo_work_queue *queue = vdo_get_current_work_queue();
1736 	struct vdo_thread *thread;
1737 	thread_id_t thread_id;
1738 
1739 	if (queue == NULL)
1740 		return VDO_INVALID_THREAD_ID;
1741 
1742 	thread = vdo_get_work_queue_owner(queue);
1743 	thread_id = thread->thread_id;
1744 
1745 	if (PARANOID_THREAD_CONSISTENCY_CHECKS) {
1746 		BUG_ON(thread_id >= thread->vdo->thread_config.thread_count);
1747 		BUG_ON(thread != &thread->vdo->threads[thread_id]);
1748 	}
1749 
1750 	return thread_id;
1751 }
1752 
1753 /**
1754  * vdo_dump_status() - Dump status information about a vdo to the log for debugging.
1755  * @vdo: The vdo to dump.
1756  */
vdo_dump_status(const struct vdo * vdo)1757 void vdo_dump_status(const struct vdo *vdo)
1758 {
1759 	zone_count_t zone;
1760 
1761 	vdo_dump_flusher(vdo->flusher);
1762 	vdo_dump_recovery_journal_statistics(vdo->recovery_journal);
1763 	vdo_dump_packer(vdo->packer);
1764 	vdo_dump_slab_depot(vdo->depot);
1765 
1766 	for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++)
1767 		vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]);
1768 
1769 	for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++)
1770 		vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]);
1771 
1772 	vdo_dump_hash_zones(vdo->hash_zones);
1773 }
1774 
1775 /**
1776  * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread.
1777  * @vdo: The vdo.
1778  * @name: The name of the function which should be running on the admin thread (for logging).
1779  */
vdo_assert_on_admin_thread(const struct vdo * vdo,const char * name)1780 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name)
1781 {
1782 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread),
1783 			    "%s called on admin thread", name);
1784 }
1785 
1786 /**
1787  * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified
1788  *                                       logical zone thread.
1789  * @vdo: The vdo.
1790  * @logical_zone: The number of the logical zone.
1791  * @name: The name of the calling function.
1792  */
vdo_assert_on_logical_zone_thread(const struct vdo * vdo,zone_count_t logical_zone,const char * name)1793 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone,
1794 				       const char *name)
1795 {
1796 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1797 			     vdo->thread_config.logical_threads[logical_zone]),
1798 			    "%s called on logical thread", name);
1799 }
1800 
1801 /**
1802  * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified
1803  *                                        physical zone thread.
1804  * @vdo: The vdo.
1805  * @physical_zone: The number of the physical zone.
1806  * @name: The name of the calling function.
1807  */
vdo_assert_on_physical_zone_thread(const struct vdo * vdo,zone_count_t physical_zone,const char * name)1808 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo,
1809 					zone_count_t physical_zone, const char *name)
1810 {
1811 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() ==
1812 			     vdo->thread_config.physical_threads[physical_zone]),
1813 			    "%s called on physical thread", name);
1814 }
1815 
1816 /**
1817  * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number.
1818  * @vdo: The vdo containing the physical zones.
1819  * @pbn: The PBN of the data block.
1820  * @zone_ptr: A pointer to return the physical zone.
1821  *
1822  * Gets the physical zone responsible for a given physical block number of a data block in this vdo
1823  * instance, or of the zero block (for which a NULL zone is returned). For any other block number
1824  * that is not in the range of valid data block numbers in any slab, an error will be returned.
1825  * This function is safe to call on invalid block numbers; it will not put the vdo into read-only
1826  * mode.
1827  *
1828  * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any
1829  *         other failure.
1830  */
vdo_get_physical_zone(const struct vdo * vdo,physical_block_number_t pbn,struct physical_zone ** zone_ptr)1831 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn,
1832 			  struct physical_zone **zone_ptr)
1833 {
1834 	struct vdo_slab *slab;
1835 	int result;
1836 
1837 	if (pbn == VDO_ZERO_BLOCK) {
1838 		*zone_ptr = NULL;
1839 		return VDO_SUCCESS;
1840 	}
1841 
1842 	/*
1843 	 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first
1844 	 * because it won't trigger read-only mode on an invalid PBN.
1845 	 */
1846 	if (!vdo_is_physical_data_block(vdo->depot, pbn))
1847 		return VDO_OUT_OF_RANGE;
1848 
1849 	/* With the PBN already checked, we should always succeed in finding a slab. */
1850 	slab = vdo_get_slab(vdo->depot, pbn);
1851 	result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs");
1852 	if (result != VDO_SUCCESS)
1853 		return result;
1854 
1855 	*zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number];
1856 	return VDO_SUCCESS;
1857 }
1858