1 // SPDX-License-Identifier: GPL-2.0-only 2 /* 3 * Copyright 2023 Red Hat 4 */ 5 6 #include "volume.h" 7 8 #include <linux/atomic.h> 9 #include <linux/dm-bufio.h> 10 #include <linux/err.h> 11 12 #include "errors.h" 13 #include "logger.h" 14 #include "memory-alloc.h" 15 #include "permassert.h" 16 #include "string-utils.h" 17 #include "thread-utils.h" 18 19 #include "chapter-index.h" 20 #include "config.h" 21 #include "geometry.h" 22 #include "hash-utils.h" 23 #include "index.h" 24 #include "sparse-cache.h" 25 26 /* 27 * The first block of the volume layout is reserved for the volume header, which is no longer used. 28 * The remainder of the volume is divided into chapters consisting of several pages of records, and 29 * several pages of static index to use to find those records. The index pages are recorded first, 30 * followed by the record pages. The chapters are written in order as they are filled, so the 31 * volume storage acts as a circular log of the most recent chapters, with each new chapter 32 * overwriting the oldest saved one. 33 * 34 * When a new chapter is filled and closed, the records from that chapter are sorted and 35 * interleaved in approximate temporal order, and assigned to record pages. Then a static delta 36 * index is generated to store which record page contains each record. The in-memory index page map 37 * is also updated to indicate which delta lists fall on each chapter index page. This means that 38 * when a record is read, the volume only has to load a single index page and a single record page, 39 * rather than search the entire chapter. These index and record pages are written to storage, and 40 * the index pages are transferred to the page cache under the theory that the most recently 41 * written chapter is likely to be accessed again soon. 42 * 43 * When reading a record, the volume index will indicate which chapter should contain it. The 44 * volume uses the index page map to determine which chapter index page needs to be loaded, and 45 * then reads the relevant record page number from the chapter index. Both index and record pages 46 * are stored in a page cache when read for the common case that subsequent records need the same 47 * pages. The page cache evicts the least recently accessed entries when caching new pages. In 48 * addition, the volume uses dm-bufio to manage access to the storage, which may allow for 49 * additional caching depending on available system resources. 50 * 51 * Record requests are handled from cached pages when possible. If a page needs to be read, it is 52 * placed on a queue along with the request that wants to read it. Any requests for the same page 53 * that arrive while the read is pending are added to the queue entry. A separate reader thread 54 * handles the queued reads, adding the page to the cache and updating any requests queued with it 55 * so they can continue processing. This allows the index zone threads to continue processing new 56 * requests rather than wait for the storage reads. 57 * 58 * When an index rebuild is necessary, the volume reads each stored chapter to determine which 59 * range of chapters contain valid records, so that those records can be used to reconstruct the 60 * in-memory volume index. 61 */ 62 63 /* The maximum allowable number of contiguous bad chapters */ 64 #define MAX_BAD_CHAPTERS 100 65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1) 66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15) 67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096 68 69 static const u64 BAD_CHAPTER = U64_MAX; 70 71 /* 72 * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits 73 * are the physical page number of the cached page being read. The high order 32 bits are a 74 * sequence number. This value is written when the zone that owns it begins or completes a cache 75 * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting 76 * to update the cache contents. 77 */ 78 union invalidate_counter { 79 u64 value; 80 struct { 81 u32 page; 82 u32 counter; 83 }; 84 }; 85 86 static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page) 87 { 88 return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter; 89 } 90 91 static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page) 92 { 93 return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter; 94 } 95 96 static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page) 97 { 98 return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter; 99 } 100 101 static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page) 102 { 103 /* Page zero is the header page, so the first chapter index page is page one. */ 104 return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page; 105 } 106 107 static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache, 108 unsigned int zone_number) 109 { 110 return (union invalidate_counter) { 111 .value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value), 112 }; 113 } 114 115 static inline void set_invalidate_counter(struct page_cache *cache, 116 unsigned int zone_number, 117 union invalidate_counter invalidate_counter) 118 { 119 WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value, 120 invalidate_counter.value); 121 } 122 123 static inline bool search_pending(union invalidate_counter invalidate_counter) 124 { 125 return (invalidate_counter.counter & 1) != 0; 126 } 127 128 /* Lock the cache for a zone in order to search for a page. */ 129 static void begin_pending_search(struct page_cache *cache, u32 physical_page, 130 unsigned int zone_number) 131 { 132 union invalidate_counter invalidate_counter = 133 get_invalidate_counter(cache, zone_number); 134 135 invalidate_counter.page = physical_page; 136 invalidate_counter.counter++; 137 set_invalidate_counter(cache, zone_number, invalidate_counter); 138 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter), 139 "Search is pending for zone %u", zone_number); 140 /* 141 * This memory barrier ensures that the write to the invalidate counter is seen by other 142 * threads before this thread accesses the cached page. The corresponding read memory 143 * barrier is in wait_for_pending_searches(). 144 */ 145 smp_mb(); 146 } 147 148 /* Unlock the cache for a zone by clearing its invalidate counter. */ 149 static void end_pending_search(struct page_cache *cache, unsigned int zone_number) 150 { 151 union invalidate_counter invalidate_counter; 152 153 /* 154 * This memory barrier ensures that this thread completes reads of the 155 * cached page before other threads see the write to the invalidate 156 * counter. 157 */ 158 smp_mb(); 159 160 invalidate_counter = get_invalidate_counter(cache, zone_number); 161 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter), 162 "Search is pending for zone %u", zone_number); 163 invalidate_counter.counter++; 164 set_invalidate_counter(cache, zone_number, invalidate_counter); 165 } 166 167 static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page) 168 { 169 union invalidate_counter initial_counters[MAX_ZONES]; 170 unsigned int i; 171 172 /* 173 * We hold the read_threads_mutex. We are waiting for threads that do not hold the 174 * read_threads_mutex. Those threads have "locked" their targeted page by setting the 175 * search_pending_counter. The corresponding write memory barrier is in 176 * begin_pending_search(). 177 */ 178 smp_mb(); 179 180 for (i = 0; i < cache->zone_count; i++) 181 initial_counters[i] = get_invalidate_counter(cache, i); 182 for (i = 0; i < cache->zone_count; i++) { 183 if (search_pending(initial_counters[i]) && 184 (initial_counters[i].page == physical_page)) { 185 /* 186 * There is an active search using the physical page. We need to wait for 187 * the search to finish. 188 * 189 * FIXME: Investigate using wait_event() to wait for the search to finish. 190 */ 191 while (initial_counters[i].value == 192 get_invalidate_counter(cache, i).value) 193 cond_resched(); 194 } 195 } 196 } 197 198 static void release_page_buffer(struct cached_page *page) 199 { 200 if (page->buffer != NULL) 201 dm_bufio_release(vdo_forget(page->buffer)); 202 } 203 204 static void clear_cache_page(struct page_cache *cache, struct cached_page *page) 205 { 206 /* Do not clear read_pending because the read queue relies on it. */ 207 release_page_buffer(page); 208 page->physical_page = cache->indexable_pages; 209 WRITE_ONCE(page->last_used, 0); 210 } 211 212 static void make_page_most_recent(struct page_cache *cache, struct cached_page *page) 213 { 214 /* 215 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any 216 * thread holding the read_threads_mutex. 217 */ 218 if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used)) 219 WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock)); 220 } 221 222 /* Select a page to remove from the cache to make space for a new entry. */ 223 static struct cached_page *select_victim_in_cache(struct page_cache *cache) 224 { 225 struct cached_page *page; 226 int oldest_index = 0; 227 s64 oldest_time = S64_MAX; 228 s64 last_used; 229 u16 i; 230 231 /* Find the oldest unclaimed page. We hold the read_threads_mutex. */ 232 for (i = 0; i < cache->cache_slots; i++) { 233 /* A page with a pending read must not be replaced. */ 234 if (cache->cache[i].read_pending) 235 continue; 236 237 last_used = READ_ONCE(cache->cache[i].last_used); 238 if (last_used <= oldest_time) { 239 oldest_time = last_used; 240 oldest_index = i; 241 } 242 } 243 244 page = &cache->cache[oldest_index]; 245 if (page->physical_page != cache->indexable_pages) { 246 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots); 247 wait_for_pending_searches(cache, page->physical_page); 248 } 249 250 page->read_pending = true; 251 clear_cache_page(cache, page); 252 return page; 253 } 254 255 /* Make a newly filled cache entry available to other threads. */ 256 static int put_page_in_cache(struct page_cache *cache, u32 physical_page, 257 struct cached_page *page) 258 { 259 int result; 260 261 /* We hold the read_threads_mutex. */ 262 result = VDO_ASSERT((page->read_pending), "page to install has a pending read"); 263 if (result != VDO_SUCCESS) 264 return result; 265 266 page->physical_page = physical_page; 267 make_page_most_recent(cache, page); 268 page->read_pending = false; 269 270 /* 271 * We hold the read_threads_mutex, but we must have a write memory barrier before making 272 * the cached_page available to the readers that do not hold the mutex. The corresponding 273 * read memory barrier is in get_page_and_index(). 274 */ 275 smp_wmb(); 276 277 /* This assignment also clears the queued flag. */ 278 WRITE_ONCE(cache->index[physical_page], page - cache->cache); 279 return UDS_SUCCESS; 280 } 281 282 static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page, 283 struct cached_page *page) 284 { 285 int result; 286 287 /* We hold the read_threads_mutex. */ 288 result = VDO_ASSERT((page->read_pending), "page to install has a pending read"); 289 if (result != VDO_SUCCESS) 290 return; 291 292 clear_cache_page(cache, page); 293 page->read_pending = false; 294 295 /* Clear the mapping and the queued flag for the new page. */ 296 WRITE_ONCE(cache->index[physical_page], cache->cache_slots); 297 } 298 299 static inline u16 next_queue_position(u16 position) 300 { 301 return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS; 302 } 303 304 static inline void advance_queue_position(u16 *position) 305 { 306 *position = next_queue_position(*position); 307 } 308 309 static inline bool read_queue_is_full(struct page_cache *cache) 310 { 311 return cache->read_queue_first == next_queue_position(cache->read_queue_last); 312 } 313 314 static bool enqueue_read(struct page_cache *cache, struct uds_request *request, 315 u32 physical_page) 316 { 317 struct queued_read *queue_entry; 318 u16 last = cache->read_queue_last; 319 u16 read_queue_index; 320 321 /* We hold the read_threads_mutex. */ 322 if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) { 323 /* This page has no existing entry in the queue. */ 324 if (read_queue_is_full(cache)) 325 return false; 326 327 /* Fill in the read queue entry. */ 328 cache->read_queue[last].physical_page = physical_page; 329 cache->read_queue[last].invalid = false; 330 cache->read_queue[last].first_request = NULL; 331 cache->read_queue[last].last_request = NULL; 332 333 /* Point the cache index to the read queue entry. */ 334 read_queue_index = last; 335 WRITE_ONCE(cache->index[physical_page], 336 read_queue_index | VOLUME_CACHE_QUEUED_FLAG); 337 338 advance_queue_position(&cache->read_queue_last); 339 } else { 340 /* It's already queued, so add this request to the existing entry. */ 341 read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG; 342 } 343 344 request->next_request = NULL; 345 queue_entry = &cache->read_queue[read_queue_index]; 346 if (queue_entry->first_request == NULL) 347 queue_entry->first_request = request; 348 else 349 queue_entry->last_request->next_request = request; 350 queue_entry->last_request = request; 351 352 return true; 353 } 354 355 static void enqueue_page_read(struct volume *volume, struct uds_request *request, 356 u32 physical_page) 357 { 358 /* Mark the page as queued, so that chapter invalidation knows to cancel a read. */ 359 while (!enqueue_read(&volume->page_cache, request, physical_page)) { 360 vdo_log_debug("Read queue full, waiting for reads to finish"); 361 uds_wait_cond(&volume->read_threads_read_done_cond, 362 &volume->read_threads_mutex); 363 } 364 365 uds_signal_cond(&volume->read_threads_cond); 366 } 367 368 /* 369 * Reserve the next read queue entry for processing, but do not actually remove it from the queue. 370 * Must be followed by release_queued_requests(). 371 */ 372 static struct queued_read *reserve_read_queue_entry(struct page_cache *cache) 373 { 374 /* We hold the read_threads_mutex. */ 375 struct queued_read *entry; 376 u16 index_value; 377 bool queued; 378 379 /* No items to dequeue */ 380 if (cache->read_queue_next_read == cache->read_queue_last) 381 return NULL; 382 383 entry = &cache->read_queue[cache->read_queue_next_read]; 384 index_value = cache->index[entry->physical_page]; 385 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0; 386 /* Check to see if it's still queued before resetting. */ 387 if (entry->invalid && queued) 388 WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots); 389 390 /* 391 * If a synchronous read has taken this page, set invalid to true so it doesn't get 392 * overwritten. Requests will just be requeued. 393 */ 394 if (!queued) 395 entry->invalid = true; 396 397 entry->reserved = true; 398 advance_queue_position(&cache->read_queue_next_read); 399 return entry; 400 } 401 402 static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume) 403 { 404 struct queued_read *queue_entry = NULL; 405 406 while (!volume->read_threads_exiting) { 407 queue_entry = reserve_read_queue_entry(&volume->page_cache); 408 if (queue_entry != NULL) 409 break; 410 411 uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex); 412 } 413 414 return queue_entry; 415 } 416 417 static int init_chapter_index_page(const struct volume *volume, u8 *index_page, 418 u32 chapter, u32 index_page_number, 419 struct delta_index_page *chapter_index_page) 420 { 421 u64 ci_virtual; 422 u32 ci_chapter; 423 u32 lowest_list; 424 u32 highest_list; 425 struct index_geometry *geometry = volume->geometry; 426 int result; 427 428 result = uds_initialize_chapter_index_page(chapter_index_page, geometry, 429 index_page, volume->nonce); 430 if (volume->lookup_mode == LOOKUP_FOR_REBUILD) 431 return result; 432 433 if (result != UDS_SUCCESS) { 434 return vdo_log_error_strerror(result, 435 "Reading chapter index page for chapter %u page %u", 436 chapter, index_page_number); 437 } 438 439 uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number, 440 &lowest_list, &highest_list); 441 ci_virtual = chapter_index_page->virtual_chapter_number; 442 ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual); 443 if ((chapter == ci_chapter) && 444 (lowest_list == chapter_index_page->lowest_list_number) && 445 (highest_list == chapter_index_page->highest_list_number)) 446 return UDS_SUCCESS; 447 448 vdo_log_warning("Index page map updated to %llu", 449 (unsigned long long) volume->index_page_map->last_update); 450 vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u", 451 chapter, index_page_number, lowest_list, highest_list, 452 (unsigned long long) ci_virtual, 453 chapter_index_page->lowest_list_number, 454 chapter_index_page->highest_list_number); 455 return vdo_log_error_strerror(UDS_CORRUPT_DATA, 456 "index page map mismatch with chapter index"); 457 } 458 459 static int initialize_index_page(const struct volume *volume, u32 physical_page, 460 struct cached_page *page) 461 { 462 u32 chapter = map_to_chapter_number(volume->geometry, physical_page); 463 u32 index_page_number = map_to_page_number(volume->geometry, physical_page); 464 465 return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer), 466 chapter, index_page_number, &page->index_page); 467 } 468 469 static bool search_record_page(const u8 record_page[], 470 const struct uds_record_name *name, 471 const struct index_geometry *geometry, 472 struct uds_record_data *metadata) 473 { 474 /* 475 * The array of records is sorted by name and stored as a binary tree in heap order, so the 476 * root of the tree is the first array element. 477 */ 478 u32 node = 0; 479 const struct uds_volume_record *records = (const struct uds_volume_record *) record_page; 480 481 while (node < geometry->records_per_page) { 482 int result; 483 const struct uds_volume_record *record = &records[node]; 484 485 result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE); 486 if (result == 0) { 487 if (metadata != NULL) 488 *metadata = record->data; 489 return true; 490 } 491 492 /* The children of node N are at indexes 2N+1 and 2N+2. */ 493 node = ((2 * node) + ((result < 0) ? 1 : 2)); 494 } 495 496 return false; 497 } 498 499 /* 500 * If we've read in a record page, we're going to do an immediate search, to speed up processing by 501 * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If 502 * we've read in an index page, we save the record page number so we don't have to resolve the 503 * index page again. We use the location, virtual_chapter, and old_metadata fields in the request 504 * to allow the index code to know where to begin processing the request again. 505 */ 506 static int search_page(struct cached_page *page, const struct volume *volume, 507 struct uds_request *request, u32 physical_page) 508 { 509 int result; 510 enum uds_index_region location; 511 u16 record_page_number; 512 513 if (is_record_page(volume->geometry, physical_page)) { 514 if (search_record_page(dm_bufio_get_block_data(page->buffer), 515 &request->record_name, volume->geometry, 516 &request->old_metadata)) 517 location = UDS_LOCATION_RECORD_PAGE_LOOKUP; 518 else 519 location = UDS_LOCATION_UNAVAILABLE; 520 } else { 521 result = uds_search_chapter_index_page(&page->index_page, 522 volume->geometry, 523 &request->record_name, 524 &record_page_number); 525 if (result != UDS_SUCCESS) 526 return result; 527 528 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) { 529 location = UDS_LOCATION_UNAVAILABLE; 530 } else { 531 location = UDS_LOCATION_INDEX_PAGE_LOOKUP; 532 *((u16 *) &request->old_metadata) = record_page_number; 533 } 534 } 535 536 request->location = location; 537 request->found = false; 538 return UDS_SUCCESS; 539 } 540 541 static int process_entry(struct volume *volume, struct queued_read *entry) 542 { 543 u32 page_number = entry->physical_page; 544 struct uds_request *request; 545 struct cached_page *page = NULL; 546 u8 *page_data; 547 int result; 548 549 if (entry->invalid) { 550 vdo_log_debug("Requeuing requests for invalid page"); 551 return UDS_SUCCESS; 552 } 553 554 page = select_victim_in_cache(&volume->page_cache); 555 556 mutex_unlock(&volume->read_threads_mutex); 557 page_data = dm_bufio_read(volume->client, page_number, &page->buffer); 558 mutex_lock(&volume->read_threads_mutex); 559 if (IS_ERR(page_data)) { 560 result = -PTR_ERR(page_data); 561 vdo_log_warning_strerror(result, 562 "error reading physical page %u from volume", 563 page_number); 564 cancel_page_in_cache(&volume->page_cache, page_number, page); 565 return result; 566 } 567 568 if (entry->invalid) { 569 vdo_log_warning("Page %u invalidated after read", page_number); 570 cancel_page_in_cache(&volume->page_cache, page_number, page); 571 return UDS_SUCCESS; 572 } 573 574 if (!is_record_page(volume->geometry, page_number)) { 575 result = initialize_index_page(volume, page_number, page); 576 if (result != UDS_SUCCESS) { 577 vdo_log_warning("Error initializing chapter index page"); 578 cancel_page_in_cache(&volume->page_cache, page_number, page); 579 return result; 580 } 581 } 582 583 result = put_page_in_cache(&volume->page_cache, page_number, page); 584 if (result != UDS_SUCCESS) { 585 vdo_log_warning("Error putting page %u in cache", page_number); 586 cancel_page_in_cache(&volume->page_cache, page_number, page); 587 return result; 588 } 589 590 request = entry->first_request; 591 while ((request != NULL) && (result == UDS_SUCCESS)) { 592 result = search_page(page, volume, request, page_number); 593 request = request->next_request; 594 } 595 596 return result; 597 } 598 599 static void release_queued_requests(struct volume *volume, struct queued_read *entry, 600 int result) 601 { 602 struct page_cache *cache = &volume->page_cache; 603 u16 next_read = cache->read_queue_next_read; 604 struct uds_request *request; 605 struct uds_request *next; 606 607 for (request = entry->first_request; request != NULL; request = next) { 608 next = request->next_request; 609 request->status = result; 610 request->requeued = true; 611 uds_enqueue_request(request, STAGE_INDEX); 612 } 613 614 entry->reserved = false; 615 616 /* Move the read_queue_first pointer as far as we can. */ 617 while ((cache->read_queue_first != next_read) && 618 (!cache->read_queue[cache->read_queue_first].reserved)) 619 advance_queue_position(&cache->read_queue_first); 620 uds_broadcast_cond(&volume->read_threads_read_done_cond); 621 } 622 623 static void read_thread_function(void *arg) 624 { 625 struct volume *volume = arg; 626 627 vdo_log_debug("reader starting"); 628 mutex_lock(&volume->read_threads_mutex); 629 while (true) { 630 struct queued_read *queue_entry; 631 int result; 632 633 queue_entry = wait_to_reserve_read_queue_entry(volume); 634 if (volume->read_threads_exiting) 635 break; 636 637 result = process_entry(volume, queue_entry); 638 release_queued_requests(volume, queue_entry, result); 639 } 640 mutex_unlock(&volume->read_threads_mutex); 641 vdo_log_debug("reader done"); 642 } 643 644 static void get_page_and_index(struct page_cache *cache, u32 physical_page, 645 int *queue_index, struct cached_page **page_ptr) 646 { 647 u16 index_value; 648 u16 index; 649 bool queued; 650 651 /* 652 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any 653 * thread holding the read_threads_mutex. 654 * 655 * Holding only a search_pending_counter is the most frequent case. 656 */ 657 /* 658 * It would be unlikely for the compiler to turn the usage of index_value into two reads of 659 * cache->index, but it would be possible and very bad if those reads did not return the 660 * same bits. 661 */ 662 index_value = READ_ONCE(cache->index[physical_page]); 663 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0; 664 index = index_value & ~VOLUME_CACHE_QUEUED_FLAG; 665 666 if (!queued && (index < cache->cache_slots)) { 667 *page_ptr = &cache->cache[index]; 668 /* 669 * We have acquired access to the cached page, but unless we hold the 670 * read_threads_mutex, we need a read memory barrier now. The corresponding write 671 * memory barrier is in put_page_in_cache(). 672 */ 673 smp_rmb(); 674 } else { 675 *page_ptr = NULL; 676 } 677 678 *queue_index = queued ? index : -1; 679 } 680 681 static void get_page_from_cache(struct page_cache *cache, u32 physical_page, 682 struct cached_page **page) 683 { 684 /* 685 * ASSERTION: We are in a zone thread. 686 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex. 687 */ 688 int queue_index = -1; 689 690 get_page_and_index(cache, physical_page, &queue_index, page); 691 } 692 693 static int read_page_locked(struct volume *volume, u32 physical_page, 694 struct cached_page **page_ptr) 695 { 696 int result = UDS_SUCCESS; 697 struct cached_page *page = NULL; 698 u8 *page_data; 699 700 page = select_victim_in_cache(&volume->page_cache); 701 page_data = dm_bufio_read(volume->client, physical_page, &page->buffer); 702 if (IS_ERR(page_data)) { 703 result = -PTR_ERR(page_data); 704 vdo_log_warning_strerror(result, 705 "error reading physical page %u from volume", 706 physical_page); 707 cancel_page_in_cache(&volume->page_cache, physical_page, page); 708 return result; 709 } 710 711 if (!is_record_page(volume->geometry, physical_page)) { 712 result = initialize_index_page(volume, physical_page, page); 713 if (result != UDS_SUCCESS) { 714 if (volume->lookup_mode != LOOKUP_FOR_REBUILD) 715 vdo_log_warning("Corrupt index page %u", physical_page); 716 cancel_page_in_cache(&volume->page_cache, physical_page, page); 717 return result; 718 } 719 } 720 721 result = put_page_in_cache(&volume->page_cache, physical_page, page); 722 if (result != UDS_SUCCESS) { 723 vdo_log_warning("Error putting page %u in cache", physical_page); 724 cancel_page_in_cache(&volume->page_cache, physical_page, page); 725 return result; 726 } 727 728 *page_ptr = page; 729 return UDS_SUCCESS; 730 } 731 732 /* Retrieve a page from the cache while holding the read threads mutex. */ 733 static int get_volume_page_locked(struct volume *volume, u32 physical_page, 734 struct cached_page **page_ptr) 735 { 736 int result; 737 struct cached_page *page = NULL; 738 739 get_page_from_cache(&volume->page_cache, physical_page, &page); 740 if (page == NULL) { 741 result = read_page_locked(volume, physical_page, &page); 742 if (result != UDS_SUCCESS) 743 return result; 744 } else { 745 make_page_most_recent(&volume->page_cache, page); 746 } 747 748 *page_ptr = page; 749 return UDS_SUCCESS; 750 } 751 752 /* Retrieve a page from the cache while holding a search_pending lock. */ 753 static int get_volume_page_protected(struct volume *volume, struct uds_request *request, 754 u32 physical_page, struct cached_page **page_ptr) 755 { 756 struct cached_page *page; 757 unsigned int zone_number = request->zone_number; 758 759 get_page_from_cache(&volume->page_cache, physical_page, &page); 760 if (page != NULL) { 761 if (zone_number == 0) { 762 /* Only one zone is allowed to update the LRU. */ 763 make_page_most_recent(&volume->page_cache, page); 764 } 765 766 *page_ptr = page; 767 return UDS_SUCCESS; 768 } 769 770 /* Prepare to enqueue a read for the page. */ 771 end_pending_search(&volume->page_cache, zone_number); 772 mutex_lock(&volume->read_threads_mutex); 773 774 /* 775 * Do the lookup again while holding the read mutex (no longer the fast case so this should 776 * be fine to repeat). We need to do this because a page may have been added to the cache 777 * by a reader thread between the time we searched above and the time we went to actually 778 * try to enqueue it below. This could result in us enqueuing another read for a page which 779 * is already in the cache, which would mean we end up with two entries in the cache for 780 * the same page. 781 */ 782 get_page_from_cache(&volume->page_cache, physical_page, &page); 783 if (page == NULL) { 784 enqueue_page_read(volume, request, physical_page); 785 /* 786 * The performance gain from unlocking first, while "search pending" mode is off, 787 * turns out to be significant in some cases. The page is not available yet so 788 * the order does not matter for correctness as it does below. 789 */ 790 mutex_unlock(&volume->read_threads_mutex); 791 begin_pending_search(&volume->page_cache, physical_page, zone_number); 792 return UDS_QUEUED; 793 } 794 795 /* 796 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and 797 * "search pending" state in careful order so no other thread can mess with the data before 798 * the caller gets to look at it. 799 */ 800 begin_pending_search(&volume->page_cache, physical_page, zone_number); 801 mutex_unlock(&volume->read_threads_mutex); 802 *page_ptr = page; 803 return UDS_SUCCESS; 804 } 805 806 static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number, 807 struct cached_page **page_ptr) 808 { 809 int result; 810 u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number); 811 812 mutex_lock(&volume->read_threads_mutex); 813 result = get_volume_page_locked(volume, physical_page, page_ptr); 814 mutex_unlock(&volume->read_threads_mutex); 815 return result; 816 } 817 818 int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number, 819 u8 **data_ptr) 820 { 821 int result; 822 struct cached_page *page = NULL; 823 824 result = get_volume_page(volume, chapter, page_number, &page); 825 if (result == UDS_SUCCESS) 826 *data_ptr = dm_bufio_get_block_data(page->buffer); 827 return result; 828 } 829 830 int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number, 831 struct delta_index_page **index_page_ptr) 832 { 833 int result; 834 struct cached_page *page = NULL; 835 836 result = get_volume_page(volume, chapter, page_number, &page); 837 if (result == UDS_SUCCESS) 838 *index_page_ptr = &page->index_page; 839 return result; 840 } 841 842 /* 843 * Find the record page associated with a name in a given index page. This will return UDS_QUEUED 844 * if the page in question must be read from storage. 845 */ 846 static int search_cached_index_page(struct volume *volume, struct uds_request *request, 847 u32 chapter, u32 index_page_number, 848 u16 *record_page_number) 849 { 850 int result; 851 struct cached_page *page = NULL; 852 unsigned int zone_number = request->zone_number; 853 u32 physical_page = map_to_physical_page(volume->geometry, chapter, 854 index_page_number); 855 856 /* 857 * Make sure the invalidate counter is updated before we try and read the mapping. This 858 * prevents this thread from reading a page in the cache which has already been marked for 859 * invalidation by the reader thread, before the reader thread has noticed that the 860 * invalidate_counter has been incremented. 861 */ 862 begin_pending_search(&volume->page_cache, physical_page, zone_number); 863 864 result = get_volume_page_protected(volume, request, physical_page, &page); 865 if (result != UDS_SUCCESS) { 866 end_pending_search(&volume->page_cache, zone_number); 867 return result; 868 } 869 870 result = uds_search_chapter_index_page(&page->index_page, volume->geometry, 871 &request->record_name, 872 record_page_number); 873 end_pending_search(&volume->page_cache, zone_number); 874 return result; 875 } 876 877 /* 878 * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if 879 * the page in question must be read from storage. 880 */ 881 int uds_search_cached_record_page(struct volume *volume, struct uds_request *request, 882 u32 chapter, u16 record_page_number, bool *found) 883 { 884 struct cached_page *record_page; 885 struct index_geometry *geometry = volume->geometry; 886 unsigned int zone_number = request->zone_number; 887 int result; 888 u32 physical_page, page_number; 889 890 *found = false; 891 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) 892 return UDS_SUCCESS; 893 894 result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter, 895 "0 <= %d < %u", record_page_number, 896 geometry->record_pages_per_chapter); 897 if (result != VDO_SUCCESS) 898 return result; 899 900 page_number = geometry->index_pages_per_chapter + record_page_number; 901 902 physical_page = map_to_physical_page(volume->geometry, chapter, page_number); 903 904 /* 905 * Make sure the invalidate counter is updated before we try and read the mapping. This 906 * prevents this thread from reading a page in the cache which has already been marked for 907 * invalidation by the reader thread, before the reader thread has noticed that the 908 * invalidate_counter has been incremented. 909 */ 910 begin_pending_search(&volume->page_cache, physical_page, zone_number); 911 912 result = get_volume_page_protected(volume, request, physical_page, &record_page); 913 if (result != UDS_SUCCESS) { 914 end_pending_search(&volume->page_cache, zone_number); 915 return result; 916 } 917 918 if (search_record_page(dm_bufio_get_block_data(record_page->buffer), 919 &request->record_name, geometry, &request->old_metadata)) 920 *found = true; 921 922 end_pending_search(&volume->page_cache, zone_number); 923 return UDS_SUCCESS; 924 } 925 926 void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter) 927 { 928 const struct index_geometry *geometry = volume->geometry; 929 u32 physical_page = map_to_physical_page(geometry, chapter, 0); 930 931 dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter); 932 } 933 934 int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter, 935 struct dm_buffer *volume_buffers[], 936 struct delta_index_page index_pages[]) 937 { 938 int result; 939 u32 i; 940 const struct index_geometry *geometry = volume->geometry; 941 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter); 942 u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0); 943 944 dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter); 945 for (i = 0; i < geometry->index_pages_per_chapter; i++) { 946 u8 *index_page; 947 948 index_page = dm_bufio_read(volume->client, physical_page + i, 949 &volume_buffers[i]); 950 if (IS_ERR(index_page)) { 951 result = -PTR_ERR(index_page); 952 vdo_log_warning_strerror(result, 953 "error reading physical page %u", 954 physical_page); 955 return result; 956 } 957 958 result = init_chapter_index_page(volume, index_page, physical_chapter, i, 959 &index_pages[i]); 960 if (result != UDS_SUCCESS) 961 return result; 962 } 963 964 return UDS_SUCCESS; 965 } 966 967 int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request, 968 bool *found) 969 { 970 int result; 971 u32 physical_chapter = 972 uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter); 973 u32 index_page_number; 974 u16 record_page_number; 975 976 index_page_number = uds_find_index_page_number(volume->index_page_map, 977 &request->record_name, 978 physical_chapter); 979 980 if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) { 981 record_page_number = *((u16 *) &request->old_metadata); 982 } else { 983 result = search_cached_index_page(volume, request, physical_chapter, 984 index_page_number, 985 &record_page_number); 986 if (result != UDS_SUCCESS) 987 return result; 988 } 989 990 return uds_search_cached_record_page(volume, request, physical_chapter, 991 record_page_number, found); 992 } 993 994 int uds_search_volume_page_cache_for_rebuild(struct volume *volume, 995 const struct uds_record_name *name, 996 u64 virtual_chapter, bool *found) 997 { 998 int result; 999 struct index_geometry *geometry = volume->geometry; 1000 struct cached_page *page; 1001 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter); 1002 u32 index_page_number; 1003 u16 record_page_number; 1004 u32 page_number; 1005 1006 *found = false; 1007 index_page_number = 1008 uds_find_index_page_number(volume->index_page_map, name, 1009 physical_chapter); 1010 result = get_volume_page(volume, physical_chapter, index_page_number, &page); 1011 if (result != UDS_SUCCESS) 1012 return result; 1013 1014 result = uds_search_chapter_index_page(&page->index_page, geometry, name, 1015 &record_page_number); 1016 if (result != UDS_SUCCESS) 1017 return result; 1018 1019 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) 1020 return UDS_SUCCESS; 1021 1022 page_number = geometry->index_pages_per_chapter + record_page_number; 1023 result = get_volume_page(volume, physical_chapter, page_number, &page); 1024 if (result != UDS_SUCCESS) 1025 return result; 1026 1027 *found = search_record_page(dm_bufio_get_block_data(page->buffer), name, 1028 geometry, NULL); 1029 return UDS_SUCCESS; 1030 } 1031 1032 static void invalidate_page(struct page_cache *cache, u32 physical_page) 1033 { 1034 struct cached_page *page; 1035 int queue_index = -1; 1036 1037 /* We hold the read_threads_mutex. */ 1038 get_page_and_index(cache, physical_page, &queue_index, &page); 1039 if (page != NULL) { 1040 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots); 1041 wait_for_pending_searches(cache, page->physical_page); 1042 clear_cache_page(cache, page); 1043 } else if (queue_index > -1) { 1044 vdo_log_debug("setting pending read to invalid"); 1045 cache->read_queue[queue_index].invalid = true; 1046 } 1047 } 1048 1049 void uds_forget_chapter(struct volume *volume, u64 virtual_chapter) 1050 { 1051 u32 physical_chapter = 1052 uds_map_to_physical_chapter(volume->geometry, virtual_chapter); 1053 u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0); 1054 u32 i; 1055 1056 vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter); 1057 mutex_lock(&volume->read_threads_mutex); 1058 for (i = 0; i < volume->geometry->pages_per_chapter; i++) 1059 invalidate_page(&volume->page_cache, first_page + i); 1060 mutex_unlock(&volume->read_threads_mutex); 1061 } 1062 1063 /* 1064 * Donate an index pages from a newly written chapter to the page cache since it is likely to be 1065 * used again soon. The caller must already hold the reader thread mutex. 1066 */ 1067 static int donate_index_page_locked(struct volume *volume, u32 physical_chapter, 1068 u32 index_page_number, struct dm_buffer *page_buffer) 1069 { 1070 int result; 1071 struct cached_page *page = NULL; 1072 u32 physical_page = 1073 map_to_physical_page(volume->geometry, physical_chapter, 1074 index_page_number); 1075 1076 page = select_victim_in_cache(&volume->page_cache); 1077 page->buffer = page_buffer; 1078 result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer), 1079 physical_chapter, index_page_number, 1080 &page->index_page); 1081 if (result != UDS_SUCCESS) { 1082 vdo_log_warning("Error initialize chapter index page"); 1083 cancel_page_in_cache(&volume->page_cache, physical_page, page); 1084 return result; 1085 } 1086 1087 result = put_page_in_cache(&volume->page_cache, physical_page, page); 1088 if (result != UDS_SUCCESS) { 1089 vdo_log_warning("Error putting page %u in cache", physical_page); 1090 cancel_page_in_cache(&volume->page_cache, physical_page, page); 1091 return result; 1092 } 1093 1094 return UDS_SUCCESS; 1095 } 1096 1097 static int write_index_pages(struct volume *volume, u32 physical_chapter_number, 1098 struct open_chapter_index *chapter_index) 1099 { 1100 struct index_geometry *geometry = volume->geometry; 1101 struct dm_buffer *page_buffer; 1102 u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0); 1103 u32 delta_list_number = 0; 1104 u32 index_page_number; 1105 1106 for (index_page_number = 0; 1107 index_page_number < geometry->index_pages_per_chapter; 1108 index_page_number++) { 1109 u8 *page_data; 1110 u32 physical_page = first_index_page + index_page_number; 1111 u32 lists_packed; 1112 bool last_page; 1113 int result; 1114 1115 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer); 1116 if (IS_ERR(page_data)) { 1117 return vdo_log_warning_strerror(-PTR_ERR(page_data), 1118 "failed to prepare index page"); 1119 } 1120 1121 last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter); 1122 result = uds_pack_open_chapter_index_page(chapter_index, page_data, 1123 delta_list_number, last_page, 1124 &lists_packed); 1125 if (result != UDS_SUCCESS) { 1126 dm_bufio_release(page_buffer); 1127 return vdo_log_warning_strerror(result, 1128 "failed to pack index page"); 1129 } 1130 1131 dm_bufio_mark_buffer_dirty(page_buffer); 1132 1133 if (lists_packed == 0) { 1134 vdo_log_debug("no delta lists packed on chapter %u page %u", 1135 physical_chapter_number, index_page_number); 1136 } else { 1137 delta_list_number += lists_packed; 1138 } 1139 1140 uds_update_index_page_map(volume->index_page_map, 1141 chapter_index->virtual_chapter_number, 1142 physical_chapter_number, index_page_number, 1143 delta_list_number - 1); 1144 1145 mutex_lock(&volume->read_threads_mutex); 1146 result = donate_index_page_locked(volume, physical_chapter_number, 1147 index_page_number, page_buffer); 1148 mutex_unlock(&volume->read_threads_mutex); 1149 if (result != UDS_SUCCESS) { 1150 dm_bufio_release(page_buffer); 1151 return result; 1152 } 1153 } 1154 1155 return UDS_SUCCESS; 1156 } 1157 1158 static u32 encode_tree(u8 record_page[], 1159 const struct uds_volume_record *sorted_pointers[], 1160 u32 next_record, u32 node, u32 node_count) 1161 { 1162 if (node < node_count) { 1163 u32 child = (2 * node) + 1; 1164 1165 next_record = encode_tree(record_page, sorted_pointers, next_record, 1166 child, node_count); 1167 1168 /* 1169 * In-order traversal: copy the contents of the next record into the page at the 1170 * node offset. 1171 */ 1172 memcpy(&record_page[node * BYTES_PER_RECORD], 1173 sorted_pointers[next_record++], BYTES_PER_RECORD); 1174 1175 next_record = encode_tree(record_page, sorted_pointers, next_record, 1176 child + 1, node_count); 1177 } 1178 1179 return next_record; 1180 } 1181 1182 static int encode_record_page(const struct volume *volume, 1183 const struct uds_volume_record records[], u8 record_page[]) 1184 { 1185 int result; 1186 u32 i; 1187 u32 records_per_page = volume->geometry->records_per_page; 1188 const struct uds_volume_record **record_pointers = volume->record_pointers; 1189 1190 for (i = 0; i < records_per_page; i++) 1191 record_pointers[i] = &records[i]; 1192 1193 /* 1194 * Sort the record pointers by using just the names in the records, which is less work than 1195 * sorting the entire record values. 1196 */ 1197 BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0); 1198 result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers, 1199 records_per_page, UDS_RECORD_NAME_SIZE); 1200 if (result != UDS_SUCCESS) 1201 return result; 1202 1203 encode_tree(record_page, record_pointers, 0, 0, records_per_page); 1204 return UDS_SUCCESS; 1205 } 1206 1207 static int write_record_pages(struct volume *volume, u32 physical_chapter_number, 1208 const struct uds_volume_record *records) 1209 { 1210 u32 record_page_number; 1211 struct index_geometry *geometry = volume->geometry; 1212 struct dm_buffer *page_buffer; 1213 const struct uds_volume_record *next_record = records; 1214 u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number, 1215 geometry->index_pages_per_chapter); 1216 1217 for (record_page_number = 0; 1218 record_page_number < geometry->record_pages_per_chapter; 1219 record_page_number++) { 1220 u8 *page_data; 1221 u32 physical_page = first_record_page + record_page_number; 1222 int result; 1223 1224 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer); 1225 if (IS_ERR(page_data)) { 1226 return vdo_log_warning_strerror(-PTR_ERR(page_data), 1227 "failed to prepare record page"); 1228 } 1229 1230 result = encode_record_page(volume, next_record, page_data); 1231 if (result != UDS_SUCCESS) { 1232 dm_bufio_release(page_buffer); 1233 return vdo_log_warning_strerror(result, 1234 "failed to encode record page %u", 1235 record_page_number); 1236 } 1237 1238 next_record += geometry->records_per_page; 1239 dm_bufio_mark_buffer_dirty(page_buffer); 1240 dm_bufio_release(page_buffer); 1241 } 1242 1243 return UDS_SUCCESS; 1244 } 1245 1246 int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index, 1247 const struct uds_volume_record *records) 1248 { 1249 int result; 1250 u32 physical_chapter_number = 1251 uds_map_to_physical_chapter(volume->geometry, 1252 chapter_index->virtual_chapter_number); 1253 1254 result = write_index_pages(volume, physical_chapter_number, chapter_index); 1255 if (result != UDS_SUCCESS) 1256 return result; 1257 1258 result = write_record_pages(volume, physical_chapter_number, records); 1259 if (result != UDS_SUCCESS) 1260 return result; 1261 1262 result = -dm_bufio_write_dirty_buffers(volume->client); 1263 if (result != UDS_SUCCESS) 1264 vdo_log_error_strerror(result, "cannot sync chapter to volume"); 1265 1266 return result; 1267 } 1268 1269 static void probe_chapter(struct volume *volume, u32 chapter_number, 1270 u64 *virtual_chapter_number) 1271 { 1272 const struct index_geometry *geometry = volume->geometry; 1273 u32 expected_list_number = 0; 1274 u32 i; 1275 u64 vcn = BAD_CHAPTER; 1276 1277 *virtual_chapter_number = BAD_CHAPTER; 1278 dm_bufio_prefetch(volume->client, 1279 map_to_physical_page(geometry, chapter_number, 0), 1280 geometry->index_pages_per_chapter); 1281 1282 for (i = 0; i < geometry->index_pages_per_chapter; i++) { 1283 struct delta_index_page *page; 1284 int result; 1285 1286 result = uds_get_volume_index_page(volume, chapter_number, i, &page); 1287 if (result != UDS_SUCCESS) 1288 return; 1289 1290 if (page->virtual_chapter_number == BAD_CHAPTER) { 1291 vdo_log_error("corrupt index page in chapter %u", 1292 chapter_number); 1293 return; 1294 } 1295 1296 if (vcn == BAD_CHAPTER) { 1297 vcn = page->virtual_chapter_number; 1298 } else if (page->virtual_chapter_number != vcn) { 1299 vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu", 1300 chapter_number, i, (unsigned long long) vcn, 1301 (unsigned long long) page->virtual_chapter_number); 1302 return; 1303 } 1304 1305 if (expected_list_number != page->lowest_list_number) { 1306 vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u", 1307 chapter_number, i, expected_list_number, 1308 page->lowest_list_number); 1309 return; 1310 } 1311 expected_list_number = page->highest_list_number + 1; 1312 1313 result = uds_validate_chapter_index_page(page, geometry); 1314 if (result != UDS_SUCCESS) 1315 return; 1316 } 1317 1318 if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) { 1319 vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number, 1320 (unsigned long long) vcn, geometry->chapters_per_volume); 1321 return; 1322 } 1323 1324 *virtual_chapter_number = vcn; 1325 } 1326 1327 /* Find the last valid physical chapter in the volume. */ 1328 static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr) 1329 { 1330 u32 span = 1; 1331 u32 tries = 0; 1332 1333 while (limit > 0) { 1334 u32 chapter = (span > limit) ? 0 : limit - span; 1335 u64 vcn = 0; 1336 1337 probe_chapter(volume, chapter, &vcn); 1338 if (vcn == BAD_CHAPTER) { 1339 limit = chapter; 1340 if (++tries > 1) 1341 span *= 2; 1342 } else { 1343 if (span == 1) 1344 break; 1345 span /= 2; 1346 tries = 0; 1347 } 1348 } 1349 1350 *limit_ptr = limit; 1351 } 1352 1353 static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn, 1354 u64 *highest_vcn) 1355 { 1356 struct index_geometry *geometry = volume->geometry; 1357 u64 zero_vcn; 1358 u64 lowest = BAD_CHAPTER; 1359 u64 highest = BAD_CHAPTER; 1360 u64 moved_chapter = BAD_CHAPTER; 1361 u32 left_chapter = 0; 1362 u32 right_chapter = 0; 1363 u32 bad_chapters = 0; 1364 1365 /* 1366 * This method assumes there is at most one run of contiguous bad chapters caused by 1367 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the 1368 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the 1369 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately 1370 * precedes the lowest one. 1371 */ 1372 1373 /* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */ 1374 probe_chapter(volume, 0, &zero_vcn); 1375 1376 /* 1377 * Binary search for end of the discontinuity in the monotonically increasing virtual 1378 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're 1379 * searching for the index of the smallest value less than zero_vcn. In the case we go off 1380 * the end it means that chapter 0 has the lowest vcn. 1381 * 1382 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always 1383 * skip over the moved chapter when searching, adding it to the range at the end if 1384 * necessary. 1385 */ 1386 if (geometry->remapped_physical > 0) { 1387 u64 remapped_vcn; 1388 1389 probe_chapter(volume, geometry->remapped_physical, &remapped_vcn); 1390 if (remapped_vcn == geometry->remapped_virtual) 1391 moved_chapter = geometry->remapped_physical; 1392 } 1393 1394 left_chapter = 0; 1395 right_chapter = chapter_limit; 1396 1397 while (left_chapter < right_chapter) { 1398 u64 probe_vcn; 1399 u32 chapter = (left_chapter + right_chapter) / 2; 1400 1401 if (chapter == moved_chapter) 1402 chapter--; 1403 1404 probe_chapter(volume, chapter, &probe_vcn); 1405 if (zero_vcn <= probe_vcn) { 1406 left_chapter = chapter + 1; 1407 if (left_chapter == moved_chapter) 1408 left_chapter++; 1409 } else { 1410 right_chapter = chapter; 1411 } 1412 } 1413 1414 /* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/ 1415 if (left_chapter >= chapter_limit) 1416 left_chapter = 0; 1417 1418 /* At this point, left_chapter is the chapter with the lowest virtual chapter number. */ 1419 probe_chapter(volume, left_chapter, &lowest); 1420 1421 /* The moved chapter might be the lowest in the range. */ 1422 if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1)) 1423 lowest = geometry->remapped_virtual; 1424 1425 /* 1426 * Circularly scan backwards, moving over any bad chapters until encountering a good one, 1427 * which is the chapter with the highest vcn. 1428 */ 1429 while (highest == BAD_CHAPTER) { 1430 right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit; 1431 if (right_chapter == moved_chapter) 1432 continue; 1433 1434 probe_chapter(volume, right_chapter, &highest); 1435 if (bad_chapters++ >= MAX_BAD_CHAPTERS) { 1436 vdo_log_error("too many bad chapters in volume: %u", 1437 bad_chapters); 1438 return UDS_CORRUPT_DATA; 1439 } 1440 } 1441 1442 *lowest_vcn = lowest; 1443 *highest_vcn = highest; 1444 return UDS_SUCCESS; 1445 } 1446 1447 /* 1448 * Find the highest and lowest contiguous chapters present in the volume and determine their 1449 * virtual chapter numbers. This is used by rebuild. 1450 */ 1451 int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn, 1452 u64 *highest_vcn, bool *is_empty) 1453 { 1454 u32 chapter_limit = volume->geometry->chapters_per_volume; 1455 1456 find_real_end_of_volume(volume, chapter_limit, &chapter_limit); 1457 if (chapter_limit == 0) { 1458 *lowest_vcn = 0; 1459 *highest_vcn = 0; 1460 *is_empty = true; 1461 return UDS_SUCCESS; 1462 } 1463 1464 *is_empty = false; 1465 return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn); 1466 } 1467 1468 int __must_check uds_replace_volume_storage(struct volume *volume, 1469 struct index_layout *layout, 1470 struct block_device *bdev) 1471 { 1472 int result; 1473 u32 i; 1474 1475 result = uds_replace_index_layout_storage(layout, bdev); 1476 if (result != UDS_SUCCESS) 1477 return result; 1478 1479 /* Release all outstanding dm_bufio objects */ 1480 for (i = 0; i < volume->page_cache.indexable_pages; i++) 1481 volume->page_cache.index[i] = volume->page_cache.cache_slots; 1482 for (i = 0; i < volume->page_cache.cache_slots; i++) 1483 clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]); 1484 if (volume->sparse_cache != NULL) 1485 uds_invalidate_sparse_cache(volume->sparse_cache); 1486 if (volume->client != NULL) 1487 dm_bufio_client_destroy(vdo_forget(volume->client)); 1488 1489 return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page, 1490 volume->reserved_buffers, &volume->client); 1491 } 1492 1493 static int __must_check initialize_page_cache(struct page_cache *cache, 1494 const struct index_geometry *geometry, 1495 u32 chapters_in_cache, 1496 unsigned int zone_count) 1497 { 1498 int result; 1499 u32 i; 1500 1501 cache->indexable_pages = geometry->pages_per_volume + 1; 1502 cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter; 1503 cache->zone_count = zone_count; 1504 atomic64_set(&cache->clock, 1); 1505 1506 result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES), 1507 "requested cache size, %u, within limit %u", 1508 cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES); 1509 if (result != VDO_SUCCESS) 1510 return result; 1511 1512 result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read, 1513 "volume read queue", &cache->read_queue); 1514 if (result != VDO_SUCCESS) 1515 return result; 1516 1517 result = vdo_allocate(cache->zone_count, struct search_pending_counter, 1518 "Volume Cache Zones", &cache->search_pending_counters); 1519 if (result != VDO_SUCCESS) 1520 return result; 1521 1522 result = vdo_allocate(cache->indexable_pages, u16, "page cache index", 1523 &cache->index); 1524 if (result != VDO_SUCCESS) 1525 return result; 1526 1527 result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache", 1528 &cache->cache); 1529 if (result != VDO_SUCCESS) 1530 return result; 1531 1532 /* Initialize index values to invalid values. */ 1533 for (i = 0; i < cache->indexable_pages; i++) 1534 cache->index[i] = cache->cache_slots; 1535 1536 for (i = 0; i < cache->cache_slots; i++) 1537 clear_cache_page(cache, &cache->cache[i]); 1538 1539 return UDS_SUCCESS; 1540 } 1541 1542 int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout, 1543 struct volume **new_volume) 1544 { 1545 unsigned int i; 1546 struct volume *volume = NULL; 1547 struct index_geometry *geometry; 1548 unsigned int reserved_buffers; 1549 int result; 1550 1551 result = vdo_allocate(1, struct volume, "volume", &volume); 1552 if (result != VDO_SUCCESS) 1553 return result; 1554 1555 volume->nonce = uds_get_volume_nonce(layout); 1556 1557 result = uds_copy_index_geometry(config->geometry, &volume->geometry); 1558 if (result != UDS_SUCCESS) { 1559 uds_free_volume(volume); 1560 return vdo_log_warning_strerror(result, 1561 "failed to allocate geometry: error"); 1562 } 1563 geometry = volume->geometry; 1564 1565 /* 1566 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one 1567 * for each entry in the sparse cache. 1568 */ 1569 reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter; 1570 reserved_buffers += 1; 1571 if (uds_is_sparse_index_geometry(geometry)) 1572 reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter); 1573 volume->reserved_buffers = reserved_buffers; 1574 result = uds_open_volume_bufio(layout, geometry->bytes_per_page, 1575 volume->reserved_buffers, &volume->client); 1576 if (result != UDS_SUCCESS) { 1577 uds_free_volume(volume); 1578 return result; 1579 } 1580 1581 result = uds_make_radix_sorter(geometry->records_per_page, 1582 &volume->radix_sorter); 1583 if (result != UDS_SUCCESS) { 1584 uds_free_volume(volume); 1585 return result; 1586 } 1587 1588 result = vdo_allocate(geometry->records_per_page, 1589 const struct uds_volume_record *, "record pointers", 1590 &volume->record_pointers); 1591 if (result != VDO_SUCCESS) { 1592 uds_free_volume(volume); 1593 return result; 1594 } 1595 1596 if (uds_is_sparse_index_geometry(geometry)) { 1597 size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page; 1598 1599 result = uds_make_sparse_cache(geometry, config->cache_chapters, 1600 config->zone_count, 1601 &volume->sparse_cache); 1602 if (result != UDS_SUCCESS) { 1603 uds_free_volume(volume); 1604 return result; 1605 } 1606 1607 volume->cache_size = 1608 page_size * geometry->index_pages_per_chapter * config->cache_chapters; 1609 } 1610 1611 result = initialize_page_cache(&volume->page_cache, geometry, 1612 config->cache_chapters, config->zone_count); 1613 if (result != UDS_SUCCESS) { 1614 uds_free_volume(volume); 1615 return result; 1616 } 1617 1618 volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page); 1619 result = uds_make_index_page_map(geometry, &volume->index_page_map); 1620 if (result != UDS_SUCCESS) { 1621 uds_free_volume(volume); 1622 return result; 1623 } 1624 1625 mutex_init(&volume->read_threads_mutex); 1626 uds_init_cond(&volume->read_threads_read_done_cond); 1627 uds_init_cond(&volume->read_threads_cond); 1628 1629 result = vdo_allocate(config->read_threads, struct thread *, "reader threads", 1630 &volume->reader_threads); 1631 if (result != VDO_SUCCESS) { 1632 uds_free_volume(volume); 1633 return result; 1634 } 1635 1636 for (i = 0; i < config->read_threads; i++) { 1637 result = vdo_create_thread(read_thread_function, (void *) volume, 1638 "reader", &volume->reader_threads[i]); 1639 if (result != VDO_SUCCESS) { 1640 uds_free_volume(volume); 1641 return result; 1642 } 1643 1644 volume->read_thread_count = i + 1; 1645 } 1646 1647 *new_volume = volume; 1648 return UDS_SUCCESS; 1649 } 1650 1651 static void uninitialize_page_cache(struct page_cache *cache) 1652 { 1653 u16 i; 1654 1655 if (cache->cache != NULL) { 1656 for (i = 0; i < cache->cache_slots; i++) 1657 release_page_buffer(&cache->cache[i]); 1658 } 1659 vdo_free(cache->index); 1660 vdo_free(cache->cache); 1661 vdo_free(cache->search_pending_counters); 1662 vdo_free(cache->read_queue); 1663 } 1664 1665 void uds_free_volume(struct volume *volume) 1666 { 1667 if (volume == NULL) 1668 return; 1669 1670 if (volume->reader_threads != NULL) { 1671 unsigned int i; 1672 1673 /* This works even if some threads weren't started. */ 1674 mutex_lock(&volume->read_threads_mutex); 1675 volume->read_threads_exiting = true; 1676 uds_broadcast_cond(&volume->read_threads_cond); 1677 mutex_unlock(&volume->read_threads_mutex); 1678 for (i = 0; i < volume->read_thread_count; i++) 1679 vdo_join_threads(volume->reader_threads[i]); 1680 vdo_free(volume->reader_threads); 1681 volume->reader_threads = NULL; 1682 } 1683 1684 /* Must destroy the client AFTER freeing the cached pages. */ 1685 uninitialize_page_cache(&volume->page_cache); 1686 uds_free_sparse_cache(volume->sparse_cache); 1687 if (volume->client != NULL) 1688 dm_bufio_client_destroy(vdo_forget(volume->client)); 1689 1690 uds_free_index_page_map(volume->index_page_map); 1691 uds_free_radix_sorter(volume->radix_sorter); 1692 vdo_free(volume->geometry); 1693 vdo_free(volume->record_pointers); 1694 vdo_free(volume); 1695 } 1696