1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 /* 7 * This file contains the main entry points for normal operations on a vdo as well as functions for 8 * constructing and destroying vdo instances (in memory). 9 */ 10 11 /** 12 * DOC: 13 * 14 * A read_only_notifier has a single completion which is used to perform read-only notifications, 15 * however, vdo_enter_read_only_mode() may be called from any thread. A pair of fields, protected 16 * by a spinlock, are used to control the read-only mode entry process. The first field holds the 17 * read-only error. The second is the state field, which may hold any of the four special values 18 * enumerated here. 19 * 20 * When vdo_enter_read_only_mode() is called from some vdo thread, if the read_only_error field 21 * already contains an error (i.e. its value is not VDO_SUCCESS), then some other error has already 22 * initiated the read-only process, and nothing more is done. Otherwise, the new error is stored in 23 * the read_only_error field, and the state field is consulted. If the state is MAY_NOTIFY, it is 24 * set to NOTIFYING, and the notification process begins. If the state is MAY_NOT_NOTIFY, then 25 * notifications are currently disallowed, generally due to the vdo being suspended. In this case, 26 * the nothing more will be done until the vdo is resumed, at which point the notification will be 27 * performed. In any other case, the vdo is already read-only, and there is nothing more to do. 28 */ 29 30 #include "vdo.h" 31 32 #include <linux/completion.h> 33 #include <linux/device-mapper.h> 34 #include <linux/lz4.h> 35 #include <linux/mutex.h> 36 #include <linux/spinlock.h> 37 #include <linux/types.h> 38 39 #include "logger.h" 40 #include "memory-alloc.h" 41 #include "permassert.h" 42 #include "string-utils.h" 43 44 #include "block-map.h" 45 #include "completion.h" 46 #include "data-vio.h" 47 #include "dedupe.h" 48 #include "encodings.h" 49 #include "funnel-workqueue.h" 50 #include "io-submitter.h" 51 #include "logical-zone.h" 52 #include "packer.h" 53 #include "physical-zone.h" 54 #include "recovery-journal.h" 55 #include "slab-depot.h" 56 #include "statistics.h" 57 #include "status-codes.h" 58 #include "vio.h" 59 60 #define PARANOID_THREAD_CONSISTENCY_CHECKS 0 61 62 struct sync_completion { 63 struct vdo_completion vdo_completion; 64 struct completion completion; 65 }; 66 67 /* A linked list is adequate for the small number of entries we expect. */ 68 struct device_registry { 69 struct list_head links; 70 /* TODO: Convert to rcu per kernel recommendation. */ 71 rwlock_t lock; 72 }; 73 74 static struct device_registry registry; 75 76 /** 77 * vdo_initialize_device_registry_once() - Initialize the necessary structures for the device 78 * registry. 79 */ 80 void vdo_initialize_device_registry_once(void) 81 { 82 INIT_LIST_HEAD(®istry.links); 83 rwlock_init(®istry.lock); 84 } 85 86 /** vdo_is_equal() - Implements vdo_filter_fn. */ 87 static bool vdo_is_equal(struct vdo *vdo, const void *context) 88 { 89 return (vdo == context); 90 } 91 92 /** 93 * filter_vdos_locked() - Find a vdo in the registry if it exists there. 94 * @filter: The filter function to apply to devices. 95 * @context: A bit of context to provide the filter. 96 * 97 * Context: Must be called holding the lock. 98 * 99 * Return: the vdo object found, if any. 100 */ 101 static struct vdo * __must_check filter_vdos_locked(vdo_filter_fn filter, 102 const void *context) 103 { 104 struct vdo *vdo; 105 106 list_for_each_entry(vdo, ®istry.links, registration) { 107 if (filter(vdo, context)) 108 return vdo; 109 } 110 111 return NULL; 112 } 113 114 /** 115 * vdo_find_matching() - Find and return the first (if any) vdo matching a given filter function. 116 * @filter: The filter function to apply to vdos. 117 * @context: A bit of context to provide the filter. 118 */ 119 struct vdo *vdo_find_matching(vdo_filter_fn filter, const void *context) 120 { 121 struct vdo *vdo; 122 123 read_lock(®istry.lock); 124 vdo = filter_vdos_locked(filter, context); 125 read_unlock(®istry.lock); 126 127 return vdo; 128 } 129 130 static void start_vdo_request_queue(void *ptr) 131 { 132 struct vdo_thread *thread = vdo_get_work_queue_owner(vdo_get_current_work_queue()); 133 134 vdo_register_allocating_thread(&thread->allocating_thread, 135 &thread->vdo->allocations_allowed); 136 } 137 138 static void finish_vdo_request_queue(void *ptr) 139 { 140 vdo_unregister_allocating_thread(); 141 } 142 143 static const struct vdo_work_queue_type default_queue_type = { 144 .start = start_vdo_request_queue, 145 .finish = finish_vdo_request_queue, 146 .max_priority = VDO_DEFAULT_Q_MAX_PRIORITY, 147 .default_priority = VDO_DEFAULT_Q_COMPLETION_PRIORITY, 148 }; 149 150 static const struct vdo_work_queue_type bio_ack_q_type = { 151 .start = NULL, 152 .finish = NULL, 153 .max_priority = BIO_ACK_Q_MAX_PRIORITY, 154 .default_priority = BIO_ACK_Q_ACK_PRIORITY, 155 }; 156 157 static const struct vdo_work_queue_type cpu_q_type = { 158 .start = NULL, 159 .finish = NULL, 160 .max_priority = CPU_Q_MAX_PRIORITY, 161 .default_priority = CPU_Q_MAX_PRIORITY, 162 }; 163 164 static void uninitialize_thread_config(struct thread_config *config) 165 { 166 vdo_free(vdo_forget(config->logical_threads)); 167 vdo_free(vdo_forget(config->physical_threads)); 168 vdo_free(vdo_forget(config->hash_zone_threads)); 169 vdo_free(vdo_forget(config->bio_threads)); 170 memset(config, 0, sizeof(struct thread_config)); 171 } 172 173 static void assign_thread_ids(struct thread_config *config, 174 thread_id_t thread_ids[], zone_count_t count) 175 { 176 zone_count_t zone; 177 178 for (zone = 0; zone < count; zone++) 179 thread_ids[zone] = config->thread_count++; 180 } 181 182 /** 183 * initialize_thread_config() - Initialize the thread mapping 184 * 185 * If the logical, physical, and hash zone counts are all 0, a single thread will be shared by all 186 * three plus the packer and recovery journal. Otherwise, there must be at least one of each type, 187 * and each will have its own thread, as will the packer and recovery journal. 188 * 189 * Return: VDO_SUCCESS or an error. 190 */ 191 static int __must_check initialize_thread_config(struct thread_count_config counts, 192 struct thread_config *config) 193 { 194 int result; 195 bool single = ((counts.logical_zones + counts.physical_zones + counts.hash_zones) == 0); 196 197 config->bio_thread_count = counts.bio_threads; 198 if (single) { 199 config->logical_zone_count = 1; 200 config->physical_zone_count = 1; 201 config->hash_zone_count = 1; 202 } else { 203 config->logical_zone_count = counts.logical_zones; 204 config->physical_zone_count = counts.physical_zones; 205 config->hash_zone_count = counts.hash_zones; 206 } 207 208 result = vdo_allocate(config->logical_zone_count, thread_id_t, 209 "logical thread array", &config->logical_threads); 210 if (result != VDO_SUCCESS) { 211 uninitialize_thread_config(config); 212 return result; 213 } 214 215 result = vdo_allocate(config->physical_zone_count, thread_id_t, 216 "physical thread array", &config->physical_threads); 217 if (result != VDO_SUCCESS) { 218 uninitialize_thread_config(config); 219 return result; 220 } 221 222 result = vdo_allocate(config->hash_zone_count, thread_id_t, 223 "hash thread array", &config->hash_zone_threads); 224 if (result != VDO_SUCCESS) { 225 uninitialize_thread_config(config); 226 return result; 227 } 228 229 result = vdo_allocate(config->bio_thread_count, thread_id_t, 230 "bio thread array", &config->bio_threads); 231 if (result != VDO_SUCCESS) { 232 uninitialize_thread_config(config); 233 return result; 234 } 235 236 if (single) { 237 config->logical_threads[0] = config->thread_count; 238 config->physical_threads[0] = config->thread_count; 239 config->hash_zone_threads[0] = config->thread_count++; 240 } else { 241 config->admin_thread = config->thread_count; 242 config->journal_thread = config->thread_count++; 243 config->packer_thread = config->thread_count++; 244 assign_thread_ids(config, config->logical_threads, counts.logical_zones); 245 assign_thread_ids(config, config->physical_threads, counts.physical_zones); 246 assign_thread_ids(config, config->hash_zone_threads, counts.hash_zones); 247 } 248 249 config->dedupe_thread = config->thread_count++; 250 config->bio_ack_thread = 251 ((counts.bio_ack_threads > 0) ? config->thread_count++ : VDO_INVALID_THREAD_ID); 252 config->cpu_thread = config->thread_count++; 253 assign_thread_ids(config, config->bio_threads, counts.bio_threads); 254 return VDO_SUCCESS; 255 } 256 257 /** 258 * read_geometry_block() - Synchronously read the geometry block from a vdo's underlying block 259 * device. 260 * @vdo: The vdo whose geometry is to be read. 261 * 262 * Return: VDO_SUCCESS or an error code. 263 */ 264 static int __must_check read_geometry_block(struct vdo *vdo) 265 { 266 struct vio *vio; 267 char *block; 268 int result; 269 270 result = vdo_allocate(VDO_BLOCK_SIZE, u8, __func__, &block); 271 if (result != VDO_SUCCESS) 272 return result; 273 274 result = create_metadata_vio(vdo, VIO_TYPE_GEOMETRY, VIO_PRIORITY_HIGH, NULL, 275 block, &vio); 276 if (result != VDO_SUCCESS) { 277 vdo_free(block); 278 return result; 279 } 280 281 /* 282 * This is only safe because, having not already loaded the geometry, the vdo's geometry's 283 * bio_offset field is 0, so the fact that vio_reset_bio() will subtract that offset from 284 * the supplied pbn is not a problem. 285 */ 286 result = vio_reset_bio(vio, block, NULL, REQ_OP_READ, 287 VDO_GEOMETRY_BLOCK_LOCATION); 288 if (result != VDO_SUCCESS) { 289 free_vio(vdo_forget(vio)); 290 vdo_free(block); 291 return result; 292 } 293 294 bio_set_dev(vio->bio, vdo_get_backing_device(vdo)); 295 submit_bio_wait(vio->bio); 296 result = blk_status_to_errno(vio->bio->bi_status); 297 free_vio(vdo_forget(vio)); 298 if (result != 0) { 299 vdo_log_error_strerror(result, "synchronous read failed"); 300 vdo_free(block); 301 return -EIO; 302 } 303 304 result = vdo_parse_geometry_block((u8 *) block, &vdo->geometry); 305 vdo_free(block); 306 return result; 307 } 308 309 static bool get_zone_thread_name(const thread_id_t thread_ids[], zone_count_t count, 310 thread_id_t id, const char *prefix, 311 char *buffer, size_t buffer_length) 312 { 313 if (id >= thread_ids[0]) { 314 thread_id_t index = id - thread_ids[0]; 315 316 if (index < count) { 317 snprintf(buffer, buffer_length, "%s%d", prefix, index); 318 return true; 319 } 320 } 321 322 return false; 323 } 324 325 /** 326 * get_thread_name() - Format the name of the worker thread desired to support a given work queue. 327 * @thread_config: The thread configuration. 328 * @thread_id: The thread id. 329 * @buffer: Where to put the formatted name. 330 * @buffer_length: Size of the output buffer. 331 * 332 * The physical layer may add a prefix identifying the product; the output from this function 333 * should just identify the thread. 334 */ 335 static void get_thread_name(const struct thread_config *thread_config, 336 thread_id_t thread_id, char *buffer, size_t buffer_length) 337 { 338 if (thread_id == thread_config->journal_thread) { 339 if (thread_config->packer_thread == thread_id) { 340 /* 341 * This is the "single thread" config where one thread is used for the 342 * journal, packer, logical, physical, and hash zones. In that case, it is 343 * known as the "request queue." 344 */ 345 snprintf(buffer, buffer_length, "reqQ"); 346 return; 347 } 348 349 snprintf(buffer, buffer_length, "journalQ"); 350 return; 351 } else if (thread_id == thread_config->admin_thread) { 352 /* Theoretically this could be different from the journal thread. */ 353 snprintf(buffer, buffer_length, "adminQ"); 354 return; 355 } else if (thread_id == thread_config->packer_thread) { 356 snprintf(buffer, buffer_length, "packerQ"); 357 return; 358 } else if (thread_id == thread_config->dedupe_thread) { 359 snprintf(buffer, buffer_length, "dedupeQ"); 360 return; 361 } else if (thread_id == thread_config->bio_ack_thread) { 362 snprintf(buffer, buffer_length, "ackQ"); 363 return; 364 } else if (thread_id == thread_config->cpu_thread) { 365 snprintf(buffer, buffer_length, "cpuQ"); 366 return; 367 } 368 369 if (get_zone_thread_name(thread_config->logical_threads, 370 thread_config->logical_zone_count, 371 thread_id, "logQ", buffer, buffer_length)) 372 return; 373 374 if (get_zone_thread_name(thread_config->physical_threads, 375 thread_config->physical_zone_count, 376 thread_id, "physQ", buffer, buffer_length)) 377 return; 378 379 if (get_zone_thread_name(thread_config->hash_zone_threads, 380 thread_config->hash_zone_count, 381 thread_id, "hashQ", buffer, buffer_length)) 382 return; 383 384 if (get_zone_thread_name(thread_config->bio_threads, 385 thread_config->bio_thread_count, 386 thread_id, "bioQ", buffer, buffer_length)) 387 return; 388 389 /* Some sort of misconfiguration? */ 390 snprintf(buffer, buffer_length, "reqQ%d", thread_id); 391 } 392 393 /** 394 * vdo_make_thread() - Construct a single vdo work_queue and its associated thread (or threads for 395 * round-robin queues). 396 * @vdo: The vdo which owns the thread. 397 * @thread_id: The id of the thread to create (as determined by the thread_config). 398 * @type: The description of the work queue for this thread. 399 * @queue_count: The number of actual threads/queues contained in the "thread". 400 * @contexts: An array of queue_count contexts, one for each individual queue; may be NULL. 401 * 402 * Each "thread" constructed by this method is represented by a unique thread id in the thread 403 * config, and completions can be enqueued to the queue and run on the threads comprising this 404 * entity. 405 * 406 * Return: VDO_SUCCESS or an error. 407 */ 408 int vdo_make_thread(struct vdo *vdo, thread_id_t thread_id, 409 const struct vdo_work_queue_type *type, 410 unsigned int queue_count, void *contexts[]) 411 { 412 struct vdo_thread *thread = &vdo->threads[thread_id]; 413 char queue_name[MAX_VDO_WORK_QUEUE_NAME_LEN]; 414 415 if (type == NULL) 416 type = &default_queue_type; 417 418 if (thread->queue != NULL) { 419 return VDO_ASSERT(vdo_work_queue_type_is(thread->queue, type), 420 "already constructed vdo thread %u is of the correct type", 421 thread_id); 422 } 423 424 thread->vdo = vdo; 425 thread->thread_id = thread_id; 426 get_thread_name(&vdo->thread_config, thread_id, queue_name, sizeof(queue_name)); 427 return vdo_make_work_queue(vdo->thread_name_prefix, queue_name, thread, 428 type, queue_count, contexts, &thread->queue); 429 } 430 431 /** 432 * register_vdo() - Register a VDO; it must not already be registered. 433 * @vdo: The vdo to register. 434 * 435 * Return: VDO_SUCCESS or an error. 436 */ 437 static int register_vdo(struct vdo *vdo) 438 { 439 int result; 440 441 write_lock(®istry.lock); 442 result = VDO_ASSERT(filter_vdos_locked(vdo_is_equal, vdo) == NULL, 443 "VDO not already registered"); 444 if (result == VDO_SUCCESS) { 445 INIT_LIST_HEAD(&vdo->registration); 446 list_add_tail(&vdo->registration, ®istry.links); 447 } 448 write_unlock(®istry.lock); 449 450 return result; 451 } 452 453 /** 454 * initialize_vdo() - Do the portion of initializing a vdo which will clean up after itself on 455 * error. 456 * @vdo: The vdo being initialized 457 * @config: The configuration of the vdo 458 * @instance: The instance number of the vdo 459 * @reason: The buffer to hold the failure reason on error 460 */ 461 static int initialize_vdo(struct vdo *vdo, struct device_config *config, 462 unsigned int instance, char **reason) 463 { 464 int result; 465 zone_count_t i; 466 467 vdo->device_config = config; 468 vdo->starting_sector_offset = config->owning_target->begin; 469 vdo->instance = instance; 470 vdo->allocations_allowed = true; 471 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_NEW); 472 INIT_LIST_HEAD(&vdo->device_config_list); 473 vdo_initialize_completion(&vdo->admin.completion, vdo, VDO_ADMIN_COMPLETION); 474 init_completion(&vdo->admin.callback_sync); 475 mutex_init(&vdo->stats_mutex); 476 result = read_geometry_block(vdo); 477 if (result != VDO_SUCCESS) { 478 *reason = "Could not load geometry block"; 479 return result; 480 } 481 482 result = initialize_thread_config(config->thread_counts, &vdo->thread_config); 483 if (result != VDO_SUCCESS) { 484 *reason = "Cannot create thread configuration"; 485 return result; 486 } 487 488 vdo_log_info("zones: %d logical, %d physical, %d hash; total threads: %d", 489 config->thread_counts.logical_zones, 490 config->thread_counts.physical_zones, 491 config->thread_counts.hash_zones, vdo->thread_config.thread_count); 492 493 /* Compression context storage */ 494 result = vdo_allocate(config->thread_counts.cpu_threads, char *, "LZ4 context", 495 &vdo->compression_context); 496 if (result != VDO_SUCCESS) { 497 *reason = "cannot allocate LZ4 context"; 498 return result; 499 } 500 501 for (i = 0; i < config->thread_counts.cpu_threads; i++) { 502 result = vdo_allocate(LZ4_MEM_COMPRESS, char, "LZ4 context", 503 &vdo->compression_context[i]); 504 if (result != VDO_SUCCESS) { 505 *reason = "cannot allocate LZ4 context"; 506 return result; 507 } 508 } 509 510 result = register_vdo(vdo); 511 if (result != VDO_SUCCESS) { 512 *reason = "Cannot add VDO to device registry"; 513 return result; 514 } 515 516 vdo_set_admin_state_code(&vdo->admin.state, VDO_ADMIN_STATE_INITIALIZED); 517 return result; 518 } 519 520 /** 521 * vdo_make() - Allocate and initialize a vdo. 522 * @instance: Device instantiation counter. 523 * @config: The device configuration. 524 * @reason: The reason for any failure during this call. 525 * @vdo_ptr: A pointer to hold the created vdo. 526 * 527 * Return: VDO_SUCCESS or an error. 528 */ 529 int vdo_make(unsigned int instance, struct device_config *config, char **reason, 530 struct vdo **vdo_ptr) 531 { 532 int result; 533 struct vdo *vdo; 534 535 /* Initialize with a generic failure reason to prevent returning garbage. */ 536 *reason = "Unspecified error"; 537 538 result = vdo_allocate(1, struct vdo, __func__, &vdo); 539 if (result != VDO_SUCCESS) { 540 *reason = "Cannot allocate VDO"; 541 return result; 542 } 543 544 result = initialize_vdo(vdo, config, instance, reason); 545 if (result != VDO_SUCCESS) { 546 vdo_destroy(vdo); 547 return result; 548 } 549 550 /* From here on, the caller will clean up if there is an error. */ 551 *vdo_ptr = vdo; 552 553 snprintf(vdo->thread_name_prefix, sizeof(vdo->thread_name_prefix), 554 "vdo%u", instance); 555 result = vdo_allocate(vdo->thread_config.thread_count, 556 struct vdo_thread, __func__, &vdo->threads); 557 if (result != VDO_SUCCESS) { 558 *reason = "Cannot allocate thread structures"; 559 return result; 560 } 561 562 result = vdo_make_thread(vdo, vdo->thread_config.admin_thread, 563 &default_queue_type, 1, NULL); 564 if (result != VDO_SUCCESS) { 565 *reason = "Cannot make admin thread"; 566 return result; 567 } 568 569 result = vdo_make_flusher(vdo); 570 if (result != VDO_SUCCESS) { 571 *reason = "Cannot make flusher zones"; 572 return result; 573 } 574 575 result = vdo_make_packer(vdo, DEFAULT_PACKER_BINS, &vdo->packer); 576 if (result != VDO_SUCCESS) { 577 *reason = "Cannot make packer zones"; 578 return result; 579 } 580 581 BUG_ON(vdo->device_config->logical_block_size <= 0); 582 BUG_ON(vdo->device_config->owned_device == NULL); 583 result = make_data_vio_pool(vdo, MAXIMUM_VDO_USER_VIOS, 584 MAXIMUM_VDO_USER_VIOS * 3 / 4, 585 &vdo->data_vio_pool); 586 if (result != VDO_SUCCESS) { 587 *reason = "Cannot allocate data_vio pool"; 588 return result; 589 } 590 591 result = vdo_make_io_submitter(config->thread_counts.bio_threads, 592 config->thread_counts.bio_rotation_interval, 593 get_data_vio_pool_request_limit(vdo->data_vio_pool), 594 vdo, &vdo->io_submitter); 595 if (result != VDO_SUCCESS) { 596 *reason = "bio submission initialization failed"; 597 return result; 598 } 599 600 if (vdo_uses_bio_ack_queue(vdo)) { 601 result = vdo_make_thread(vdo, vdo->thread_config.bio_ack_thread, 602 &bio_ack_q_type, 603 config->thread_counts.bio_ack_threads, NULL); 604 if (result != VDO_SUCCESS) { 605 *reason = "bio ack queue initialization failed"; 606 return result; 607 } 608 } 609 610 result = vdo_make_thread(vdo, vdo->thread_config.cpu_thread, &cpu_q_type, 611 config->thread_counts.cpu_threads, 612 (void **) vdo->compression_context); 613 if (result != VDO_SUCCESS) { 614 *reason = "CPU queue initialization failed"; 615 return result; 616 } 617 618 return VDO_SUCCESS; 619 } 620 621 static void finish_vdo(struct vdo *vdo) 622 { 623 int i; 624 625 if (vdo->threads == NULL) 626 return; 627 628 vdo_cleanup_io_submitter(vdo->io_submitter); 629 vdo_finish_dedupe_index(vdo->hash_zones); 630 631 for (i = 0; i < vdo->thread_config.thread_count; i++) 632 vdo_finish_work_queue(vdo->threads[i].queue); 633 } 634 635 /** 636 * free_listeners() - Free the list of read-only listeners associated with a thread. 637 * @thread: The thread holding the list to free. 638 */ 639 static void free_listeners(struct vdo_thread *thread) 640 { 641 struct read_only_listener *listener, *next; 642 643 for (listener = vdo_forget(thread->listeners); listener != NULL; listener = next) { 644 next = vdo_forget(listener->next); 645 vdo_free(listener); 646 } 647 } 648 649 static void uninitialize_super_block(struct vdo_super_block *super_block) 650 { 651 free_vio_components(&super_block->vio); 652 vdo_free(super_block->buffer); 653 } 654 655 /** 656 * unregister_vdo() - Remove a vdo from the device registry. 657 * @vdo: The vdo to remove. 658 */ 659 static void unregister_vdo(struct vdo *vdo) 660 { 661 write_lock(®istry.lock); 662 if (filter_vdos_locked(vdo_is_equal, vdo) == vdo) 663 list_del_init(&vdo->registration); 664 665 write_unlock(®istry.lock); 666 } 667 668 /** 669 * vdo_destroy() - Destroy a vdo instance. 670 * @vdo: The vdo to destroy (may be NULL). 671 */ 672 void vdo_destroy(struct vdo *vdo) 673 { 674 unsigned int i; 675 676 if (vdo == NULL) 677 return; 678 679 /* A running VDO should never be destroyed without suspending first. */ 680 BUG_ON(vdo_get_admin_state(vdo)->normal); 681 682 vdo->allocations_allowed = true; 683 684 finish_vdo(vdo); 685 unregister_vdo(vdo); 686 free_data_vio_pool(vdo->data_vio_pool); 687 vdo_free_io_submitter(vdo_forget(vdo->io_submitter)); 688 vdo_free_flusher(vdo_forget(vdo->flusher)); 689 vdo_free_packer(vdo_forget(vdo->packer)); 690 vdo_free_recovery_journal(vdo_forget(vdo->recovery_journal)); 691 vdo_free_slab_depot(vdo_forget(vdo->depot)); 692 vdo_uninitialize_layout(&vdo->layout); 693 vdo_uninitialize_layout(&vdo->next_layout); 694 if (vdo->partition_copier) 695 dm_kcopyd_client_destroy(vdo_forget(vdo->partition_copier)); 696 uninitialize_super_block(&vdo->super_block); 697 vdo_free_block_map(vdo_forget(vdo->block_map)); 698 vdo_free_hash_zones(vdo_forget(vdo->hash_zones)); 699 vdo_free_physical_zones(vdo_forget(vdo->physical_zones)); 700 vdo_free_logical_zones(vdo_forget(vdo->logical_zones)); 701 702 if (vdo->threads != NULL) { 703 for (i = 0; i < vdo->thread_config.thread_count; i++) { 704 free_listeners(&vdo->threads[i]); 705 vdo_free_work_queue(vdo_forget(vdo->threads[i].queue)); 706 } 707 vdo_free(vdo_forget(vdo->threads)); 708 } 709 710 uninitialize_thread_config(&vdo->thread_config); 711 712 if (vdo->compression_context != NULL) { 713 for (i = 0; i < vdo->device_config->thread_counts.cpu_threads; i++) 714 vdo_free(vdo_forget(vdo->compression_context[i])); 715 716 vdo_free(vdo_forget(vdo->compression_context)); 717 } 718 vdo_free(vdo); 719 } 720 721 static int initialize_super_block(struct vdo *vdo, struct vdo_super_block *super_block) 722 { 723 int result; 724 725 result = vdo_allocate(VDO_BLOCK_SIZE, char, "encoded super block", 726 (char **) &vdo->super_block.buffer); 727 if (result != VDO_SUCCESS) 728 return result; 729 730 return allocate_vio_components(vdo, VIO_TYPE_SUPER_BLOCK, 731 VIO_PRIORITY_METADATA, NULL, 1, 732 (char *) super_block->buffer, 733 &vdo->super_block.vio); 734 } 735 736 /** 737 * finish_reading_super_block() - Continue after loading the super block. 738 * @completion: The super block vio. 739 * 740 * This callback is registered in vdo_load_super_block(). 741 */ 742 static void finish_reading_super_block(struct vdo_completion *completion) 743 { 744 struct vdo_super_block *super_block = 745 container_of(as_vio(completion), struct vdo_super_block, vio); 746 747 vdo_continue_completion(vdo_forget(completion->parent), 748 vdo_decode_super_block(super_block->buffer)); 749 } 750 751 /** 752 * handle_super_block_read_error() - Handle an error reading the super block. 753 * @completion: The super block vio. 754 * 755 * This error handler is registered in vdo_load_super_block(). 756 */ 757 static void handle_super_block_read_error(struct vdo_completion *completion) 758 { 759 vio_record_metadata_io_error(as_vio(completion)); 760 finish_reading_super_block(completion); 761 } 762 763 static void read_super_block_endio(struct bio *bio) 764 { 765 struct vio *vio = bio->bi_private; 766 struct vdo_completion *parent = vio->completion.parent; 767 768 continue_vio_after_io(vio, finish_reading_super_block, 769 parent->callback_thread_id); 770 } 771 772 /** 773 * vdo_load_super_block() - Allocate a super block and read its contents from storage. 774 * @vdo: The vdo containing the super block on disk. 775 * @parent: The completion to notify after loading the super block. 776 */ 777 void vdo_load_super_block(struct vdo *vdo, struct vdo_completion *parent) 778 { 779 int result; 780 781 result = initialize_super_block(vdo, &vdo->super_block); 782 if (result != VDO_SUCCESS) { 783 vdo_continue_completion(parent, result); 784 return; 785 } 786 787 vdo->super_block.vio.completion.parent = parent; 788 vdo_submit_metadata_vio(&vdo->super_block.vio, 789 vdo_get_data_region_start(vdo->geometry), 790 read_super_block_endio, 791 handle_super_block_read_error, 792 REQ_OP_READ); 793 } 794 795 /** 796 * vdo_get_backing_device() - Get the block device object underlying a vdo. 797 * @vdo: The vdo. 798 * 799 * Return: The vdo's current block device. 800 */ 801 struct block_device *vdo_get_backing_device(const struct vdo *vdo) 802 { 803 return vdo->device_config->owned_device->bdev; 804 } 805 806 /** 807 * vdo_get_device_name() - Get the device name associated with the vdo target. 808 * @target: The target device interface. 809 * 810 * Return: The block device name. 811 */ 812 const char *vdo_get_device_name(const struct dm_target *target) 813 { 814 return dm_device_name(dm_table_get_md(target->table)); 815 } 816 817 /** 818 * vdo_synchronous_flush() - Issue a flush request and wait for it to complete. 819 * @vdo: The vdo. 820 * 821 * Return: VDO_SUCCESS or an error. 822 */ 823 int vdo_synchronous_flush(struct vdo *vdo) 824 { 825 int result; 826 struct bio bio; 827 828 bio_init(&bio, vdo_get_backing_device(vdo), NULL, 0, 829 REQ_OP_WRITE | REQ_PREFLUSH); 830 submit_bio_wait(&bio); 831 result = blk_status_to_errno(bio.bi_status); 832 833 atomic64_inc(&vdo->stats.flush_out); 834 if (result != 0) { 835 vdo_log_error_strerror(result, "synchronous flush failed"); 836 result = -EIO; 837 } 838 839 bio_uninit(&bio); 840 return result; 841 } 842 843 /** 844 * vdo_get_state() - Get the current state of the vdo. 845 * @vdo: The vdo. 846 * 847 * Context: This method may be called from any thread. 848 * 849 * Return: The current state of the vdo. 850 */ 851 enum vdo_state vdo_get_state(const struct vdo *vdo) 852 { 853 enum vdo_state state = atomic_read(&vdo->state); 854 855 /* pairs with barriers where state field is changed */ 856 smp_rmb(); 857 return state; 858 } 859 860 /** 861 * vdo_set_state() - Set the current state of the vdo. 862 * @vdo: The vdo whose state is to be set. 863 * @state: The new state of the vdo. 864 * 865 * Context: This method may be called from any thread. 866 */ 867 void vdo_set_state(struct vdo *vdo, enum vdo_state state) 868 { 869 /* pairs with barrier in vdo_get_state */ 870 smp_wmb(); 871 atomic_set(&vdo->state, state); 872 } 873 874 /** 875 * vdo_get_admin_state() - Get the admin state of the vdo. 876 * @vdo: The vdo. 877 * 878 * Return: The code for the vdo's current admin state. 879 */ 880 const struct admin_state_code *vdo_get_admin_state(const struct vdo *vdo) 881 { 882 return vdo_get_admin_state_code(&vdo->admin.state); 883 } 884 885 /** 886 * record_vdo() - Record the state of the VDO for encoding in the super block. 887 */ 888 static void record_vdo(struct vdo *vdo) 889 { 890 /* This is for backwards compatibility. */ 891 vdo->states.unused = vdo->geometry.unused; 892 vdo->states.vdo.state = vdo_get_state(vdo); 893 vdo->states.block_map = vdo_record_block_map(vdo->block_map); 894 vdo->states.recovery_journal = vdo_record_recovery_journal(vdo->recovery_journal); 895 vdo->states.slab_depot = vdo_record_slab_depot(vdo->depot); 896 vdo->states.layout = vdo->layout; 897 } 898 899 /** 900 * continue_super_block_parent() - Continue the parent of a super block save operation. 901 * @completion: The super block vio. 902 * 903 * This callback is registered in vdo_save_components(). 904 */ 905 static void continue_super_block_parent(struct vdo_completion *completion) 906 { 907 vdo_continue_completion(vdo_forget(completion->parent), completion->result); 908 } 909 910 /** 911 * handle_save_error() - Log a super block save error. 912 * @completion: The super block vio. 913 * 914 * This error handler is registered in vdo_save_components(). 915 */ 916 static void handle_save_error(struct vdo_completion *completion) 917 { 918 struct vdo_super_block *super_block = 919 container_of(as_vio(completion), struct vdo_super_block, vio); 920 921 vio_record_metadata_io_error(&super_block->vio); 922 vdo_log_error_strerror(completion->result, "super block save failed"); 923 /* 924 * Mark the super block as unwritable so that we won't attempt to write it again. This 925 * avoids the case where a growth attempt fails writing the super block with the new size, 926 * but the subsequent attempt to write out the read-only state succeeds. In this case, 927 * writes which happened just before the suspend would not be visible if the VDO is 928 * restarted without rebuilding, but, after a read-only rebuild, the effects of those 929 * writes would reappear. 930 */ 931 super_block->unwritable = true; 932 completion->callback(completion); 933 } 934 935 static void super_block_write_endio(struct bio *bio) 936 { 937 struct vio *vio = bio->bi_private; 938 struct vdo_completion *parent = vio->completion.parent; 939 940 continue_vio_after_io(vio, continue_super_block_parent, 941 parent->callback_thread_id); 942 } 943 944 /** 945 * vdo_save_components() - Encode the vdo and save the super block asynchronously. 946 * @vdo: The vdo whose state is being saved. 947 * @parent: The completion to notify when the save is complete. 948 */ 949 void vdo_save_components(struct vdo *vdo, struct vdo_completion *parent) 950 { 951 struct vdo_super_block *super_block = &vdo->super_block; 952 953 if (super_block->unwritable) { 954 vdo_continue_completion(parent, VDO_READ_ONLY); 955 return; 956 } 957 958 if (super_block->vio.completion.parent != NULL) { 959 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 960 return; 961 } 962 963 record_vdo(vdo); 964 965 vdo_encode_super_block(super_block->buffer, &vdo->states); 966 super_block->vio.completion.parent = parent; 967 super_block->vio.completion.callback_thread_id = parent->callback_thread_id; 968 vdo_submit_metadata_vio(&super_block->vio, 969 vdo_get_data_region_start(vdo->geometry), 970 super_block_write_endio, handle_save_error, 971 REQ_OP_WRITE | REQ_PREFLUSH | REQ_FUA); 972 } 973 974 /** 975 * vdo_register_read_only_listener() - Register a listener to be notified when the VDO goes 976 * read-only. 977 * @vdo: The vdo to register with. 978 * @listener: The object to notify. 979 * @notification: The function to call to send the notification. 980 * @thread_id: The id of the thread on which to send the notification. 981 * 982 * Return: VDO_SUCCESS or an error. 983 */ 984 int vdo_register_read_only_listener(struct vdo *vdo, void *listener, 985 vdo_read_only_notification_fn notification, 986 thread_id_t thread_id) 987 { 988 struct vdo_thread *thread = &vdo->threads[thread_id]; 989 struct read_only_listener *read_only_listener; 990 int result; 991 992 result = VDO_ASSERT(thread_id != vdo->thread_config.dedupe_thread, 993 "read only listener not registered on dedupe thread"); 994 if (result != VDO_SUCCESS) 995 return result; 996 997 result = vdo_allocate(1, struct read_only_listener, __func__, 998 &read_only_listener); 999 if (result != VDO_SUCCESS) 1000 return result; 1001 1002 *read_only_listener = (struct read_only_listener) { 1003 .listener = listener, 1004 .notify = notification, 1005 .next = thread->listeners, 1006 }; 1007 1008 thread->listeners = read_only_listener; 1009 return VDO_SUCCESS; 1010 } 1011 1012 /** 1013 * notify_vdo_of_read_only_mode() - Notify a vdo that it is going read-only. 1014 * @listener: The vdo. 1015 * @parent: The completion to notify in order to acknowledge the notification. 1016 * 1017 * This will save the read-only state to the super block. 1018 * 1019 * Implements vdo_read_only_notification_fn. 1020 */ 1021 static void notify_vdo_of_read_only_mode(void *listener, struct vdo_completion *parent) 1022 { 1023 struct vdo *vdo = listener; 1024 1025 if (vdo_in_read_only_mode(vdo)) 1026 vdo_finish_completion(parent); 1027 1028 vdo_set_state(vdo, VDO_READ_ONLY_MODE); 1029 vdo_save_components(vdo, parent); 1030 } 1031 1032 /** 1033 * vdo_enable_read_only_entry() - Enable a vdo to enter read-only mode on errors. 1034 * @vdo: The vdo to enable. 1035 * 1036 * Return: VDO_SUCCESS or an error. 1037 */ 1038 int vdo_enable_read_only_entry(struct vdo *vdo) 1039 { 1040 thread_id_t id; 1041 bool is_read_only = vdo_in_read_only_mode(vdo); 1042 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1043 1044 if (is_read_only) { 1045 notifier->read_only_error = VDO_READ_ONLY; 1046 notifier->state = NOTIFIED; 1047 } else { 1048 notifier->state = MAY_NOT_NOTIFY; 1049 } 1050 1051 spin_lock_init(¬ifier->lock); 1052 vdo_initialize_completion(¬ifier->completion, vdo, 1053 VDO_READ_ONLY_MODE_COMPLETION); 1054 1055 for (id = 0; id < vdo->thread_config.thread_count; id++) 1056 vdo->threads[id].is_read_only = is_read_only; 1057 1058 return vdo_register_read_only_listener(vdo, vdo, notify_vdo_of_read_only_mode, 1059 vdo->thread_config.admin_thread); 1060 } 1061 1062 /** 1063 * vdo_wait_until_not_entering_read_only_mode() - Wait until no read-only notifications are in 1064 * progress and prevent any subsequent 1065 * notifications. 1066 * @parent: The completion to notify when no threads are entering read-only mode. 1067 * 1068 * Notifications may be re-enabled by calling vdo_allow_read_only_mode_entry(). 1069 */ 1070 void vdo_wait_until_not_entering_read_only_mode(struct vdo_completion *parent) 1071 { 1072 struct vdo *vdo = parent->vdo; 1073 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1074 1075 vdo_assert_on_admin_thread(vdo, __func__); 1076 1077 if (notifier->waiter != NULL) { 1078 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1079 return; 1080 } 1081 1082 spin_lock(¬ifier->lock); 1083 if (notifier->state == NOTIFYING) 1084 notifier->waiter = parent; 1085 else if (notifier->state == MAY_NOTIFY) 1086 notifier->state = MAY_NOT_NOTIFY; 1087 spin_unlock(¬ifier->lock); 1088 1089 if (notifier->waiter == NULL) { 1090 /* 1091 * A notification was not in progress, and now they are 1092 * disallowed. 1093 */ 1094 vdo_launch_completion(parent); 1095 return; 1096 } 1097 } 1098 1099 /** 1100 * as_notifier() - Convert a generic vdo_completion to a read_only_notifier. 1101 * @completion: The completion to convert. 1102 * 1103 * Return: The completion as a read_only_notifier. 1104 */ 1105 static inline struct read_only_notifier *as_notifier(struct vdo_completion *completion) 1106 { 1107 vdo_assert_completion_type(completion, VDO_READ_ONLY_MODE_COMPLETION); 1108 return container_of(completion, struct read_only_notifier, completion); 1109 } 1110 1111 /** 1112 * finish_entering_read_only_mode() - Complete the process of entering read only mode. 1113 * @completion: The read-only mode completion. 1114 */ 1115 static void finish_entering_read_only_mode(struct vdo_completion *completion) 1116 { 1117 struct read_only_notifier *notifier = as_notifier(completion); 1118 1119 vdo_assert_on_admin_thread(completion->vdo, __func__); 1120 1121 spin_lock(¬ifier->lock); 1122 notifier->state = NOTIFIED; 1123 spin_unlock(¬ifier->lock); 1124 1125 if (notifier->waiter != NULL) 1126 vdo_continue_completion(vdo_forget(notifier->waiter), 1127 completion->result); 1128 } 1129 1130 /** 1131 * make_thread_read_only() - Inform each thread that the VDO is in read-only mode. 1132 * @completion: The read-only mode completion. 1133 */ 1134 static void make_thread_read_only(struct vdo_completion *completion) 1135 { 1136 struct vdo *vdo = completion->vdo; 1137 thread_id_t thread_id = completion->callback_thread_id; 1138 struct read_only_notifier *notifier = as_notifier(completion); 1139 struct read_only_listener *listener = completion->parent; 1140 1141 if (listener == NULL) { 1142 /* This is the first call on this thread */ 1143 struct vdo_thread *thread = &vdo->threads[thread_id]; 1144 1145 thread->is_read_only = true; 1146 listener = thread->listeners; 1147 if (thread_id == 0) 1148 vdo_log_error_strerror(READ_ONCE(notifier->read_only_error), 1149 "Unrecoverable error, entering read-only mode"); 1150 } else { 1151 /* We've just finished notifying a listener */ 1152 listener = listener->next; 1153 } 1154 1155 if (listener != NULL) { 1156 /* We have a listener to notify */ 1157 vdo_prepare_completion(completion, make_thread_read_only, 1158 make_thread_read_only, thread_id, 1159 listener); 1160 listener->notify(listener->listener, completion); 1161 return; 1162 } 1163 1164 /* We're done with this thread */ 1165 if (++thread_id == vdo->thread_config.dedupe_thread) { 1166 /* 1167 * We don't want to notify the dedupe thread since it may be 1168 * blocked rebuilding the index. 1169 */ 1170 thread_id++; 1171 } 1172 1173 if (thread_id >= vdo->thread_config.thread_count) { 1174 /* There are no more threads */ 1175 vdo_prepare_completion(completion, finish_entering_read_only_mode, 1176 finish_entering_read_only_mode, 1177 vdo->thread_config.admin_thread, NULL); 1178 } else { 1179 vdo_prepare_completion(completion, make_thread_read_only, 1180 make_thread_read_only, thread_id, NULL); 1181 } 1182 1183 vdo_launch_completion(completion); 1184 } 1185 1186 /** 1187 * vdo_allow_read_only_mode_entry() - Allow the notifier to put the VDO into read-only mode, 1188 * reversing the effects of 1189 * vdo_wait_until_not_entering_read_only_mode(). 1190 * @parent: The object to notify once the operation is complete. 1191 * 1192 * If some thread tried to put the vdo into read-only mode while notifications were disallowed, it 1193 * will be done when this method is called. If that happens, the parent will not be notified until 1194 * the vdo has actually entered read-only mode and attempted to save the super block. 1195 * 1196 * Context: This method may only be called from the admin thread. 1197 */ 1198 void vdo_allow_read_only_mode_entry(struct vdo_completion *parent) 1199 { 1200 struct vdo *vdo = parent->vdo; 1201 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1202 1203 vdo_assert_on_admin_thread(vdo, __func__); 1204 1205 if (notifier->waiter != NULL) { 1206 vdo_continue_completion(parent, VDO_COMPONENT_BUSY); 1207 return; 1208 } 1209 1210 spin_lock(¬ifier->lock); 1211 if (notifier->state == MAY_NOT_NOTIFY) { 1212 if (notifier->read_only_error == VDO_SUCCESS) { 1213 notifier->state = MAY_NOTIFY; 1214 } else { 1215 notifier->state = NOTIFYING; 1216 notifier->waiter = parent; 1217 } 1218 } 1219 spin_unlock(¬ifier->lock); 1220 1221 if (notifier->waiter == NULL) { 1222 /* We're done */ 1223 vdo_launch_completion(parent); 1224 return; 1225 } 1226 1227 /* Do the pending notification. */ 1228 make_thread_read_only(¬ifier->completion); 1229 } 1230 1231 /** 1232 * vdo_enter_read_only_mode() - Put a VDO into read-only mode and save the read-only state in the 1233 * super block. 1234 * @vdo: The vdo. 1235 * @error_code: The error which caused the VDO to enter read-only mode. 1236 * 1237 * This method is a no-op if the VDO is already read-only. 1238 */ 1239 void vdo_enter_read_only_mode(struct vdo *vdo, int error_code) 1240 { 1241 bool notify = false; 1242 thread_id_t thread_id = vdo_get_callback_thread_id(); 1243 struct read_only_notifier *notifier = &vdo->read_only_notifier; 1244 struct vdo_thread *thread; 1245 1246 if (thread_id != VDO_INVALID_THREAD_ID) { 1247 thread = &vdo->threads[thread_id]; 1248 if (thread->is_read_only) { 1249 /* This thread has already gone read-only. */ 1250 return; 1251 } 1252 1253 /* Record for this thread that the VDO is read-only. */ 1254 thread->is_read_only = true; 1255 } 1256 1257 spin_lock(¬ifier->lock); 1258 if (notifier->read_only_error == VDO_SUCCESS) { 1259 WRITE_ONCE(notifier->read_only_error, error_code); 1260 if (notifier->state == MAY_NOTIFY) { 1261 notifier->state = NOTIFYING; 1262 notify = true; 1263 } 1264 } 1265 spin_unlock(¬ifier->lock); 1266 1267 if (!notify) { 1268 /* The notifier is already aware of a read-only error */ 1269 return; 1270 } 1271 1272 /* Initiate a notification starting on the lowest numbered thread. */ 1273 vdo_launch_completion_callback(¬ifier->completion, make_thread_read_only, 0); 1274 } 1275 1276 /** 1277 * vdo_is_read_only() - Check whether the VDO is read-only. 1278 * @vdo: The vdo. 1279 * 1280 * Return: true if the vdo is read-only. 1281 * 1282 * This method may be called from any thread, as opposed to examining the VDO's state field which 1283 * is only safe to check from the admin thread. 1284 */ 1285 bool vdo_is_read_only(struct vdo *vdo) 1286 { 1287 return vdo->threads[vdo_get_callback_thread_id()].is_read_only; 1288 } 1289 1290 /** 1291 * vdo_in_read_only_mode() - Check whether a vdo is in read-only mode. 1292 * @vdo: The vdo to query. 1293 * 1294 * Return: true if the vdo is in read-only mode. 1295 */ 1296 bool vdo_in_read_only_mode(const struct vdo *vdo) 1297 { 1298 return (vdo_get_state(vdo) == VDO_READ_ONLY_MODE); 1299 } 1300 1301 /** 1302 * vdo_in_recovery_mode() - Check whether the vdo is in recovery mode. 1303 * @vdo: The vdo to query. 1304 * 1305 * Return: true if the vdo is in recovery mode. 1306 */ 1307 bool vdo_in_recovery_mode(const struct vdo *vdo) 1308 { 1309 return (vdo_get_state(vdo) == VDO_RECOVERING); 1310 } 1311 1312 /** 1313 * vdo_enter_recovery_mode() - Put the vdo into recovery mode. 1314 * @vdo: The vdo. 1315 */ 1316 void vdo_enter_recovery_mode(struct vdo *vdo) 1317 { 1318 vdo_assert_on_admin_thread(vdo, __func__); 1319 1320 if (vdo_in_read_only_mode(vdo)) 1321 return; 1322 1323 vdo_log_info("Entering recovery mode"); 1324 vdo_set_state(vdo, VDO_RECOVERING); 1325 } 1326 1327 /** 1328 * complete_synchronous_action() - Signal the waiting thread that a synchronous action is complete. 1329 * @completion: The sync completion. 1330 */ 1331 static void complete_synchronous_action(struct vdo_completion *completion) 1332 { 1333 vdo_assert_completion_type(completion, VDO_SYNC_COMPLETION); 1334 complete(&(container_of(completion, struct sync_completion, 1335 vdo_completion)->completion)); 1336 } 1337 1338 /** 1339 * perform_synchronous_action() - Launch an action on a VDO thread and wait for it to complete. 1340 * @vdo: The vdo. 1341 * @action: The callback to launch. 1342 * @thread_id: The thread on which to run the action. 1343 * @parent: The parent of the sync completion (may be NULL). 1344 */ 1345 static int perform_synchronous_action(struct vdo *vdo, vdo_action_fn action, 1346 thread_id_t thread_id, void *parent) 1347 { 1348 struct sync_completion sync; 1349 1350 vdo_initialize_completion(&sync.vdo_completion, vdo, VDO_SYNC_COMPLETION); 1351 init_completion(&sync.completion); 1352 sync.vdo_completion.parent = parent; 1353 vdo_launch_completion_callback(&sync.vdo_completion, action, thread_id); 1354 wait_for_completion(&sync.completion); 1355 return sync.vdo_completion.result; 1356 } 1357 1358 /** 1359 * set_compression_callback() - Callback to turn compression on or off. 1360 * @completion: The completion. 1361 */ 1362 static void set_compression_callback(struct vdo_completion *completion) 1363 { 1364 struct vdo *vdo = completion->vdo; 1365 bool *enable = completion->parent; 1366 bool was_enabled = vdo_get_compressing(vdo); 1367 1368 if (*enable != was_enabled) { 1369 WRITE_ONCE(vdo->compressing, *enable); 1370 if (was_enabled) { 1371 /* Signal the packer to flush since compression has been disabled. */ 1372 vdo_flush_packer(vdo->packer); 1373 } 1374 } 1375 1376 vdo_log_info("compression is %s", (*enable ? "enabled" : "disabled")); 1377 *enable = was_enabled; 1378 complete_synchronous_action(completion); 1379 } 1380 1381 /** 1382 * vdo_set_compressing() - Turn compression on or off. 1383 * @vdo: The vdo. 1384 * @enable: Whether to enable or disable compression. 1385 * 1386 * Return: Whether compression was previously on or off. 1387 */ 1388 bool vdo_set_compressing(struct vdo *vdo, bool enable) 1389 { 1390 perform_synchronous_action(vdo, set_compression_callback, 1391 vdo->thread_config.packer_thread, 1392 &enable); 1393 return enable; 1394 } 1395 1396 /** 1397 * vdo_get_compressing() - Get whether compression is enabled in a vdo. 1398 * @vdo: The vdo. 1399 * 1400 * Return: State of compression. 1401 */ 1402 bool vdo_get_compressing(struct vdo *vdo) 1403 { 1404 return READ_ONCE(vdo->compressing); 1405 } 1406 1407 static size_t get_block_map_cache_size(const struct vdo *vdo) 1408 { 1409 return ((size_t) vdo->device_config->cache_size) * VDO_BLOCK_SIZE; 1410 } 1411 1412 static struct error_statistics __must_check get_vdo_error_statistics(const struct vdo *vdo) 1413 { 1414 /* 1415 * The error counts can be incremented from arbitrary threads and so must be incremented 1416 * atomically, but they are just statistics with no semantics that could rely on memory 1417 * order, so unfenced reads are sufficient. 1418 */ 1419 const struct atomic_statistics *atoms = &vdo->stats; 1420 1421 return (struct error_statistics) { 1422 .invalid_advice_pbn_count = atomic64_read(&atoms->invalid_advice_pbn_count), 1423 .no_space_error_count = atomic64_read(&atoms->no_space_error_count), 1424 .read_only_error_count = atomic64_read(&atoms->read_only_error_count), 1425 }; 1426 } 1427 1428 static void copy_bio_stat(struct bio_stats *b, const struct atomic_bio_stats *a) 1429 { 1430 b->read = atomic64_read(&a->read); 1431 b->write = atomic64_read(&a->write); 1432 b->discard = atomic64_read(&a->discard); 1433 b->flush = atomic64_read(&a->flush); 1434 b->empty_flush = atomic64_read(&a->empty_flush); 1435 b->fua = atomic64_read(&a->fua); 1436 } 1437 1438 static struct bio_stats subtract_bio_stats(struct bio_stats minuend, 1439 struct bio_stats subtrahend) 1440 { 1441 return (struct bio_stats) { 1442 .read = minuend.read - subtrahend.read, 1443 .write = minuend.write - subtrahend.write, 1444 .discard = minuend.discard - subtrahend.discard, 1445 .flush = minuend.flush - subtrahend.flush, 1446 .empty_flush = minuend.empty_flush - subtrahend.empty_flush, 1447 .fua = minuend.fua - subtrahend.fua, 1448 }; 1449 } 1450 1451 /** 1452 * vdo_get_physical_blocks_allocated() - Get the number of physical blocks in use by user data. 1453 * @vdo: The vdo. 1454 * 1455 * Return: The number of blocks allocated for user data. 1456 */ 1457 static block_count_t __must_check vdo_get_physical_blocks_allocated(const struct vdo *vdo) 1458 { 1459 return (vdo_get_slab_depot_allocated_blocks(vdo->depot) - 1460 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1461 } 1462 1463 /** 1464 * vdo_get_physical_blocks_overhead() - Get the number of physical blocks used by vdo metadata. 1465 * @vdo: The vdo. 1466 * 1467 * Return: The number of overhead blocks. 1468 */ 1469 static block_count_t __must_check vdo_get_physical_blocks_overhead(const struct vdo *vdo) 1470 { 1471 /* 1472 * config.physical_blocks is mutated during resize and is in a packed structure, 1473 * but resize runs on admin thread. 1474 * TODO: Verify that this is always safe. 1475 */ 1476 return (vdo->states.vdo.config.physical_blocks - 1477 vdo_get_slab_depot_data_blocks(vdo->depot) + 1478 vdo_get_journal_block_map_data_blocks_used(vdo->recovery_journal)); 1479 } 1480 1481 static const char *vdo_describe_state(enum vdo_state state) 1482 { 1483 /* These strings should all fit in the 15 chars of VDOStatistics.mode. */ 1484 switch (state) { 1485 case VDO_RECOVERING: 1486 return "recovering"; 1487 1488 case VDO_READ_ONLY_MODE: 1489 return "read-only"; 1490 1491 default: 1492 return "normal"; 1493 } 1494 } 1495 1496 /** 1497 * get_vdo_statistics() - Populate a vdo_statistics structure on the admin thread. 1498 * @vdo: The vdo. 1499 * @stats: The statistics structure to populate. 1500 */ 1501 static void get_vdo_statistics(const struct vdo *vdo, struct vdo_statistics *stats) 1502 { 1503 struct recovery_journal *journal = vdo->recovery_journal; 1504 enum vdo_state state = vdo_get_state(vdo); 1505 1506 vdo_assert_on_admin_thread(vdo, __func__); 1507 1508 /* start with a clean slate */ 1509 memset(stats, 0, sizeof(struct vdo_statistics)); 1510 1511 /* 1512 * These are immutable properties of the vdo object, so it is safe to query them from any 1513 * thread. 1514 */ 1515 stats->version = STATISTICS_VERSION; 1516 stats->logical_blocks = vdo->states.vdo.config.logical_blocks; 1517 /* 1518 * config.physical_blocks is mutated during resize and is in a packed structure, but resize 1519 * runs on the admin thread. 1520 * TODO: verify that this is always safe 1521 */ 1522 stats->physical_blocks = vdo->states.vdo.config.physical_blocks; 1523 stats->block_size = VDO_BLOCK_SIZE; 1524 stats->complete_recoveries = vdo->states.vdo.complete_recoveries; 1525 stats->read_only_recoveries = vdo->states.vdo.read_only_recoveries; 1526 stats->block_map_cache_size = get_block_map_cache_size(vdo); 1527 1528 /* The callees are responsible for thread-safety. */ 1529 stats->data_blocks_used = vdo_get_physical_blocks_allocated(vdo); 1530 stats->overhead_blocks_used = vdo_get_physical_blocks_overhead(vdo); 1531 stats->logical_blocks_used = vdo_get_recovery_journal_logical_blocks_used(journal); 1532 vdo_get_slab_depot_statistics(vdo->depot, stats); 1533 stats->journal = vdo_get_recovery_journal_statistics(journal); 1534 stats->packer = vdo_get_packer_statistics(vdo->packer); 1535 stats->block_map = vdo_get_block_map_statistics(vdo->block_map); 1536 vdo_get_dedupe_statistics(vdo->hash_zones, stats); 1537 stats->errors = get_vdo_error_statistics(vdo); 1538 stats->in_recovery_mode = (state == VDO_RECOVERING); 1539 snprintf(stats->mode, sizeof(stats->mode), "%s", vdo_describe_state(state)); 1540 1541 stats->instance = vdo->instance; 1542 stats->current_vios_in_progress = get_data_vio_pool_active_requests(vdo->data_vio_pool); 1543 stats->max_vios = get_data_vio_pool_maximum_requests(vdo->data_vio_pool); 1544 1545 stats->flush_out = atomic64_read(&vdo->stats.flush_out); 1546 stats->logical_block_size = vdo->device_config->logical_block_size; 1547 copy_bio_stat(&stats->bios_in, &vdo->stats.bios_in); 1548 copy_bio_stat(&stats->bios_in_partial, &vdo->stats.bios_in_partial); 1549 copy_bio_stat(&stats->bios_out, &vdo->stats.bios_out); 1550 copy_bio_stat(&stats->bios_meta, &vdo->stats.bios_meta); 1551 copy_bio_stat(&stats->bios_journal, &vdo->stats.bios_journal); 1552 copy_bio_stat(&stats->bios_page_cache, &vdo->stats.bios_page_cache); 1553 copy_bio_stat(&stats->bios_out_completed, &vdo->stats.bios_out_completed); 1554 copy_bio_stat(&stats->bios_meta_completed, &vdo->stats.bios_meta_completed); 1555 copy_bio_stat(&stats->bios_journal_completed, 1556 &vdo->stats.bios_journal_completed); 1557 copy_bio_stat(&stats->bios_page_cache_completed, 1558 &vdo->stats.bios_page_cache_completed); 1559 copy_bio_stat(&stats->bios_acknowledged, &vdo->stats.bios_acknowledged); 1560 copy_bio_stat(&stats->bios_acknowledged_partial, &vdo->stats.bios_acknowledged_partial); 1561 stats->bios_in_progress = 1562 subtract_bio_stats(stats->bios_in, stats->bios_acknowledged); 1563 vdo_get_memory_stats(&stats->memory_usage.bytes_used, 1564 &stats->memory_usage.peak_bytes_used); 1565 } 1566 1567 /** 1568 * vdo_fetch_statistics_callback() - Action to populate a vdo_statistics 1569 * structure on the admin thread. 1570 * @completion: The completion. 1571 * 1572 * This callback is registered in vdo_fetch_statistics(). 1573 */ 1574 static void vdo_fetch_statistics_callback(struct vdo_completion *completion) 1575 { 1576 get_vdo_statistics(completion->vdo, completion->parent); 1577 complete_synchronous_action(completion); 1578 } 1579 1580 /** 1581 * vdo_fetch_statistics() - Fetch statistics on the correct thread. 1582 * @vdo: The vdo. 1583 * @stats: The vdo statistics are returned here. 1584 */ 1585 void vdo_fetch_statistics(struct vdo *vdo, struct vdo_statistics *stats) 1586 { 1587 perform_synchronous_action(vdo, vdo_fetch_statistics_callback, 1588 vdo->thread_config.admin_thread, stats); 1589 } 1590 1591 /** 1592 * vdo_get_callback_thread_id() - Get the id of the callback thread on which a completion is 1593 * currently running. 1594 * 1595 * Return: The current thread ID, or -1 if no such thread. 1596 */ 1597 thread_id_t vdo_get_callback_thread_id(void) 1598 { 1599 struct vdo_work_queue *queue = vdo_get_current_work_queue(); 1600 struct vdo_thread *thread; 1601 thread_id_t thread_id; 1602 1603 if (queue == NULL) 1604 return VDO_INVALID_THREAD_ID; 1605 1606 thread = vdo_get_work_queue_owner(queue); 1607 thread_id = thread->thread_id; 1608 1609 if (PARANOID_THREAD_CONSISTENCY_CHECKS) { 1610 BUG_ON(thread_id >= thread->vdo->thread_config.thread_count); 1611 BUG_ON(thread != &thread->vdo->threads[thread_id]); 1612 } 1613 1614 return thread_id; 1615 } 1616 1617 /** 1618 * vdo_dump_status() - Dump status information about a vdo to the log for debugging. 1619 * @vdo: The vdo to dump. 1620 */ 1621 void vdo_dump_status(const struct vdo *vdo) 1622 { 1623 zone_count_t zone; 1624 1625 vdo_dump_flusher(vdo->flusher); 1626 vdo_dump_recovery_journal_statistics(vdo->recovery_journal); 1627 vdo_dump_packer(vdo->packer); 1628 vdo_dump_slab_depot(vdo->depot); 1629 1630 for (zone = 0; zone < vdo->thread_config.logical_zone_count; zone++) 1631 vdo_dump_logical_zone(&vdo->logical_zones->zones[zone]); 1632 1633 for (zone = 0; zone < vdo->thread_config.physical_zone_count; zone++) 1634 vdo_dump_physical_zone(&vdo->physical_zones->zones[zone]); 1635 1636 vdo_dump_hash_zones(vdo->hash_zones); 1637 } 1638 1639 /** 1640 * vdo_assert_on_admin_thread() - Assert that we are running on the admin thread. 1641 * @vdo: The vdo. 1642 * @name: The name of the function which should be running on the admin thread (for logging). 1643 */ 1644 void vdo_assert_on_admin_thread(const struct vdo *vdo, const char *name) 1645 { 1646 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == vdo->thread_config.admin_thread), 1647 "%s called on admin thread", name); 1648 } 1649 1650 /** 1651 * vdo_assert_on_logical_zone_thread() - Assert that this function was called on the specified 1652 * logical zone thread. 1653 * @vdo: The vdo. 1654 * @logical_zone: The number of the logical zone. 1655 * @name: The name of the calling function. 1656 */ 1657 void vdo_assert_on_logical_zone_thread(const struct vdo *vdo, zone_count_t logical_zone, 1658 const char *name) 1659 { 1660 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1661 vdo->thread_config.logical_threads[logical_zone]), 1662 "%s called on logical thread", name); 1663 } 1664 1665 /** 1666 * vdo_assert_on_physical_zone_thread() - Assert that this function was called on the specified 1667 * physical zone thread. 1668 * @vdo: The vdo. 1669 * @physical_zone: The number of the physical zone. 1670 * @name: The name of the calling function. 1671 */ 1672 void vdo_assert_on_physical_zone_thread(const struct vdo *vdo, 1673 zone_count_t physical_zone, const char *name) 1674 { 1675 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == 1676 vdo->thread_config.physical_threads[physical_zone]), 1677 "%s called on physical thread", name); 1678 } 1679 1680 /** 1681 * vdo_get_physical_zone() - Get the physical zone responsible for a given physical block number. 1682 * @vdo: The vdo containing the physical zones. 1683 * @pbn: The PBN of the data block. 1684 * @zone_ptr: A pointer to return the physical zone. 1685 * 1686 * Gets the physical zone responsible for a given physical block number of a data block in this vdo 1687 * instance, or of the zero block (for which a NULL zone is returned). For any other block number 1688 * that is not in the range of valid data block numbers in any slab, an error will be returned. 1689 * This function is safe to call on invalid block numbers; it will not put the vdo into read-only 1690 * mode. 1691 * 1692 * Return: VDO_SUCCESS or VDO_OUT_OF_RANGE if the block number is invalid or an error code for any 1693 * other failure. 1694 */ 1695 int vdo_get_physical_zone(const struct vdo *vdo, physical_block_number_t pbn, 1696 struct physical_zone **zone_ptr) 1697 { 1698 struct vdo_slab *slab; 1699 int result; 1700 1701 if (pbn == VDO_ZERO_BLOCK) { 1702 *zone_ptr = NULL; 1703 return VDO_SUCCESS; 1704 } 1705 1706 /* 1707 * Used because it does a more restrictive bounds check than vdo_get_slab(), and done first 1708 * because it won't trigger read-only mode on an invalid PBN. 1709 */ 1710 if (!vdo_is_physical_data_block(vdo->depot, pbn)) 1711 return VDO_OUT_OF_RANGE; 1712 1713 /* With the PBN already checked, we should always succeed in finding a slab. */ 1714 slab = vdo_get_slab(vdo->depot, pbn); 1715 result = VDO_ASSERT(slab != NULL, "vdo_get_slab must succeed on all valid PBNs"); 1716 if (result != VDO_SUCCESS) 1717 return result; 1718 1719 *zone_ptr = &vdo->physical_zones->zones[slab->allocator->zone_number]; 1720 return VDO_SUCCESS; 1721 } 1722