1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "volume.h"
7
8 #include <linux/atomic.h>
9 #include <linux/dm-bufio.h>
10 #include <linux/err.h>
11
12 #include "errors.h"
13 #include "logger.h"
14 #include "memory-alloc.h"
15 #include "permassert.h"
16 #include "string-utils.h"
17 #include "thread-utils.h"
18
19 #include "chapter-index.h"
20 #include "config.h"
21 #include "geometry.h"
22 #include "hash-utils.h"
23 #include "index.h"
24 #include "sparse-cache.h"
25
26 /*
27 * The first block of the volume layout is reserved for the volume header, which is no longer used.
28 * The remainder of the volume is divided into chapters consisting of several pages of records, and
29 * several pages of static index to use to find those records. The index pages are recorded first,
30 * followed by the record pages. The chapters are written in order as they are filled, so the
31 * volume storage acts as a circular log of the most recent chapters, with each new chapter
32 * overwriting the oldest saved one.
33 *
34 * When a new chapter is filled and closed, the records from that chapter are sorted and
35 * interleaved in approximate temporal order, and assigned to record pages. Then a static delta
36 * index is generated to store which record page contains each record. The in-memory index page map
37 * is also updated to indicate which delta lists fall on each chapter index page. This means that
38 * when a record is read, the volume only has to load a single index page and a single record page,
39 * rather than search the entire chapter. These index and record pages are written to storage, and
40 * the index pages are transferred to the page cache under the theory that the most recently
41 * written chapter is likely to be accessed again soon.
42 *
43 * When reading a record, the volume index will indicate which chapter should contain it. The
44 * volume uses the index page map to determine which chapter index page needs to be loaded, and
45 * then reads the relevant record page number from the chapter index. Both index and record pages
46 * are stored in a page cache when read for the common case that subsequent records need the same
47 * pages. The page cache evicts the least recently accessed entries when caching new pages. In
48 * addition, the volume uses dm-bufio to manage access to the storage, which may allow for
49 * additional caching depending on available system resources.
50 *
51 * Record requests are handled from cached pages when possible. If a page needs to be read, it is
52 * placed on a queue along with the request that wants to read it. Any requests for the same page
53 * that arrive while the read is pending are added to the queue entry. A separate reader thread
54 * handles the queued reads, adding the page to the cache and updating any requests queued with it
55 * so they can continue processing. This allows the index zone threads to continue processing new
56 * requests rather than wait for the storage reads.
57 *
58 * When an index rebuild is necessary, the volume reads each stored chapter to determine which
59 * range of chapters contain valid records, so that those records can be used to reconstruct the
60 * in-memory volume index.
61 */
62
63 /* The maximum allowable number of contiguous bad chapters */
64 #define MAX_BAD_CHAPTERS 100
65 #define VOLUME_CACHE_MAX_ENTRIES (U16_MAX >> 1)
66 #define VOLUME_CACHE_QUEUED_FLAG (1 << 15)
67 #define VOLUME_CACHE_MAX_QUEUED_READS 4096
68
69 static const u64 BAD_CHAPTER = U64_MAX;
70
71 /*
72 * The invalidate counter is two 32 bits fields stored together atomically. The low order 32 bits
73 * are the physical page number of the cached page being read. The high order 32 bits are a
74 * sequence number. This value is written when the zone that owns it begins or completes a cache
75 * search. Any other thread will only read the counter in wait_for_pending_searches() while waiting
76 * to update the cache contents.
77 */
78 union invalidate_counter {
79 u64 value;
80 struct {
81 u32 page;
82 u32 counter;
83 };
84 };
85
map_to_page_number(struct index_geometry * geometry,u32 physical_page)86 static inline u32 map_to_page_number(struct index_geometry *geometry, u32 physical_page)
87 {
88 return (physical_page - HEADER_PAGES_PER_VOLUME) % geometry->pages_per_chapter;
89 }
90
map_to_chapter_number(struct index_geometry * geometry,u32 physical_page)91 static inline u32 map_to_chapter_number(struct index_geometry *geometry, u32 physical_page)
92 {
93 return (physical_page - HEADER_PAGES_PER_VOLUME) / geometry->pages_per_chapter;
94 }
95
is_record_page(struct index_geometry * geometry,u32 physical_page)96 static inline bool is_record_page(struct index_geometry *geometry, u32 physical_page)
97 {
98 return map_to_page_number(geometry, physical_page) >= geometry->index_pages_per_chapter;
99 }
100
map_to_physical_page(const struct index_geometry * geometry,u32 chapter,u32 page)101 static u32 map_to_physical_page(const struct index_geometry *geometry, u32 chapter, u32 page)
102 {
103 /* Page zero is the header page, so the first chapter index page is page one. */
104 return HEADER_PAGES_PER_VOLUME + (geometry->pages_per_chapter * chapter) + page;
105 }
106
get_invalidate_counter(struct page_cache * cache,unsigned int zone_number)107 static inline union invalidate_counter get_invalidate_counter(struct page_cache *cache,
108 unsigned int zone_number)
109 {
110 return (union invalidate_counter) {
111 .value = READ_ONCE(cache->search_pending_counters[zone_number].atomic_value),
112 };
113 }
114
set_invalidate_counter(struct page_cache * cache,unsigned int zone_number,union invalidate_counter invalidate_counter)115 static inline void set_invalidate_counter(struct page_cache *cache,
116 unsigned int zone_number,
117 union invalidate_counter invalidate_counter)
118 {
119 WRITE_ONCE(cache->search_pending_counters[zone_number].atomic_value,
120 invalidate_counter.value);
121 }
122
search_pending(union invalidate_counter invalidate_counter)123 static inline bool search_pending(union invalidate_counter invalidate_counter)
124 {
125 return (invalidate_counter.counter & 1) != 0;
126 }
127
128 /* Lock the cache for a zone in order to search for a page. */
begin_pending_search(struct page_cache * cache,u32 physical_page,unsigned int zone_number)129 static void begin_pending_search(struct page_cache *cache, u32 physical_page,
130 unsigned int zone_number)
131 {
132 union invalidate_counter invalidate_counter =
133 get_invalidate_counter(cache, zone_number);
134
135 invalidate_counter.page = physical_page;
136 invalidate_counter.counter++;
137 set_invalidate_counter(cache, zone_number, invalidate_counter);
138 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
139 "Search is pending for zone %u", zone_number);
140 /*
141 * This memory barrier ensures that the write to the invalidate counter is seen by other
142 * threads before this thread accesses the cached page. The corresponding read memory
143 * barrier is in wait_for_pending_searches().
144 */
145 smp_mb();
146 }
147
148 /* Unlock the cache for a zone by clearing its invalidate counter. */
end_pending_search(struct page_cache * cache,unsigned int zone_number)149 static void end_pending_search(struct page_cache *cache, unsigned int zone_number)
150 {
151 union invalidate_counter invalidate_counter;
152
153 /*
154 * This memory barrier ensures that this thread completes reads of the
155 * cached page before other threads see the write to the invalidate
156 * counter.
157 */
158 smp_mb();
159
160 invalidate_counter = get_invalidate_counter(cache, zone_number);
161 VDO_ASSERT_LOG_ONLY(search_pending(invalidate_counter),
162 "Search is pending for zone %u", zone_number);
163 invalidate_counter.counter++;
164 set_invalidate_counter(cache, zone_number, invalidate_counter);
165 }
166
wait_for_pending_searches(struct page_cache * cache,u32 physical_page)167 static void wait_for_pending_searches(struct page_cache *cache, u32 physical_page)
168 {
169 union invalidate_counter initial_counters[MAX_ZONES];
170 unsigned int i;
171
172 /*
173 * We hold the read_threads_mutex. We are waiting for threads that do not hold the
174 * read_threads_mutex. Those threads have "locked" their targeted page by setting the
175 * search_pending_counter. The corresponding write memory barrier is in
176 * begin_pending_search().
177 */
178 smp_mb();
179
180 for (i = 0; i < cache->zone_count; i++)
181 initial_counters[i] = get_invalidate_counter(cache, i);
182 for (i = 0; i < cache->zone_count; i++) {
183 if (search_pending(initial_counters[i]) &&
184 (initial_counters[i].page == physical_page)) {
185 /*
186 * There is an active search using the physical page. We need to wait for
187 * the search to finish.
188 *
189 * FIXME: Investigate using wait_event() to wait for the search to finish.
190 */
191 while (initial_counters[i].value ==
192 get_invalidate_counter(cache, i).value)
193 cond_resched();
194 }
195 }
196 }
197
release_page_buffer(struct cached_page * page)198 static void release_page_buffer(struct cached_page *page)
199 {
200 if (page->buffer != NULL)
201 dm_bufio_release(vdo_forget(page->buffer));
202 }
203
clear_cache_page(struct page_cache * cache,struct cached_page * page)204 static void clear_cache_page(struct page_cache *cache, struct cached_page *page)
205 {
206 /* Do not clear read_pending because the read queue relies on it. */
207 release_page_buffer(page);
208 page->physical_page = cache->indexable_pages;
209 WRITE_ONCE(page->last_used, 0);
210 }
211
make_page_most_recent(struct page_cache * cache,struct cached_page * page)212 static void make_page_most_recent(struct page_cache *cache, struct cached_page *page)
213 {
214 /*
215 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
216 * thread holding the read_threads_mutex.
217 */
218 if (atomic64_read(&cache->clock) != READ_ONCE(page->last_used))
219 WRITE_ONCE(page->last_used, atomic64_inc_return(&cache->clock));
220 }
221
222 /* Select a page to remove from the cache to make space for a new entry. */
select_victim_in_cache(struct page_cache * cache)223 static struct cached_page *select_victim_in_cache(struct page_cache *cache)
224 {
225 struct cached_page *page;
226 int oldest_index = 0;
227 s64 oldest_time = S64_MAX;
228 s64 last_used;
229 u16 i;
230
231 /* Find the oldest unclaimed page. We hold the read_threads_mutex. */
232 for (i = 0; i < cache->cache_slots; i++) {
233 /* A page with a pending read must not be replaced. */
234 if (cache->cache[i].read_pending)
235 continue;
236
237 last_used = READ_ONCE(cache->cache[i].last_used);
238 if (last_used <= oldest_time) {
239 oldest_time = last_used;
240 oldest_index = i;
241 }
242 }
243
244 page = &cache->cache[oldest_index];
245 if (page->physical_page != cache->indexable_pages) {
246 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
247 wait_for_pending_searches(cache, page->physical_page);
248 }
249
250 page->read_pending = true;
251 clear_cache_page(cache, page);
252 return page;
253 }
254
255 /* Make a newly filled cache entry available to other threads. */
put_page_in_cache(struct page_cache * cache,u32 physical_page,struct cached_page * page)256 static int put_page_in_cache(struct page_cache *cache, u32 physical_page,
257 struct cached_page *page)
258 {
259 int result;
260
261 /* We hold the read_threads_mutex. */
262 result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
263 if (result != VDO_SUCCESS)
264 return result;
265
266 page->physical_page = physical_page;
267 make_page_most_recent(cache, page);
268 page->read_pending = false;
269
270 /*
271 * We hold the read_threads_mutex, but we must have a write memory barrier before making
272 * the cached_page available to the readers that do not hold the mutex. The corresponding
273 * read memory barrier is in get_page_and_index().
274 */
275 smp_wmb();
276
277 /* This assignment also clears the queued flag. */
278 WRITE_ONCE(cache->index[physical_page], page - cache->cache);
279 return UDS_SUCCESS;
280 }
281
cancel_page_in_cache(struct page_cache * cache,u32 physical_page,struct cached_page * page)282 static void cancel_page_in_cache(struct page_cache *cache, u32 physical_page,
283 struct cached_page *page)
284 {
285 int result;
286
287 /* We hold the read_threads_mutex. */
288 result = VDO_ASSERT((page->read_pending), "page to install has a pending read");
289 if (result != VDO_SUCCESS)
290 return;
291
292 clear_cache_page(cache, page);
293 page->read_pending = false;
294
295 /* Clear the mapping and the queued flag for the new page. */
296 WRITE_ONCE(cache->index[physical_page], cache->cache_slots);
297 }
298
next_queue_position(u16 position)299 static inline u16 next_queue_position(u16 position)
300 {
301 return (position + 1) % VOLUME_CACHE_MAX_QUEUED_READS;
302 }
303
advance_queue_position(u16 * position)304 static inline void advance_queue_position(u16 *position)
305 {
306 *position = next_queue_position(*position);
307 }
308
read_queue_is_full(struct page_cache * cache)309 static inline bool read_queue_is_full(struct page_cache *cache)
310 {
311 return cache->read_queue_first == next_queue_position(cache->read_queue_last);
312 }
313
enqueue_read(struct page_cache * cache,struct uds_request * request,u32 physical_page)314 static bool enqueue_read(struct page_cache *cache, struct uds_request *request,
315 u32 physical_page)
316 {
317 struct queued_read *queue_entry;
318 u16 last = cache->read_queue_last;
319 u16 read_queue_index;
320
321 /* We hold the read_threads_mutex. */
322 if ((cache->index[physical_page] & VOLUME_CACHE_QUEUED_FLAG) == 0) {
323 /* This page has no existing entry in the queue. */
324 if (read_queue_is_full(cache))
325 return false;
326
327 /* Fill in the read queue entry. */
328 cache->read_queue[last].physical_page = physical_page;
329 cache->read_queue[last].invalid = false;
330 cache->read_queue[last].first_request = NULL;
331 cache->read_queue[last].last_request = NULL;
332
333 /* Point the cache index to the read queue entry. */
334 read_queue_index = last;
335 WRITE_ONCE(cache->index[physical_page],
336 read_queue_index | VOLUME_CACHE_QUEUED_FLAG);
337
338 advance_queue_position(&cache->read_queue_last);
339 } else {
340 /* It's already queued, so add this request to the existing entry. */
341 read_queue_index = cache->index[physical_page] & ~VOLUME_CACHE_QUEUED_FLAG;
342 }
343
344 request->next_request = NULL;
345 queue_entry = &cache->read_queue[read_queue_index];
346 if (queue_entry->first_request == NULL)
347 queue_entry->first_request = request;
348 else
349 queue_entry->last_request->next_request = request;
350 queue_entry->last_request = request;
351
352 return true;
353 }
354
enqueue_page_read(struct volume * volume,struct uds_request * request,u32 physical_page)355 static void enqueue_page_read(struct volume *volume, struct uds_request *request,
356 u32 physical_page)
357 {
358 /* Mark the page as queued, so that chapter invalidation knows to cancel a read. */
359 while (!enqueue_read(&volume->page_cache, request, physical_page)) {
360 vdo_log_debug("Read queue full, waiting for reads to finish");
361 uds_wait_cond(&volume->read_threads_read_done_cond,
362 &volume->read_threads_mutex);
363 }
364
365 uds_signal_cond(&volume->read_threads_cond);
366 }
367
368 /*
369 * Reserve the next read queue entry for processing, but do not actually remove it from the queue.
370 * Must be followed by release_queued_requests().
371 */
reserve_read_queue_entry(struct page_cache * cache)372 static struct queued_read *reserve_read_queue_entry(struct page_cache *cache)
373 {
374 /* We hold the read_threads_mutex. */
375 struct queued_read *entry;
376 u16 index_value;
377 bool queued;
378
379 /* No items to dequeue */
380 if (cache->read_queue_next_read == cache->read_queue_last)
381 return NULL;
382
383 entry = &cache->read_queue[cache->read_queue_next_read];
384 index_value = cache->index[entry->physical_page];
385 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
386 /* Check to see if it's still queued before resetting. */
387 if (entry->invalid && queued)
388 WRITE_ONCE(cache->index[entry->physical_page], cache->cache_slots);
389
390 /*
391 * If a synchronous read has taken this page, set invalid to true so it doesn't get
392 * overwritten. Requests will just be requeued.
393 */
394 if (!queued)
395 entry->invalid = true;
396
397 entry->reserved = true;
398 advance_queue_position(&cache->read_queue_next_read);
399 return entry;
400 }
401
wait_to_reserve_read_queue_entry(struct volume * volume)402 static inline struct queued_read *wait_to_reserve_read_queue_entry(struct volume *volume)
403 {
404 struct queued_read *queue_entry = NULL;
405
406 while (!volume->read_threads_exiting) {
407 queue_entry = reserve_read_queue_entry(&volume->page_cache);
408 if (queue_entry != NULL)
409 break;
410
411 uds_wait_cond(&volume->read_threads_cond, &volume->read_threads_mutex);
412 }
413
414 return queue_entry;
415 }
416
init_chapter_index_page(const struct volume * volume,u8 * index_page,u32 chapter,u32 index_page_number,struct delta_index_page * chapter_index_page)417 static int init_chapter_index_page(const struct volume *volume, u8 *index_page,
418 u32 chapter, u32 index_page_number,
419 struct delta_index_page *chapter_index_page)
420 {
421 u64 ci_virtual;
422 u32 ci_chapter;
423 u32 lowest_list;
424 u32 highest_list;
425 struct index_geometry *geometry = volume->geometry;
426 int result;
427
428 result = uds_initialize_chapter_index_page(chapter_index_page, geometry,
429 index_page, volume->nonce);
430 if (volume->lookup_mode == LOOKUP_FOR_REBUILD)
431 return result;
432
433 if (result != UDS_SUCCESS) {
434 return vdo_log_error_strerror(result,
435 "Reading chapter index page for chapter %u page %u",
436 chapter, index_page_number);
437 }
438
439 uds_get_list_number_bounds(volume->index_page_map, chapter, index_page_number,
440 &lowest_list, &highest_list);
441 ci_virtual = chapter_index_page->virtual_chapter_number;
442 ci_chapter = uds_map_to_physical_chapter(geometry, ci_virtual);
443 if ((chapter == ci_chapter) &&
444 (lowest_list == chapter_index_page->lowest_list_number) &&
445 (highest_list == chapter_index_page->highest_list_number))
446 return UDS_SUCCESS;
447
448 vdo_log_warning("Index page map updated to %llu",
449 (unsigned long long) volume->index_page_map->last_update);
450 vdo_log_warning("Page map expects that chapter %u page %u has range %u to %u, but chapter index page has chapter %llu with range %u to %u",
451 chapter, index_page_number, lowest_list, highest_list,
452 (unsigned long long) ci_virtual,
453 chapter_index_page->lowest_list_number,
454 chapter_index_page->highest_list_number);
455 return vdo_log_error_strerror(UDS_CORRUPT_DATA,
456 "index page map mismatch with chapter index");
457 }
458
initialize_index_page(const struct volume * volume,u32 physical_page,struct cached_page * page)459 static int initialize_index_page(const struct volume *volume, u32 physical_page,
460 struct cached_page *page)
461 {
462 u32 chapter = map_to_chapter_number(volume->geometry, physical_page);
463 u32 index_page_number = map_to_page_number(volume->geometry, physical_page);
464
465 return init_chapter_index_page(volume, dm_bufio_get_block_data(page->buffer),
466 chapter, index_page_number, &page->index_page);
467 }
468
search_record_page(const u8 record_page[],const struct uds_record_name * name,const struct index_geometry * geometry,struct uds_record_data * metadata)469 static bool search_record_page(const u8 record_page[],
470 const struct uds_record_name *name,
471 const struct index_geometry *geometry,
472 struct uds_record_data *metadata)
473 {
474 /*
475 * The array of records is sorted by name and stored as a binary tree in heap order, so the
476 * root of the tree is the first array element.
477 */
478 u32 node = 0;
479 const struct uds_volume_record *records = (const struct uds_volume_record *) record_page;
480
481 while (node < geometry->records_per_page) {
482 int result;
483 const struct uds_volume_record *record = &records[node];
484
485 result = memcmp(name, &record->name, UDS_RECORD_NAME_SIZE);
486 if (result == 0) {
487 if (metadata != NULL)
488 *metadata = record->data;
489 return true;
490 }
491
492 /* The children of node N are at indexes 2N+1 and 2N+2. */
493 node = ((2 * node) + ((result < 0) ? 1 : 2));
494 }
495
496 return false;
497 }
498
499 /*
500 * If we've read in a record page, we're going to do an immediate search, to speed up processing by
501 * avoiding get_record_from_zone(), and to ensure that requests make progress even when queued. If
502 * we've read in an index page, we save the record page number so we don't have to resolve the
503 * index page again. We use the location, virtual_chapter, and old_metadata fields in the request
504 * to allow the index code to know where to begin processing the request again.
505 */
search_page(struct cached_page * page,const struct volume * volume,struct uds_request * request,u32 physical_page)506 static int search_page(struct cached_page *page, const struct volume *volume,
507 struct uds_request *request, u32 physical_page)
508 {
509 int result;
510 enum uds_index_region location;
511 u16 record_page_number;
512
513 if (is_record_page(volume->geometry, physical_page)) {
514 if (search_record_page(dm_bufio_get_block_data(page->buffer),
515 &request->record_name, volume->geometry,
516 &request->old_metadata))
517 location = UDS_LOCATION_RECORD_PAGE_LOOKUP;
518 else
519 location = UDS_LOCATION_UNAVAILABLE;
520 } else {
521 result = uds_search_chapter_index_page(&page->index_page,
522 volume->geometry,
523 &request->record_name,
524 &record_page_number);
525 if (result != UDS_SUCCESS)
526 return result;
527
528 if (record_page_number == NO_CHAPTER_INDEX_ENTRY) {
529 location = UDS_LOCATION_UNAVAILABLE;
530 } else {
531 location = UDS_LOCATION_INDEX_PAGE_LOOKUP;
532 *((u16 *) &request->old_metadata) = record_page_number;
533 }
534 }
535
536 request->location = location;
537 request->found = false;
538 return UDS_SUCCESS;
539 }
540
process_entry(struct volume * volume,struct queued_read * entry)541 static int process_entry(struct volume *volume, struct queued_read *entry)
542 {
543 u32 page_number = entry->physical_page;
544 struct uds_request *request;
545 struct cached_page *page = NULL;
546 u8 *page_data;
547 int result;
548
549 if (entry->invalid) {
550 vdo_log_debug("Requeuing requests for invalid page");
551 return UDS_SUCCESS;
552 }
553
554 page = select_victim_in_cache(&volume->page_cache);
555
556 mutex_unlock(&volume->read_threads_mutex);
557 page_data = dm_bufio_read(volume->client, page_number, &page->buffer);
558 mutex_lock(&volume->read_threads_mutex);
559 if (IS_ERR(page_data)) {
560 result = -PTR_ERR(page_data);
561 vdo_log_warning_strerror(result,
562 "error reading physical page %u from volume",
563 page_number);
564 cancel_page_in_cache(&volume->page_cache, page_number, page);
565 return result;
566 }
567
568 if (entry->invalid) {
569 vdo_log_warning("Page %u invalidated after read", page_number);
570 cancel_page_in_cache(&volume->page_cache, page_number, page);
571 return UDS_SUCCESS;
572 }
573
574 if (!is_record_page(volume->geometry, page_number)) {
575 result = initialize_index_page(volume, page_number, page);
576 if (result != UDS_SUCCESS) {
577 vdo_log_warning("Error initializing chapter index page");
578 cancel_page_in_cache(&volume->page_cache, page_number, page);
579 return result;
580 }
581 }
582
583 result = put_page_in_cache(&volume->page_cache, page_number, page);
584 if (result != UDS_SUCCESS) {
585 vdo_log_warning("Error putting page %u in cache", page_number);
586 cancel_page_in_cache(&volume->page_cache, page_number, page);
587 return result;
588 }
589
590 request = entry->first_request;
591 while ((request != NULL) && (result == UDS_SUCCESS)) {
592 result = search_page(page, volume, request, page_number);
593 request = request->next_request;
594 }
595
596 return result;
597 }
598
release_queued_requests(struct volume * volume,struct queued_read * entry,int result)599 static void release_queued_requests(struct volume *volume, struct queued_read *entry,
600 int result)
601 {
602 struct page_cache *cache = &volume->page_cache;
603 u16 next_read = cache->read_queue_next_read;
604 struct uds_request *request;
605 struct uds_request *next;
606
607 for (request = entry->first_request; request != NULL; request = next) {
608 next = request->next_request;
609 request->status = result;
610 request->requeued = true;
611 uds_enqueue_request(request, STAGE_INDEX);
612 }
613
614 entry->reserved = false;
615
616 /* Move the read_queue_first pointer as far as we can. */
617 while ((cache->read_queue_first != next_read) &&
618 (!cache->read_queue[cache->read_queue_first].reserved))
619 advance_queue_position(&cache->read_queue_first);
620 uds_broadcast_cond(&volume->read_threads_read_done_cond);
621 }
622
read_thread_function(void * arg)623 static void read_thread_function(void *arg)
624 {
625 struct volume *volume = arg;
626
627 vdo_log_debug("reader starting");
628 mutex_lock(&volume->read_threads_mutex);
629 while (true) {
630 struct queued_read *queue_entry;
631 int result;
632
633 queue_entry = wait_to_reserve_read_queue_entry(volume);
634 if (volume->read_threads_exiting)
635 break;
636
637 result = process_entry(volume, queue_entry);
638 release_queued_requests(volume, queue_entry, result);
639 }
640 mutex_unlock(&volume->read_threads_mutex);
641 vdo_log_debug("reader done");
642 }
643
get_page_and_index(struct page_cache * cache,u32 physical_page,int * queue_index,struct cached_page ** page_ptr)644 static void get_page_and_index(struct page_cache *cache, u32 physical_page,
645 int *queue_index, struct cached_page **page_ptr)
646 {
647 u16 index_value;
648 u16 index;
649 bool queued;
650
651 /*
652 * ASSERTION: We are either a zone thread holding a search_pending_counter, or we are any
653 * thread holding the read_threads_mutex.
654 *
655 * Holding only a search_pending_counter is the most frequent case.
656 */
657 /*
658 * It would be unlikely for the compiler to turn the usage of index_value into two reads of
659 * cache->index, but it would be possible and very bad if those reads did not return the
660 * same bits.
661 */
662 index_value = READ_ONCE(cache->index[physical_page]);
663 queued = (index_value & VOLUME_CACHE_QUEUED_FLAG) != 0;
664 index = index_value & ~VOLUME_CACHE_QUEUED_FLAG;
665
666 if (!queued && (index < cache->cache_slots)) {
667 *page_ptr = &cache->cache[index];
668 /*
669 * We have acquired access to the cached page, but unless we hold the
670 * read_threads_mutex, we need a read memory barrier now. The corresponding write
671 * memory barrier is in put_page_in_cache().
672 */
673 smp_rmb();
674 } else {
675 *page_ptr = NULL;
676 }
677
678 *queue_index = queued ? index : -1;
679 }
680
get_page_from_cache(struct page_cache * cache,u32 physical_page,struct cached_page ** page)681 static void get_page_from_cache(struct page_cache *cache, u32 physical_page,
682 struct cached_page **page)
683 {
684 /*
685 * ASSERTION: We are in a zone thread.
686 * ASSERTION: We holding a search_pending_counter or the read_threads_mutex.
687 */
688 int queue_index = -1;
689
690 get_page_and_index(cache, physical_page, &queue_index, page);
691 }
692
read_page_locked(struct volume * volume,u32 physical_page,struct cached_page ** page_ptr)693 static int read_page_locked(struct volume *volume, u32 physical_page,
694 struct cached_page **page_ptr)
695 {
696 int result = UDS_SUCCESS;
697 struct cached_page *page = NULL;
698 u8 *page_data;
699
700 page = select_victim_in_cache(&volume->page_cache);
701 page_data = dm_bufio_read(volume->client, physical_page, &page->buffer);
702 if (IS_ERR(page_data)) {
703 result = -PTR_ERR(page_data);
704 vdo_log_warning_strerror(result,
705 "error reading physical page %u from volume",
706 physical_page);
707 cancel_page_in_cache(&volume->page_cache, physical_page, page);
708 return result;
709 }
710
711 if (!is_record_page(volume->geometry, physical_page)) {
712 result = initialize_index_page(volume, physical_page, page);
713 if (result != UDS_SUCCESS) {
714 if (volume->lookup_mode != LOOKUP_FOR_REBUILD)
715 vdo_log_warning("Corrupt index page %u", physical_page);
716 cancel_page_in_cache(&volume->page_cache, physical_page, page);
717 return result;
718 }
719 }
720
721 result = put_page_in_cache(&volume->page_cache, physical_page, page);
722 if (result != UDS_SUCCESS) {
723 vdo_log_warning("Error putting page %u in cache", physical_page);
724 cancel_page_in_cache(&volume->page_cache, physical_page, page);
725 return result;
726 }
727
728 *page_ptr = page;
729 return UDS_SUCCESS;
730 }
731
732 /* Retrieve a page from the cache while holding the read threads mutex. */
get_volume_page_locked(struct volume * volume,u32 physical_page,struct cached_page ** page_ptr)733 static int get_volume_page_locked(struct volume *volume, u32 physical_page,
734 struct cached_page **page_ptr)
735 {
736 int result;
737 struct cached_page *page = NULL;
738
739 get_page_from_cache(&volume->page_cache, physical_page, &page);
740 if (page == NULL) {
741 result = read_page_locked(volume, physical_page, &page);
742 if (result != UDS_SUCCESS)
743 return result;
744 } else {
745 make_page_most_recent(&volume->page_cache, page);
746 }
747
748 *page_ptr = page;
749 return UDS_SUCCESS;
750 }
751
752 /* Retrieve a page from the cache while holding a search_pending lock. */
get_volume_page_protected(struct volume * volume,struct uds_request * request,u32 physical_page,struct cached_page ** page_ptr)753 static int get_volume_page_protected(struct volume *volume, struct uds_request *request,
754 u32 physical_page, struct cached_page **page_ptr)
755 {
756 struct cached_page *page;
757 unsigned int zone_number = request->zone_number;
758
759 get_page_from_cache(&volume->page_cache, physical_page, &page);
760 if (page != NULL) {
761 if (zone_number == 0) {
762 /* Only one zone is allowed to update the LRU. */
763 make_page_most_recent(&volume->page_cache, page);
764 }
765
766 *page_ptr = page;
767 return UDS_SUCCESS;
768 }
769
770 /* Prepare to enqueue a read for the page. */
771 end_pending_search(&volume->page_cache, zone_number);
772 mutex_lock(&volume->read_threads_mutex);
773
774 /*
775 * Do the lookup again while holding the read mutex (no longer the fast case so this should
776 * be fine to repeat). We need to do this because a page may have been added to the cache
777 * by a reader thread between the time we searched above and the time we went to actually
778 * try to enqueue it below. This could result in us enqueuing another read for a page which
779 * is already in the cache, which would mean we end up with two entries in the cache for
780 * the same page.
781 */
782 get_page_from_cache(&volume->page_cache, physical_page, &page);
783 if (page == NULL) {
784 enqueue_page_read(volume, request, physical_page);
785 /*
786 * The performance gain from unlocking first, while "search pending" mode is off,
787 * turns out to be significant in some cases. The page is not available yet so
788 * the order does not matter for correctness as it does below.
789 */
790 mutex_unlock(&volume->read_threads_mutex);
791 begin_pending_search(&volume->page_cache, physical_page, zone_number);
792 return UDS_QUEUED;
793 }
794
795 /*
796 * Now that the page is loaded, the volume needs to switch to "reader thread unlocked" and
797 * "search pending" state in careful order so no other thread can mess with the data before
798 * the caller gets to look at it.
799 */
800 begin_pending_search(&volume->page_cache, physical_page, zone_number);
801 mutex_unlock(&volume->read_threads_mutex);
802 *page_ptr = page;
803 return UDS_SUCCESS;
804 }
805
get_volume_page(struct volume * volume,u32 chapter,u32 page_number,struct cached_page ** page_ptr)806 static int get_volume_page(struct volume *volume, u32 chapter, u32 page_number,
807 struct cached_page **page_ptr)
808 {
809 int result;
810 u32 physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
811
812 mutex_lock(&volume->read_threads_mutex);
813 result = get_volume_page_locked(volume, physical_page, page_ptr);
814 mutex_unlock(&volume->read_threads_mutex);
815 return result;
816 }
817
uds_get_volume_record_page(struct volume * volume,u32 chapter,u32 page_number,u8 ** data_ptr)818 int uds_get_volume_record_page(struct volume *volume, u32 chapter, u32 page_number,
819 u8 **data_ptr)
820 {
821 int result;
822 struct cached_page *page = NULL;
823
824 result = get_volume_page(volume, chapter, page_number, &page);
825 if (result == UDS_SUCCESS)
826 *data_ptr = dm_bufio_get_block_data(page->buffer);
827 return result;
828 }
829
uds_get_volume_index_page(struct volume * volume,u32 chapter,u32 page_number,struct delta_index_page ** index_page_ptr)830 int uds_get_volume_index_page(struct volume *volume, u32 chapter, u32 page_number,
831 struct delta_index_page **index_page_ptr)
832 {
833 int result;
834 struct cached_page *page = NULL;
835
836 result = get_volume_page(volume, chapter, page_number, &page);
837 if (result == UDS_SUCCESS)
838 *index_page_ptr = &page->index_page;
839 return result;
840 }
841
842 /*
843 * Find the record page associated with a name in a given index page. This will return UDS_QUEUED
844 * if the page in question must be read from storage.
845 */
search_cached_index_page(struct volume * volume,struct uds_request * request,u32 chapter,u32 index_page_number,u16 * record_page_number)846 static int search_cached_index_page(struct volume *volume, struct uds_request *request,
847 u32 chapter, u32 index_page_number,
848 u16 *record_page_number)
849 {
850 int result;
851 struct cached_page *page = NULL;
852 unsigned int zone_number = request->zone_number;
853 u32 physical_page = map_to_physical_page(volume->geometry, chapter,
854 index_page_number);
855
856 /*
857 * Make sure the invalidate counter is updated before we try and read the mapping. This
858 * prevents this thread from reading a page in the cache which has already been marked for
859 * invalidation by the reader thread, before the reader thread has noticed that the
860 * invalidate_counter has been incremented.
861 */
862 begin_pending_search(&volume->page_cache, physical_page, zone_number);
863
864 result = get_volume_page_protected(volume, request, physical_page, &page);
865 if (result != UDS_SUCCESS) {
866 end_pending_search(&volume->page_cache, zone_number);
867 return result;
868 }
869
870 result = uds_search_chapter_index_page(&page->index_page, volume->geometry,
871 &request->record_name,
872 record_page_number);
873 end_pending_search(&volume->page_cache, zone_number);
874 return result;
875 }
876
877 /*
878 * Find the metadata associated with a name in a given record page. This will return UDS_QUEUED if
879 * the page in question must be read from storage.
880 */
uds_search_cached_record_page(struct volume * volume,struct uds_request * request,u32 chapter,u16 record_page_number,bool * found)881 int uds_search_cached_record_page(struct volume *volume, struct uds_request *request,
882 u32 chapter, u16 record_page_number, bool *found)
883 {
884 struct cached_page *record_page;
885 struct index_geometry *geometry = volume->geometry;
886 unsigned int zone_number = request->zone_number;
887 int result;
888 u32 physical_page, page_number;
889
890 *found = false;
891 if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
892 return UDS_SUCCESS;
893
894 result = VDO_ASSERT(record_page_number < geometry->record_pages_per_chapter,
895 "0 <= %d < %u", record_page_number,
896 geometry->record_pages_per_chapter);
897 if (result != VDO_SUCCESS)
898 return result;
899
900 page_number = geometry->index_pages_per_chapter + record_page_number;
901
902 physical_page = map_to_physical_page(volume->geometry, chapter, page_number);
903
904 /*
905 * Make sure the invalidate counter is updated before we try and read the mapping. This
906 * prevents this thread from reading a page in the cache which has already been marked for
907 * invalidation by the reader thread, before the reader thread has noticed that the
908 * invalidate_counter has been incremented.
909 */
910 begin_pending_search(&volume->page_cache, physical_page, zone_number);
911
912 result = get_volume_page_protected(volume, request, physical_page, &record_page);
913 if (result != UDS_SUCCESS) {
914 end_pending_search(&volume->page_cache, zone_number);
915 return result;
916 }
917
918 if (search_record_page(dm_bufio_get_block_data(record_page->buffer),
919 &request->record_name, geometry, &request->old_metadata))
920 *found = true;
921
922 end_pending_search(&volume->page_cache, zone_number);
923 return UDS_SUCCESS;
924 }
925
uds_prefetch_volume_chapter(const struct volume * volume,u32 chapter)926 void uds_prefetch_volume_chapter(const struct volume *volume, u32 chapter)
927 {
928 const struct index_geometry *geometry = volume->geometry;
929 u32 physical_page = map_to_physical_page(geometry, chapter, 0);
930
931 dm_bufio_prefetch(volume->client, physical_page, geometry->pages_per_chapter);
932 }
933
uds_read_chapter_index_from_volume(const struct volume * volume,u64 virtual_chapter,struct dm_buffer * volume_buffers[],struct delta_index_page index_pages[])934 int uds_read_chapter_index_from_volume(const struct volume *volume, u64 virtual_chapter,
935 struct dm_buffer *volume_buffers[],
936 struct delta_index_page index_pages[])
937 {
938 int result;
939 u32 i;
940 const struct index_geometry *geometry = volume->geometry;
941 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
942 u32 physical_page = map_to_physical_page(geometry, physical_chapter, 0);
943
944 dm_bufio_prefetch(volume->client, physical_page, geometry->index_pages_per_chapter);
945 for (i = 0; i < geometry->index_pages_per_chapter; i++) {
946 u8 *index_page;
947
948 index_page = dm_bufio_read(volume->client, physical_page + i,
949 &volume_buffers[i]);
950 if (IS_ERR(index_page)) {
951 result = -PTR_ERR(index_page);
952 vdo_log_warning_strerror(result,
953 "error reading physical page %u",
954 physical_page);
955 return result;
956 }
957
958 result = init_chapter_index_page(volume, index_page, physical_chapter, i,
959 &index_pages[i]);
960 if (result != UDS_SUCCESS)
961 return result;
962 }
963
964 return UDS_SUCCESS;
965 }
966
uds_search_volume_page_cache(struct volume * volume,struct uds_request * request,bool * found)967 int uds_search_volume_page_cache(struct volume *volume, struct uds_request *request,
968 bool *found)
969 {
970 int result;
971 u32 physical_chapter =
972 uds_map_to_physical_chapter(volume->geometry, request->virtual_chapter);
973 u32 index_page_number;
974 u16 record_page_number;
975
976 index_page_number = uds_find_index_page_number(volume->index_page_map,
977 &request->record_name,
978 physical_chapter);
979
980 if (request->location == UDS_LOCATION_INDEX_PAGE_LOOKUP) {
981 record_page_number = *((u16 *) &request->old_metadata);
982 } else {
983 result = search_cached_index_page(volume, request, physical_chapter,
984 index_page_number,
985 &record_page_number);
986 if (result != UDS_SUCCESS)
987 return result;
988 }
989
990 return uds_search_cached_record_page(volume, request, physical_chapter,
991 record_page_number, found);
992 }
993
uds_search_volume_page_cache_for_rebuild(struct volume * volume,const struct uds_record_name * name,u64 virtual_chapter,bool * found)994 int uds_search_volume_page_cache_for_rebuild(struct volume *volume,
995 const struct uds_record_name *name,
996 u64 virtual_chapter, bool *found)
997 {
998 int result;
999 struct index_geometry *geometry = volume->geometry;
1000 struct cached_page *page;
1001 u32 physical_chapter = uds_map_to_physical_chapter(geometry, virtual_chapter);
1002 u32 index_page_number;
1003 u16 record_page_number;
1004 u32 page_number;
1005
1006 *found = false;
1007 index_page_number =
1008 uds_find_index_page_number(volume->index_page_map, name,
1009 physical_chapter);
1010 result = get_volume_page(volume, physical_chapter, index_page_number, &page);
1011 if (result != UDS_SUCCESS)
1012 return result;
1013
1014 result = uds_search_chapter_index_page(&page->index_page, geometry, name,
1015 &record_page_number);
1016 if (result != UDS_SUCCESS)
1017 return result;
1018
1019 if (record_page_number == NO_CHAPTER_INDEX_ENTRY)
1020 return UDS_SUCCESS;
1021
1022 page_number = geometry->index_pages_per_chapter + record_page_number;
1023 result = get_volume_page(volume, physical_chapter, page_number, &page);
1024 if (result != UDS_SUCCESS)
1025 return result;
1026
1027 *found = search_record_page(dm_bufio_get_block_data(page->buffer), name,
1028 geometry, NULL);
1029 return UDS_SUCCESS;
1030 }
1031
invalidate_page(struct page_cache * cache,u32 physical_page)1032 static void invalidate_page(struct page_cache *cache, u32 physical_page)
1033 {
1034 struct cached_page *page;
1035 int queue_index = -1;
1036
1037 /* We hold the read_threads_mutex. */
1038 get_page_and_index(cache, physical_page, &queue_index, &page);
1039 if (page != NULL) {
1040 WRITE_ONCE(cache->index[page->physical_page], cache->cache_slots);
1041 wait_for_pending_searches(cache, page->physical_page);
1042 clear_cache_page(cache, page);
1043 } else if (queue_index > -1) {
1044 vdo_log_debug("setting pending read to invalid");
1045 cache->read_queue[queue_index].invalid = true;
1046 }
1047 }
1048
uds_forget_chapter(struct volume * volume,u64 virtual_chapter)1049 void uds_forget_chapter(struct volume *volume, u64 virtual_chapter)
1050 {
1051 u32 physical_chapter =
1052 uds_map_to_physical_chapter(volume->geometry, virtual_chapter);
1053 u32 first_page = map_to_physical_page(volume->geometry, physical_chapter, 0);
1054 u32 i;
1055
1056 vdo_log_debug("forgetting chapter %llu", (unsigned long long) virtual_chapter);
1057 mutex_lock(&volume->read_threads_mutex);
1058 for (i = 0; i < volume->geometry->pages_per_chapter; i++)
1059 invalidate_page(&volume->page_cache, first_page + i);
1060 mutex_unlock(&volume->read_threads_mutex);
1061 }
1062
1063 /*
1064 * Donate an index pages from a newly written chapter to the page cache since it is likely to be
1065 * used again soon. The caller must already hold the reader thread mutex.
1066 */
donate_index_page_locked(struct volume * volume,u32 physical_chapter,u32 index_page_number,struct dm_buffer * page_buffer)1067 static int donate_index_page_locked(struct volume *volume, u32 physical_chapter,
1068 u32 index_page_number, struct dm_buffer *page_buffer)
1069 {
1070 int result;
1071 struct cached_page *page = NULL;
1072 u32 physical_page =
1073 map_to_physical_page(volume->geometry, physical_chapter,
1074 index_page_number);
1075
1076 page = select_victim_in_cache(&volume->page_cache);
1077 page->buffer = page_buffer;
1078 result = init_chapter_index_page(volume, dm_bufio_get_block_data(page_buffer),
1079 physical_chapter, index_page_number,
1080 &page->index_page);
1081 if (result != UDS_SUCCESS) {
1082 vdo_log_warning("Error initialize chapter index page");
1083 cancel_page_in_cache(&volume->page_cache, physical_page, page);
1084 return result;
1085 }
1086
1087 result = put_page_in_cache(&volume->page_cache, physical_page, page);
1088 if (result != UDS_SUCCESS) {
1089 vdo_log_warning("Error putting page %u in cache", physical_page);
1090 cancel_page_in_cache(&volume->page_cache, physical_page, page);
1091 return result;
1092 }
1093
1094 return UDS_SUCCESS;
1095 }
1096
write_index_pages(struct volume * volume,u32 physical_chapter_number,struct open_chapter_index * chapter_index)1097 static int write_index_pages(struct volume *volume, u32 physical_chapter_number,
1098 struct open_chapter_index *chapter_index)
1099 {
1100 struct index_geometry *geometry = volume->geometry;
1101 struct dm_buffer *page_buffer;
1102 u32 first_index_page = map_to_physical_page(geometry, physical_chapter_number, 0);
1103 u32 delta_list_number = 0;
1104 u32 index_page_number;
1105
1106 for (index_page_number = 0;
1107 index_page_number < geometry->index_pages_per_chapter;
1108 index_page_number++) {
1109 u8 *page_data;
1110 u32 physical_page = first_index_page + index_page_number;
1111 u32 lists_packed;
1112 bool last_page;
1113 int result;
1114
1115 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1116 if (IS_ERR(page_data)) {
1117 return vdo_log_warning_strerror(-PTR_ERR(page_data),
1118 "failed to prepare index page");
1119 }
1120
1121 last_page = ((index_page_number + 1) == geometry->index_pages_per_chapter);
1122 result = uds_pack_open_chapter_index_page(chapter_index, page_data,
1123 delta_list_number, last_page,
1124 &lists_packed);
1125 if (result != UDS_SUCCESS) {
1126 dm_bufio_release(page_buffer);
1127 return vdo_log_warning_strerror(result,
1128 "failed to pack index page");
1129 }
1130
1131 dm_bufio_mark_buffer_dirty(page_buffer);
1132
1133 if (lists_packed == 0) {
1134 vdo_log_debug("no delta lists packed on chapter %u page %u",
1135 physical_chapter_number, index_page_number);
1136 } else {
1137 delta_list_number += lists_packed;
1138 }
1139
1140 uds_update_index_page_map(volume->index_page_map,
1141 chapter_index->virtual_chapter_number,
1142 physical_chapter_number, index_page_number,
1143 delta_list_number - 1);
1144
1145 mutex_lock(&volume->read_threads_mutex);
1146 result = donate_index_page_locked(volume, physical_chapter_number,
1147 index_page_number, page_buffer);
1148 mutex_unlock(&volume->read_threads_mutex);
1149 if (result != UDS_SUCCESS) {
1150 dm_bufio_release(page_buffer);
1151 return result;
1152 }
1153 }
1154
1155 return UDS_SUCCESS;
1156 }
1157
encode_tree(u8 record_page[],const struct uds_volume_record * sorted_pointers[],u32 next_record,u32 node,u32 node_count)1158 static u32 encode_tree(u8 record_page[],
1159 const struct uds_volume_record *sorted_pointers[],
1160 u32 next_record, u32 node, u32 node_count)
1161 {
1162 if (node < node_count) {
1163 u32 child = (2 * node) + 1;
1164
1165 next_record = encode_tree(record_page, sorted_pointers, next_record,
1166 child, node_count);
1167
1168 /*
1169 * In-order traversal: copy the contents of the next record into the page at the
1170 * node offset.
1171 */
1172 memcpy(&record_page[node * BYTES_PER_RECORD],
1173 sorted_pointers[next_record++], BYTES_PER_RECORD);
1174
1175 next_record = encode_tree(record_page, sorted_pointers, next_record,
1176 child + 1, node_count);
1177 }
1178
1179 return next_record;
1180 }
1181
encode_record_page(const struct volume * volume,const struct uds_volume_record records[],u8 record_page[])1182 static int encode_record_page(const struct volume *volume,
1183 const struct uds_volume_record records[], u8 record_page[])
1184 {
1185 int result;
1186 u32 i;
1187 u32 records_per_page = volume->geometry->records_per_page;
1188 const struct uds_volume_record **record_pointers = volume->record_pointers;
1189
1190 for (i = 0; i < records_per_page; i++)
1191 record_pointers[i] = &records[i];
1192
1193 /*
1194 * Sort the record pointers by using just the names in the records, which is less work than
1195 * sorting the entire record values.
1196 */
1197 BUILD_BUG_ON(offsetof(struct uds_volume_record, name) != 0);
1198 result = uds_radix_sort(volume->radix_sorter, (const u8 **) record_pointers,
1199 records_per_page, UDS_RECORD_NAME_SIZE);
1200 if (result != UDS_SUCCESS)
1201 return result;
1202
1203 encode_tree(record_page, record_pointers, 0, 0, records_per_page);
1204 return UDS_SUCCESS;
1205 }
1206
write_record_pages(struct volume * volume,u32 physical_chapter_number,const struct uds_volume_record * records)1207 static int write_record_pages(struct volume *volume, u32 physical_chapter_number,
1208 const struct uds_volume_record *records)
1209 {
1210 u32 record_page_number;
1211 struct index_geometry *geometry = volume->geometry;
1212 struct dm_buffer *page_buffer;
1213 const struct uds_volume_record *next_record = records;
1214 u32 first_record_page = map_to_physical_page(geometry, physical_chapter_number,
1215 geometry->index_pages_per_chapter);
1216
1217 for (record_page_number = 0;
1218 record_page_number < geometry->record_pages_per_chapter;
1219 record_page_number++) {
1220 u8 *page_data;
1221 u32 physical_page = first_record_page + record_page_number;
1222 int result;
1223
1224 page_data = dm_bufio_new(volume->client, physical_page, &page_buffer);
1225 if (IS_ERR(page_data)) {
1226 return vdo_log_warning_strerror(-PTR_ERR(page_data),
1227 "failed to prepare record page");
1228 }
1229
1230 result = encode_record_page(volume, next_record, page_data);
1231 if (result != UDS_SUCCESS) {
1232 dm_bufio_release(page_buffer);
1233 return vdo_log_warning_strerror(result,
1234 "failed to encode record page %u",
1235 record_page_number);
1236 }
1237
1238 next_record += geometry->records_per_page;
1239 dm_bufio_mark_buffer_dirty(page_buffer);
1240 dm_bufio_release(page_buffer);
1241 }
1242
1243 return UDS_SUCCESS;
1244 }
1245
uds_write_chapter(struct volume * volume,struct open_chapter_index * chapter_index,const struct uds_volume_record * records)1246 int uds_write_chapter(struct volume *volume, struct open_chapter_index *chapter_index,
1247 const struct uds_volume_record *records)
1248 {
1249 int result;
1250 u32 physical_chapter_number =
1251 uds_map_to_physical_chapter(volume->geometry,
1252 chapter_index->virtual_chapter_number);
1253
1254 result = write_index_pages(volume, physical_chapter_number, chapter_index);
1255 if (result != UDS_SUCCESS)
1256 return result;
1257
1258 result = write_record_pages(volume, physical_chapter_number, records);
1259 if (result != UDS_SUCCESS)
1260 return result;
1261
1262 result = -dm_bufio_write_dirty_buffers(volume->client);
1263 if (result != UDS_SUCCESS)
1264 vdo_log_error_strerror(result, "cannot sync chapter to volume");
1265
1266 return result;
1267 }
1268
probe_chapter(struct volume * volume,u32 chapter_number,u64 * virtual_chapter_number)1269 static void probe_chapter(struct volume *volume, u32 chapter_number,
1270 u64 *virtual_chapter_number)
1271 {
1272 const struct index_geometry *geometry = volume->geometry;
1273 u32 expected_list_number = 0;
1274 u32 i;
1275 u64 vcn = BAD_CHAPTER;
1276
1277 *virtual_chapter_number = BAD_CHAPTER;
1278 dm_bufio_prefetch(volume->client,
1279 map_to_physical_page(geometry, chapter_number, 0),
1280 geometry->index_pages_per_chapter);
1281
1282 for (i = 0; i < geometry->index_pages_per_chapter; i++) {
1283 struct delta_index_page *page;
1284 int result;
1285
1286 result = uds_get_volume_index_page(volume, chapter_number, i, &page);
1287 if (result != UDS_SUCCESS)
1288 return;
1289
1290 if (page->virtual_chapter_number == BAD_CHAPTER) {
1291 vdo_log_error("corrupt index page in chapter %u",
1292 chapter_number);
1293 return;
1294 }
1295
1296 if (vcn == BAD_CHAPTER) {
1297 vcn = page->virtual_chapter_number;
1298 } else if (page->virtual_chapter_number != vcn) {
1299 vdo_log_error("inconsistent chapter %u index page %u: expected vcn %llu, got vcn %llu",
1300 chapter_number, i, (unsigned long long) vcn,
1301 (unsigned long long) page->virtual_chapter_number);
1302 return;
1303 }
1304
1305 if (expected_list_number != page->lowest_list_number) {
1306 vdo_log_error("inconsistent chapter %u index page %u: expected list number %u, got list number %u",
1307 chapter_number, i, expected_list_number,
1308 page->lowest_list_number);
1309 return;
1310 }
1311 expected_list_number = page->highest_list_number + 1;
1312
1313 result = uds_validate_chapter_index_page(page, geometry);
1314 if (result != UDS_SUCCESS)
1315 return;
1316 }
1317
1318 if (chapter_number != uds_map_to_physical_chapter(geometry, vcn)) {
1319 vdo_log_error("chapter %u vcn %llu is out of phase (%u)", chapter_number,
1320 (unsigned long long) vcn, geometry->chapters_per_volume);
1321 return;
1322 }
1323
1324 *virtual_chapter_number = vcn;
1325 }
1326
1327 /* Find the last valid physical chapter in the volume. */
find_real_end_of_volume(struct volume * volume,u32 limit,u32 * limit_ptr)1328 static void find_real_end_of_volume(struct volume *volume, u32 limit, u32 *limit_ptr)
1329 {
1330 u32 span = 1;
1331 u32 tries = 0;
1332
1333 while (limit > 0) {
1334 u32 chapter = (span > limit) ? 0 : limit - span;
1335 u64 vcn = 0;
1336
1337 probe_chapter(volume, chapter, &vcn);
1338 if (vcn == BAD_CHAPTER) {
1339 limit = chapter;
1340 if (++tries > 1)
1341 span *= 2;
1342 } else {
1343 if (span == 1)
1344 break;
1345 span /= 2;
1346 tries = 0;
1347 }
1348 }
1349
1350 *limit_ptr = limit;
1351 }
1352
find_chapter_limits(struct volume * volume,u32 chapter_limit,u64 * lowest_vcn,u64 * highest_vcn)1353 static int find_chapter_limits(struct volume *volume, u32 chapter_limit, u64 *lowest_vcn,
1354 u64 *highest_vcn)
1355 {
1356 struct index_geometry *geometry = volume->geometry;
1357 u64 zero_vcn;
1358 u64 lowest = BAD_CHAPTER;
1359 u64 highest = BAD_CHAPTER;
1360 u64 moved_chapter = BAD_CHAPTER;
1361 u32 left_chapter = 0;
1362 u32 right_chapter = 0;
1363 u32 bad_chapters = 0;
1364
1365 /*
1366 * This method assumes there is at most one run of contiguous bad chapters caused by
1367 * unflushed writes. Either the bad spot is at the beginning and end, or somewhere in the
1368 * middle. Wherever it is, the highest and lowest VCNs are adjacent to it. Otherwise the
1369 * volume is cleanly saved and somewhere in the middle of it the highest VCN immediately
1370 * precedes the lowest one.
1371 */
1372
1373 /* It doesn't matter if this results in a bad spot (BAD_CHAPTER). */
1374 probe_chapter(volume, 0, &zero_vcn);
1375
1376 /*
1377 * Binary search for end of the discontinuity in the monotonically increasing virtual
1378 * chapter numbers; bad spots are treated as a span of BAD_CHAPTER values. In effect we're
1379 * searching for the index of the smallest value less than zero_vcn. In the case we go off
1380 * the end it means that chapter 0 has the lowest vcn.
1381 *
1382 * If a virtual chapter is out-of-order, it will be the one moved by conversion. Always
1383 * skip over the moved chapter when searching, adding it to the range at the end if
1384 * necessary.
1385 */
1386 if (geometry->remapped_physical > 0) {
1387 u64 remapped_vcn;
1388
1389 probe_chapter(volume, geometry->remapped_physical, &remapped_vcn);
1390 if (remapped_vcn == geometry->remapped_virtual)
1391 moved_chapter = geometry->remapped_physical;
1392 }
1393
1394 left_chapter = 0;
1395 right_chapter = chapter_limit;
1396
1397 while (left_chapter < right_chapter) {
1398 u64 probe_vcn;
1399 u32 chapter = (left_chapter + right_chapter) / 2;
1400
1401 if (chapter == moved_chapter)
1402 chapter--;
1403
1404 probe_chapter(volume, chapter, &probe_vcn);
1405 if (zero_vcn <= probe_vcn) {
1406 left_chapter = chapter + 1;
1407 if (left_chapter == moved_chapter)
1408 left_chapter++;
1409 } else {
1410 right_chapter = chapter;
1411 }
1412 }
1413
1414 /* If left_chapter goes off the end, chapter 0 has the lowest virtual chapter number.*/
1415 if (left_chapter >= chapter_limit)
1416 left_chapter = 0;
1417
1418 /* At this point, left_chapter is the chapter with the lowest virtual chapter number. */
1419 probe_chapter(volume, left_chapter, &lowest);
1420
1421 /* The moved chapter might be the lowest in the range. */
1422 if ((moved_chapter != BAD_CHAPTER) && (lowest == geometry->remapped_virtual + 1))
1423 lowest = geometry->remapped_virtual;
1424
1425 /*
1426 * Circularly scan backwards, moving over any bad chapters until encountering a good one,
1427 * which is the chapter with the highest vcn.
1428 */
1429 while (highest == BAD_CHAPTER) {
1430 right_chapter = (right_chapter + chapter_limit - 1) % chapter_limit;
1431 if (right_chapter == moved_chapter)
1432 continue;
1433
1434 probe_chapter(volume, right_chapter, &highest);
1435 if (bad_chapters++ >= MAX_BAD_CHAPTERS) {
1436 vdo_log_error("too many bad chapters in volume: %u",
1437 bad_chapters);
1438 return UDS_CORRUPT_DATA;
1439 }
1440 }
1441
1442 *lowest_vcn = lowest;
1443 *highest_vcn = highest;
1444 return UDS_SUCCESS;
1445 }
1446
1447 /*
1448 * Find the highest and lowest contiguous chapters present in the volume and determine their
1449 * virtual chapter numbers. This is used by rebuild.
1450 */
uds_find_volume_chapter_boundaries(struct volume * volume,u64 * lowest_vcn,u64 * highest_vcn,bool * is_empty)1451 int uds_find_volume_chapter_boundaries(struct volume *volume, u64 *lowest_vcn,
1452 u64 *highest_vcn, bool *is_empty)
1453 {
1454 u32 chapter_limit = volume->geometry->chapters_per_volume;
1455
1456 find_real_end_of_volume(volume, chapter_limit, &chapter_limit);
1457 if (chapter_limit == 0) {
1458 *lowest_vcn = 0;
1459 *highest_vcn = 0;
1460 *is_empty = true;
1461 return UDS_SUCCESS;
1462 }
1463
1464 *is_empty = false;
1465 return find_chapter_limits(volume, chapter_limit, lowest_vcn, highest_vcn);
1466 }
1467
uds_replace_volume_storage(struct volume * volume,struct index_layout * layout,struct block_device * bdev)1468 int __must_check uds_replace_volume_storage(struct volume *volume,
1469 struct index_layout *layout,
1470 struct block_device *bdev)
1471 {
1472 int result;
1473 u32 i;
1474
1475 result = uds_replace_index_layout_storage(layout, bdev);
1476 if (result != UDS_SUCCESS)
1477 return result;
1478
1479 /* Release all outstanding dm_bufio objects */
1480 for (i = 0; i < volume->page_cache.indexable_pages; i++)
1481 volume->page_cache.index[i] = volume->page_cache.cache_slots;
1482 for (i = 0; i < volume->page_cache.cache_slots; i++)
1483 clear_cache_page(&volume->page_cache, &volume->page_cache.cache[i]);
1484 if (volume->sparse_cache != NULL)
1485 uds_invalidate_sparse_cache(volume->sparse_cache);
1486 if (volume->client != NULL)
1487 dm_bufio_client_destroy(vdo_forget(volume->client));
1488
1489 return uds_open_volume_bufio(layout, volume->geometry->bytes_per_page,
1490 volume->reserved_buffers, &volume->client);
1491 }
1492
initialize_page_cache(struct page_cache * cache,const struct index_geometry * geometry,u32 chapters_in_cache,unsigned int zone_count)1493 static int __must_check initialize_page_cache(struct page_cache *cache,
1494 const struct index_geometry *geometry,
1495 u32 chapters_in_cache,
1496 unsigned int zone_count)
1497 {
1498 int result;
1499 u32 i;
1500
1501 cache->indexable_pages = geometry->pages_per_volume + 1;
1502 cache->cache_slots = chapters_in_cache * geometry->record_pages_per_chapter;
1503 cache->zone_count = zone_count;
1504 atomic64_set(&cache->clock, 1);
1505
1506 result = VDO_ASSERT((cache->cache_slots <= VOLUME_CACHE_MAX_ENTRIES),
1507 "requested cache size, %u, within limit %u",
1508 cache->cache_slots, VOLUME_CACHE_MAX_ENTRIES);
1509 if (result != VDO_SUCCESS)
1510 return result;
1511
1512 result = vdo_allocate(VOLUME_CACHE_MAX_QUEUED_READS, struct queued_read,
1513 "volume read queue", &cache->read_queue);
1514 if (result != VDO_SUCCESS)
1515 return result;
1516
1517 result = vdo_allocate(cache->zone_count, struct search_pending_counter,
1518 "Volume Cache Zones", &cache->search_pending_counters);
1519 if (result != VDO_SUCCESS)
1520 return result;
1521
1522 result = vdo_allocate(cache->indexable_pages, u16, "page cache index",
1523 &cache->index);
1524 if (result != VDO_SUCCESS)
1525 return result;
1526
1527 result = vdo_allocate(cache->cache_slots, struct cached_page, "page cache cache",
1528 &cache->cache);
1529 if (result != VDO_SUCCESS)
1530 return result;
1531
1532 /* Initialize index values to invalid values. */
1533 for (i = 0; i < cache->indexable_pages; i++)
1534 cache->index[i] = cache->cache_slots;
1535
1536 for (i = 0; i < cache->cache_slots; i++)
1537 clear_cache_page(cache, &cache->cache[i]);
1538
1539 return UDS_SUCCESS;
1540 }
1541
uds_make_volume(const struct uds_configuration * config,struct index_layout * layout,struct volume ** new_volume)1542 int uds_make_volume(const struct uds_configuration *config, struct index_layout *layout,
1543 struct volume **new_volume)
1544 {
1545 unsigned int i;
1546 struct volume *volume = NULL;
1547 struct index_geometry *geometry;
1548 unsigned int reserved_buffers;
1549 int result;
1550
1551 result = vdo_allocate(1, struct volume, "volume", &volume);
1552 if (result != VDO_SUCCESS)
1553 return result;
1554
1555 volume->nonce = uds_get_volume_nonce(layout);
1556
1557 result = uds_copy_index_geometry(config->geometry, &volume->geometry);
1558 if (result != UDS_SUCCESS) {
1559 uds_free_volume(volume);
1560 return vdo_log_warning_strerror(result,
1561 "failed to allocate geometry: error");
1562 }
1563 geometry = volume->geometry;
1564
1565 /*
1566 * Reserve a buffer for each entry in the page cache, one for the chapter writer, and one
1567 * for each entry in the sparse cache.
1568 */
1569 reserved_buffers = config->cache_chapters * geometry->record_pages_per_chapter;
1570 reserved_buffers += 1;
1571 if (uds_is_sparse_index_geometry(geometry))
1572 reserved_buffers += (config->cache_chapters * geometry->index_pages_per_chapter);
1573 volume->reserved_buffers = reserved_buffers;
1574 result = uds_open_volume_bufio(layout, geometry->bytes_per_page,
1575 volume->reserved_buffers, &volume->client);
1576 if (result != UDS_SUCCESS) {
1577 uds_free_volume(volume);
1578 return result;
1579 }
1580
1581 result = uds_make_radix_sorter(geometry->records_per_page,
1582 &volume->radix_sorter);
1583 if (result != UDS_SUCCESS) {
1584 uds_free_volume(volume);
1585 return result;
1586 }
1587
1588 result = vdo_allocate(geometry->records_per_page,
1589 const struct uds_volume_record *, "record pointers",
1590 &volume->record_pointers);
1591 if (result != VDO_SUCCESS) {
1592 uds_free_volume(volume);
1593 return result;
1594 }
1595
1596 if (uds_is_sparse_index_geometry(geometry)) {
1597 size_t page_size = sizeof(struct delta_index_page) + geometry->bytes_per_page;
1598
1599 result = uds_make_sparse_cache(geometry, config->cache_chapters,
1600 config->zone_count,
1601 &volume->sparse_cache);
1602 if (result != UDS_SUCCESS) {
1603 uds_free_volume(volume);
1604 return result;
1605 }
1606
1607 volume->cache_size =
1608 page_size * geometry->index_pages_per_chapter * config->cache_chapters;
1609 }
1610
1611 result = initialize_page_cache(&volume->page_cache, geometry,
1612 config->cache_chapters, config->zone_count);
1613 if (result != UDS_SUCCESS) {
1614 uds_free_volume(volume);
1615 return result;
1616 }
1617
1618 volume->cache_size += volume->page_cache.cache_slots * sizeof(struct delta_index_page);
1619 result = uds_make_index_page_map(geometry, &volume->index_page_map);
1620 if (result != UDS_SUCCESS) {
1621 uds_free_volume(volume);
1622 return result;
1623 }
1624
1625 mutex_init(&volume->read_threads_mutex);
1626 uds_init_cond(&volume->read_threads_read_done_cond);
1627 uds_init_cond(&volume->read_threads_cond);
1628
1629 result = vdo_allocate(config->read_threads, struct thread *, "reader threads",
1630 &volume->reader_threads);
1631 if (result != VDO_SUCCESS) {
1632 uds_free_volume(volume);
1633 return result;
1634 }
1635
1636 for (i = 0; i < config->read_threads; i++) {
1637 result = vdo_create_thread(read_thread_function, (void *) volume,
1638 "reader", &volume->reader_threads[i]);
1639 if (result != VDO_SUCCESS) {
1640 uds_free_volume(volume);
1641 return result;
1642 }
1643
1644 volume->read_thread_count = i + 1;
1645 }
1646
1647 *new_volume = volume;
1648 return UDS_SUCCESS;
1649 }
1650
uninitialize_page_cache(struct page_cache * cache)1651 static void uninitialize_page_cache(struct page_cache *cache)
1652 {
1653 u16 i;
1654
1655 if (cache->cache != NULL) {
1656 for (i = 0; i < cache->cache_slots; i++)
1657 release_page_buffer(&cache->cache[i]);
1658 }
1659 vdo_free(cache->index);
1660 vdo_free(cache->cache);
1661 vdo_free(cache->search_pending_counters);
1662 vdo_free(cache->read_queue);
1663 }
1664
uds_free_volume(struct volume * volume)1665 void uds_free_volume(struct volume *volume)
1666 {
1667 if (volume == NULL)
1668 return;
1669
1670 if (volume->reader_threads != NULL) {
1671 unsigned int i;
1672
1673 /* This works even if some threads weren't started. */
1674 mutex_lock(&volume->read_threads_mutex);
1675 volume->read_threads_exiting = true;
1676 uds_broadcast_cond(&volume->read_threads_cond);
1677 mutex_unlock(&volume->read_threads_mutex);
1678 for (i = 0; i < volume->read_thread_count; i++)
1679 vdo_join_threads(volume->reader_threads[i]);
1680 vdo_free(volume->reader_threads);
1681 volume->reader_threads = NULL;
1682 }
1683
1684 /* Must destroy the client AFTER freeing the cached pages. */
1685 uninitialize_page_cache(&volume->page_cache);
1686 uds_free_sparse_cache(volume->sparse_cache);
1687 if (volume->client != NULL)
1688 dm_bufio_client_destroy(vdo_forget(volume->client));
1689
1690 uds_free_index_page_map(volume->index_page_map);
1691 uds_free_radix_sorter(volume->radix_sorter);
1692 vdo_free(volume->geometry);
1693 vdo_free(volume->record_pointers);
1694 vdo_free(volume);
1695 }
1696