1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "slab-depot.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10 #include <linux/err.h>
11 #include <linux/log2.h>
12 #include <linux/min_heap.h>
13 #include <linux/minmax.h>
14
15 #include "logger.h"
16 #include "memory-alloc.h"
17 #include "numeric.h"
18 #include "permassert.h"
19 #include "string-utils.h"
20
21 #include "action-manager.h"
22 #include "admin-state.h"
23 #include "completion.h"
24 #include "constants.h"
25 #include "data-vio.h"
26 #include "encodings.h"
27 #include "io-submitter.h"
28 #include "physical-zone.h"
29 #include "priority-table.h"
30 #include "recovery-journal.h"
31 #include "repair.h"
32 #include "status-codes.h"
33 #include "types.h"
34 #include "vdo.h"
35 #include "vio.h"
36 #include "wait-queue.h"
37
38 static const u64 BYTES_PER_WORD = sizeof(u64);
39 static const bool NORMAL_OPERATION = true;
40
41 /**
42 * get_lock() - Get the lock object for a slab journal block by sequence number.
43 * @journal: The vdo_slab journal to retrieve from.
44 * @sequence_number: Sequence number of the block.
45 *
46 * Return: The lock object for the given sequence number.
47 */
get_lock(struct slab_journal * journal,sequence_number_t sequence_number)48 static inline struct journal_lock * __must_check get_lock(struct slab_journal *journal,
49 sequence_number_t sequence_number)
50 {
51 return &journal->locks[sequence_number % journal->size];
52 }
53
is_slab_open(struct vdo_slab * slab)54 static bool is_slab_open(struct vdo_slab *slab)
55 {
56 return (!vdo_is_state_quiescing(&slab->state) &&
57 !vdo_is_state_quiescent(&slab->state));
58 }
59
60 /**
61 * must_make_entries_to_flush() - Check whether there are entry waiters which should delay a flush.
62 * @journal: The journal to check.
63 *
64 * Return: true if there are no entry waiters, or if the slab is unrecovered.
65 */
must_make_entries_to_flush(struct slab_journal * journal)66 static inline bool __must_check must_make_entries_to_flush(struct slab_journal *journal)
67 {
68 return ((journal->slab->status != VDO_SLAB_REBUILDING) &&
69 vdo_waitq_has_waiters(&journal->entry_waiters));
70 }
71
72 /**
73 * is_reaping() - Check whether a reap is currently in progress.
74 * @journal: The journal which may be reaping.
75 *
76 * Return: true if the journal is reaping.
77 */
is_reaping(struct slab_journal * journal)78 static inline bool __must_check is_reaping(struct slab_journal *journal)
79 {
80 return (journal->head != journal->unreapable);
81 }
82
83 /**
84 * initialize_tail_block() - Initialize tail block as a new block.
85 * @journal: The journal whose tail block is being initialized.
86 */
initialize_tail_block(struct slab_journal * journal)87 static void initialize_tail_block(struct slab_journal *journal)
88 {
89 struct slab_journal_block_header *header = &journal->tail_header;
90
91 header->sequence_number = journal->tail;
92 header->entry_count = 0;
93 header->has_block_map_increments = false;
94 }
95
96 /**
97 * initialize_journal_state() - Set all journal fields appropriately to start journaling.
98 * @journal: The journal to be reset, based on its tail sequence number.
99 */
initialize_journal_state(struct slab_journal * journal)100 static void initialize_journal_state(struct slab_journal *journal)
101 {
102 journal->unreapable = journal->head;
103 journal->reap_lock = get_lock(journal, journal->unreapable);
104 journal->next_commit = journal->tail;
105 journal->summarized = journal->last_summarized = journal->tail;
106 initialize_tail_block(journal);
107 }
108
109 /**
110 * block_is_full() - Check whether a journal block is full.
111 * @journal: The slab journal for the block.
112 *
113 * Return: True if the tail block is full.
114 */
block_is_full(struct slab_journal * journal)115 static bool __must_check block_is_full(struct slab_journal *journal)
116 {
117 journal_entry_count_t count = journal->tail_header.entry_count;
118
119 return (journal->tail_header.has_block_map_increments ?
120 (journal->full_entries_per_block == count) :
121 (journal->entries_per_block == count));
122 }
123
124 static void add_entries(struct slab_journal *journal);
125 static void update_tail_block_location(struct slab_journal *journal);
126 static void release_journal_locks(struct vdo_waiter *waiter, void *context);
127
128 /**
129 * is_slab_journal_blank() - Check whether a slab's journal is blank.
130 * @slab: The slab to check.
131 *
132 * A slab journal is blank if it has never had any entries recorded in it.
133 *
134 * Return: True if the slab's journal has never been modified.
135 */
is_slab_journal_blank(const struct vdo_slab * slab)136 static bool is_slab_journal_blank(const struct vdo_slab *slab)
137 {
138 return ((slab->journal.tail == 1) &&
139 (slab->journal.tail_header.entry_count == 0));
140 }
141
142 /**
143 * mark_slab_journal_dirty() - Put a slab journal on the dirty list of its allocator in the correct
144 * order.
145 * @journal: The journal to be marked dirty.
146 * @lock: The recovery journal lock held by the slab journal.
147 */
mark_slab_journal_dirty(struct slab_journal * journal,sequence_number_t lock)148 static void mark_slab_journal_dirty(struct slab_journal *journal, sequence_number_t lock)
149 {
150 struct slab_journal *dirty_journal;
151 struct list_head *dirty_list = &journal->slab->allocator->dirty_slab_journals;
152
153 VDO_ASSERT_LOG_ONLY(journal->recovery_lock == 0, "slab journal was clean");
154
155 journal->recovery_lock = lock;
156 list_for_each_entry_reverse(dirty_journal, dirty_list, dirty_entry) {
157 if (dirty_journal->recovery_lock <= journal->recovery_lock)
158 break;
159 }
160
161 list_move_tail(&journal->dirty_entry, dirty_journal->dirty_entry.next);
162 }
163
mark_slab_journal_clean(struct slab_journal * journal)164 static void mark_slab_journal_clean(struct slab_journal *journal)
165 {
166 journal->recovery_lock = 0;
167 list_del_init(&journal->dirty_entry);
168 }
169
check_if_slab_drained(struct vdo_slab * slab)170 static void check_if_slab_drained(struct vdo_slab *slab)
171 {
172 bool read_only;
173 struct slab_journal *journal = &slab->journal;
174 const struct admin_state_code *code;
175
176 if (!vdo_is_state_draining(&slab->state) ||
177 must_make_entries_to_flush(journal) ||
178 is_reaping(journal) ||
179 journal->waiting_to_commit ||
180 !list_empty(&journal->uncommitted_blocks) ||
181 journal->updating_slab_summary ||
182 (slab->active_count > 0))
183 return;
184
185 /* When not suspending or recovering, the slab must be clean. */
186 code = vdo_get_admin_state_code(&slab->state);
187 read_only = vdo_is_read_only(slab->allocator->depot->vdo);
188 if (!read_only &&
189 vdo_waitq_has_waiters(&slab->dirty_blocks) &&
190 (code != VDO_ADMIN_STATE_SUSPENDING) &&
191 (code != VDO_ADMIN_STATE_RECOVERING))
192 return;
193
194 vdo_finish_draining_with_result(&slab->state,
195 (read_only ? VDO_READ_ONLY : VDO_SUCCESS));
196 }
197
198 /* FULLNESS HINT COMPUTATION */
199
200 /**
201 * compute_fullness_hint() - Translate a slab's free block count into a 'fullness hint' that can be
202 * stored in a slab_summary_entry's 7 bits that are dedicated to its free
203 * count.
204 * @depot: The depot whose summary being updated.
205 * @free_blocks: The number of free blocks.
206 *
207 * Note: the number of free blocks must be strictly less than 2^23 blocks, even though
208 * theoretically slabs could contain precisely 2^23 blocks; there is an assumption that at least
209 * one block is used by metadata. This assumption is necessary; otherwise, the fullness hint might
210 * overflow. The fullness hint formula is roughly (fullness >> 16) & 0x7f, but (2^23 >> 16) & 0x7f
211 * is 0, which would make it impossible to distinguish completely full from completely empty.
212 *
213 * Return: A fullness hint, which can be stored in 7 bits.
214 */
compute_fullness_hint(struct slab_depot * depot,block_count_t free_blocks)215 static u8 __must_check compute_fullness_hint(struct slab_depot *depot,
216 block_count_t free_blocks)
217 {
218 block_count_t hint;
219
220 VDO_ASSERT_LOG_ONLY((free_blocks < (1 << 23)), "free blocks must be less than 2^23");
221
222 if (free_blocks == 0)
223 return 0;
224
225 hint = free_blocks >> depot->hint_shift;
226 return ((hint == 0) ? 1 : hint);
227 }
228
229 /**
230 * check_summary_drain_complete() - Check whether an allocators summary has finished draining.
231 * @allocator: The allocator to check.
232 */
check_summary_drain_complete(struct block_allocator * allocator)233 static void check_summary_drain_complete(struct block_allocator *allocator)
234 {
235 if (!vdo_is_state_draining(&allocator->summary_state) ||
236 (allocator->summary_write_count > 0))
237 return;
238
239 vdo_finish_operation(&allocator->summary_state,
240 (vdo_is_read_only(allocator->depot->vdo) ?
241 VDO_READ_ONLY : VDO_SUCCESS));
242 }
243
244 /**
245 * notify_summary_waiters() - Wake all the waiters in a given queue.
246 * @allocator: The block allocator summary which owns the queue.
247 * @queue: The queue to notify.
248 */
notify_summary_waiters(struct block_allocator * allocator,struct vdo_wait_queue * queue)249 static void notify_summary_waiters(struct block_allocator *allocator,
250 struct vdo_wait_queue *queue)
251 {
252 int result = (vdo_is_read_only(allocator->depot->vdo) ?
253 VDO_READ_ONLY : VDO_SUCCESS);
254
255 vdo_waitq_notify_all_waiters(queue, NULL, &result);
256 }
257
258 static void launch_write(struct slab_summary_block *summary_block);
259
260 /**
261 * finish_updating_slab_summary_block() - Finish processing a block which attempted to write,
262 * whether or not the attempt succeeded.
263 * @block: The block.
264 */
finish_updating_slab_summary_block(struct slab_summary_block * block)265 static void finish_updating_slab_summary_block(struct slab_summary_block *block)
266 {
267 notify_summary_waiters(block->allocator, &block->current_update_waiters);
268 block->writing = false;
269 block->allocator->summary_write_count--;
270 if (vdo_waitq_has_waiters(&block->next_update_waiters))
271 launch_write(block);
272 else
273 check_summary_drain_complete(block->allocator);
274 }
275
276 /**
277 * finish_update() - This is the callback for a successful summary block write.
278 * @completion: The write vio.
279 */
finish_update(struct vdo_completion * completion)280 static void finish_update(struct vdo_completion *completion)
281 {
282 struct slab_summary_block *block =
283 container_of(as_vio(completion), struct slab_summary_block, vio);
284
285 atomic64_inc(&block->allocator->depot->summary_statistics.blocks_written);
286 finish_updating_slab_summary_block(block);
287 }
288
289 /**
290 * handle_write_error() - Handle an error writing a slab summary block.
291 * @completion: The write VIO.
292 */
handle_write_error(struct vdo_completion * completion)293 static void handle_write_error(struct vdo_completion *completion)
294 {
295 struct slab_summary_block *block =
296 container_of(as_vio(completion), struct slab_summary_block, vio);
297
298 vio_record_metadata_io_error(as_vio(completion));
299 vdo_enter_read_only_mode(completion->vdo, completion->result);
300 finish_updating_slab_summary_block(block);
301 }
302
write_slab_summary_endio(struct bio * bio)303 static void write_slab_summary_endio(struct bio *bio)
304 {
305 struct vio *vio = bio->bi_private;
306 struct slab_summary_block *block =
307 container_of(vio, struct slab_summary_block, vio);
308
309 continue_vio_after_io(vio, finish_update, block->allocator->thread_id);
310 }
311
312 /**
313 * launch_write() - Write a slab summary block unless it is currently out for writing.
314 * @block: The block that needs to be committed.
315 */
launch_write(struct slab_summary_block * block)316 static void launch_write(struct slab_summary_block *block)
317 {
318 struct block_allocator *allocator = block->allocator;
319 struct slab_depot *depot = allocator->depot;
320 physical_block_number_t pbn;
321
322 if (block->writing)
323 return;
324
325 allocator->summary_write_count++;
326 vdo_waitq_transfer_all_waiters(&block->next_update_waiters,
327 &block->current_update_waiters);
328 block->writing = true;
329
330 if (vdo_is_read_only(depot->vdo)) {
331 finish_updating_slab_summary_block(block);
332 return;
333 }
334
335 memcpy(block->outgoing_entries, block->entries, VDO_BLOCK_SIZE);
336
337 /*
338 * Flush before writing to ensure that the slab journal tail blocks and reference updates
339 * covered by this summary update are stable. Otherwise, a subsequent recovery could
340 * encounter a slab summary update that refers to a slab journal tail block that has not
341 * actually been written. In such cases, the slab journal referenced will be treated as
342 * empty, causing any data within the slab which predates the existing recovery journal
343 * entries to be lost.
344 */
345 pbn = (depot->summary_origin +
346 (VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE * allocator->zone_number) +
347 block->index);
348 vdo_submit_metadata_vio(&block->vio, pbn, write_slab_summary_endio,
349 handle_write_error, REQ_OP_WRITE | REQ_PREFLUSH);
350 }
351
352 /**
353 * update_slab_summary_entry() - Update the entry for a slab.
354 * @slab: The slab whose entry is to be updated.
355 * @waiter: The waiter that is updating the summary.
356 * @tail_block_offset: The offset of the slab journal's tail block.
357 * @load_ref_counts: Whether the reference counts must be loaded from disk on the vdo load.
358 * @is_clean: Whether the slab is clean.
359 * @free_blocks: The number of free blocks.
360 */
update_slab_summary_entry(struct vdo_slab * slab,struct vdo_waiter * waiter,tail_block_offset_t tail_block_offset,bool load_ref_counts,bool is_clean,block_count_t free_blocks)361 static void update_slab_summary_entry(struct vdo_slab *slab, struct vdo_waiter *waiter,
362 tail_block_offset_t tail_block_offset,
363 bool load_ref_counts, bool is_clean,
364 block_count_t free_blocks)
365 {
366 u8 index = slab->slab_number / VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK;
367 struct block_allocator *allocator = slab->allocator;
368 struct slab_summary_block *block = &allocator->summary_blocks[index];
369 int result;
370 struct slab_summary_entry *entry;
371
372 if (vdo_is_read_only(block->vio.completion.vdo)) {
373 result = VDO_READ_ONLY;
374 waiter->callback(waiter, &result);
375 return;
376 }
377
378 if (vdo_is_state_draining(&allocator->summary_state) ||
379 vdo_is_state_quiescent(&allocator->summary_state)) {
380 result = VDO_INVALID_ADMIN_STATE;
381 waiter->callback(waiter, &result);
382 return;
383 }
384
385 entry = &allocator->summary_entries[slab->slab_number];
386 *entry = (struct slab_summary_entry) {
387 .tail_block_offset = tail_block_offset,
388 .load_ref_counts = (entry->load_ref_counts || load_ref_counts),
389 .is_dirty = !is_clean,
390 .fullness_hint = compute_fullness_hint(allocator->depot, free_blocks),
391 };
392 vdo_waitq_enqueue_waiter(&block->next_update_waiters, waiter);
393 launch_write(block);
394 }
395
396 /**
397 * finish_reaping() - Actually advance the head of the journal now that any necessary flushes are
398 * complete.
399 * @journal: The journal to be reaped.
400 */
finish_reaping(struct slab_journal * journal)401 static void finish_reaping(struct slab_journal *journal)
402 {
403 journal->head = journal->unreapable;
404 add_entries(journal);
405 check_if_slab_drained(journal->slab);
406 }
407
408 static void reap_slab_journal(struct slab_journal *journal);
409
410 /**
411 * complete_reaping() - Finish reaping now that we have flushed the lower layer and then try
412 * reaping again in case we deferred reaping due to an outstanding vio.
413 * @completion: The flush vio.
414 */
complete_reaping(struct vdo_completion * completion)415 static void complete_reaping(struct vdo_completion *completion)
416 {
417 struct slab_journal *journal = completion->parent;
418
419 return_vio_to_pool(vio_as_pooled_vio(as_vio(completion)));
420 finish_reaping(journal);
421 reap_slab_journal(journal);
422 }
423
424 /**
425 * handle_flush_error() - Handle an error flushing the lower layer.
426 * @completion: The flush vio.
427 */
handle_flush_error(struct vdo_completion * completion)428 static void handle_flush_error(struct vdo_completion *completion)
429 {
430 vio_record_metadata_io_error(as_vio(completion));
431 vdo_enter_read_only_mode(completion->vdo, completion->result);
432 complete_reaping(completion);
433 }
434
flush_endio(struct bio * bio)435 static void flush_endio(struct bio *bio)
436 {
437 struct vio *vio = bio->bi_private;
438 struct slab_journal *journal = vio->completion.parent;
439
440 continue_vio_after_io(vio, complete_reaping,
441 journal->slab->allocator->thread_id);
442 }
443
444 /**
445 * flush_for_reaping() - A waiter callback for getting a vio with which to flush the lower layer
446 * prior to reaping.
447 * @waiter: The journal as a flush waiter.
448 * @context: The newly acquired flush vio.
449 */
flush_for_reaping(struct vdo_waiter * waiter,void * context)450 static void flush_for_reaping(struct vdo_waiter *waiter, void *context)
451 {
452 struct slab_journal *journal =
453 container_of(waiter, struct slab_journal, flush_waiter);
454 struct pooled_vio *pooled = context;
455 struct vio *vio = &pooled->vio;
456
457 vio->completion.parent = journal;
458 vdo_submit_flush_vio(vio, flush_endio, handle_flush_error);
459 }
460
461 /**
462 * reap_slab_journal() - Conduct a reap on a slab journal to reclaim unreferenced blocks.
463 * @journal: The slab journal.
464 */
reap_slab_journal(struct slab_journal * journal)465 static void reap_slab_journal(struct slab_journal *journal)
466 {
467 bool reaped = false;
468
469 if (is_reaping(journal)) {
470 /* We already have a reap in progress so wait for it to finish. */
471 return;
472 }
473
474 if ((journal->slab->status != VDO_SLAB_REBUILT) ||
475 !vdo_is_state_normal(&journal->slab->state) ||
476 vdo_is_read_only(journal->slab->allocator->depot->vdo)) {
477 /*
478 * We must not reap in the first two cases, and there's no point in read-only mode.
479 */
480 return;
481 }
482
483 /*
484 * Start reclaiming blocks only when the journal head has no references. Then stop when a
485 * block is referenced or reap reaches the most recently written block, referenced by the
486 * slab summary, which has the sequence number just before the tail.
487 */
488 while ((journal->unreapable < journal->tail) && (journal->reap_lock->count == 0)) {
489 reaped = true;
490 journal->unreapable++;
491 journal->reap_lock++;
492 if (journal->reap_lock == &journal->locks[journal->size])
493 journal->reap_lock = &journal->locks[0];
494 }
495
496 if (!reaped)
497 return;
498
499 /*
500 * It is never safe to reap a slab journal block without first issuing a flush, regardless
501 * of whether a user flush has been received or not. In the absence of the flush, the
502 * reference block write which released the locks allowing the slab journal to reap may not
503 * be persisted. Although slab summary writes will eventually issue flushes, multiple slab
504 * journal block writes can be issued while previous slab summary updates have not yet been
505 * made. Even though those slab journal block writes will be ignored if the slab summary
506 * update is not persisted, they may still overwrite the to-be-reaped slab journal block
507 * resulting in a loss of reference count updates.
508 */
509 journal->flush_waiter.callback = flush_for_reaping;
510 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
511 &journal->flush_waiter);
512 }
513
514 /**
515 * adjust_slab_journal_block_reference() - Adjust the reference count for a slab journal block.
516 * @journal: The slab journal.
517 * @sequence_number: The journal sequence number of the referenced block.
518 * @adjustment: Amount to adjust the reference counter.
519 *
520 * Note that when the adjustment is negative, the slab journal will be reaped.
521 */
adjust_slab_journal_block_reference(struct slab_journal * journal,sequence_number_t sequence_number,int adjustment)522 static void adjust_slab_journal_block_reference(struct slab_journal *journal,
523 sequence_number_t sequence_number,
524 int adjustment)
525 {
526 struct journal_lock *lock;
527
528 if (sequence_number == 0)
529 return;
530
531 if (journal->slab->status == VDO_SLAB_REPLAYING) {
532 /* Locks should not be used during offline replay. */
533 return;
534 }
535
536 VDO_ASSERT_LOG_ONLY((adjustment != 0), "adjustment must be non-zero");
537 lock = get_lock(journal, sequence_number);
538 if (adjustment < 0) {
539 VDO_ASSERT_LOG_ONLY((-adjustment <= lock->count),
540 "adjustment %d of lock count %u for slab journal block %llu must not underflow",
541 adjustment, lock->count,
542 (unsigned long long) sequence_number);
543 }
544
545 lock->count += adjustment;
546 if (lock->count == 0)
547 reap_slab_journal(journal);
548 }
549
550 /**
551 * release_journal_locks() - Callback invoked after a slab summary update completes.
552 * @waiter: The slab summary waiter that has just been notified.
553 * @context: The result code of the update.
554 *
555 * Registered in the constructor on behalf of update_tail_block_location().
556 *
557 * Implements waiter_callback_fn.
558 */
release_journal_locks(struct vdo_waiter * waiter,void * context)559 static void release_journal_locks(struct vdo_waiter *waiter, void *context)
560 {
561 sequence_number_t first, i;
562 struct slab_journal *journal =
563 container_of(waiter, struct slab_journal, slab_summary_waiter);
564 int result = *((int *) context);
565
566 if (result != VDO_SUCCESS) {
567 if (result != VDO_READ_ONLY) {
568 /*
569 * Don't bother logging what might be lots of errors if we are already in
570 * read-only mode.
571 */
572 vdo_log_error_strerror(result, "failed slab summary update %llu",
573 (unsigned long long) journal->summarized);
574 }
575
576 journal->updating_slab_summary = false;
577 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
578 check_if_slab_drained(journal->slab);
579 return;
580 }
581
582 if (journal->partial_write_in_progress && (journal->summarized == journal->tail)) {
583 journal->partial_write_in_progress = false;
584 add_entries(journal);
585 }
586
587 first = journal->last_summarized;
588 journal->last_summarized = journal->summarized;
589 for (i = journal->summarized - 1; i >= first; i--) {
590 /*
591 * Release the lock the summarized block held on the recovery journal. (During
592 * replay, recovery_start will always be 0.)
593 */
594 if (journal->recovery_journal != NULL) {
595 zone_count_t zone_number = journal->slab->allocator->zone_number;
596 struct journal_lock *lock = get_lock(journal, i);
597
598 vdo_release_recovery_journal_block_reference(journal->recovery_journal,
599 lock->recovery_start,
600 VDO_ZONE_TYPE_PHYSICAL,
601 zone_number);
602 }
603
604 /*
605 * Release our own lock against reaping for blocks that are committed. (This
606 * function will not change locks during replay.)
607 */
608 adjust_slab_journal_block_reference(journal, i, -1);
609 }
610
611 journal->updating_slab_summary = false;
612
613 reap_slab_journal(journal);
614
615 /* Check if the slab summary needs to be updated again. */
616 update_tail_block_location(journal);
617 }
618
619 /**
620 * update_tail_block_location() - Update the tail block location in the slab summary, if necessary.
621 * @journal: The slab journal that is updating its tail block location.
622 */
update_tail_block_location(struct slab_journal * journal)623 static void update_tail_block_location(struct slab_journal *journal)
624 {
625 block_count_t free_block_count;
626 struct vdo_slab *slab = journal->slab;
627
628 if (journal->updating_slab_summary ||
629 vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
630 (journal->last_summarized >= journal->next_commit)) {
631 check_if_slab_drained(slab);
632 return;
633 }
634
635 if (slab->status != VDO_SLAB_REBUILT) {
636 u8 hint = slab->allocator->summary_entries[slab->slab_number].fullness_hint;
637
638 free_block_count = ((block_count_t) hint) << slab->allocator->depot->hint_shift;
639 } else {
640 free_block_count = slab->free_blocks;
641 }
642
643 journal->summarized = journal->next_commit;
644 journal->updating_slab_summary = true;
645
646 /*
647 * Update slab summary as dirty.
648 * vdo_slab journal can only reap past sequence number 1 when all the ref counts for this
649 * slab have been written to the layer. Therefore, indicate that the ref counts must be
650 * loaded when the journal head has reaped past sequence number 1.
651 */
652 update_slab_summary_entry(slab, &journal->slab_summary_waiter,
653 journal->summarized % journal->size,
654 (journal->head > 1), false, free_block_count);
655 }
656
657 /**
658 * reopen_slab_journal() - Reopen a slab's journal by emptying it and then adding pending entries.
659 * @slab: The slab to reopen.
660 */
reopen_slab_journal(struct vdo_slab * slab)661 static void reopen_slab_journal(struct vdo_slab *slab)
662 {
663 struct slab_journal *journal = &slab->journal;
664 sequence_number_t block;
665
666 VDO_ASSERT_LOG_ONLY(journal->tail_header.entry_count == 0,
667 "vdo_slab journal's active block empty before reopening");
668 journal->head = journal->tail;
669 initialize_journal_state(journal);
670
671 /* Ensure no locks are spuriously held on an empty journal. */
672 for (block = 1; block <= journal->size; block++) {
673 VDO_ASSERT_LOG_ONLY((get_lock(journal, block)->count == 0),
674 "Scrubbed journal's block %llu is not locked",
675 (unsigned long long) block);
676 }
677
678 add_entries(journal);
679 }
680
get_committing_sequence_number(const struct pooled_vio * vio)681 static sequence_number_t get_committing_sequence_number(const struct pooled_vio *vio)
682 {
683 const struct packed_slab_journal_block *block =
684 (const struct packed_slab_journal_block *) vio->vio.data;
685
686 return __le64_to_cpu(block->header.sequence_number);
687 }
688
689 /**
690 * complete_write() - Handle post-commit processing.
691 * @completion: The write vio as a completion.
692 *
693 * This is the callback registered by write_slab_journal_block().
694 */
complete_write(struct vdo_completion * completion)695 static void complete_write(struct vdo_completion *completion)
696 {
697 int result = completion->result;
698 struct pooled_vio *pooled = vio_as_pooled_vio(as_vio(completion));
699 struct slab_journal *journal = completion->parent;
700 sequence_number_t committed = get_committing_sequence_number(pooled);
701
702 list_del_init(&pooled->list_entry);
703 return_vio_to_pool(pooled);
704
705 if (result != VDO_SUCCESS) {
706 vio_record_metadata_io_error(as_vio(completion));
707 vdo_log_error_strerror(result, "cannot write slab journal block %llu",
708 (unsigned long long) committed);
709 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
710 check_if_slab_drained(journal->slab);
711 return;
712 }
713
714 WRITE_ONCE(journal->events->blocks_written, journal->events->blocks_written + 1);
715
716 if (list_empty(&journal->uncommitted_blocks)) {
717 /* If no blocks are outstanding, then the commit point is at the tail. */
718 journal->next_commit = journal->tail;
719 } else {
720 /* The commit point is always the beginning of the oldest incomplete block. */
721 pooled = container_of(journal->uncommitted_blocks.next,
722 struct pooled_vio, list_entry);
723 journal->next_commit = get_committing_sequence_number(pooled);
724 }
725
726 update_tail_block_location(journal);
727 }
728
write_slab_journal_endio(struct bio * bio)729 static void write_slab_journal_endio(struct bio *bio)
730 {
731 struct vio *vio = bio->bi_private;
732 struct slab_journal *journal = vio->completion.parent;
733
734 continue_vio_after_io(vio, complete_write, journal->slab->allocator->thread_id);
735 }
736
737 /**
738 * write_slab_journal_block() - Write a slab journal block.
739 * @waiter: The vio pool waiter which was just notified.
740 * @context: The vio pool entry for the write.
741 *
742 * Callback from acquire_vio_from_pool() registered in commit_tail().
743 */
write_slab_journal_block(struct vdo_waiter * waiter,void * context)744 static void write_slab_journal_block(struct vdo_waiter *waiter, void *context)
745 {
746 struct pooled_vio *pooled = context;
747 struct vio *vio = &pooled->vio;
748 struct slab_journal *journal =
749 container_of(waiter, struct slab_journal, resource_waiter);
750 struct slab_journal_block_header *header = &journal->tail_header;
751 int unused_entries = journal->entries_per_block - header->entry_count;
752 physical_block_number_t block_number;
753 const struct admin_state_code *operation;
754
755 header->head = journal->head;
756 list_add_tail(&pooled->list_entry, &journal->uncommitted_blocks);
757 vdo_pack_slab_journal_block_header(header, &journal->block->header);
758
759 /* Copy the tail block into the vio. */
760 memcpy(pooled->vio.data, journal->block, VDO_BLOCK_SIZE);
761
762 VDO_ASSERT_LOG_ONLY(unused_entries >= 0, "vdo_slab journal block is not overfull");
763 if (unused_entries > 0) {
764 /*
765 * Release the per-entry locks for any unused entries in the block we are about to
766 * write.
767 */
768 adjust_slab_journal_block_reference(journal, header->sequence_number,
769 -unused_entries);
770 journal->partial_write_in_progress = !block_is_full(journal);
771 }
772
773 block_number = journal->slab->journal_origin +
774 (header->sequence_number % journal->size);
775 vio->completion.parent = journal;
776
777 /*
778 * This block won't be read in recovery until the slab summary is updated to refer to it.
779 * The slab summary update does a flush which is sufficient to protect us from corruption
780 * due to out of order slab journal, reference block, or block map writes.
781 */
782 vdo_submit_metadata_vio(vdo_forget(vio), block_number, write_slab_journal_endio,
783 complete_write, REQ_OP_WRITE);
784
785 /* Since the write is submitted, the tail block structure can be reused. */
786 journal->tail++;
787 initialize_tail_block(journal);
788 journal->waiting_to_commit = false;
789
790 operation = vdo_get_admin_state_code(&journal->slab->state);
791 if (operation == VDO_ADMIN_STATE_WAITING_FOR_RECOVERY) {
792 vdo_finish_operation(&journal->slab->state,
793 (vdo_is_read_only(journal->slab->allocator->depot->vdo) ?
794 VDO_READ_ONLY : VDO_SUCCESS));
795 return;
796 }
797
798 add_entries(journal);
799 }
800
801 /**
802 * commit_tail() - Commit the tail block of the slab journal.
803 * @journal: The journal whose tail block should be committed.
804 */
commit_tail(struct slab_journal * journal)805 static void commit_tail(struct slab_journal *journal)
806 {
807 if ((journal->tail_header.entry_count == 0) && must_make_entries_to_flush(journal)) {
808 /*
809 * There are no entries at the moment, but there are some waiters, so defer
810 * initiating the flush until those entries are ready to write.
811 */
812 return;
813 }
814
815 if (vdo_is_read_only(journal->slab->allocator->depot->vdo) ||
816 journal->waiting_to_commit ||
817 (journal->tail_header.entry_count == 0)) {
818 /*
819 * There is nothing to do since the tail block is empty, or writing, or the journal
820 * is in read-only mode.
821 */
822 return;
823 }
824
825 /*
826 * Since we are about to commit the tail block, this journal no longer needs to be on the
827 * list of journals which the recovery journal might ask to commit.
828 */
829 mark_slab_journal_clean(journal);
830
831 journal->waiting_to_commit = true;
832
833 journal->resource_waiter.callback = write_slab_journal_block;
834 acquire_vio_from_pool(journal->slab->allocator->vio_pool,
835 &journal->resource_waiter);
836 }
837
838 /**
839 * encode_slab_journal_entry() - Encode a slab journal entry.
840 * @tail_header: The unpacked header for the block.
841 * @payload: The journal block payload to hold the entry.
842 * @sbn: The slab block number of the entry to encode.
843 * @operation: The type of the entry.
844 * @increment: True if this is an increment.
845 */
encode_slab_journal_entry(struct slab_journal_block_header * tail_header,slab_journal_payload * payload,slab_block_number sbn,enum journal_operation operation,bool increment)846 static void encode_slab_journal_entry(struct slab_journal_block_header *tail_header,
847 slab_journal_payload *payload,
848 slab_block_number sbn,
849 enum journal_operation operation,
850 bool increment)
851 {
852 journal_entry_count_t entry_number = tail_header->entry_count++;
853
854 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
855 if (!tail_header->has_block_map_increments) {
856 memset(payload->full_entries.entry_types, 0,
857 VDO_SLAB_JOURNAL_ENTRY_TYPES_SIZE);
858 tail_header->has_block_map_increments = true;
859 }
860
861 payload->full_entries.entry_types[entry_number / 8] |=
862 ((u8)1 << (entry_number % 8));
863 }
864
865 vdo_pack_slab_journal_entry(&payload->entries[entry_number], sbn, increment);
866 }
867
868 /**
869 * expand_journal_point() - Convert a recovery journal journal_point which refers to both an
870 * increment and a decrement to a single point which refers to one or the
871 * other.
872 * @recovery_point: The journal point to convert.
873 * @increment: Whether the current entry is an increment.
874 *
875 * Return: The expanded journal point
876 *
877 * Because each data_vio has but a single recovery journal point, but may need to make both
878 * increment and decrement entries in the same slab journal. In order to distinguish the two
879 * entries, the entry count of the expanded journal point is twice the actual recovery journal
880 * entry count for increments, and one more than that for decrements.
881 */
expand_journal_point(struct journal_point recovery_point,bool increment)882 static struct journal_point expand_journal_point(struct journal_point recovery_point,
883 bool increment)
884 {
885 recovery_point.entry_count *= 2;
886 if (!increment)
887 recovery_point.entry_count++;
888
889 return recovery_point;
890 }
891
892 /**
893 * add_entry() - Actually add an entry to the slab journal, potentially firing off a write if a
894 * block becomes full.
895 * @journal: The slab journal to append to.
896 * @pbn: The pbn being adjusted.
897 * @operation: The type of entry to make.
898 * @increment: True if this is an increment.
899 * @recovery_point: The expanded recovery point.
900 *
901 * This function is synchronous.
902 */
add_entry(struct slab_journal * journal,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point recovery_point)903 static void add_entry(struct slab_journal *journal, physical_block_number_t pbn,
904 enum journal_operation operation, bool increment,
905 struct journal_point recovery_point)
906 {
907 struct packed_slab_journal_block *block = journal->block;
908 int result;
909
910 result = VDO_ASSERT(vdo_before_journal_point(&journal->tail_header.recovery_point,
911 &recovery_point),
912 "recovery journal point is monotonically increasing, recovery point: %llu.%u, block recovery point: %llu.%u",
913 (unsigned long long) recovery_point.sequence_number,
914 recovery_point.entry_count,
915 (unsigned long long) journal->tail_header.recovery_point.sequence_number,
916 journal->tail_header.recovery_point.entry_count);
917 if (result != VDO_SUCCESS) {
918 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo, result);
919 return;
920 }
921
922 if (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
923 result = VDO_ASSERT((journal->tail_header.entry_count <
924 journal->full_entries_per_block),
925 "block has room for full entries");
926 if (result != VDO_SUCCESS) {
927 vdo_enter_read_only_mode(journal->slab->allocator->depot->vdo,
928 result);
929 return;
930 }
931 }
932
933 encode_slab_journal_entry(&journal->tail_header, &block->payload,
934 pbn - journal->slab->start, operation, increment);
935 journal->tail_header.recovery_point = recovery_point;
936 if (block_is_full(journal))
937 commit_tail(journal);
938 }
939
journal_length(const struct slab_journal * journal)940 static inline block_count_t journal_length(const struct slab_journal *journal)
941 {
942 return journal->tail - journal->head;
943 }
944
945 /**
946 * vdo_attempt_replay_into_slab() - Replay a recovery journal entry into a slab's journal.
947 * @slab: The slab to play into.
948 * @pbn: The PBN for the entry.
949 * @operation: The type of entry to add.
950 * @increment: True if this entry is an increment.
951 * @recovery_point: The recovery journal point corresponding to this entry.
952 * @parent: The completion to notify when there is space to add the entry if the entry could not be
953 * added immediately.
954 *
955 * Return: True if the entry was added immediately.
956 */
vdo_attempt_replay_into_slab(struct vdo_slab * slab,physical_block_number_t pbn,enum journal_operation operation,bool increment,struct journal_point * recovery_point,struct vdo_completion * parent)957 bool vdo_attempt_replay_into_slab(struct vdo_slab *slab, physical_block_number_t pbn,
958 enum journal_operation operation, bool increment,
959 struct journal_point *recovery_point,
960 struct vdo_completion *parent)
961 {
962 struct slab_journal *journal = &slab->journal;
963 struct slab_journal_block_header *header = &journal->tail_header;
964 struct journal_point expanded = expand_journal_point(*recovery_point, increment);
965
966 /* Only accept entries after the current recovery point. */
967 if (!vdo_before_journal_point(&journal->tail_header.recovery_point, &expanded))
968 return true;
969
970 if ((header->entry_count >= journal->full_entries_per_block) &&
971 (header->has_block_map_increments || (operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING))) {
972 /*
973 * The tail block does not have room for the entry we are attempting to add so
974 * commit the tail block now.
975 */
976 commit_tail(journal);
977 }
978
979 if (journal->waiting_to_commit) {
980 vdo_start_operation_with_waiter(&journal->slab->state,
981 VDO_ADMIN_STATE_WAITING_FOR_RECOVERY,
982 parent, NULL);
983 return false;
984 }
985
986 if (journal_length(journal) >= journal->size) {
987 /*
988 * We must have reaped the current head before the crash, since the blocked
989 * threshold keeps us from having more entries than fit in a slab journal; hence we
990 * can just advance the head (and unreapable block), as needed.
991 */
992 journal->head++;
993 journal->unreapable++;
994 }
995
996 if (journal->slab->status == VDO_SLAB_REBUILT)
997 journal->slab->status = VDO_SLAB_REPLAYING;
998
999 add_entry(journal, pbn, operation, increment, expanded);
1000 return true;
1001 }
1002
1003 /**
1004 * requires_reaping() - Check whether the journal must be reaped before adding new entries.
1005 * @journal: The journal to check.
1006 *
1007 * Return: True if the journal must be reaped.
1008 */
requires_reaping(const struct slab_journal * journal)1009 static bool requires_reaping(const struct slab_journal *journal)
1010 {
1011 return (journal_length(journal) >= journal->blocking_threshold);
1012 }
1013
1014 /** finish_summary_update() - A waiter callback that resets the writing state of a slab. */
finish_summary_update(struct vdo_waiter * waiter,void * context)1015 static void finish_summary_update(struct vdo_waiter *waiter, void *context)
1016 {
1017 struct vdo_slab *slab = container_of(waiter, struct vdo_slab, summary_waiter);
1018 int result = *((int *) context);
1019
1020 slab->active_count--;
1021
1022 if ((result != VDO_SUCCESS) && (result != VDO_READ_ONLY)) {
1023 vdo_log_error_strerror(result, "failed to update slab summary");
1024 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1025 }
1026
1027 check_if_slab_drained(slab);
1028 }
1029
1030 static void write_reference_block(struct vdo_waiter *waiter, void *context);
1031
1032 /**
1033 * launch_reference_block_write() - Launch the write of a dirty reference block by first acquiring
1034 * a VIO for it from the pool.
1035 * @waiter: The waiter of the block which is starting to write.
1036 * @context: The parent slab of the block.
1037 *
1038 * This can be asynchronous since the writer will have to wait if all VIOs in the pool are
1039 * currently in use.
1040 */
launch_reference_block_write(struct vdo_waiter * waiter,void * context)1041 static void launch_reference_block_write(struct vdo_waiter *waiter, void *context)
1042 {
1043 struct vdo_slab *slab = context;
1044
1045 if (vdo_is_read_only(slab->allocator->depot->vdo))
1046 return;
1047
1048 slab->active_count++;
1049 container_of(waiter, struct reference_block, waiter)->is_writing = true;
1050 waiter->callback = write_reference_block;
1051 acquire_vio_from_pool(slab->allocator->vio_pool, waiter);
1052 }
1053
save_dirty_reference_blocks(struct vdo_slab * slab)1054 static void save_dirty_reference_blocks(struct vdo_slab *slab)
1055 {
1056 vdo_waitq_notify_all_waiters(&slab->dirty_blocks,
1057 launch_reference_block_write, slab);
1058 check_if_slab_drained(slab);
1059 }
1060
1061 /**
1062 * finish_reference_block_write() - After a reference block has written, clean it, release its
1063 * locks, and return its VIO to the pool.
1064 * @completion: The VIO that just finished writing.
1065 */
finish_reference_block_write(struct vdo_completion * completion)1066 static void finish_reference_block_write(struct vdo_completion *completion)
1067 {
1068 struct vio *vio = as_vio(completion);
1069 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
1070 struct reference_block *block = completion->parent;
1071 struct vdo_slab *slab = block->slab;
1072 tail_block_offset_t offset;
1073
1074 slab->active_count--;
1075
1076 /* Release the slab journal lock. */
1077 adjust_slab_journal_block_reference(&slab->journal,
1078 block->slab_journal_lock_to_release, -1);
1079 return_vio_to_pool(pooled);
1080
1081 /*
1082 * We can't clear the is_writing flag earlier as releasing the slab journal lock may cause
1083 * us to be dirtied again, but we don't want to double enqueue.
1084 */
1085 block->is_writing = false;
1086
1087 if (vdo_is_read_only(completion->vdo)) {
1088 check_if_slab_drained(slab);
1089 return;
1090 }
1091
1092 /* Re-queue the block if it was re-dirtied while it was writing. */
1093 if (block->is_dirty) {
1094 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1095 if (vdo_is_state_draining(&slab->state)) {
1096 /* We must be saving, and this block will otherwise not be relaunched. */
1097 save_dirty_reference_blocks(slab);
1098 }
1099
1100 return;
1101 }
1102
1103 /*
1104 * Mark the slab as clean in the slab summary if there are no dirty or writing blocks
1105 * and no summary update in progress.
1106 */
1107 if ((slab->active_count > 0) || vdo_waitq_has_waiters(&slab->dirty_blocks)) {
1108 check_if_slab_drained(slab);
1109 return;
1110 }
1111
1112 offset = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
1113 slab->active_count++;
1114 slab->summary_waiter.callback = finish_summary_update;
1115 update_slab_summary_entry(slab, &slab->summary_waiter, offset,
1116 true, true, slab->free_blocks);
1117 }
1118
1119 /**
1120 * get_reference_counters_for_block() - Find the reference counters for a given block.
1121 * @block: The reference_block in question.
1122 *
1123 * Return: A pointer to the reference counters for this block.
1124 */
get_reference_counters_for_block(struct reference_block * block)1125 static vdo_refcount_t * __must_check get_reference_counters_for_block(struct reference_block *block)
1126 {
1127 size_t block_index = block - block->slab->reference_blocks;
1128
1129 return &block->slab->counters[block_index * COUNTS_PER_BLOCK];
1130 }
1131
1132 /**
1133 * pack_reference_block() - Copy data from a reference block to a buffer ready to be written out.
1134 * @block: The block to copy.
1135 * @buffer: The char buffer to fill with the packed block.
1136 */
pack_reference_block(struct reference_block * block,void * buffer)1137 static void pack_reference_block(struct reference_block *block, void *buffer)
1138 {
1139 struct packed_reference_block *packed = buffer;
1140 vdo_refcount_t *counters = get_reference_counters_for_block(block);
1141 sector_count_t i;
1142 struct packed_journal_point commit_point;
1143
1144 vdo_pack_journal_point(&block->slab->slab_journal_point, &commit_point);
1145
1146 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
1147 packed->sectors[i].commit_point = commit_point;
1148 memcpy(packed->sectors[i].counts, counters + (i * COUNTS_PER_SECTOR),
1149 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
1150 }
1151 }
1152
write_reference_block_endio(struct bio * bio)1153 static void write_reference_block_endio(struct bio *bio)
1154 {
1155 struct vio *vio = bio->bi_private;
1156 struct reference_block *block = vio->completion.parent;
1157 thread_id_t thread_id = block->slab->allocator->thread_id;
1158
1159 continue_vio_after_io(vio, finish_reference_block_write, thread_id);
1160 }
1161
1162 /**
1163 * handle_io_error() - Handle an I/O error reading or writing a reference count block.
1164 * @completion: The VIO doing the I/O as a completion.
1165 */
handle_io_error(struct vdo_completion * completion)1166 static void handle_io_error(struct vdo_completion *completion)
1167 {
1168 int result = completion->result;
1169 struct vio *vio = as_vio(completion);
1170 struct vdo_slab *slab = ((struct reference_block *) completion->parent)->slab;
1171
1172 vio_record_metadata_io_error(vio);
1173 return_vio_to_pool(vio_as_pooled_vio(vio));
1174 slab->active_count -= vio->io_size / VDO_BLOCK_SIZE;
1175 vdo_enter_read_only_mode(slab->allocator->depot->vdo, result);
1176 check_if_slab_drained(slab);
1177 }
1178
1179 /**
1180 * write_reference_block() - After a dirty block waiter has gotten a VIO from the VIO pool, copy
1181 * its counters and associated data into the VIO, and launch the write.
1182 * @waiter: The waiter of the dirty block.
1183 * @context: The VIO returned by the pool.
1184 */
write_reference_block(struct vdo_waiter * waiter,void * context)1185 static void write_reference_block(struct vdo_waiter *waiter, void *context)
1186 {
1187 size_t block_offset;
1188 physical_block_number_t pbn;
1189 struct pooled_vio *pooled = context;
1190 struct vdo_completion *completion = &pooled->vio.completion;
1191 struct reference_block *block = container_of(waiter, struct reference_block,
1192 waiter);
1193
1194 pack_reference_block(block, pooled->vio.data);
1195 block_offset = (block - block->slab->reference_blocks);
1196 pbn = (block->slab->ref_counts_origin + block_offset);
1197 block->slab_journal_lock_to_release = block->slab_journal_lock;
1198 completion->parent = block;
1199
1200 /*
1201 * Mark the block as clean, since we won't be committing any updates that happen after this
1202 * moment. As long as VIO order is preserved, two VIOs updating this block at once will not
1203 * cause complications.
1204 */
1205 block->is_dirty = false;
1206
1207 /*
1208 * Flush before writing to ensure that the recovery journal and slab journal entries which
1209 * cover this reference update are stable. This prevents data corruption that can be caused
1210 * by out of order writes.
1211 */
1212 WRITE_ONCE(block->slab->allocator->ref_counts_statistics.blocks_written,
1213 block->slab->allocator->ref_counts_statistics.blocks_written + 1);
1214
1215 completion->callback_thread_id = ((struct block_allocator *) pooled->context)->thread_id;
1216 vdo_submit_metadata_vio(&pooled->vio, pbn, write_reference_block_endio,
1217 handle_io_error, REQ_OP_WRITE | REQ_PREFLUSH);
1218 }
1219
reclaim_journal_space(struct slab_journal * journal)1220 static void reclaim_journal_space(struct slab_journal *journal)
1221 {
1222 block_count_t length = journal_length(journal);
1223 struct vdo_slab *slab = journal->slab;
1224 block_count_t write_count = vdo_waitq_num_waiters(&slab->dirty_blocks);
1225 block_count_t written;
1226
1227 if ((length < journal->flushing_threshold) || (write_count == 0))
1228 return;
1229
1230 /* The slab journal is over the first threshold, schedule some reference block writes. */
1231 WRITE_ONCE(journal->events->flush_count, journal->events->flush_count + 1);
1232 if (length < journal->flushing_deadline) {
1233 /* Schedule more writes the closer to the deadline we get. */
1234 write_count /= journal->flushing_deadline - length + 1;
1235 write_count = max_t(block_count_t, write_count, 1);
1236 }
1237
1238 for (written = 0; written < write_count; written++) {
1239 vdo_waitq_notify_next_waiter(&slab->dirty_blocks,
1240 launch_reference_block_write, slab);
1241 }
1242 }
1243
1244 /**
1245 * reference_count_to_status() - Convert a reference count to a reference status.
1246 * @count: The count to convert.
1247 *
1248 * Return: The appropriate reference status.
1249 */
reference_count_to_status(vdo_refcount_t count)1250 static enum reference_status __must_check reference_count_to_status(vdo_refcount_t count)
1251 {
1252 if (count == EMPTY_REFERENCE_COUNT)
1253 return RS_FREE;
1254 else if (count == 1)
1255 return RS_SINGLE;
1256 else if (count == PROVISIONAL_REFERENCE_COUNT)
1257 return RS_PROVISIONAL;
1258 else
1259 return RS_SHARED;
1260 }
1261
1262 /**
1263 * dirty_block() - Mark a reference count block as dirty, potentially adding it to the dirty queue
1264 * if it wasn't already dirty.
1265 * @block: The reference block to mark as dirty.
1266 */
dirty_block(struct reference_block * block)1267 static void dirty_block(struct reference_block *block)
1268 {
1269 if (block->is_dirty)
1270 return;
1271
1272 block->is_dirty = true;
1273 if (!block->is_writing)
1274 vdo_waitq_enqueue_waiter(&block->slab->dirty_blocks, &block->waiter);
1275 }
1276
1277 /**
1278 * get_reference_block() - Get the reference block that covers the given block index.
1279 * @slab: The slab containing the references.
1280 * @index: The index of the physical block.
1281 */
get_reference_block(struct vdo_slab * slab,slab_block_number index)1282 static struct reference_block * __must_check get_reference_block(struct vdo_slab *slab,
1283 slab_block_number index)
1284 {
1285 return &slab->reference_blocks[index / COUNTS_PER_BLOCK];
1286 }
1287
1288 /**
1289 * slab_block_number_from_pbn() - Determine the index within the slab of a particular physical
1290 * block number.
1291 * @slab: The slab.
1292 * @pbn: The physical block number.
1293 * @slab_block_number_ptr: A pointer to the slab block number.
1294 *
1295 * Return: VDO_SUCCESS or an error code.
1296 */
slab_block_number_from_pbn(struct vdo_slab * slab,physical_block_number_t pbn,slab_block_number * slab_block_number_ptr)1297 static int __must_check slab_block_number_from_pbn(struct vdo_slab *slab,
1298 physical_block_number_t pbn,
1299 slab_block_number *slab_block_number_ptr)
1300 {
1301 u64 slab_block_number;
1302
1303 if (pbn < slab->start)
1304 return VDO_OUT_OF_RANGE;
1305
1306 slab_block_number = pbn - slab->start;
1307 if (slab_block_number >= slab->allocator->depot->slab_config.data_blocks)
1308 return VDO_OUT_OF_RANGE;
1309
1310 *slab_block_number_ptr = slab_block_number;
1311 return VDO_SUCCESS;
1312 }
1313
1314 /**
1315 * get_reference_counter() - Get the reference counter that covers the given physical block number.
1316 * @slab: The slab to query.
1317 * @pbn: The physical block number.
1318 * @counter_ptr: A pointer to the reference counter.
1319 */
get_reference_counter(struct vdo_slab * slab,physical_block_number_t pbn,vdo_refcount_t ** counter_ptr)1320 static int __must_check get_reference_counter(struct vdo_slab *slab,
1321 physical_block_number_t pbn,
1322 vdo_refcount_t **counter_ptr)
1323 {
1324 slab_block_number index;
1325 int result = slab_block_number_from_pbn(slab, pbn, &index);
1326
1327 if (result != VDO_SUCCESS)
1328 return result;
1329
1330 *counter_ptr = &slab->counters[index];
1331
1332 return VDO_SUCCESS;
1333 }
1334
calculate_slab_priority(struct vdo_slab * slab)1335 static unsigned int calculate_slab_priority(struct vdo_slab *slab)
1336 {
1337 block_count_t free_blocks = slab->free_blocks;
1338 unsigned int unopened_slab_priority = slab->allocator->unopened_slab_priority;
1339 unsigned int priority;
1340
1341 /*
1342 * Wholly full slabs must be the only ones with lowest priority, 0.
1343 *
1344 * Slabs that have never been opened (empty, newly initialized, and never been written to)
1345 * have lower priority than previously opened slabs that have a significant number of free
1346 * blocks. This ranking causes VDO to avoid writing physical blocks for the first time
1347 * unless there are very few free blocks that have been previously written to.
1348 *
1349 * Since VDO doesn't discard blocks currently, reusing previously written blocks makes VDO
1350 * a better client of any underlying storage that is thinly-provisioned (though discarding
1351 * would be better).
1352 *
1353 * For all other slabs, the priority is derived from the logarithm of the number of free
1354 * blocks. Slabs with the same order of magnitude of free blocks have the same priority.
1355 * With 2^23 blocks, the priority will range from 1 to 25. The reserved
1356 * unopened_slab_priority divides the range and is skipped by the logarithmic mapping.
1357 */
1358
1359 if (free_blocks == 0)
1360 return 0;
1361
1362 if (is_slab_journal_blank(slab))
1363 return unopened_slab_priority;
1364
1365 priority = (1 + ilog2(free_blocks));
1366 return ((priority < unopened_slab_priority) ? priority : priority + 1);
1367 }
1368
1369 /*
1370 * Slabs are essentially prioritized by an approximation of the number of free blocks in the slab
1371 * so slabs with lots of free blocks will be opened for allocation before slabs that have few free
1372 * blocks.
1373 */
prioritize_slab(struct vdo_slab * slab)1374 static void prioritize_slab(struct vdo_slab *slab)
1375 {
1376 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
1377 "a slab must not already be on a list when prioritizing");
1378 slab->priority = calculate_slab_priority(slab);
1379 vdo_priority_table_enqueue(slab->allocator->prioritized_slabs,
1380 slab->priority, &slab->allocq_entry);
1381 }
1382
1383 /**
1384 * adjust_free_block_count() - Adjust the free block count and (if needed) reprioritize the slab.
1385 * @slab: The slab.
1386 * @incremented: True if the free block count went up.
1387 */
adjust_free_block_count(struct vdo_slab * slab,bool incremented)1388 static void adjust_free_block_count(struct vdo_slab *slab, bool incremented)
1389 {
1390 struct block_allocator *allocator = slab->allocator;
1391
1392 WRITE_ONCE(allocator->allocated_blocks,
1393 allocator->allocated_blocks + (incremented ? -1 : 1));
1394
1395 /* The open slab doesn't need to be reprioritized until it is closed. */
1396 if (slab == allocator->open_slab)
1397 return;
1398
1399 /* Don't bother adjusting the priority table if unneeded. */
1400 if (slab->priority == calculate_slab_priority(slab))
1401 return;
1402
1403 /*
1404 * Reprioritize the slab to reflect the new free block count by removing it from the table
1405 * and re-enqueuing it with the new priority.
1406 */
1407 vdo_priority_table_remove(allocator->prioritized_slabs, &slab->allocq_entry);
1408 prioritize_slab(slab);
1409 }
1410
1411 /**
1412 * increment_for_data() - Increment the reference count for a data block.
1413 * @slab: The slab which owns the block.
1414 * @block: The reference block which contains the block being updated.
1415 * @block_number: The block to update.
1416 * @old_status: The reference status of the data block before this increment.
1417 * @lock: The pbn_lock associated with this increment (may be NULL).
1418 * @counter_ptr: A pointer to the count for the data block (in, out).
1419 * @adjust_block_count: Whether to update the allocator's free block count.
1420 *
1421 * Return: VDO_SUCCESS or an error.
1422 */
increment_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,vdo_refcount_t * counter_ptr,bool adjust_block_count)1423 static int increment_for_data(struct vdo_slab *slab, struct reference_block *block,
1424 slab_block_number block_number,
1425 enum reference_status old_status,
1426 struct pbn_lock *lock, vdo_refcount_t *counter_ptr,
1427 bool adjust_block_count)
1428 {
1429 switch (old_status) {
1430 case RS_FREE:
1431 *counter_ptr = 1;
1432 block->allocated_count++;
1433 slab->free_blocks--;
1434 if (adjust_block_count)
1435 adjust_free_block_count(slab, false);
1436
1437 break;
1438
1439 case RS_PROVISIONAL:
1440 *counter_ptr = 1;
1441 break;
1442
1443 default:
1444 /* Single or shared */
1445 if (*counter_ptr >= MAXIMUM_REFERENCE_COUNT) {
1446 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1447 "Incrementing a block already having 254 references (slab %u, offset %u)",
1448 slab->slab_number, block_number);
1449 }
1450 (*counter_ptr)++;
1451 }
1452
1453 if (lock != NULL)
1454 vdo_unassign_pbn_lock_provisional_reference(lock);
1455 return VDO_SUCCESS;
1456 }
1457
1458 /**
1459 * decrement_for_data() - Decrement the reference count for a data block.
1460 * @slab: The slab which owns the block.
1461 * @block: The reference block which contains the block being updated.
1462 * @block_number: The block to update.
1463 * @old_status: The reference status of the data block before this decrement.
1464 * @updater: The reference updater doing this operation in case we need to look up the pbn lock.
1465 * @counter_ptr: A pointer to the count for the data block (in, out).
1466 * @adjust_block_count: Whether to update the allocator's free block count.
1467 *
1468 * Return: VDO_SUCCESS or an error.
1469 */
decrement_for_data(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct reference_updater * updater,vdo_refcount_t * counter_ptr,bool adjust_block_count)1470 static int decrement_for_data(struct vdo_slab *slab, struct reference_block *block,
1471 slab_block_number block_number,
1472 enum reference_status old_status,
1473 struct reference_updater *updater,
1474 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1475 {
1476 switch (old_status) {
1477 case RS_FREE:
1478 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1479 "Decrementing free block at offset %u in slab %u",
1480 block_number, slab->slab_number);
1481
1482 case RS_PROVISIONAL:
1483 case RS_SINGLE:
1484 if (updater->zpbn.zone != NULL) {
1485 struct pbn_lock *lock = vdo_get_physical_zone_pbn_lock(updater->zpbn.zone,
1486 updater->zpbn.pbn);
1487
1488 if (lock != NULL) {
1489 /*
1490 * There is a read lock on this block, so the block must not become
1491 * unreferenced.
1492 */
1493 *counter_ptr = PROVISIONAL_REFERENCE_COUNT;
1494 vdo_assign_pbn_lock_provisional_reference(lock);
1495 break;
1496 }
1497 }
1498
1499 *counter_ptr = EMPTY_REFERENCE_COUNT;
1500 block->allocated_count--;
1501 slab->free_blocks++;
1502 if (adjust_block_count)
1503 adjust_free_block_count(slab, true);
1504
1505 break;
1506
1507 default:
1508 /* Shared */
1509 (*counter_ptr)--;
1510 }
1511
1512 return VDO_SUCCESS;
1513 }
1514
1515 /**
1516 * increment_for_block_map() - Increment the reference count for a block map page.
1517 * @slab: The slab which owns the block.
1518 * @block: The reference block which contains the block being updated.
1519 * @block_number: The block to update.
1520 * @old_status: The reference status of the block before this increment.
1521 * @lock: The pbn_lock associated with this increment (may be NULL).
1522 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1523 * @counter_ptr: A pointer to the count for the block (in, out).
1524 * @adjust_block_count: Whether to update the allocator's free block count.
1525 *
1526 * All block map increments should be from provisional to MAXIMUM_REFERENCE_COUNT. Since block map
1527 * blocks never dedupe they should never be adjusted from any other state. The adjustment always
1528 * results in MAXIMUM_REFERENCE_COUNT as this value is used to prevent dedupe against block map
1529 * blocks.
1530 *
1531 * Return: VDO_SUCCESS or an error.
1532 */
increment_for_block_map(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,enum reference_status old_status,struct pbn_lock * lock,bool normal_operation,vdo_refcount_t * counter_ptr,bool adjust_block_count)1533 static int increment_for_block_map(struct vdo_slab *slab, struct reference_block *block,
1534 slab_block_number block_number,
1535 enum reference_status old_status,
1536 struct pbn_lock *lock, bool normal_operation,
1537 vdo_refcount_t *counter_ptr, bool adjust_block_count)
1538 {
1539 switch (old_status) {
1540 case RS_FREE:
1541 if (normal_operation) {
1542 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1543 "Incrementing unallocated block map block (slab %u, offset %u)",
1544 slab->slab_number, block_number);
1545 }
1546
1547 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1548 block->allocated_count++;
1549 slab->free_blocks--;
1550 if (adjust_block_count)
1551 adjust_free_block_count(slab, false);
1552
1553 return VDO_SUCCESS;
1554
1555 case RS_PROVISIONAL:
1556 if (!normal_operation)
1557 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1558 "Block map block had provisional reference during replay (slab %u, offset %u)",
1559 slab->slab_number, block_number);
1560
1561 *counter_ptr = MAXIMUM_REFERENCE_COUNT;
1562 if (lock != NULL)
1563 vdo_unassign_pbn_lock_provisional_reference(lock);
1564 return VDO_SUCCESS;
1565
1566 default:
1567 return vdo_log_error_strerror(VDO_REF_COUNT_INVALID,
1568 "Incrementing a block map block which is already referenced %u times (slab %u, offset %u)",
1569 *counter_ptr, slab->slab_number,
1570 block_number);
1571 }
1572 }
1573
is_valid_journal_point(const struct journal_point * point)1574 static bool __must_check is_valid_journal_point(const struct journal_point *point)
1575 {
1576 return ((point != NULL) && (point->sequence_number > 0));
1577 }
1578
1579 /**
1580 * update_reference_count() - Update the reference count of a block.
1581 * @slab: The slab which owns the block.
1582 * @block: The reference block which contains the block being updated.
1583 * @block_number: The block to update.
1584 * @slab_journal_point: The slab journal point at which this update is journaled.
1585 * @updater: The reference updater.
1586 * @normal_operation: Whether we are in normal operation vs. recovery or rebuild.
1587 * @adjust_block_count: Whether to update the slab's free block count.
1588 * @provisional_decrement_ptr: A pointer which will be set to true if this update was a decrement
1589 * of a provisional reference.
1590 *
1591 * Return: VDO_SUCCESS or an error.
1592 */
update_reference_count(struct vdo_slab * slab,struct reference_block * block,slab_block_number block_number,const struct journal_point * slab_journal_point,struct reference_updater * updater,bool normal_operation,bool adjust_block_count,bool * provisional_decrement_ptr)1593 static int update_reference_count(struct vdo_slab *slab, struct reference_block *block,
1594 slab_block_number block_number,
1595 const struct journal_point *slab_journal_point,
1596 struct reference_updater *updater,
1597 bool normal_operation, bool adjust_block_count,
1598 bool *provisional_decrement_ptr)
1599 {
1600 vdo_refcount_t *counter_ptr = &slab->counters[block_number];
1601 enum reference_status old_status = reference_count_to_status(*counter_ptr);
1602 int result;
1603
1604 if (!updater->increment) {
1605 result = decrement_for_data(slab, block, block_number, old_status,
1606 updater, counter_ptr, adjust_block_count);
1607 if ((result == VDO_SUCCESS) && (old_status == RS_PROVISIONAL)) {
1608 if (provisional_decrement_ptr != NULL)
1609 *provisional_decrement_ptr = true;
1610 return VDO_SUCCESS;
1611 }
1612 } else if (updater->operation == VDO_JOURNAL_DATA_REMAPPING) {
1613 result = increment_for_data(slab, block, block_number, old_status,
1614 updater->lock, counter_ptr, adjust_block_count);
1615 } else {
1616 result = increment_for_block_map(slab, block, block_number, old_status,
1617 updater->lock, normal_operation,
1618 counter_ptr, adjust_block_count);
1619 }
1620
1621 if (result != VDO_SUCCESS)
1622 return result;
1623
1624 if (is_valid_journal_point(slab_journal_point))
1625 slab->slab_journal_point = *slab_journal_point;
1626
1627 return VDO_SUCCESS;
1628 }
1629
adjust_reference_count(struct vdo_slab * slab,struct reference_updater * updater,const struct journal_point * slab_journal_point)1630 static int __must_check adjust_reference_count(struct vdo_slab *slab,
1631 struct reference_updater *updater,
1632 const struct journal_point *slab_journal_point)
1633 {
1634 slab_block_number block_number;
1635 int result;
1636 struct reference_block *block;
1637 bool provisional_decrement = false;
1638
1639 if (!is_slab_open(slab))
1640 return VDO_INVALID_ADMIN_STATE;
1641
1642 result = slab_block_number_from_pbn(slab, updater->zpbn.pbn, &block_number);
1643 if (result != VDO_SUCCESS)
1644 return result;
1645
1646 block = get_reference_block(slab, block_number);
1647 result = update_reference_count(slab, block, block_number, slab_journal_point,
1648 updater, NORMAL_OPERATION, true,
1649 &provisional_decrement);
1650 if ((result != VDO_SUCCESS) || provisional_decrement)
1651 return result;
1652
1653 if (block->is_dirty && (block->slab_journal_lock > 0)) {
1654 sequence_number_t entry_lock = slab_journal_point->sequence_number;
1655 /*
1656 * This block is already dirty and a slab journal entry has been made for it since
1657 * the last time it was clean. We must release the per-entry slab journal lock for
1658 * the entry associated with the update we are now doing.
1659 */
1660 result = VDO_ASSERT(is_valid_journal_point(slab_journal_point),
1661 "Reference count adjustments need slab journal points.");
1662 if (result != VDO_SUCCESS)
1663 return result;
1664
1665 adjust_slab_journal_block_reference(&slab->journal, entry_lock, -1);
1666 return VDO_SUCCESS;
1667 }
1668
1669 /*
1670 * This may be the first time we are applying an update for which there is a slab journal
1671 * entry to this block since the block was cleaned. Therefore, we convert the per-entry
1672 * slab journal lock to an uncommitted reference block lock, if there is a per-entry lock.
1673 */
1674 if (is_valid_journal_point(slab_journal_point))
1675 block->slab_journal_lock = slab_journal_point->sequence_number;
1676 else
1677 block->slab_journal_lock = 0;
1678
1679 dirty_block(block);
1680 return VDO_SUCCESS;
1681 }
1682
1683 /**
1684 * add_entry_from_waiter() - Add an entry to the slab journal.
1685 * @waiter: The vio which should make an entry now.
1686 * @context: The slab journal to make an entry in.
1687 *
1688 * This callback is invoked by add_entries() once it has determined that we are ready to make
1689 * another entry in the slab journal. Implements waiter_callback_fn.
1690 */
add_entry_from_waiter(struct vdo_waiter * waiter,void * context)1691 static void add_entry_from_waiter(struct vdo_waiter *waiter, void *context)
1692 {
1693 int result;
1694 struct reference_updater *updater =
1695 container_of(waiter, struct reference_updater, waiter);
1696 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
1697 struct slab_journal *journal = context;
1698 struct slab_journal_block_header *header = &journal->tail_header;
1699 struct journal_point slab_journal_point = {
1700 .sequence_number = header->sequence_number,
1701 .entry_count = header->entry_count,
1702 };
1703 sequence_number_t recovery_block = data_vio->recovery_journal_point.sequence_number;
1704
1705 if (header->entry_count == 0) {
1706 /*
1707 * This is the first entry in the current tail block, so get a lock on the recovery
1708 * journal which we will hold until this tail block is committed.
1709 */
1710 get_lock(journal, header->sequence_number)->recovery_start = recovery_block;
1711 if (journal->recovery_journal != NULL) {
1712 zone_count_t zone_number = journal->slab->allocator->zone_number;
1713
1714 vdo_acquire_recovery_journal_block_reference(journal->recovery_journal,
1715 recovery_block,
1716 VDO_ZONE_TYPE_PHYSICAL,
1717 zone_number);
1718 }
1719
1720 mark_slab_journal_dirty(journal, recovery_block);
1721 reclaim_journal_space(journal);
1722 }
1723
1724 add_entry(journal, updater->zpbn.pbn, updater->operation, updater->increment,
1725 expand_journal_point(data_vio->recovery_journal_point,
1726 updater->increment));
1727
1728 if (journal->slab->status != VDO_SLAB_REBUILT) {
1729 /*
1730 * If the slab is unrecovered, scrubbing will take care of the count since the
1731 * update is now recorded in the journal.
1732 */
1733 adjust_slab_journal_block_reference(journal,
1734 slab_journal_point.sequence_number, -1);
1735 result = VDO_SUCCESS;
1736 } else {
1737 /* Now that an entry has been made in the slab journal, update the counter. */
1738 result = adjust_reference_count(journal->slab, updater,
1739 &slab_journal_point);
1740 }
1741
1742 if (updater->increment)
1743 continue_data_vio_with_error(data_vio, result);
1744 else
1745 vdo_continue_completion(&data_vio->decrement_completion, result);
1746 }
1747
1748 /**
1749 * is_next_entry_a_block_map_increment() - Check whether the next entry to be made is a block map
1750 * increment.
1751 * @journal: The journal.
1752 *
1753 * Return: true if the first entry waiter's operation is a block map increment.
1754 */
is_next_entry_a_block_map_increment(struct slab_journal * journal)1755 static inline bool is_next_entry_a_block_map_increment(struct slab_journal *journal)
1756 {
1757 struct vdo_waiter *waiter = vdo_waitq_get_first_waiter(&journal->entry_waiters);
1758 struct reference_updater *updater =
1759 container_of(waiter, struct reference_updater, waiter);
1760
1761 return (updater->operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING);
1762 }
1763
1764 /**
1765 * add_entries() - Add as many entries as possible from the queue of vios waiting to make entries.
1766 * @journal: The journal to which entries may be added.
1767 *
1768 * By processing the queue in order, we ensure that slab journal entries are made in the same order
1769 * as recovery journal entries for the same increment or decrement.
1770 */
add_entries(struct slab_journal * journal)1771 static void add_entries(struct slab_journal *journal)
1772 {
1773 if (journal->adding_entries) {
1774 /* Protect against re-entrancy. */
1775 return;
1776 }
1777
1778 journal->adding_entries = true;
1779 while (vdo_waitq_has_waiters(&journal->entry_waiters)) {
1780 struct slab_journal_block_header *header = &journal->tail_header;
1781
1782 if (journal->partial_write_in_progress ||
1783 (journal->slab->status == VDO_SLAB_REBUILDING)) {
1784 /*
1785 * Don't add entries while rebuilding or while a partial write is
1786 * outstanding, as it could result in reference count corruption.
1787 */
1788 break;
1789 }
1790
1791 if (journal->waiting_to_commit) {
1792 /*
1793 * If we are waiting for resources to write the tail block, and the tail
1794 * block is full, we can't make another entry.
1795 */
1796 WRITE_ONCE(journal->events->tail_busy_count,
1797 journal->events->tail_busy_count + 1);
1798 break;
1799 } else if (is_next_entry_a_block_map_increment(journal) &&
1800 (header->entry_count >= journal->full_entries_per_block)) {
1801 /*
1802 * The tail block does not have room for a block map increment, so commit
1803 * it now.
1804 */
1805 commit_tail(journal);
1806 if (journal->waiting_to_commit) {
1807 WRITE_ONCE(journal->events->tail_busy_count,
1808 journal->events->tail_busy_count + 1);
1809 break;
1810 }
1811 }
1812
1813 /* If the slab is over the blocking threshold, make the vio wait. */
1814 if (requires_reaping(journal)) {
1815 WRITE_ONCE(journal->events->blocked_count,
1816 journal->events->blocked_count + 1);
1817 save_dirty_reference_blocks(journal->slab);
1818 break;
1819 }
1820
1821 if (header->entry_count == 0) {
1822 struct journal_lock *lock =
1823 get_lock(journal, header->sequence_number);
1824
1825 /*
1826 * Check if the on disk slab journal is full. Because of the blocking and
1827 * scrubbing thresholds, this should never happen.
1828 */
1829 if (lock->count > 0) {
1830 VDO_ASSERT_LOG_ONLY((journal->head + journal->size) == journal->tail,
1831 "New block has locks, but journal is not full");
1832
1833 /*
1834 * The blocking threshold must let the journal fill up if the new
1835 * block has locks; if the blocking threshold is smaller than the
1836 * journal size, the new block cannot possibly have locks already.
1837 */
1838 VDO_ASSERT_LOG_ONLY((journal->blocking_threshold >= journal->size),
1839 "New block can have locks already iff blocking threshold is at the end of the journal");
1840
1841 WRITE_ONCE(journal->events->disk_full_count,
1842 journal->events->disk_full_count + 1);
1843 save_dirty_reference_blocks(journal->slab);
1844 break;
1845 }
1846
1847 /*
1848 * Don't allow the new block to be reaped until all of the reference count
1849 * blocks are written and the journal block has been fully committed as
1850 * well.
1851 */
1852 lock->count = journal->entries_per_block + 1;
1853
1854 if (header->sequence_number == 1) {
1855 struct vdo_slab *slab = journal->slab;
1856 block_count_t i;
1857
1858 /*
1859 * This is the first entry in this slab journal, ever. Dirty all of
1860 * the reference count blocks. Each will acquire a lock on the tail
1861 * block so that the journal won't be reaped until the reference
1862 * counts are initialized. The lock acquisition must be done by the
1863 * ref_counts since here we don't know how many reference blocks
1864 * the ref_counts has.
1865 */
1866 for (i = 0; i < slab->reference_block_count; i++) {
1867 slab->reference_blocks[i].slab_journal_lock = 1;
1868 dirty_block(&slab->reference_blocks[i]);
1869 }
1870
1871 adjust_slab_journal_block_reference(journal, 1,
1872 slab->reference_block_count);
1873 }
1874 }
1875
1876 vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1877 add_entry_from_waiter, journal);
1878 }
1879
1880 journal->adding_entries = false;
1881
1882 /* If there are no waiters, and we are flushing or saving, commit the tail block. */
1883 if (vdo_is_state_draining(&journal->slab->state) &&
1884 !vdo_is_state_suspending(&journal->slab->state) &&
1885 !vdo_waitq_has_waiters(&journal->entry_waiters))
1886 commit_tail(journal);
1887 }
1888
1889 /**
1890 * reset_search_cursor() - Reset the free block search back to the first reference counter in the
1891 * first reference block of a slab.
1892 * @slab: The slab.
1893 */
reset_search_cursor(struct vdo_slab * slab)1894 static void reset_search_cursor(struct vdo_slab *slab)
1895 {
1896 struct search_cursor *cursor = &slab->search_cursor;
1897
1898 cursor->block = cursor->first_block;
1899 cursor->index = 0;
1900 cursor->end_index = min_t(u32, COUNTS_PER_BLOCK, slab->block_count);
1901 }
1902
1903 /**
1904 * advance_search_cursor() - Advance the search cursor to the start of the next reference block in
1905 * a slab.
1906 * @slab: The slab.
1907 *
1908 * Wraps around to the first reference block if the current block is the last reference block.
1909 *
1910 * Return: True unless the cursor was at the last reference block.
1911 */
advance_search_cursor(struct vdo_slab * slab)1912 static bool advance_search_cursor(struct vdo_slab *slab)
1913 {
1914 struct search_cursor *cursor = &slab->search_cursor;
1915
1916 /*
1917 * If we just finished searching the last reference block, then wrap back around to the
1918 * start of the array.
1919 */
1920 if (cursor->block == cursor->last_block) {
1921 reset_search_cursor(slab);
1922 return false;
1923 }
1924
1925 /* We're not already at the end, so advance to cursor to the next block. */
1926 cursor->block++;
1927 cursor->index = cursor->end_index;
1928
1929 if (cursor->block == cursor->last_block) {
1930 /* The last reference block will usually be a runt. */
1931 cursor->end_index = slab->block_count;
1932 } else {
1933 cursor->end_index += COUNTS_PER_BLOCK;
1934 }
1935
1936 return true;
1937 }
1938
1939 /**
1940 * vdo_adjust_reference_count_for_rebuild() - Adjust the reference count of a block during rebuild.
1941 * @depot: The slab depot.
1942 * @pbn: The physical block number to adjust.
1943 * @operation: The type opf operation.
1944 *
1945 * Return: VDO_SUCCESS or an error.
1946 */
vdo_adjust_reference_count_for_rebuild(struct slab_depot * depot,physical_block_number_t pbn,enum journal_operation operation)1947 int vdo_adjust_reference_count_for_rebuild(struct slab_depot *depot,
1948 physical_block_number_t pbn,
1949 enum journal_operation operation)
1950 {
1951 int result;
1952 slab_block_number block_number;
1953 struct reference_block *block;
1954 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
1955 struct reference_updater updater = {
1956 .operation = operation,
1957 .increment = true,
1958 };
1959
1960 result = slab_block_number_from_pbn(slab, pbn, &block_number);
1961 if (result != VDO_SUCCESS)
1962 return result;
1963
1964 block = get_reference_block(slab, block_number);
1965 result = update_reference_count(slab, block, block_number, NULL,
1966 &updater, !NORMAL_OPERATION, false, NULL);
1967 if (result != VDO_SUCCESS)
1968 return result;
1969
1970 dirty_block(block);
1971 return VDO_SUCCESS;
1972 }
1973
1974 /**
1975 * replay_reference_count_change() - Replay the reference count adjustment from a slab journal
1976 * entry into the reference count for a block.
1977 * @slab: The slab.
1978 * @entry_point: The slab journal point for the entry.
1979 * @entry: The slab journal entry being replayed.
1980 *
1981 * The adjustment will be ignored if it was already recorded in the reference count.
1982 *
1983 * Return: VDO_SUCCESS or an error code.
1984 */
replay_reference_count_change(struct vdo_slab * slab,const struct journal_point * entry_point,struct slab_journal_entry entry)1985 static int replay_reference_count_change(struct vdo_slab *slab,
1986 const struct journal_point *entry_point,
1987 struct slab_journal_entry entry)
1988 {
1989 int result;
1990 struct reference_block *block = get_reference_block(slab, entry.sbn);
1991 sector_count_t sector = (entry.sbn % COUNTS_PER_BLOCK) / COUNTS_PER_SECTOR;
1992 struct reference_updater updater = {
1993 .operation = entry.operation,
1994 .increment = entry.increment,
1995 };
1996
1997 if (!vdo_before_journal_point(&block->commit_points[sector], entry_point)) {
1998 /* This entry is already reflected in the existing counts, so do nothing. */
1999 return VDO_SUCCESS;
2000 }
2001
2002 /* This entry is not yet counted in the reference counts. */
2003 result = update_reference_count(slab, block, entry.sbn, entry_point,
2004 &updater, !NORMAL_OPERATION, false, NULL);
2005 if (result != VDO_SUCCESS)
2006 return result;
2007
2008 dirty_block(block);
2009 return VDO_SUCCESS;
2010 }
2011
2012 /**
2013 * find_zero_byte_in_word() - Find the array index of the first zero byte in word-sized range of
2014 * reference counters.
2015 * @word_ptr: A pointer to the eight counter bytes to check.
2016 * @start_index: The array index corresponding to word_ptr[0].
2017 * @fail_index: The array index to return if no zero byte is found.
2018 *
2019 * The search does no bounds checking; the function relies on the array being sufficiently padded.
2020 *
2021 * Return: The array index of the first zero byte in the word, or the value passed as fail_index if
2022 * no zero byte was found.
2023 */
find_zero_byte_in_word(const u8 * word_ptr,slab_block_number start_index,slab_block_number fail_index)2024 static inline slab_block_number find_zero_byte_in_word(const u8 *word_ptr,
2025 slab_block_number start_index,
2026 slab_block_number fail_index)
2027 {
2028 u64 word = get_unaligned_le64(word_ptr);
2029
2030 /* This looks like a loop, but GCC will unroll the eight iterations for us. */
2031 unsigned int offset;
2032
2033 for (offset = 0; offset < BYTES_PER_WORD; offset++) {
2034 /* Assumes little-endian byte order, which we have on X86. */
2035 if ((word & 0xFF) == 0)
2036 return (start_index + offset);
2037 word >>= 8;
2038 }
2039
2040 return fail_index;
2041 }
2042
2043 /**
2044 * find_free_block() - Find the first block with a reference count of zero in the specified
2045 * range of reference counter indexes.
2046 * @slab: The slab counters to scan.
2047 * @index_ptr: A pointer to hold the array index of the free block.
2048 *
2049 * Return: True if a free block was found in the specified range.
2050 */
find_free_block(const struct vdo_slab * slab,slab_block_number * index_ptr)2051 static bool find_free_block(const struct vdo_slab *slab, slab_block_number *index_ptr)
2052 {
2053 slab_block_number zero_index;
2054 slab_block_number next_index = slab->search_cursor.index;
2055 slab_block_number end_index = slab->search_cursor.end_index;
2056 u8 *next_counter = &slab->counters[next_index];
2057 u8 *end_counter = &slab->counters[end_index];
2058
2059 /*
2060 * Search every byte of the first unaligned word. (Array is padded so reading past end is
2061 * safe.)
2062 */
2063 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2064 if (zero_index < end_index) {
2065 *index_ptr = zero_index;
2066 return true;
2067 }
2068
2069 /*
2070 * On architectures where unaligned word access is expensive, this would be a good place to
2071 * advance to an alignment boundary.
2072 */
2073 next_index += BYTES_PER_WORD;
2074 next_counter += BYTES_PER_WORD;
2075
2076 /*
2077 * Now we're word-aligned; check an word at a time until we find a word containing a zero.
2078 * (Array is padded so reading past end is safe.)
2079 */
2080 while (next_counter < end_counter) {
2081 /*
2082 * The following code is currently an exact copy of the code preceding the loop,
2083 * but if you try to merge them by using a do loop, it runs slower because a jump
2084 * instruction gets added at the start of the iteration.
2085 */
2086 zero_index = find_zero_byte_in_word(next_counter, next_index, end_index);
2087 if (zero_index < end_index) {
2088 *index_ptr = zero_index;
2089 return true;
2090 }
2091
2092 next_index += BYTES_PER_WORD;
2093 next_counter += BYTES_PER_WORD;
2094 }
2095
2096 return false;
2097 }
2098
2099 /**
2100 * search_current_reference_block() - Search the reference block currently saved in the search
2101 * cursor for a reference count of zero, starting at the saved
2102 * counter index.
2103 * @slab: The slab to search.
2104 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2105 *
2106 * Return: True if an unreferenced counter was found.
2107 */
search_current_reference_block(const struct vdo_slab * slab,slab_block_number * free_index_ptr)2108 static bool search_current_reference_block(const struct vdo_slab *slab,
2109 slab_block_number *free_index_ptr)
2110 {
2111 /* Don't bother searching if the current block is known to be full. */
2112 return ((slab->search_cursor.block->allocated_count < COUNTS_PER_BLOCK) &&
2113 find_free_block(slab, free_index_ptr));
2114 }
2115
2116 /**
2117 * search_reference_blocks() - Search each reference block for a reference count of zero.
2118 * @slab: The slab to search.
2119 * @free_index_ptr: A pointer to receive the array index of the zero reference count.
2120 *
2121 * Searches each reference block for a reference count of zero, starting at the reference block and
2122 * counter index saved in the search cursor and searching up to the end of the last reference
2123 * block. The search does not wrap.
2124 *
2125 * Return: True if an unreferenced counter was found.
2126 */
search_reference_blocks(struct vdo_slab * slab,slab_block_number * free_index_ptr)2127 static bool search_reference_blocks(struct vdo_slab *slab,
2128 slab_block_number *free_index_ptr)
2129 {
2130 /* Start searching at the saved search position in the current block. */
2131 if (search_current_reference_block(slab, free_index_ptr))
2132 return true;
2133
2134 /* Search each reference block up to the end of the slab. */
2135 while (advance_search_cursor(slab)) {
2136 if (search_current_reference_block(slab, free_index_ptr))
2137 return true;
2138 }
2139
2140 return false;
2141 }
2142
2143 /**
2144 * make_provisional_reference() - Do the bookkeeping for making a provisional reference.
2145 * @slab: The slab.
2146 * @block_number: The index for the physical block to reference.
2147 */
make_provisional_reference(struct vdo_slab * slab,slab_block_number block_number)2148 static void make_provisional_reference(struct vdo_slab *slab,
2149 slab_block_number block_number)
2150 {
2151 struct reference_block *block = get_reference_block(slab, block_number);
2152
2153 /*
2154 * Make the initial transition from an unreferenced block to a
2155 * provisionally allocated block.
2156 */
2157 slab->counters[block_number] = PROVISIONAL_REFERENCE_COUNT;
2158
2159 /* Account for the allocation. */
2160 block->allocated_count++;
2161 slab->free_blocks--;
2162 }
2163
2164 /**
2165 * dirty_all_reference_blocks() - Mark all reference count blocks in a slab as dirty.
2166 * @slab: The slab.
2167 */
dirty_all_reference_blocks(struct vdo_slab * slab)2168 static void dirty_all_reference_blocks(struct vdo_slab *slab)
2169 {
2170 block_count_t i;
2171
2172 for (i = 0; i < slab->reference_block_count; i++)
2173 dirty_block(&slab->reference_blocks[i]);
2174 }
2175
journal_points_equal(struct journal_point first,struct journal_point second)2176 static inline bool journal_points_equal(struct journal_point first,
2177 struct journal_point second)
2178 {
2179 return ((first.sequence_number == second.sequence_number) &&
2180 (first.entry_count == second.entry_count));
2181 }
2182
2183 /**
2184 * match_bytes() - Check an 8-byte word for bytes matching the value specified
2185 * @input: A word to examine the bytes of.
2186 * @match: The byte value sought.
2187 *
2188 * Return: 1 in each byte when the corresponding input byte matched, 0 otherwise.
2189 */
match_bytes(u64 input,u8 match)2190 static inline u64 match_bytes(u64 input, u8 match)
2191 {
2192 u64 temp = input ^ (match * 0x0101010101010101ULL);
2193 /* top bit of each byte is set iff top bit of temp byte is clear; rest are 0 */
2194 u64 test_top_bits = ~temp & 0x8080808080808080ULL;
2195 /* top bit of each byte is set iff low 7 bits of temp byte are clear; rest are useless */
2196 u64 test_low_bits = 0x8080808080808080ULL - (temp & 0x7f7f7f7f7f7f7f7fULL);
2197 /* return 1 when both tests indicate temp byte is 0 */
2198 return (test_top_bits & test_low_bits) >> 7;
2199 }
2200
2201 /**
2202 * count_valid_references() - Process a newly loaded refcount array
2203 * @counters: The array of counters from a metadata block.
2204 *
2205 * Scan an 8-byte-aligned array of counters, fixing up any provisional values that
2206 * weren't cleaned up at shutdown, changing them internally to zero.
2207 *
2208 * Return: The number of blocks with a non-zero reference count.
2209 */
count_valid_references(vdo_refcount_t * counters)2210 static unsigned int count_valid_references(vdo_refcount_t *counters)
2211 {
2212 u64 *words = (u64 *)counters;
2213 /* It's easier to count occurrences of a specific byte than its absences. */
2214 unsigned int empty_count = 0;
2215 /* For speed, we process 8 bytes at once. */
2216 unsigned int words_left = COUNTS_PER_BLOCK / sizeof(u64);
2217
2218 /*
2219 * Sanity check assumptions used for optimizing this code: Counters are bytes. The counter
2220 * array is a multiple of the word size.
2221 */
2222 BUILD_BUG_ON(sizeof(vdo_refcount_t) != 1);
2223 BUILD_BUG_ON((COUNTS_PER_BLOCK % sizeof(u64)) != 0);
2224
2225 while (words_left > 0) {
2226 /*
2227 * This is used effectively as 8 byte-size counters. Byte 0 counts how many words
2228 * had the target value found in byte 0, etc. We just have to avoid overflow.
2229 */
2230 u64 split_count = 0;
2231 /*
2232 * The counter "% 255" trick used below to fold split_count into empty_count
2233 * imposes a limit of 254 bytes examined each iteration of the outer loop. We
2234 * process a word at a time, so that limit gets rounded down to 31 u64 words.
2235 */
2236 const unsigned int max_words_per_iteration = 254 / sizeof(u64);
2237 unsigned int iter_words_left = min_t(unsigned int, words_left,
2238 max_words_per_iteration);
2239
2240 words_left -= iter_words_left;
2241
2242 while (iter_words_left--) {
2243 u64 word = *words;
2244 u64 temp;
2245
2246 /* First, if we have any provisional refcount values, clear them. */
2247 temp = match_bytes(word, PROVISIONAL_REFERENCE_COUNT);
2248 if (temp) {
2249 /*
2250 * 'temp' has 0x01 bytes where 'word' has PROVISIONAL; this xor
2251 * will alter just those bytes, changing PROVISIONAL to EMPTY.
2252 */
2253 word ^= temp * (PROVISIONAL_REFERENCE_COUNT ^ EMPTY_REFERENCE_COUNT);
2254 *words = word;
2255 }
2256
2257 /* Now count the EMPTY_REFERENCE_COUNT bytes, updating the 8 counters. */
2258 split_count += match_bytes(word, EMPTY_REFERENCE_COUNT);
2259 words++;
2260 }
2261 empty_count += split_count % 255;
2262 }
2263
2264 return COUNTS_PER_BLOCK - empty_count;
2265 }
2266
2267 /**
2268 * unpack_reference_block() - Unpack reference counts blocks into the internal memory structure.
2269 * @packed: The written reference block to be unpacked.
2270 * @block: The internal reference block to be loaded.
2271 */
unpack_reference_block(struct packed_reference_block * packed,struct reference_block * block)2272 static void unpack_reference_block(struct packed_reference_block *packed,
2273 struct reference_block *block)
2274 {
2275 sector_count_t i;
2276 struct vdo_slab *slab = block->slab;
2277 vdo_refcount_t *counters = get_reference_counters_for_block(block);
2278
2279 for (i = 0; i < VDO_SECTORS_PER_BLOCK; i++) {
2280 struct packed_reference_sector *sector = &packed->sectors[i];
2281
2282 vdo_unpack_journal_point(§or->commit_point, &block->commit_points[i]);
2283 memcpy(counters + (i * COUNTS_PER_SECTOR), sector->counts,
2284 (sizeof(vdo_refcount_t) * COUNTS_PER_SECTOR));
2285 /* The slab_journal_point must be the latest point found in any sector. */
2286 if (vdo_before_journal_point(&slab->slab_journal_point,
2287 &block->commit_points[i]))
2288 slab->slab_journal_point = block->commit_points[i];
2289
2290 if ((i > 0) &&
2291 !journal_points_equal(block->commit_points[0],
2292 block->commit_points[i])) {
2293 size_t block_index = block - block->slab->reference_blocks;
2294
2295 vdo_log_warning("Torn write detected in sector %u of reference block %zu of slab %u",
2296 i, block_index, block->slab->slab_number);
2297 }
2298 }
2299
2300 block->allocated_count = count_valid_references(counters);
2301 }
2302
2303 /**
2304 * finish_reference_block_load() - After a reference block has been read, unpack it.
2305 * @completion: The VIO that just finished reading.
2306 */
finish_reference_block_load(struct vdo_completion * completion)2307 static void finish_reference_block_load(struct vdo_completion *completion)
2308 {
2309 struct vio *vio = as_vio(completion);
2310 struct pooled_vio *pooled = vio_as_pooled_vio(vio);
2311 struct reference_block *block = completion->parent;
2312 struct vdo_slab *slab = block->slab;
2313 unsigned int block_count = vio->io_size / VDO_BLOCK_SIZE;
2314 unsigned int i;
2315 char *data = vio->data;
2316
2317 for (i = 0; i < block_count; i++, block++, data += VDO_BLOCK_SIZE) {
2318 struct packed_reference_block *packed = (struct packed_reference_block *) data;
2319
2320 unpack_reference_block(packed, block);
2321 slab->free_blocks -= block->allocated_count;
2322 }
2323 return_vio_to_pool(pooled);
2324 slab->active_count -= block_count;
2325
2326 check_if_slab_drained(slab);
2327 }
2328
load_reference_block_endio(struct bio * bio)2329 static void load_reference_block_endio(struct bio *bio)
2330 {
2331 struct vio *vio = bio->bi_private;
2332 struct reference_block *block = vio->completion.parent;
2333
2334 continue_vio_after_io(vio, finish_reference_block_load,
2335 block->slab->allocator->thread_id);
2336 }
2337
2338 /**
2339 * load_reference_block_group() - After a block waiter has gotten a VIO from the VIO pool, load
2340 * a set of blocks.
2341 * @waiter: The waiter of the first block to load.
2342 * @context: The VIO returned by the pool.
2343 */
load_reference_block_group(struct vdo_waiter * waiter,void * context)2344 static void load_reference_block_group(struct vdo_waiter *waiter, void *context)
2345 {
2346 struct pooled_vio *pooled = context;
2347 struct vio *vio = &pooled->vio;
2348 struct reference_block *block =
2349 container_of(waiter, struct reference_block, waiter);
2350 u32 block_offset = block - block->slab->reference_blocks;
2351 u32 max_block_count = block->slab->reference_block_count - block_offset;
2352 u32 block_count = min_t(int, vio->block_count, max_block_count);
2353
2354 vio->completion.parent = block;
2355 vdo_submit_metadata_vio_with_size(vio, block->slab->ref_counts_origin + block_offset,
2356 load_reference_block_endio, handle_io_error,
2357 REQ_OP_READ, block_count * VDO_BLOCK_SIZE);
2358 }
2359
2360 /**
2361 * load_reference_blocks() - Load a slab's reference blocks from the underlying storage into a
2362 * pre-allocated reference counter.
2363 * @slab: The slab.
2364 */
load_reference_blocks(struct vdo_slab * slab)2365 static void load_reference_blocks(struct vdo_slab *slab)
2366 {
2367 block_count_t i;
2368 u64 blocks_per_vio = slab->allocator->refcount_blocks_per_big_vio;
2369 struct vio_pool *pool = slab->allocator->refcount_big_vio_pool;
2370
2371 if (!pool) {
2372 pool = slab->allocator->vio_pool;
2373 blocks_per_vio = 1;
2374 }
2375
2376 slab->free_blocks = slab->block_count;
2377 slab->active_count = slab->reference_block_count;
2378 for (i = 0; i < slab->reference_block_count; i += blocks_per_vio) {
2379 struct vdo_waiter *waiter = &slab->reference_blocks[i].waiter;
2380
2381 waiter->callback = load_reference_block_group;
2382 acquire_vio_from_pool(pool, waiter);
2383 }
2384 }
2385
2386 /**
2387 * drain_slab() - Drain all reference count I/O.
2388 * @slab: The slab.
2389 *
2390 * Depending upon the type of drain being performed (as recorded in the ref_count's vdo_slab), the
2391 * reference blocks may be loaded from disk or dirty reference blocks may be written out.
2392 */
drain_slab(struct vdo_slab * slab)2393 static void drain_slab(struct vdo_slab *slab)
2394 {
2395 bool save;
2396 bool load;
2397 const struct admin_state_code *state = vdo_get_admin_state_code(&slab->state);
2398
2399 if (state == VDO_ADMIN_STATE_SUSPENDING)
2400 return;
2401
2402 if ((state != VDO_ADMIN_STATE_REBUILDING) &&
2403 (state != VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING))
2404 commit_tail(&slab->journal);
2405
2406 if ((state == VDO_ADMIN_STATE_RECOVERING) || (slab->counters == NULL))
2407 return;
2408
2409 save = false;
2410 load = slab->allocator->summary_entries[slab->slab_number].load_ref_counts;
2411 if (state == VDO_ADMIN_STATE_SCRUBBING) {
2412 if (load) {
2413 load_reference_blocks(slab);
2414 return;
2415 }
2416 } else if (state == VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING) {
2417 if (!load) {
2418 /* These reference counts were never written, so mark them all dirty. */
2419 dirty_all_reference_blocks(slab);
2420 }
2421 save = true;
2422 } else if (state == VDO_ADMIN_STATE_REBUILDING) {
2423 /*
2424 * Write out the counters if the slab has written them before, or it has any
2425 * non-zero reference counts, or there are any slab journal blocks.
2426 */
2427 block_count_t data_blocks = slab->allocator->depot->slab_config.data_blocks;
2428
2429 if (load || (slab->free_blocks != data_blocks) ||
2430 !is_slab_journal_blank(slab)) {
2431 dirty_all_reference_blocks(slab);
2432 save = true;
2433 }
2434 } else if (state == VDO_ADMIN_STATE_SAVING) {
2435 save = (slab->status == VDO_SLAB_REBUILT);
2436 } else {
2437 vdo_finish_draining_with_result(&slab->state, VDO_SUCCESS);
2438 return;
2439 }
2440
2441 if (save)
2442 save_dirty_reference_blocks(slab);
2443 }
2444
allocate_slab_counters(struct vdo_slab * slab)2445 static int allocate_slab_counters(struct vdo_slab *slab)
2446 {
2447 int result;
2448 size_t index, bytes;
2449
2450 result = VDO_ASSERT(slab->reference_blocks == NULL,
2451 "vdo_slab %u doesn't allocate refcounts twice",
2452 slab->slab_number);
2453 if (result != VDO_SUCCESS)
2454 return result;
2455
2456 result = vdo_allocate(slab->reference_block_count, __func__, &slab->reference_blocks);
2457 if (result != VDO_SUCCESS)
2458 return result;
2459
2460 /*
2461 * Allocate such that the runt slab has a full-length memory array, plus a little padding
2462 * so we can word-search even at the very end.
2463 */
2464 bytes = (slab->reference_block_count * COUNTS_PER_BLOCK) + (2 * BYTES_PER_WORD);
2465 result = vdo_allocate(bytes, "ref counts array", &slab->counters);
2466 if (result != VDO_SUCCESS) {
2467 vdo_free(vdo_forget(slab->reference_blocks));
2468 return result;
2469 }
2470
2471 slab->search_cursor.first_block = slab->reference_blocks;
2472 slab->search_cursor.last_block = &slab->reference_blocks[slab->reference_block_count - 1];
2473 reset_search_cursor(slab);
2474
2475 for (index = 0; index < slab->reference_block_count; index++) {
2476 slab->reference_blocks[index] = (struct reference_block) {
2477 .slab = slab,
2478 };
2479 }
2480
2481 return VDO_SUCCESS;
2482 }
2483
allocate_counters_if_clean(struct vdo_slab * slab)2484 static int allocate_counters_if_clean(struct vdo_slab *slab)
2485 {
2486 if (vdo_is_state_clean_load(&slab->state))
2487 return allocate_slab_counters(slab);
2488
2489 return VDO_SUCCESS;
2490 }
2491
finish_loading_journal(struct vdo_completion * completion)2492 static void finish_loading_journal(struct vdo_completion *completion)
2493 {
2494 struct vio *vio = as_vio(completion);
2495 struct slab_journal *journal = completion->parent;
2496 struct vdo_slab *slab = journal->slab;
2497 struct packed_slab_journal_block *block = (struct packed_slab_journal_block *) vio->data;
2498 struct slab_journal_block_header header;
2499
2500 vdo_unpack_slab_journal_block_header(&block->header, &header);
2501
2502 /* FIXME: should it be an error if the following conditional fails? */
2503 if ((header.metadata_type == VDO_METADATA_SLAB_JOURNAL) &&
2504 (header.nonce == slab->allocator->nonce)) {
2505 journal->tail = header.sequence_number + 1;
2506
2507 /*
2508 * If the slab is clean, this implies the slab journal is empty, so advance the
2509 * head appropriately.
2510 */
2511 journal->head = (slab->allocator->summary_entries[slab->slab_number].is_dirty ?
2512 header.head : journal->tail);
2513 journal->tail_header = header;
2514 initialize_journal_state(journal);
2515 }
2516
2517 return_vio_to_pool(vio_as_pooled_vio(vio));
2518 vdo_finish_loading_with_result(&slab->state, allocate_counters_if_clean(slab));
2519 }
2520
read_slab_journal_tail_endio(struct bio * bio)2521 static void read_slab_journal_tail_endio(struct bio *bio)
2522 {
2523 struct vio *vio = bio->bi_private;
2524 struct slab_journal *journal = vio->completion.parent;
2525
2526 continue_vio_after_io(vio, finish_loading_journal,
2527 journal->slab->allocator->thread_id);
2528 }
2529
handle_load_error(struct vdo_completion * completion)2530 static void handle_load_error(struct vdo_completion *completion)
2531 {
2532 int result = completion->result;
2533 struct slab_journal *journal = completion->parent;
2534 struct vio *vio = as_vio(completion);
2535
2536 vio_record_metadata_io_error(vio);
2537 return_vio_to_pool(vio_as_pooled_vio(vio));
2538 vdo_finish_loading_with_result(&journal->slab->state, result);
2539 }
2540
2541 /**
2542 * read_slab_journal_tail() - Read the slab journal tail block by using a vio acquired from the vio
2543 * pool.
2544 * @waiter: The vio pool waiter which has just been notified.
2545 * @context: The vio pool entry given to the waiter.
2546 *
2547 * This is the success callback from acquire_vio_from_pool() when loading a slab journal.
2548 */
read_slab_journal_tail(struct vdo_waiter * waiter,void * context)2549 static void read_slab_journal_tail(struct vdo_waiter *waiter, void *context)
2550 {
2551 struct slab_journal *journal =
2552 container_of(waiter, struct slab_journal, resource_waiter);
2553 struct vdo_slab *slab = journal->slab;
2554 struct pooled_vio *pooled = context;
2555 struct vio *vio = &pooled->vio;
2556 tail_block_offset_t last_commit_point =
2557 slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2558
2559 /*
2560 * Slab summary keeps the commit point offset, so the tail block is the block before that.
2561 * Calculation supports small journals in unit tests.
2562 */
2563 tail_block_offset_t tail_block = ((last_commit_point == 0) ?
2564 (tail_block_offset_t)(journal->size - 1) :
2565 (last_commit_point - 1));
2566
2567 vio->completion.parent = journal;
2568 vio->completion.callback_thread_id = slab->allocator->thread_id;
2569 vdo_submit_metadata_vio(vio, slab->journal_origin + tail_block,
2570 read_slab_journal_tail_endio, handle_load_error,
2571 REQ_OP_READ);
2572 }
2573
2574 /**
2575 * load_slab_journal() - Load a slab's journal by reading the journal's tail.
2576 * @slab: The slab.
2577 */
load_slab_journal(struct vdo_slab * slab)2578 static void load_slab_journal(struct vdo_slab *slab)
2579 {
2580 struct slab_journal *journal = &slab->journal;
2581 tail_block_offset_t last_commit_point;
2582
2583 last_commit_point = slab->allocator->summary_entries[slab->slab_number].tail_block_offset;
2584 if ((last_commit_point == 0) &&
2585 !slab->allocator->summary_entries[slab->slab_number].load_ref_counts) {
2586 /*
2587 * This slab claims that it has a tail block at (journal->size - 1), but a head of
2588 * 1. This is impossible, due to the scrubbing threshold, on a real system, so
2589 * don't bother reading the (bogus) data off disk.
2590 */
2591 VDO_ASSERT_LOG_ONLY(((journal->size < 16) ||
2592 (journal->scrubbing_threshold < (journal->size - 1))),
2593 "Scrubbing threshold protects against reads of unwritten slab journal blocks");
2594 vdo_finish_loading_with_result(&slab->state,
2595 allocate_counters_if_clean(slab));
2596 return;
2597 }
2598
2599 journal->resource_waiter.callback = read_slab_journal_tail;
2600 acquire_vio_from_pool(slab->allocator->vio_pool, &journal->resource_waiter);
2601 }
2602
register_slab_for_scrubbing(struct vdo_slab * slab,bool high_priority)2603 static void register_slab_for_scrubbing(struct vdo_slab *slab, bool high_priority)
2604 {
2605 struct slab_scrubber *scrubber = &slab->allocator->scrubber;
2606
2607 VDO_ASSERT_LOG_ONLY((slab->status != VDO_SLAB_REBUILT),
2608 "slab to be scrubbed is unrecovered");
2609
2610 if (slab->status != VDO_SLAB_REQUIRES_SCRUBBING)
2611 return;
2612
2613 list_del_init(&slab->allocq_entry);
2614 if (!slab->was_queued_for_scrubbing) {
2615 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count + 1);
2616 slab->was_queued_for_scrubbing = true;
2617 }
2618
2619 if (high_priority) {
2620 slab->status = VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING;
2621 list_add_tail(&slab->allocq_entry, &scrubber->high_priority_slabs);
2622 return;
2623 }
2624
2625 list_add_tail(&slab->allocq_entry, &scrubber->slabs);
2626 }
2627
2628 /* Queue a slab for allocation or scrubbing. */
queue_slab(struct vdo_slab * slab)2629 static void queue_slab(struct vdo_slab *slab)
2630 {
2631 struct block_allocator *allocator = slab->allocator;
2632 block_count_t free_blocks;
2633 int result;
2634
2635 VDO_ASSERT_LOG_ONLY(list_empty(&slab->allocq_entry),
2636 "a requeued slab must not already be on a list");
2637
2638 if (vdo_is_read_only(allocator->depot->vdo))
2639 return;
2640
2641 free_blocks = slab->free_blocks;
2642 result = VDO_ASSERT((free_blocks <= allocator->depot->slab_config.data_blocks),
2643 "rebuilt slab %u must have a valid free block count (has %llu, expected maximum %llu)",
2644 slab->slab_number, (unsigned long long) free_blocks,
2645 (unsigned long long) allocator->depot->slab_config.data_blocks);
2646 if (result != VDO_SUCCESS) {
2647 vdo_enter_read_only_mode(allocator->depot->vdo, result);
2648 return;
2649 }
2650
2651 if (slab->status != VDO_SLAB_REBUILT) {
2652 register_slab_for_scrubbing(slab, false);
2653 return;
2654 }
2655
2656 if (!vdo_is_state_resuming(&slab->state)) {
2657 /*
2658 * If the slab is resuming, we've already accounted for it here, so don't do it
2659 * again.
2660 * FIXME: under what situation would the slab be resuming here?
2661 */
2662 WRITE_ONCE(allocator->allocated_blocks,
2663 allocator->allocated_blocks - free_blocks);
2664 if (!is_slab_journal_blank(slab)) {
2665 WRITE_ONCE(allocator->statistics.slabs_opened,
2666 allocator->statistics.slabs_opened + 1);
2667 }
2668 }
2669
2670 if (allocator->depot->vdo->suspend_type == VDO_ADMIN_STATE_SAVING)
2671 reopen_slab_journal(slab);
2672
2673 prioritize_slab(slab);
2674 }
2675
2676 /** Implements vdo_admin_initiator_fn. */
initiate_slab_action(struct admin_state * state)2677 static void initiate_slab_action(struct admin_state *state)
2678 {
2679 struct vdo_slab *slab = container_of(state, struct vdo_slab, state);
2680
2681 if (vdo_is_state_draining(state)) {
2682 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
2683
2684 if (operation == VDO_ADMIN_STATE_SCRUBBING)
2685 slab->status = VDO_SLAB_REBUILDING;
2686
2687 drain_slab(slab);
2688 check_if_slab_drained(slab);
2689 return;
2690 }
2691
2692 if (vdo_is_state_loading(state)) {
2693 load_slab_journal(slab);
2694 return;
2695 }
2696
2697 if (vdo_is_state_resuming(state)) {
2698 queue_slab(slab);
2699 vdo_finish_resuming(state);
2700 return;
2701 }
2702
2703 vdo_finish_operation(state, VDO_INVALID_ADMIN_STATE);
2704 }
2705
2706 /**
2707 * get_next_slab() - Get the next slab to scrub.
2708 * @scrubber: The slab scrubber.
2709 *
2710 * Return: The next slab to scrub or NULL if there are none.
2711 */
get_next_slab(struct slab_scrubber * scrubber)2712 static struct vdo_slab *get_next_slab(struct slab_scrubber *scrubber)
2713 {
2714 struct vdo_slab *slab;
2715
2716 slab = list_first_entry_or_null(&scrubber->high_priority_slabs,
2717 struct vdo_slab, allocq_entry);
2718 if (slab != NULL)
2719 return slab;
2720
2721 return list_first_entry_or_null(&scrubber->slabs, struct vdo_slab,
2722 allocq_entry);
2723 }
2724
2725 /**
2726 * has_slabs_to_scrub() - Check whether a scrubber has slabs to scrub.
2727 * @scrubber: The scrubber to check.
2728 *
2729 * Return: True if the scrubber has slabs to scrub.
2730 */
has_slabs_to_scrub(struct slab_scrubber * scrubber)2731 static inline bool __must_check has_slabs_to_scrub(struct slab_scrubber *scrubber)
2732 {
2733 return (get_next_slab(scrubber) != NULL);
2734 }
2735
2736 /**
2737 * uninitialize_scrubber_vio() - Clean up the slab_scrubber's vio.
2738 * @scrubber: The scrubber.
2739 */
uninitialize_scrubber_vio(struct slab_scrubber * scrubber)2740 static void uninitialize_scrubber_vio(struct slab_scrubber *scrubber)
2741 {
2742 vdo_free(vdo_forget(scrubber->vio.data));
2743 free_vio_components(&scrubber->vio);
2744 }
2745
2746 /**
2747 * finish_scrubbing() - Stop scrubbing, either because there are no more slabs to scrub or because
2748 * there's been an error.
2749 * @scrubber: The scrubber.
2750 * @result: The result of the scrubbing operation.
2751 */
finish_scrubbing(struct slab_scrubber * scrubber,int result)2752 static void finish_scrubbing(struct slab_scrubber *scrubber, int result)
2753 {
2754 bool notify = vdo_waitq_has_waiters(&scrubber->waiters);
2755 bool done = !has_slabs_to_scrub(scrubber);
2756 struct block_allocator *allocator =
2757 container_of(scrubber, struct block_allocator, scrubber);
2758
2759 if (done)
2760 uninitialize_scrubber_vio(scrubber);
2761
2762 if (scrubber->high_priority_only) {
2763 scrubber->high_priority_only = false;
2764 vdo_fail_completion(vdo_forget(scrubber->vio.completion.parent), result);
2765 } else if (done && (atomic_add_return(-1, &allocator->depot->zones_to_scrub) == 0)) {
2766 /* All of our slabs were scrubbed, and we're the last allocator to finish. */
2767 enum vdo_state prior_state =
2768 atomic_cmpxchg(&allocator->depot->vdo->state, VDO_RECOVERING,
2769 VDO_DIRTY);
2770
2771 /*
2772 * To be safe, even if the CAS failed, ensure anything that follows is ordered with
2773 * respect to whatever state change did happen.
2774 */
2775 smp_mb__after_atomic();
2776
2777 /*
2778 * We must check the VDO state here and not the depot's read_only_notifier since
2779 * the compare-swap-above could have failed due to a read-only entry which our own
2780 * thread does not yet know about.
2781 */
2782 if (prior_state == VDO_DIRTY)
2783 vdo_log_info("VDO commencing normal operation");
2784 else if (prior_state == VDO_RECOVERING)
2785 vdo_log_info("Exiting recovery mode");
2786 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
2787 }
2788
2789 /*
2790 * Note that the scrubber has stopped, and inform anyone who might be waiting for that to
2791 * happen.
2792 */
2793 if (!vdo_finish_draining(&scrubber->admin_state))
2794 WRITE_ONCE(scrubber->admin_state.current_state,
2795 VDO_ADMIN_STATE_SUSPENDED);
2796
2797 /*
2798 * We can't notify waiters until after we've finished draining or they'll just requeue.
2799 * Fortunately if there were waiters, we can't have been freed yet.
2800 */
2801 if (notify)
2802 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
2803 }
2804
2805 static void scrub_next_slab(struct slab_scrubber *scrubber);
2806
2807 /**
2808 * slab_scrubbed() - Notify the scrubber that a slab has been scrubbed.
2809 * @completion: The slab rebuild completion.
2810 *
2811 * This callback is registered in apply_journal_entries().
2812 */
slab_scrubbed(struct vdo_completion * completion)2813 static void slab_scrubbed(struct vdo_completion *completion)
2814 {
2815 struct slab_scrubber *scrubber =
2816 container_of(as_vio(completion), struct slab_scrubber, vio);
2817 struct vdo_slab *slab = scrubber->slab;
2818
2819 slab->status = VDO_SLAB_REBUILT;
2820 queue_slab(slab);
2821 reopen_slab_journal(slab);
2822 WRITE_ONCE(scrubber->slab_count, scrubber->slab_count - 1);
2823 scrub_next_slab(scrubber);
2824 }
2825
2826 /**
2827 * abort_scrubbing() - Abort scrubbing due to an error.
2828 * @scrubber: The slab scrubber.
2829 * @result: The error.
2830 */
abort_scrubbing(struct slab_scrubber * scrubber,int result)2831 static void abort_scrubbing(struct slab_scrubber *scrubber, int result)
2832 {
2833 vdo_enter_read_only_mode(scrubber->vio.completion.vdo, result);
2834 finish_scrubbing(scrubber, result);
2835 }
2836
2837 /**
2838 * handle_scrubber_error() - Handle errors while rebuilding a slab.
2839 * @completion: The slab rebuild completion.
2840 */
handle_scrubber_error(struct vdo_completion * completion)2841 static void handle_scrubber_error(struct vdo_completion *completion)
2842 {
2843 struct vio *vio = as_vio(completion);
2844
2845 vio_record_metadata_io_error(vio);
2846 abort_scrubbing(container_of(vio, struct slab_scrubber, vio),
2847 completion->result);
2848 }
2849
2850 /**
2851 * apply_block_entries() - Apply all the entries in a block to the reference counts.
2852 * @block: A block with entries to apply.
2853 * @entry_count: The number of entries to apply.
2854 * @block_number: The sequence number of the block.
2855 * @slab: The slab to apply the entries to.
2856 *
2857 * Return: VDO_SUCCESS or an error code.
2858 */
apply_block_entries(struct packed_slab_journal_block * block,journal_entry_count_t entry_count,sequence_number_t block_number,struct vdo_slab * slab)2859 static int apply_block_entries(struct packed_slab_journal_block *block,
2860 journal_entry_count_t entry_count,
2861 sequence_number_t block_number, struct vdo_slab *slab)
2862 {
2863 struct journal_point entry_point = {
2864 .sequence_number = block_number,
2865 .entry_count = 0,
2866 };
2867 int result;
2868 slab_block_number max_sbn = slab->end - slab->start;
2869
2870 while (entry_point.entry_count < entry_count) {
2871 struct slab_journal_entry entry =
2872 vdo_decode_slab_journal_entry(block, entry_point.entry_count);
2873
2874 if (entry.sbn > max_sbn) {
2875 /* This entry is out of bounds. */
2876 return vdo_log_error_strerror(VDO_CORRUPT_JOURNAL,
2877 "vdo_slab journal entry (%llu, %u) had invalid offset %u in slab (size %u blocks)",
2878 (unsigned long long) block_number,
2879 entry_point.entry_count,
2880 entry.sbn, max_sbn);
2881 }
2882
2883 result = replay_reference_count_change(slab, &entry_point, entry);
2884 if (result != VDO_SUCCESS) {
2885 vdo_log_error_strerror(result,
2886 "vdo_slab journal entry (%llu, %u) (%s of offset %u) could not be applied in slab %u",
2887 (unsigned long long) block_number,
2888 entry_point.entry_count,
2889 vdo_get_journal_operation_name(entry.operation),
2890 entry.sbn, slab->slab_number);
2891 return result;
2892 }
2893 entry_point.entry_count++;
2894 }
2895
2896 return VDO_SUCCESS;
2897 }
2898
2899 /**
2900 * apply_journal_entries() - Find the relevant vio of the slab journal and apply all valid entries.
2901 * @completion: The metadata read vio completion.
2902 *
2903 * This is a callback registered in start_scrubbing().
2904 */
apply_journal_entries(struct vdo_completion * completion)2905 static void apply_journal_entries(struct vdo_completion *completion)
2906 {
2907 int result;
2908 struct slab_scrubber *scrubber =
2909 container_of(as_vio(completion), struct slab_scrubber, vio);
2910 struct vdo_slab *slab = scrubber->slab;
2911 struct slab_journal *journal = &slab->journal;
2912
2913 /* Find the boundaries of the useful part of the journal. */
2914 sequence_number_t tail = journal->tail;
2915 tail_block_offset_t end_index = (tail - 1) % journal->size;
2916 char *end_data = scrubber->vio.data + (end_index * VDO_BLOCK_SIZE);
2917 struct packed_slab_journal_block *end_block =
2918 (struct packed_slab_journal_block *) end_data;
2919
2920 sequence_number_t head = __le64_to_cpu(end_block->header.head);
2921 tail_block_offset_t head_index = head % journal->size;
2922 block_count_t index = head_index;
2923
2924 struct journal_point ref_counts_point = slab->slab_journal_point;
2925 struct journal_point last_entry_applied = ref_counts_point;
2926 sequence_number_t sequence;
2927
2928 for (sequence = head; sequence < tail; sequence++) {
2929 char *block_data = scrubber->vio.data + (index * VDO_BLOCK_SIZE);
2930 struct packed_slab_journal_block *block =
2931 (struct packed_slab_journal_block *) block_data;
2932 struct slab_journal_block_header header;
2933
2934 vdo_unpack_slab_journal_block_header(&block->header, &header);
2935
2936 if ((header.nonce != slab->allocator->nonce) ||
2937 (header.metadata_type != VDO_METADATA_SLAB_JOURNAL) ||
2938 (header.sequence_number != sequence) ||
2939 (header.entry_count > journal->entries_per_block) ||
2940 (header.has_block_map_increments &&
2941 (header.entry_count > journal->full_entries_per_block))) {
2942 /* The block is not what we expect it to be. */
2943 vdo_log_error("vdo_slab journal block for slab %u was invalid",
2944 slab->slab_number);
2945 abort_scrubbing(scrubber, VDO_CORRUPT_JOURNAL);
2946 return;
2947 }
2948
2949 result = apply_block_entries(block, header.entry_count, sequence, slab);
2950 if (result != VDO_SUCCESS) {
2951 abort_scrubbing(scrubber, result);
2952 return;
2953 }
2954
2955 last_entry_applied.sequence_number = sequence;
2956 last_entry_applied.entry_count = header.entry_count - 1;
2957 index++;
2958 if (index == journal->size)
2959 index = 0;
2960 }
2961
2962 /*
2963 * At the end of rebuild, the reference counters should be accurate to the end of the
2964 * journal we just applied.
2965 */
2966 result = VDO_ASSERT(!vdo_before_journal_point(&last_entry_applied,
2967 &ref_counts_point),
2968 "Refcounts are not more accurate than the slab journal");
2969 if (result != VDO_SUCCESS) {
2970 abort_scrubbing(scrubber, result);
2971 return;
2972 }
2973
2974 /* Save out the rebuilt reference blocks. */
2975 vdo_prepare_completion(completion, slab_scrubbed, handle_scrubber_error,
2976 slab->allocator->thread_id, completion->parent);
2977 vdo_start_operation_with_waiter(&slab->state,
2978 VDO_ADMIN_STATE_SAVE_FOR_SCRUBBING,
2979 completion, initiate_slab_action);
2980 }
2981
read_slab_journal_endio(struct bio * bio)2982 static void read_slab_journal_endio(struct bio *bio)
2983 {
2984 struct vio *vio = bio->bi_private;
2985 struct slab_scrubber *scrubber = container_of(vio, struct slab_scrubber, vio);
2986
2987 continue_vio_after_io(bio->bi_private, apply_journal_entries,
2988 scrubber->slab->allocator->thread_id);
2989 }
2990
2991 /**
2992 * start_scrubbing() - Read the current slab's journal from disk now that it has been flushed.
2993 * @completion: The scrubber's vio completion.
2994 *
2995 * This callback is registered in scrub_next_slab().
2996 */
start_scrubbing(struct vdo_completion * completion)2997 static void start_scrubbing(struct vdo_completion *completion)
2998 {
2999 struct slab_scrubber *scrubber =
3000 container_of(as_vio(completion), struct slab_scrubber, vio);
3001 struct vdo_slab *slab = scrubber->slab;
3002
3003 if (!slab->allocator->summary_entries[slab->slab_number].is_dirty) {
3004 slab_scrubbed(completion);
3005 return;
3006 }
3007
3008 vdo_submit_metadata_vio(&scrubber->vio, slab->journal_origin,
3009 read_slab_journal_endio, handle_scrubber_error,
3010 REQ_OP_READ);
3011 }
3012
3013 /**
3014 * scrub_next_slab() - Scrub the next slab if there is one.
3015 * @scrubber: The scrubber.
3016 */
scrub_next_slab(struct slab_scrubber * scrubber)3017 static void scrub_next_slab(struct slab_scrubber *scrubber)
3018 {
3019 struct vdo_completion *completion = &scrubber->vio.completion;
3020 struct vdo_slab *slab;
3021
3022 /*
3023 * Note: this notify call is always safe only because scrubbing can only be started when
3024 * the VDO is quiescent.
3025 */
3026 vdo_waitq_notify_all_waiters(&scrubber->waiters, NULL, NULL);
3027
3028 if (vdo_is_read_only(completion->vdo)) {
3029 finish_scrubbing(scrubber, VDO_READ_ONLY);
3030 return;
3031 }
3032
3033 slab = get_next_slab(scrubber);
3034 if ((slab == NULL) ||
3035 (scrubber->high_priority_only && list_empty(&scrubber->high_priority_slabs))) {
3036 finish_scrubbing(scrubber, VDO_SUCCESS);
3037 return;
3038 }
3039
3040 if (vdo_finish_draining(&scrubber->admin_state))
3041 return;
3042
3043 list_del_init(&slab->allocq_entry);
3044 scrubber->slab = slab;
3045 vdo_prepare_completion(completion, start_scrubbing, handle_scrubber_error,
3046 slab->allocator->thread_id, completion->parent);
3047 vdo_start_operation_with_waiter(&slab->state, VDO_ADMIN_STATE_SCRUBBING,
3048 completion, initiate_slab_action);
3049 }
3050
3051 /**
3052 * scrub_slabs() - Scrub all of an allocator's slabs that are eligible for scrubbing.
3053 * @allocator: The block_allocator to scrub.
3054 * @parent: The completion to notify when scrubbing is done, implies high_priority, may be NULL.
3055 */
scrub_slabs(struct block_allocator * allocator,struct vdo_completion * parent)3056 static void scrub_slabs(struct block_allocator *allocator, struct vdo_completion *parent)
3057 {
3058 struct slab_scrubber *scrubber = &allocator->scrubber;
3059
3060 scrubber->vio.completion.parent = parent;
3061 scrubber->high_priority_only = (parent != NULL);
3062 if (!has_slabs_to_scrub(scrubber)) {
3063 finish_scrubbing(scrubber, VDO_SUCCESS);
3064 return;
3065 }
3066
3067 if (scrubber->high_priority_only &&
3068 vdo_is_priority_table_empty(allocator->prioritized_slabs) &&
3069 list_empty(&scrubber->high_priority_slabs))
3070 register_slab_for_scrubbing(get_next_slab(scrubber), true);
3071
3072 vdo_resume_if_quiescent(&scrubber->admin_state);
3073 scrub_next_slab(scrubber);
3074 }
3075
assert_on_allocator_thread(thread_id_t thread_id,const char * function_name)3076 static inline void assert_on_allocator_thread(thread_id_t thread_id,
3077 const char *function_name)
3078 {
3079 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == thread_id),
3080 "%s called on correct thread", function_name);
3081 }
3082
register_slab_with_allocator(struct block_allocator * allocator,struct vdo_slab * slab)3083 static void register_slab_with_allocator(struct block_allocator *allocator,
3084 struct vdo_slab *slab)
3085 {
3086 allocator->slab_count++;
3087 allocator->last_slab = slab->slab_number;
3088 }
3089
3090 /**
3091 * get_depot_slab_iterator() - Return a slab_iterator over the slabs in a slab_depot.
3092 * @depot: The depot over which to iterate.
3093 * @start: The number of the slab to start iterating from.
3094 * @end: The number of the last slab which may be returned.
3095 * @stride: The difference in slab number between successive slabs.
3096 *
3097 * Iteration always occurs from higher to lower numbered slabs.
3098 *
3099 * Return: An initialized iterator structure.
3100 */
get_depot_slab_iterator(struct slab_depot * depot,slab_count_t start,slab_count_t end,slab_count_t stride)3101 static struct slab_iterator get_depot_slab_iterator(struct slab_depot *depot,
3102 slab_count_t start, slab_count_t end,
3103 slab_count_t stride)
3104 {
3105 struct vdo_slab **slabs = depot->slabs;
3106
3107 return (struct slab_iterator) {
3108 .slabs = slabs,
3109 .next = (((slabs == NULL) || (start < end)) ? NULL : slabs[start]),
3110 .end = end,
3111 .stride = stride,
3112 };
3113 }
3114
get_slab_iterator(const struct block_allocator * allocator)3115 static struct slab_iterator get_slab_iterator(const struct block_allocator *allocator)
3116 {
3117 return get_depot_slab_iterator(allocator->depot, allocator->last_slab,
3118 allocator->zone_number,
3119 allocator->depot->zone_count);
3120 }
3121
3122 /**
3123 * next_slab() - Get the next slab from a slab_iterator and advance the iterator
3124 * @iterator: The slab_iterator.
3125 *
3126 * Return: The next slab or NULL if the iterator is exhausted.
3127 */
next_slab(struct slab_iterator * iterator)3128 static struct vdo_slab *next_slab(struct slab_iterator *iterator)
3129 {
3130 struct vdo_slab *slab = iterator->next;
3131
3132 if ((slab == NULL) || (slab->slab_number < iterator->end + iterator->stride))
3133 iterator->next = NULL;
3134 else
3135 iterator->next = iterator->slabs[slab->slab_number - iterator->stride];
3136
3137 return slab;
3138 }
3139
3140 /**
3141 * abort_waiter() - Abort vios waiting to make journal entries when read-only.
3142 * @waiter: A waiting data_vio.
3143 * @context: Not used.
3144 *
3145 * This callback is invoked on all vios waiting to make slab journal entries after the VDO has gone
3146 * into read-only mode. Implements waiter_callback_fn.
3147 */
abort_waiter(struct vdo_waiter * waiter,void __always_unused * context)3148 static void abort_waiter(struct vdo_waiter *waiter, void __always_unused *context)
3149 {
3150 struct reference_updater *updater =
3151 container_of(waiter, struct reference_updater, waiter);
3152 struct data_vio *data_vio = data_vio_from_reference_updater(updater);
3153
3154 if (updater->increment) {
3155 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
3156 return;
3157 }
3158
3159 vdo_continue_completion(&data_vio->decrement_completion, VDO_READ_ONLY);
3160 }
3161
3162 /* Implements vdo_read_only_notification_fn. */
notify_block_allocator_of_read_only_mode(void * listener,struct vdo_completion * parent)3163 static void notify_block_allocator_of_read_only_mode(void *listener,
3164 struct vdo_completion *parent)
3165 {
3166 struct block_allocator *allocator = listener;
3167 struct slab_iterator iterator;
3168
3169 assert_on_allocator_thread(allocator->thread_id, __func__);
3170 iterator = get_slab_iterator(allocator);
3171 while (iterator.next != NULL) {
3172 struct vdo_slab *slab = next_slab(&iterator);
3173
3174 vdo_waitq_notify_all_waiters(&slab->journal.entry_waiters,
3175 abort_waiter, &slab->journal);
3176 check_if_slab_drained(slab);
3177 }
3178
3179 vdo_finish_completion(parent);
3180 }
3181
3182 /**
3183 * vdo_acquire_provisional_reference() - Acquire a provisional reference on behalf of a PBN lock if
3184 * the block it locks is unreferenced.
3185 * @slab: The slab which contains the block.
3186 * @pbn: The physical block to reference.
3187 * @lock: The lock.
3188 *
3189 * Return: VDO_SUCCESS or an error.
3190 */
vdo_acquire_provisional_reference(struct vdo_slab * slab,physical_block_number_t pbn,struct pbn_lock * lock)3191 int vdo_acquire_provisional_reference(struct vdo_slab *slab, physical_block_number_t pbn,
3192 struct pbn_lock *lock)
3193 {
3194 slab_block_number block_number;
3195 int result;
3196
3197 if (vdo_pbn_lock_has_provisional_reference(lock))
3198 return VDO_SUCCESS;
3199
3200 if (!is_slab_open(slab))
3201 return VDO_INVALID_ADMIN_STATE;
3202
3203 result = slab_block_number_from_pbn(slab, pbn, &block_number);
3204 if (result != VDO_SUCCESS)
3205 return result;
3206
3207 if (slab->counters[block_number] == EMPTY_REFERENCE_COUNT) {
3208 make_provisional_reference(slab, block_number);
3209 if (lock != NULL)
3210 vdo_assign_pbn_lock_provisional_reference(lock);
3211 }
3212
3213 if (vdo_pbn_lock_has_provisional_reference(lock))
3214 adjust_free_block_count(slab, false);
3215
3216 return VDO_SUCCESS;
3217 }
3218
allocate_slab_block(struct vdo_slab * slab,physical_block_number_t * block_number_ptr)3219 static int __must_check allocate_slab_block(struct vdo_slab *slab,
3220 physical_block_number_t *block_number_ptr)
3221 {
3222 slab_block_number free_index;
3223
3224 if (!is_slab_open(slab))
3225 return VDO_INVALID_ADMIN_STATE;
3226
3227 if (!search_reference_blocks(slab, &free_index))
3228 return VDO_NO_SPACE;
3229
3230 VDO_ASSERT_LOG_ONLY((slab->counters[free_index] == EMPTY_REFERENCE_COUNT),
3231 "free block must have ref count of zero");
3232 make_provisional_reference(slab, free_index);
3233 adjust_free_block_count(slab, false);
3234
3235 /*
3236 * Update the search hint so the next search will start at the array index just past the
3237 * free block we just found.
3238 */
3239 slab->search_cursor.index = (free_index + 1);
3240
3241 *block_number_ptr = slab->start + free_index;
3242 return VDO_SUCCESS;
3243 }
3244
3245 /**
3246 * open_slab() - Prepare a slab to be allocated from.
3247 * @slab: The slab.
3248 */
open_slab(struct vdo_slab * slab)3249 static void open_slab(struct vdo_slab *slab)
3250 {
3251 reset_search_cursor(slab);
3252 if (is_slab_journal_blank(slab)) {
3253 WRITE_ONCE(slab->allocator->statistics.slabs_opened,
3254 slab->allocator->statistics.slabs_opened + 1);
3255 dirty_all_reference_blocks(slab);
3256 } else {
3257 WRITE_ONCE(slab->allocator->statistics.slabs_reopened,
3258 slab->allocator->statistics.slabs_reopened + 1);
3259 }
3260
3261 slab->allocator->open_slab = slab;
3262 }
3263
3264
3265 /*
3266 * The block allocated will have a provisional reference and the reference must be either confirmed
3267 * with a subsequent increment or vacated with a subsequent decrement via
3268 * vdo_release_block_reference().
3269 */
vdo_allocate_block(struct block_allocator * allocator,physical_block_number_t * block_number_ptr)3270 int vdo_allocate_block(struct block_allocator *allocator,
3271 physical_block_number_t *block_number_ptr)
3272 {
3273 int result;
3274
3275 if (allocator->open_slab != NULL) {
3276 /* Try to allocate the next block in the currently open slab. */
3277 result = allocate_slab_block(allocator->open_slab, block_number_ptr);
3278 if ((result == VDO_SUCCESS) || (result != VDO_NO_SPACE))
3279 return result;
3280
3281 /* Put the exhausted open slab back into the priority table. */
3282 prioritize_slab(allocator->open_slab);
3283 }
3284
3285 /* Remove the highest priority slab from the priority table and make it the open slab. */
3286 open_slab(list_entry(vdo_priority_table_dequeue(allocator->prioritized_slabs),
3287 struct vdo_slab, allocq_entry));
3288
3289 /*
3290 * Try allocating again. If we're out of space immediately after opening a slab, then every
3291 * slab must be fully allocated.
3292 */
3293 return allocate_slab_block(allocator->open_slab, block_number_ptr);
3294 }
3295
3296 /**
3297 * vdo_enqueue_clean_slab_waiter() - Wait for a clean slab.
3298 * @allocator: The block_allocator on which to wait.
3299 * @waiter: The waiter.
3300 *
3301 * Return: VDO_SUCCESS if the waiter was queued, VDO_NO_SPACE if there are no slabs to scrub, and
3302 * some other error otherwise.
3303 */
vdo_enqueue_clean_slab_waiter(struct block_allocator * allocator,struct vdo_waiter * waiter)3304 int vdo_enqueue_clean_slab_waiter(struct block_allocator *allocator,
3305 struct vdo_waiter *waiter)
3306 {
3307 if (vdo_is_read_only(allocator->depot->vdo))
3308 return VDO_READ_ONLY;
3309
3310 if (vdo_is_state_quiescent(&allocator->scrubber.admin_state))
3311 return VDO_NO_SPACE;
3312
3313 vdo_waitq_enqueue_waiter(&allocator->scrubber.waiters, waiter);
3314 return VDO_SUCCESS;
3315 }
3316
3317 /**
3318 * vdo_modify_reference_count() - Modify the reference count of a block by first making a slab
3319 * journal entry and then updating the reference counter.
3320 * @completion: The data_vio completion for which to add the entry.
3321 * @updater: Which of the data_vio's reference updaters is being submitted.
3322 */
vdo_modify_reference_count(struct vdo_completion * completion,struct reference_updater * updater)3323 void vdo_modify_reference_count(struct vdo_completion *completion,
3324 struct reference_updater *updater)
3325 {
3326 struct vdo_slab *slab = vdo_get_slab(completion->vdo->depot, updater->zpbn.pbn);
3327
3328 if (!is_slab_open(slab)) {
3329 vdo_continue_completion(completion, VDO_INVALID_ADMIN_STATE);
3330 return;
3331 }
3332
3333 if (vdo_is_read_only(completion->vdo)) {
3334 vdo_continue_completion(completion, VDO_READ_ONLY);
3335 return;
3336 }
3337
3338 vdo_waitq_enqueue_waiter(&slab->journal.entry_waiters, &updater->waiter);
3339 if ((slab->status != VDO_SLAB_REBUILT) && requires_reaping(&slab->journal))
3340 register_slab_for_scrubbing(slab, true);
3341
3342 add_entries(&slab->journal);
3343 }
3344
3345 /* Release an unused provisional reference. */
vdo_release_block_reference(struct block_allocator * allocator,physical_block_number_t pbn)3346 int vdo_release_block_reference(struct block_allocator *allocator,
3347 physical_block_number_t pbn)
3348 {
3349 struct reference_updater updater;
3350
3351 if (pbn == VDO_ZERO_BLOCK)
3352 return VDO_SUCCESS;
3353
3354 updater = (struct reference_updater) {
3355 .operation = VDO_JOURNAL_DATA_REMAPPING,
3356 .increment = false,
3357 .zpbn = {
3358 .pbn = pbn,
3359 },
3360 };
3361
3362 return adjust_reference_count(vdo_get_slab(allocator->depot, pbn),
3363 &updater, NULL);
3364 }
3365
3366 /*
3367 * This is a min_heap callback function orders slab_status structures using the 'is_clean' field as
3368 * the primary key and the 'emptiness' field as the secondary key.
3369 *
3370 * Slabs need to be pushed onto the lists in the same order they are to be popped off. Popping
3371 * should always get the most empty first, so pushing should be from most empty to least empty.
3372 * Thus, the ordering is reversed from the usual sense since min_heap returns smaller elements
3373 * before larger ones.
3374 */
slab_status_is_less_than(const void * item1,const void * item2,void __always_unused * args)3375 static bool slab_status_is_less_than(const void *item1, const void *item2,
3376 void __always_unused *args)
3377 {
3378 const struct slab_status *info1 = item1;
3379 const struct slab_status *info2 = item2;
3380
3381 if (info1->is_clean != info2->is_clean)
3382 return info1->is_clean;
3383 if (info1->emptiness != info2->emptiness)
3384 return info1->emptiness > info2->emptiness;
3385 return info1->slab_number < info2->slab_number;
3386 }
3387
3388 static const struct min_heap_callbacks slab_status_min_heap = {
3389 .less = slab_status_is_less_than,
3390 .swp = NULL,
3391 };
3392
3393 /* Inform the slab actor that a action has finished on some slab; used by apply_to_slabs(). */
slab_action_callback(struct vdo_completion * completion)3394 static void slab_action_callback(struct vdo_completion *completion)
3395 {
3396 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3397 struct slab_actor *actor = &allocator->slab_actor;
3398
3399 if (--actor->slab_action_count == 0) {
3400 actor->callback(completion);
3401 return;
3402 }
3403
3404 vdo_reset_completion(completion);
3405 }
3406
3407 /* Preserve the error from part of an action and continue. */
handle_operation_error(struct vdo_completion * completion)3408 static void handle_operation_error(struct vdo_completion *completion)
3409 {
3410 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3411
3412 if (allocator->state.waiter != NULL)
3413 vdo_set_completion_result(allocator->state.waiter, completion->result);
3414 completion->callback(completion);
3415 }
3416
3417 /* Perform an action on each of an allocator's slabs in parallel. */
apply_to_slabs(struct block_allocator * allocator,vdo_action_fn callback)3418 static void apply_to_slabs(struct block_allocator *allocator, vdo_action_fn callback)
3419 {
3420 struct slab_iterator iterator;
3421
3422 vdo_prepare_completion(&allocator->completion, slab_action_callback,
3423 handle_operation_error, allocator->thread_id, NULL);
3424 allocator->completion.requeue = false;
3425
3426 /*
3427 * Since we are going to dequeue all of the slabs, the open slab will become invalid, so
3428 * clear it.
3429 */
3430 allocator->open_slab = NULL;
3431
3432 /* Ensure that we don't finish before we're done starting. */
3433 allocator->slab_actor = (struct slab_actor) {
3434 .slab_action_count = 1,
3435 .callback = callback,
3436 };
3437
3438 iterator = get_slab_iterator(allocator);
3439 while (iterator.next != NULL) {
3440 const struct admin_state_code *operation =
3441 vdo_get_admin_state_code(&allocator->state);
3442 struct vdo_slab *slab = next_slab(&iterator);
3443
3444 list_del_init(&slab->allocq_entry);
3445 allocator->slab_actor.slab_action_count++;
3446 vdo_start_operation_with_waiter(&slab->state, operation,
3447 &allocator->completion,
3448 initiate_slab_action);
3449 }
3450
3451 slab_action_callback(&allocator->completion);
3452 }
3453
finish_loading_allocator(struct vdo_completion * completion)3454 static void finish_loading_allocator(struct vdo_completion *completion)
3455 {
3456 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3457 const struct admin_state_code *operation =
3458 vdo_get_admin_state_code(&allocator->state);
3459
3460 if (allocator->eraser != NULL)
3461 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
3462
3463 if (operation == VDO_ADMIN_STATE_LOADING_FOR_RECOVERY) {
3464 void *context =
3465 vdo_get_current_action_context(allocator->depot->action_manager);
3466
3467 vdo_replay_into_slab_journals(allocator, context);
3468 return;
3469 }
3470
3471 vdo_finish_loading(&allocator->state);
3472 }
3473
3474 static void erase_next_slab_journal(struct block_allocator *allocator);
3475
copy_callback(int read_err,unsigned long write_err,void * context)3476 static void copy_callback(int read_err, unsigned long write_err, void *context)
3477 {
3478 struct block_allocator *allocator = context;
3479 int result = (((read_err == 0) && (write_err == 0)) ? VDO_SUCCESS : -EIO);
3480
3481 if (result != VDO_SUCCESS) {
3482 vdo_fail_completion(&allocator->completion, result);
3483 return;
3484 }
3485
3486 erase_next_slab_journal(allocator);
3487 }
3488
3489 /* erase_next_slab_journal() - Erase the next slab journal. */
erase_next_slab_journal(struct block_allocator * allocator)3490 static void erase_next_slab_journal(struct block_allocator *allocator)
3491 {
3492 struct vdo_slab *slab;
3493 physical_block_number_t pbn;
3494 struct dm_io_region regions[1];
3495 struct slab_depot *depot = allocator->depot;
3496 block_count_t blocks = depot->slab_config.slab_journal_blocks;
3497
3498 if (allocator->slabs_to_erase.next == NULL) {
3499 vdo_finish_completion(&allocator->completion);
3500 return;
3501 }
3502
3503 slab = next_slab(&allocator->slabs_to_erase);
3504 pbn = slab->journal_origin - depot->vdo->geometry.bio_offset;
3505 regions[0] = (struct dm_io_region) {
3506 .bdev = vdo_get_backing_device(depot->vdo),
3507 .sector = pbn * VDO_SECTORS_PER_BLOCK,
3508 .count = blocks * VDO_SECTORS_PER_BLOCK,
3509 };
3510 dm_kcopyd_zero(allocator->eraser, 1, regions, 0, copy_callback, allocator);
3511 }
3512
3513 /* Implements vdo_admin_initiator_fn. */
initiate_load(struct admin_state * state)3514 static void initiate_load(struct admin_state *state)
3515 {
3516 struct block_allocator *allocator =
3517 container_of(state, struct block_allocator, state);
3518 const struct admin_state_code *operation = vdo_get_admin_state_code(state);
3519
3520 if (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD) {
3521 /*
3522 * Must requeue because the kcopyd client cannot be freed in the same stack frame
3523 * as the kcopyd callback, lest it deadlock.
3524 */
3525 vdo_prepare_completion_for_requeue(&allocator->completion,
3526 finish_loading_allocator,
3527 handle_operation_error,
3528 allocator->thread_id, NULL);
3529 allocator->eraser = dm_kcopyd_client_create(NULL);
3530 if (IS_ERR(allocator->eraser)) {
3531 vdo_fail_completion(&allocator->completion,
3532 PTR_ERR(allocator->eraser));
3533 allocator->eraser = NULL;
3534 return;
3535 }
3536 allocator->slabs_to_erase = get_slab_iterator(allocator);
3537
3538 erase_next_slab_journal(allocator);
3539 return;
3540 }
3541
3542 apply_to_slabs(allocator, finish_loading_allocator);
3543 }
3544
3545 /**
3546 * vdo_notify_slab_journals_are_recovered() - Inform a block allocator that its slab journals have
3547 * been recovered from the recovery journal.
3548 * @completion: The allocator completion.
3549 */
vdo_notify_slab_journals_are_recovered(struct vdo_completion * completion)3550 void vdo_notify_slab_journals_are_recovered(struct vdo_completion *completion)
3551 {
3552 struct block_allocator *allocator = vdo_as_block_allocator(completion);
3553
3554 vdo_finish_loading_with_result(&allocator->state, completion->result);
3555 }
3556
get_slab_statuses(struct block_allocator * allocator,struct slab_status ** statuses_ptr)3557 static int get_slab_statuses(struct block_allocator *allocator,
3558 struct slab_status **statuses_ptr)
3559 {
3560 int result;
3561 struct slab_status *statuses;
3562 struct slab_iterator iterator = get_slab_iterator(allocator);
3563
3564 result = vdo_allocate(allocator->slab_count, __func__, &statuses);
3565 if (result != VDO_SUCCESS)
3566 return result;
3567
3568 *statuses_ptr = statuses;
3569
3570 while (iterator.next != NULL) {
3571 slab_count_t slab_number = next_slab(&iterator)->slab_number;
3572
3573 *statuses++ = (struct slab_status) {
3574 .slab_number = slab_number,
3575 .is_clean = !allocator->summary_entries[slab_number].is_dirty,
3576 .emptiness = allocator->summary_entries[slab_number].fullness_hint,
3577 };
3578 }
3579
3580 return VDO_SUCCESS;
3581 }
3582
3583 /* Prepare slabs for allocation or scrubbing. */
vdo_prepare_slabs_for_allocation(struct block_allocator * allocator)3584 static int __must_check vdo_prepare_slabs_for_allocation(struct block_allocator *allocator)
3585 {
3586 struct slab_status current_slab_status;
3587 DEFINE_MIN_HEAP(struct slab_status, heap) heap;
3588 int result;
3589 struct slab_status *slab_statuses;
3590 struct slab_depot *depot = allocator->depot;
3591
3592 WRITE_ONCE(allocator->allocated_blocks,
3593 allocator->slab_count * depot->slab_config.data_blocks);
3594 result = get_slab_statuses(allocator, &slab_statuses);
3595 if (result != VDO_SUCCESS)
3596 return result;
3597
3598 /* Sort the slabs by cleanliness, then by emptiness hint. */
3599 heap = (struct heap) {
3600 .data = slab_statuses,
3601 .nr = allocator->slab_count,
3602 .size = allocator->slab_count,
3603 };
3604 min_heapify_all(&heap, &slab_status_min_heap, NULL);
3605
3606 while (heap.nr > 0) {
3607 bool high_priority;
3608 struct vdo_slab *slab;
3609 struct slab_journal *journal;
3610
3611 current_slab_status = slab_statuses[0];
3612 min_heap_pop(&heap, &slab_status_min_heap, NULL);
3613 slab = depot->slabs[current_slab_status.slab_number];
3614
3615 if ((depot->load_type == VDO_SLAB_DEPOT_REBUILD_LOAD) ||
3616 (!allocator->summary_entries[slab->slab_number].load_ref_counts &&
3617 current_slab_status.is_clean)) {
3618 queue_slab(slab);
3619 continue;
3620 }
3621
3622 slab->status = VDO_SLAB_REQUIRES_SCRUBBING;
3623 journal = &slab->journal;
3624 high_priority = ((current_slab_status.is_clean &&
3625 (depot->load_type == VDO_SLAB_DEPOT_NORMAL_LOAD)) ||
3626 (journal_length(journal) >= journal->scrubbing_threshold));
3627 register_slab_for_scrubbing(slab, high_priority);
3628 }
3629
3630 vdo_free(slab_statuses);
3631 return VDO_SUCCESS;
3632 }
3633
status_to_string(enum slab_rebuild_status status)3634 static const char *status_to_string(enum slab_rebuild_status status)
3635 {
3636 switch (status) {
3637 case VDO_SLAB_REBUILT:
3638 return "REBUILT";
3639 case VDO_SLAB_REQUIRES_SCRUBBING:
3640 return "SCRUBBING";
3641 case VDO_SLAB_REQUIRES_HIGH_PRIORITY_SCRUBBING:
3642 return "PRIORITY_SCRUBBING";
3643 case VDO_SLAB_REBUILDING:
3644 return "REBUILDING";
3645 case VDO_SLAB_REPLAYING:
3646 return "REPLAYING";
3647 default:
3648 return "UNKNOWN";
3649 }
3650 }
3651
vdo_dump_block_allocator(const struct block_allocator * allocator)3652 void vdo_dump_block_allocator(const struct block_allocator *allocator)
3653 {
3654 unsigned int pause_counter = 0;
3655 struct slab_iterator iterator = get_slab_iterator(allocator);
3656 const struct slab_scrubber *scrubber = &allocator->scrubber;
3657
3658 vdo_log_info("block_allocator zone %u", allocator->zone_number);
3659 while (iterator.next != NULL) {
3660 struct vdo_slab *slab = next_slab(&iterator);
3661 struct slab_journal *journal = &slab->journal;
3662
3663 if (slab->reference_blocks != NULL) {
3664 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3665 vdo_log_info("slab %u: P%u, %llu free", slab->slab_number,
3666 slab->priority,
3667 (unsigned long long) slab->free_blocks);
3668 } else {
3669 vdo_log_info("slab %u: status %s", slab->slab_number,
3670 status_to_string(slab->status));
3671 }
3672
3673 vdo_log_info(" slab journal: entry_waiters=%zu waiting_to_commit=%s updating_slab_summary=%s head=%llu unreapable=%llu tail=%llu next_commit=%llu summarized=%llu last_summarized=%llu recovery_lock=%llu dirty=%s",
3674 vdo_waitq_num_waiters(&journal->entry_waiters),
3675 vdo_bool_to_string(journal->waiting_to_commit),
3676 vdo_bool_to_string(journal->updating_slab_summary),
3677 (unsigned long long) journal->head,
3678 (unsigned long long) journal->unreapable,
3679 (unsigned long long) journal->tail,
3680 (unsigned long long) journal->next_commit,
3681 (unsigned long long) journal->summarized,
3682 (unsigned long long) journal->last_summarized,
3683 (unsigned long long) journal->recovery_lock,
3684 vdo_bool_to_string(journal->recovery_lock != 0));
3685 /*
3686 * Given the frequency with which the locks are just a tiny bit off, it might be
3687 * worth dumping all the locks, but that might be too much logging.
3688 */
3689
3690 if (slab->counters != NULL) {
3691 /* Terse because there are a lot of slabs to dump and syslog is lossy. */
3692 vdo_log_info(" slab: free=%u/%u blocks=%u dirty=%zu active=%zu journal@(%llu,%u)",
3693 slab->free_blocks, slab->block_count,
3694 slab->reference_block_count,
3695 vdo_waitq_num_waiters(&slab->dirty_blocks),
3696 slab->active_count,
3697 (unsigned long long) slab->slab_journal_point.sequence_number,
3698 slab->slab_journal_point.entry_count);
3699 } else {
3700 vdo_log_info(" no counters");
3701 }
3702
3703 /*
3704 * Wait for a while after each batch of 32 slabs dumped, an arbitrary number,
3705 * allowing the kernel log a chance to be flushed instead of being overrun.
3706 */
3707 if (pause_counter++ == 31) {
3708 pause_counter = 0;
3709 vdo_pause_for_logger();
3710 }
3711 }
3712
3713 vdo_log_info("slab_scrubber slab_count %u waiters %zu %s%s",
3714 READ_ONCE(scrubber->slab_count),
3715 vdo_waitq_num_waiters(&scrubber->waiters),
3716 vdo_get_admin_state_code(&scrubber->admin_state)->name,
3717 scrubber->high_priority_only ? ", high_priority_only " : "");
3718 }
3719
free_slab(struct vdo_slab * slab)3720 static void free_slab(struct vdo_slab *slab)
3721 {
3722 if (slab == NULL)
3723 return;
3724
3725 list_del(&slab->allocq_entry);
3726 vdo_free(vdo_forget(slab->journal.block));
3727 vdo_free(vdo_forget(slab->journal.locks));
3728 vdo_free(vdo_forget(slab->counters));
3729 vdo_free(vdo_forget(slab->reference_blocks));
3730 vdo_free(slab);
3731 }
3732
initialize_slab_journal(struct vdo_slab * slab)3733 static int initialize_slab_journal(struct vdo_slab *slab)
3734 {
3735 struct slab_journal *journal = &slab->journal;
3736 const struct slab_config *slab_config = &slab->allocator->depot->slab_config;
3737 int result;
3738
3739 result = vdo_allocate(slab_config->slab_journal_blocks, __func__, &journal->locks);
3740 if (result != VDO_SUCCESS)
3741 return result;
3742
3743 BUILD_BUG_ON(sizeof(*journal->block) != VDO_BLOCK_SIZE);
3744 result = vdo_allocate(1, "struct packed_slab_journal_block", &journal->block);
3745 if (result != VDO_SUCCESS)
3746 return result;
3747
3748 journal->slab = slab;
3749 journal->size = slab_config->slab_journal_blocks;
3750 journal->flushing_threshold = slab_config->slab_journal_flushing_threshold;
3751 journal->blocking_threshold = slab_config->slab_journal_blocking_threshold;
3752 journal->scrubbing_threshold = slab_config->slab_journal_scrubbing_threshold;
3753 journal->entries_per_block = VDO_SLAB_JOURNAL_ENTRIES_PER_BLOCK;
3754 journal->full_entries_per_block = VDO_SLAB_JOURNAL_FULL_ENTRIES_PER_BLOCK;
3755 journal->events = &slab->allocator->slab_journal_statistics;
3756 journal->recovery_journal = slab->allocator->depot->vdo->recovery_journal;
3757 journal->tail = 1;
3758 journal->head = 1;
3759
3760 journal->flushing_deadline = journal->flushing_threshold;
3761 /*
3762 * Set there to be some time between the deadline and the blocking threshold, so that
3763 * hopefully all are done before blocking.
3764 */
3765 if ((journal->blocking_threshold - journal->flushing_threshold) > 5)
3766 journal->flushing_deadline = journal->blocking_threshold - 5;
3767
3768 journal->slab_summary_waiter.callback = release_journal_locks;
3769
3770 INIT_LIST_HEAD(&journal->dirty_entry);
3771 INIT_LIST_HEAD(&journal->uncommitted_blocks);
3772
3773 journal->tail_header.nonce = slab->allocator->nonce;
3774 journal->tail_header.metadata_type = VDO_METADATA_SLAB_JOURNAL;
3775 initialize_journal_state(journal);
3776 return VDO_SUCCESS;
3777 }
3778
3779 /**
3780 * make_slab() - Construct a new, empty slab.
3781 * @slab_origin: The physical block number within the block allocator partition of the first block
3782 * in the slab.
3783 * @allocator: The block allocator to which the slab belongs.
3784 * @slab_number: The slab number of the slab.
3785 * @is_new: True if this slab is being allocated as part of a resize.
3786 * @slab_ptr: A pointer to receive the new slab.
3787 *
3788 * Return: VDO_SUCCESS or an error code.
3789 */
make_slab(physical_block_number_t slab_origin,struct block_allocator * allocator,slab_count_t slab_number,bool is_new,struct vdo_slab ** slab_ptr)3790 static int __must_check make_slab(physical_block_number_t slab_origin,
3791 struct block_allocator *allocator,
3792 slab_count_t slab_number, bool is_new,
3793 struct vdo_slab **slab_ptr)
3794 {
3795 const struct slab_config *slab_config = &allocator->depot->slab_config;
3796 struct vdo_slab *slab;
3797 int result;
3798
3799 result = vdo_allocate(1, __func__, &slab);
3800 if (result != VDO_SUCCESS)
3801 return result;
3802
3803 *slab = (struct vdo_slab) {
3804 .allocator = allocator,
3805 .start = slab_origin,
3806 .end = slab_origin + slab_config->slab_blocks,
3807 .slab_number = slab_number,
3808 .ref_counts_origin = slab_origin + slab_config->data_blocks,
3809 .journal_origin =
3810 vdo_get_slab_journal_start_block(slab_config, slab_origin),
3811 .block_count = slab_config->data_blocks,
3812 .free_blocks = slab_config->data_blocks,
3813 .reference_block_count =
3814 vdo_get_saved_reference_count_size(slab_config->data_blocks),
3815 };
3816 INIT_LIST_HEAD(&slab->allocq_entry);
3817
3818 result = initialize_slab_journal(slab);
3819 if (result != VDO_SUCCESS) {
3820 free_slab(slab);
3821 return result;
3822 }
3823
3824 if (is_new) {
3825 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NEW);
3826 result = allocate_slab_counters(slab);
3827 if (result != VDO_SUCCESS) {
3828 free_slab(slab);
3829 return result;
3830 }
3831 } else {
3832 vdo_set_admin_state_code(&slab->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
3833 }
3834
3835 *slab_ptr = slab;
3836 return VDO_SUCCESS;
3837 }
3838
3839 /**
3840 * allocate_slabs() - Allocate a new slab pointer array.
3841 * @depot: The depot.
3842 * @slab_count: The number of slabs the depot should have in the new array.
3843 *
3844 * Any existing slab pointers will be copied into the new array, and slabs will be allocated as
3845 * needed. The newly allocated slabs will not be distributed for use by the block allocators.
3846 *
3847 * Return: VDO_SUCCESS or an error code.
3848 */
allocate_slabs(struct slab_depot * depot,slab_count_t slab_count)3849 static int allocate_slabs(struct slab_depot *depot, slab_count_t slab_count)
3850 {
3851 block_count_t slab_size;
3852 bool resizing = false;
3853 physical_block_number_t slab_origin;
3854 int result;
3855
3856 result = vdo_allocate(slab_count, "slab pointer array", &depot->new_slabs);
3857 if (result != VDO_SUCCESS)
3858 return result;
3859
3860 if (depot->slabs != NULL) {
3861 memcpy(depot->new_slabs, depot->slabs,
3862 depot->slab_count * sizeof(struct vdo_slab *));
3863 resizing = true;
3864 }
3865
3866 slab_size = depot->slab_config.slab_blocks;
3867 slab_origin = depot->first_block + (depot->slab_count * slab_size);
3868
3869 for (depot->new_slab_count = depot->slab_count;
3870 depot->new_slab_count < slab_count;
3871 depot->new_slab_count++, slab_origin += slab_size) {
3872 struct block_allocator *allocator =
3873 &depot->allocators[depot->new_slab_count % depot->zone_count];
3874 struct vdo_slab **slab_ptr = &depot->new_slabs[depot->new_slab_count];
3875
3876 result = make_slab(slab_origin, allocator, depot->new_slab_count,
3877 resizing, slab_ptr);
3878 if (result != VDO_SUCCESS)
3879 return result;
3880 }
3881
3882 return VDO_SUCCESS;
3883 }
3884
3885 /**
3886 * vdo_abandon_new_slabs() - Abandon any new slabs in this depot, freeing them as needed.
3887 * @depot: The depot.
3888 */
vdo_abandon_new_slabs(struct slab_depot * depot)3889 void vdo_abandon_new_slabs(struct slab_depot *depot)
3890 {
3891 slab_count_t i;
3892
3893 if (depot->new_slabs == NULL)
3894 return;
3895
3896 for (i = depot->slab_count; i < depot->new_slab_count; i++)
3897 free_slab(vdo_forget(depot->new_slabs[i]));
3898 depot->new_slab_count = 0;
3899 depot->new_size = 0;
3900 vdo_free(vdo_forget(depot->new_slabs));
3901 }
3902
3903 /** Implements vdo_zone_thread_getter_fn. */
get_allocator_thread_id(void * context,zone_count_t zone_number)3904 static thread_id_t get_allocator_thread_id(void *context, zone_count_t zone_number)
3905 {
3906 return ((struct slab_depot *) context)->allocators[zone_number].thread_id;
3907 }
3908
3909 /**
3910 * release_recovery_journal_lock() - Request the slab journal to release the recovery journal lock
3911 * it may hold on a specified recovery journal block.
3912 * @journal: The slab journal.
3913 * @recovery_lock: The sequence number of the recovery journal block whose locks should be
3914 * released.
3915 *
3916 * Return: True if the journal released a lock on the specified block.
3917 */
release_recovery_journal_lock(struct slab_journal * journal,sequence_number_t recovery_lock)3918 static bool __must_check release_recovery_journal_lock(struct slab_journal *journal,
3919 sequence_number_t recovery_lock)
3920 {
3921 if (recovery_lock > journal->recovery_lock) {
3922 VDO_ASSERT_LOG_ONLY((recovery_lock < journal->recovery_lock),
3923 "slab journal recovery lock is not older than the recovery journal head");
3924 return false;
3925 }
3926
3927 if ((recovery_lock < journal->recovery_lock) ||
3928 vdo_is_read_only(journal->slab->allocator->depot->vdo))
3929 return false;
3930
3931 /* All locks are held by the block which is in progress; write it. */
3932 commit_tail(journal);
3933 return true;
3934 }
3935
3936 /*
3937 * Request a commit of all dirty tail blocks which are locking the recovery journal block the depot
3938 * is seeking to release.
3939 *
3940 * Implements vdo_zone_action_fn.
3941 */
release_tail_block_locks(void * context,zone_count_t zone_number,struct vdo_completion * parent)3942 static void release_tail_block_locks(void *context, zone_count_t zone_number,
3943 struct vdo_completion *parent)
3944 {
3945 struct slab_journal *journal, *tmp;
3946 struct slab_depot *depot = context;
3947 struct list_head *list = &depot->allocators[zone_number].dirty_slab_journals;
3948
3949 list_for_each_entry_safe(journal, tmp, list, dirty_entry) {
3950 if (!release_recovery_journal_lock(journal,
3951 depot->active_release_request))
3952 break;
3953 }
3954
3955 vdo_finish_completion(parent);
3956 }
3957
3958 /**
3959 * prepare_for_tail_block_commit() - Prepare to commit oldest tail blocks.
3960 * @context: The slab depot.
3961 * @parent: The parent operation.
3962 *
3963 * Implements vdo_action_preamble_fn.
3964 */
prepare_for_tail_block_commit(void * context,struct vdo_completion * parent)3965 static void prepare_for_tail_block_commit(void *context, struct vdo_completion *parent)
3966 {
3967 struct slab_depot *depot = context;
3968
3969 depot->active_release_request = depot->new_release_request;
3970 vdo_finish_completion(parent);
3971 }
3972
3973 /**
3974 * schedule_tail_block_commit() - Schedule a tail block commit if necessary.
3975 * @context: The slab depot.
3976 *
3977 * This method should not be called directly. Rather, call vdo_schedule_default_action() on the
3978 * depot's action manager.
3979 *
3980 * Implements vdo_action_scheduler_fn.
3981 */
schedule_tail_block_commit(void * context)3982 static bool schedule_tail_block_commit(void *context)
3983 {
3984 struct slab_depot *depot = context;
3985
3986 if (depot->new_release_request == depot->active_release_request)
3987 return false;
3988
3989 return vdo_schedule_action(depot->action_manager,
3990 prepare_for_tail_block_commit,
3991 release_tail_block_locks,
3992 NULL, NULL);
3993 }
3994
3995 /**
3996 * initialize_slab_scrubber() - Initialize an allocator's slab scrubber.
3997 * @allocator: The allocator being initialized
3998 *
3999 * Return: VDO_SUCCESS or an error.
4000 */
initialize_slab_scrubber(struct block_allocator * allocator)4001 static int initialize_slab_scrubber(struct block_allocator *allocator)
4002 {
4003 struct slab_scrubber *scrubber = &allocator->scrubber;
4004 block_count_t slab_journal_size =
4005 allocator->depot->slab_config.slab_journal_blocks;
4006 char *journal_data;
4007 int result;
4008
4009 result = vdo_allocate(VDO_BLOCK_SIZE * slab_journal_size, __func__, &journal_data);
4010 if (result != VDO_SUCCESS)
4011 return result;
4012
4013 result = allocate_vio_components(allocator->completion.vdo,
4014 VIO_TYPE_SLAB_JOURNAL,
4015 VIO_PRIORITY_METADATA,
4016 allocator, slab_journal_size,
4017 journal_data, &scrubber->vio);
4018 if (result != VDO_SUCCESS) {
4019 vdo_free(journal_data);
4020 return result;
4021 }
4022
4023 INIT_LIST_HEAD(&scrubber->high_priority_slabs);
4024 INIT_LIST_HEAD(&scrubber->slabs);
4025 vdo_set_admin_state_code(&scrubber->admin_state, VDO_ADMIN_STATE_SUSPENDED);
4026 return VDO_SUCCESS;
4027 }
4028
4029 /**
4030 * initialize_slab_summary_block() - Initialize a slab_summary_block.
4031 * @allocator: The allocator which owns the block.
4032 * @index: The index of this block in its zone's summary.
4033 *
4034 * Return: VDO_SUCCESS or an error.
4035 */
initialize_slab_summary_block(struct block_allocator * allocator,block_count_t index)4036 static int __must_check initialize_slab_summary_block(struct block_allocator *allocator,
4037 block_count_t index)
4038 {
4039 struct slab_summary_block *block = &allocator->summary_blocks[index];
4040 int result;
4041
4042 result = vdo_allocate(VDO_BLOCK_SIZE, __func__, &block->outgoing_entries);
4043 if (result != VDO_SUCCESS)
4044 return result;
4045
4046 result = allocate_vio_components(allocator->depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4047 VIO_PRIORITY_METADATA, NULL, 1,
4048 block->outgoing_entries, &block->vio);
4049 if (result != VDO_SUCCESS)
4050 return result;
4051
4052 block->allocator = allocator;
4053 block->entries = &allocator->summary_entries[VDO_SLAB_SUMMARY_ENTRIES_PER_BLOCK * index];
4054 block->index = index;
4055 return VDO_SUCCESS;
4056 }
4057
initialize_block_allocator(struct slab_depot * depot,zone_count_t zone)4058 static int __must_check initialize_block_allocator(struct slab_depot *depot,
4059 zone_count_t zone)
4060 {
4061 int result;
4062 block_count_t i;
4063 struct block_allocator *allocator = &depot->allocators[zone];
4064 struct vdo *vdo = depot->vdo;
4065 block_count_t max_free_blocks = depot->slab_config.data_blocks;
4066 unsigned int max_priority = (2 + ilog2(max_free_blocks));
4067 u32 reference_block_count, refcount_reads_needed, refcount_blocks_per_vio;
4068
4069 *allocator = (struct block_allocator) {
4070 .depot = depot,
4071 .zone_number = zone,
4072 .thread_id = vdo->thread_config.physical_threads[zone],
4073 .nonce = vdo->states.vdo.nonce,
4074 };
4075
4076 INIT_LIST_HEAD(&allocator->dirty_slab_journals);
4077 vdo_set_admin_state_code(&allocator->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
4078 result = vdo_register_read_only_listener(vdo, allocator,
4079 notify_block_allocator_of_read_only_mode,
4080 allocator->thread_id);
4081 if (result != VDO_SUCCESS)
4082 return result;
4083
4084 vdo_initialize_completion(&allocator->completion, vdo, VDO_BLOCK_ALLOCATOR_COMPLETION);
4085 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_VIO_POOL_SIZE, 1, allocator->thread_id,
4086 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4087 allocator, &allocator->vio_pool);
4088 if (result != VDO_SUCCESS)
4089 return result;
4090
4091 /* Initialize the refcount-reading vio pool. */
4092 reference_block_count = vdo_get_saved_reference_count_size(depot->slab_config.slab_blocks);
4093 refcount_reads_needed = DIV_ROUND_UP(reference_block_count, MAX_BLOCKS_PER_VIO);
4094 refcount_blocks_per_vio = DIV_ROUND_UP(reference_block_count, refcount_reads_needed);
4095 allocator->refcount_blocks_per_big_vio = refcount_blocks_per_vio;
4096 result = make_vio_pool(vdo, BLOCK_ALLOCATOR_REFCOUNT_VIO_POOL_SIZE,
4097 allocator->refcount_blocks_per_big_vio, allocator->thread_id,
4098 VIO_TYPE_SLAB_JOURNAL, VIO_PRIORITY_METADATA,
4099 NULL, &allocator->refcount_big_vio_pool);
4100 if (result != VDO_SUCCESS)
4101 return result;
4102
4103 result = initialize_slab_scrubber(allocator);
4104 if (result != VDO_SUCCESS)
4105 return result;
4106
4107 result = vdo_make_priority_table(max_priority, &allocator->prioritized_slabs);
4108 if (result != VDO_SUCCESS)
4109 return result;
4110
4111 result = vdo_allocate(VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE, __func__,
4112 &allocator->summary_blocks);
4113 if (result != VDO_SUCCESS)
4114 return result;
4115
4116 vdo_set_admin_state_code(&allocator->summary_state,
4117 VDO_ADMIN_STATE_NORMAL_OPERATION);
4118 allocator->summary_entries = depot->summary_entries + (MAX_VDO_SLABS * zone);
4119
4120 /* Initialize each summary block. */
4121 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4122 result = initialize_slab_summary_block(allocator, i);
4123 if (result != VDO_SUCCESS)
4124 return result;
4125 }
4126
4127 /*
4128 * Performing well atop thin provisioned storage requires either that VDO discards freed
4129 * blocks, or that the block allocator try to use slabs that already have allocated blocks
4130 * in preference to slabs that have never been opened. For reasons we have not been able to
4131 * fully understand, some SSD machines have been have been very sensitive (50% reduction in
4132 * test throughput) to very slight differences in the timing and locality of block
4133 * allocation. Assigning a low priority to unopened slabs (max_priority/2, say) would be
4134 * ideal for the story, but anything less than a very high threshold (max_priority - 1)
4135 * hurts on these machines.
4136 *
4137 * This sets the free block threshold for preferring to open an unopened slab to the binary
4138 * floor of 3/4ths the total number of data blocks in a slab, which will generally evaluate
4139 * to about half the slab size.
4140 */
4141 allocator->unopened_slab_priority = (1 + ilog2((max_free_blocks * 3) / 4));
4142
4143 return VDO_SUCCESS;
4144 }
4145
allocate_components(struct slab_depot * depot,struct partition * summary_partition)4146 static int allocate_components(struct slab_depot *depot,
4147 struct partition *summary_partition)
4148 {
4149 int result;
4150 zone_count_t zone;
4151 slab_count_t slab_count;
4152 u8 hint;
4153 u32 i;
4154 const struct thread_config *thread_config = &depot->vdo->thread_config;
4155
4156 result = vdo_make_action_manager(depot->zone_count, get_allocator_thread_id,
4157 thread_config->journal_thread, depot,
4158 schedule_tail_block_commit,
4159 depot->vdo, &depot->action_manager);
4160 if (result != VDO_SUCCESS)
4161 return result;
4162
4163 depot->origin = depot->first_block;
4164
4165 /* block size must be a multiple of entry size */
4166 BUILD_BUG_ON((VDO_BLOCK_SIZE % sizeof(struct slab_summary_entry)) != 0);
4167
4168 depot->summary_origin = summary_partition->offset;
4169 depot->hint_shift = vdo_get_slab_summary_hint_shift(depot->slab_size_shift);
4170 result = vdo_allocate(MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES, __func__,
4171 &depot->summary_entries);
4172 if (result != VDO_SUCCESS)
4173 return result;
4174
4175
4176 /* Initialize all the entries. */
4177 hint = compute_fullness_hint(depot, depot->slab_config.data_blocks);
4178 for (i = 0; i < MAXIMUM_VDO_SLAB_SUMMARY_ENTRIES; i++) {
4179 /*
4180 * This default tail block offset must be reflected in
4181 * slabJournal.c::read_slab_journal_tail().
4182 */
4183 depot->summary_entries[i] = (struct slab_summary_entry) {
4184 .tail_block_offset = 0,
4185 .fullness_hint = hint,
4186 .load_ref_counts = false,
4187 .is_dirty = false,
4188 };
4189 }
4190
4191 slab_count = vdo_compute_slab_count(depot->first_block, depot->last_block,
4192 depot->slab_size_shift);
4193 if (thread_config->physical_zone_count > slab_count) {
4194 return vdo_log_error_strerror(VDO_BAD_CONFIGURATION,
4195 "%u physical zones exceeds slab count %u",
4196 thread_config->physical_zone_count,
4197 slab_count);
4198 }
4199
4200 /* Initialize the block allocators. */
4201 for (zone = 0; zone < depot->zone_count; zone++) {
4202 result = initialize_block_allocator(depot, zone);
4203 if (result != VDO_SUCCESS)
4204 return result;
4205 }
4206
4207 /* Allocate slabs. */
4208 result = allocate_slabs(depot, slab_count);
4209 if (result != VDO_SUCCESS)
4210 return result;
4211
4212 /* Use the new slabs. */
4213 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4214 struct vdo_slab *slab = depot->new_slabs[i];
4215
4216 register_slab_with_allocator(slab->allocator, slab);
4217 WRITE_ONCE(depot->slab_count, depot->slab_count + 1);
4218 }
4219
4220 depot->slabs = depot->new_slabs;
4221 depot->new_slabs = NULL;
4222 depot->new_slab_count = 0;
4223
4224 return VDO_SUCCESS;
4225 }
4226
4227 /**
4228 * vdo_decode_slab_depot() - Make a slab depot and configure it with the state read from the super
4229 * block.
4230 * @state: The slab depot state from the super block.
4231 * @vdo: The VDO which will own the depot.
4232 * @summary_partition: The partition which holds the slab summary.
4233 * @depot_ptr: A pointer to hold the depot.
4234 *
4235 * Return: A success or error code.
4236 */
vdo_decode_slab_depot(struct slab_depot_state_2_0 state,struct vdo * vdo,struct partition * summary_partition,struct slab_depot ** depot_ptr)4237 int vdo_decode_slab_depot(struct slab_depot_state_2_0 state, struct vdo *vdo,
4238 struct partition *summary_partition,
4239 struct slab_depot **depot_ptr)
4240 {
4241 unsigned int slab_size_shift;
4242 struct slab_depot *depot;
4243 int result;
4244
4245 /*
4246 * Calculate the bit shift for efficiently mapping block numbers to slabs. Using a shift
4247 * requires that the slab size be a power of two.
4248 */
4249 block_count_t slab_size = state.slab_config.slab_blocks;
4250
4251 if (!is_power_of_2(slab_size)) {
4252 return vdo_log_error_strerror(UDS_INVALID_ARGUMENT,
4253 "slab size must be a power of two");
4254 }
4255 slab_size_shift = ilog2(slab_size);
4256
4257 if (state.zone_count > MAX_VDO_PHYSICAL_ZONES)
4258 return vdo_log_error_strerror(UDS_CORRUPT_DATA,
4259 "invalid zone count");
4260
4261 result = vdo_allocate_extended(vdo->thread_config.physical_zone_count,
4262 allocators, __func__, &depot);
4263 if (result != VDO_SUCCESS)
4264 return result;
4265
4266 depot->vdo = vdo;
4267 depot->old_zone_count = state.zone_count;
4268 depot->zone_count = vdo->thread_config.physical_zone_count;
4269 depot->slab_config = state.slab_config;
4270 depot->first_block = state.first_block;
4271 depot->last_block = state.last_block;
4272 depot->slab_size_shift = slab_size_shift;
4273
4274 result = allocate_components(depot, summary_partition);
4275 if (result != VDO_SUCCESS) {
4276 vdo_free_slab_depot(depot);
4277 return result;
4278 }
4279
4280 *depot_ptr = depot;
4281 return VDO_SUCCESS;
4282 }
4283
uninitialize_allocator_summary(struct block_allocator * allocator)4284 static void uninitialize_allocator_summary(struct block_allocator *allocator)
4285 {
4286 block_count_t i;
4287
4288 if (allocator->summary_blocks == NULL)
4289 return;
4290
4291 for (i = 0; i < VDO_SLAB_SUMMARY_BLOCKS_PER_ZONE; i++) {
4292 free_vio_components(&allocator->summary_blocks[i].vio);
4293 vdo_free(vdo_forget(allocator->summary_blocks[i].outgoing_entries));
4294 }
4295
4296 vdo_free(vdo_forget(allocator->summary_blocks));
4297 }
4298
4299 /**
4300 * vdo_free_slab_depot() - Destroy a slab depot.
4301 * @depot: The depot to destroy.
4302 */
vdo_free_slab_depot(struct slab_depot * depot)4303 void vdo_free_slab_depot(struct slab_depot *depot)
4304 {
4305 zone_count_t zone = 0;
4306
4307 if (depot == NULL)
4308 return;
4309
4310 vdo_abandon_new_slabs(depot);
4311
4312 for (zone = 0; zone < depot->zone_count; zone++) {
4313 struct block_allocator *allocator = &depot->allocators[zone];
4314
4315 if (allocator->eraser != NULL)
4316 dm_kcopyd_client_destroy(vdo_forget(allocator->eraser));
4317
4318 uninitialize_allocator_summary(allocator);
4319 uninitialize_scrubber_vio(&allocator->scrubber);
4320 free_vio_pool(vdo_forget(allocator->vio_pool));
4321 free_vio_pool(vdo_forget(allocator->refcount_big_vio_pool));
4322 vdo_free_priority_table(vdo_forget(allocator->prioritized_slabs));
4323 }
4324
4325 if (depot->slabs != NULL) {
4326 slab_count_t i;
4327
4328 for (i = 0; i < depot->slab_count; i++)
4329 free_slab(vdo_forget(depot->slabs[i]));
4330 }
4331
4332 vdo_free(vdo_forget(depot->slabs));
4333 vdo_free(vdo_forget(depot->action_manager));
4334 vdo_free(vdo_forget(depot->summary_entries));
4335 vdo_free(depot);
4336 }
4337
4338 /**
4339 * vdo_record_slab_depot() - Record the state of a slab depot for encoding into the super block.
4340 * @depot: The depot to encode.
4341 *
4342 * Return: The depot state.
4343 */
vdo_record_slab_depot(const struct slab_depot * depot)4344 struct slab_depot_state_2_0 vdo_record_slab_depot(const struct slab_depot *depot)
4345 {
4346 /*
4347 * If this depot is currently using 0 zones, it must have been synchronously loaded by a
4348 * tool and is now being saved. We did not load and combine the slab summary, so we still
4349 * need to do that next time we load with the old zone count rather than 0.
4350 */
4351 struct slab_depot_state_2_0 state;
4352 zone_count_t zones_to_record = depot->zone_count;
4353
4354 if (depot->zone_count == 0)
4355 zones_to_record = depot->old_zone_count;
4356
4357 state = (struct slab_depot_state_2_0) {
4358 .slab_config = depot->slab_config,
4359 .first_block = depot->first_block,
4360 .last_block = depot->last_block,
4361 .zone_count = zones_to_record,
4362 };
4363
4364 return state;
4365 }
4366
4367 /**
4368 * vdo_allocate_reference_counters() - Allocate the reference counters for all slabs in the depot.
4369 * @depot: The slab depot.
4370 *
4371 * Context: This method may be called only before entering normal operation from the load thread.
4372 *
4373 * Return: VDO_SUCCESS or an error.
4374 */
vdo_allocate_reference_counters(struct slab_depot * depot)4375 int vdo_allocate_reference_counters(struct slab_depot *depot)
4376 {
4377 struct slab_iterator iterator =
4378 get_depot_slab_iterator(depot, depot->slab_count - 1, 0, 1);
4379
4380 while (iterator.next != NULL) {
4381 int result = allocate_slab_counters(next_slab(&iterator));
4382
4383 if (result != VDO_SUCCESS)
4384 return result;
4385 }
4386
4387 return VDO_SUCCESS;
4388 }
4389
4390 /**
4391 * get_slab_number() - Get the number of the slab that contains a specified block.
4392 * @depot: The slab depot.
4393 * @pbn: The physical block number.
4394 * @slab_number_ptr: A pointer to hold the slab number.
4395 *
4396 * Return: VDO_SUCCESS or an error.
4397 */
get_slab_number(const struct slab_depot * depot,physical_block_number_t pbn,slab_count_t * slab_number_ptr)4398 static int __must_check get_slab_number(const struct slab_depot *depot,
4399 physical_block_number_t pbn,
4400 slab_count_t *slab_number_ptr)
4401 {
4402 slab_count_t slab_number;
4403
4404 if (pbn < depot->first_block)
4405 return VDO_OUT_OF_RANGE;
4406
4407 slab_number = (pbn - depot->first_block) >> depot->slab_size_shift;
4408 if (slab_number >= depot->slab_count)
4409 return VDO_OUT_OF_RANGE;
4410
4411 *slab_number_ptr = slab_number;
4412 return VDO_SUCCESS;
4413 }
4414
4415 /**
4416 * vdo_get_slab() - Get the slab object for the slab that contains a specified block.
4417 * @depot: The slab depot.
4418 * @pbn: The physical block number.
4419 *
4420 * Will put the VDO in read-only mode if the PBN is not a valid data block nor the zero block.
4421 *
4422 * Return: The slab containing the block, or NULL if the block number is the zero block or
4423 * otherwise out of range.
4424 */
vdo_get_slab(const struct slab_depot * depot,physical_block_number_t pbn)4425 struct vdo_slab *vdo_get_slab(const struct slab_depot *depot,
4426 physical_block_number_t pbn)
4427 {
4428 slab_count_t slab_number;
4429 int result;
4430
4431 if (pbn == VDO_ZERO_BLOCK)
4432 return NULL;
4433
4434 result = get_slab_number(depot, pbn, &slab_number);
4435 if (result != VDO_SUCCESS) {
4436 vdo_enter_read_only_mode(depot->vdo, result);
4437 return NULL;
4438 }
4439
4440 return depot->slabs[slab_number];
4441 }
4442
4443 /**
4444 * vdo_get_increment_limit() - Determine how many new references a block can acquire.
4445 * @depot: The slab depot.
4446 * @pbn: The physical block number that is being queried.
4447 *
4448 * Context: This method must be called from the physical zone thread of the PBN.
4449 *
4450 * Return: The number of available references.
4451 */
vdo_get_increment_limit(struct slab_depot * depot,physical_block_number_t pbn)4452 u8 vdo_get_increment_limit(struct slab_depot *depot, physical_block_number_t pbn)
4453 {
4454 struct vdo_slab *slab = vdo_get_slab(depot, pbn);
4455 vdo_refcount_t *counter_ptr = NULL;
4456 int result;
4457
4458 if ((slab == NULL) || (slab->status != VDO_SLAB_REBUILT))
4459 return 0;
4460
4461 result = get_reference_counter(slab, pbn, &counter_ptr);
4462 if (result != VDO_SUCCESS)
4463 return 0;
4464
4465 if (*counter_ptr == PROVISIONAL_REFERENCE_COUNT)
4466 return (MAXIMUM_REFERENCE_COUNT - 1);
4467
4468 return (MAXIMUM_REFERENCE_COUNT - *counter_ptr);
4469 }
4470
4471 /**
4472 * vdo_is_physical_data_block() - Determine whether the given PBN refers to a data block.
4473 * @depot: The depot.
4474 * @pbn: The physical block number to ask about.
4475 *
4476 * Return: True if the PBN corresponds to a data block.
4477 */
vdo_is_physical_data_block(const struct slab_depot * depot,physical_block_number_t pbn)4478 bool vdo_is_physical_data_block(const struct slab_depot *depot,
4479 physical_block_number_t pbn)
4480 {
4481 slab_count_t slab_number;
4482 slab_block_number sbn;
4483
4484 return ((pbn == VDO_ZERO_BLOCK) ||
4485 ((get_slab_number(depot, pbn, &slab_number) == VDO_SUCCESS) &&
4486 (slab_block_number_from_pbn(depot->slabs[slab_number], pbn, &sbn) ==
4487 VDO_SUCCESS)));
4488 }
4489
4490 /**
4491 * vdo_get_slab_depot_allocated_blocks() - Get the total number of data blocks allocated across all
4492 * the slabs in the depot.
4493 * @depot: The slab depot.
4494 *
4495 * This is the total number of blocks with a non-zero reference count.
4496 *
4497 * Context: This may be called from any thread.
4498 *
4499 * Return: The total number of blocks with a non-zero reference count.
4500 */
vdo_get_slab_depot_allocated_blocks(const struct slab_depot * depot)4501 block_count_t vdo_get_slab_depot_allocated_blocks(const struct slab_depot *depot)
4502 {
4503 block_count_t total = 0;
4504 zone_count_t zone;
4505
4506 for (zone = 0; zone < depot->zone_count; zone++) {
4507 /* The allocators are responsible for thread safety. */
4508 total += READ_ONCE(depot->allocators[zone].allocated_blocks);
4509 }
4510
4511 return total;
4512 }
4513
4514 /**
4515 * vdo_get_slab_depot_data_blocks() - Get the total number of data blocks in all the slabs in the
4516 * depot.
4517 * @depot: The slab depot.
4518 *
4519 * Context: This may be called from any thread.
4520 *
4521 * Return: The total number of data blocks in all slabs.
4522 */
vdo_get_slab_depot_data_blocks(const struct slab_depot * depot)4523 block_count_t vdo_get_slab_depot_data_blocks(const struct slab_depot *depot)
4524 {
4525 return (READ_ONCE(depot->slab_count) * depot->slab_config.data_blocks);
4526 }
4527
4528 /**
4529 * finish_combining_zones() - Clean up after saving out the combined slab summary.
4530 * @completion: The vio which was used to write the summary data.
4531 */
finish_combining_zones(struct vdo_completion * completion)4532 static void finish_combining_zones(struct vdo_completion *completion)
4533 {
4534 int result = completion->result;
4535 struct vdo_completion *parent = completion->parent;
4536
4537 free_vio(as_vio(vdo_forget(completion)));
4538 vdo_fail_completion(parent, result);
4539 }
4540
handle_combining_error(struct vdo_completion * completion)4541 static void handle_combining_error(struct vdo_completion *completion)
4542 {
4543 vio_record_metadata_io_error(as_vio(completion));
4544 finish_combining_zones(completion);
4545 }
4546
write_summary_endio(struct bio * bio)4547 static void write_summary_endio(struct bio *bio)
4548 {
4549 struct vio *vio = bio->bi_private;
4550 struct vdo *vdo = vio->completion.vdo;
4551
4552 continue_vio_after_io(vio, finish_combining_zones,
4553 vdo->thread_config.admin_thread);
4554 }
4555
4556 /**
4557 * combine_summaries() - Treating the current entries buffer as the on-disk value of all zones,
4558 * update every zone to the correct values for every slab.
4559 * @depot: The depot whose summary entries should be combined.
4560 */
combine_summaries(struct slab_depot * depot)4561 static void combine_summaries(struct slab_depot *depot)
4562 {
4563 /*
4564 * Combine all the old summary data into the portion of the buffer corresponding to the
4565 * first zone.
4566 */
4567 zone_count_t zone = 0;
4568 struct slab_summary_entry *entries = depot->summary_entries;
4569
4570 if (depot->old_zone_count > 1) {
4571 slab_count_t entry_number;
4572
4573 for (entry_number = 0; entry_number < MAX_VDO_SLABS; entry_number++) {
4574 if (zone != 0) {
4575 memcpy(entries + entry_number,
4576 entries + (zone * MAX_VDO_SLABS) + entry_number,
4577 sizeof(struct slab_summary_entry));
4578 }
4579
4580 zone++;
4581 if (zone == depot->old_zone_count)
4582 zone = 0;
4583 }
4584 }
4585
4586 /* Copy the combined data to each zones's region of the buffer. */
4587 for (zone = 1; zone < MAX_VDO_PHYSICAL_ZONES; zone++) {
4588 memcpy(entries + (zone * MAX_VDO_SLABS), entries,
4589 MAX_VDO_SLABS * sizeof(struct slab_summary_entry));
4590 }
4591 }
4592
4593 /**
4594 * finish_loading_summary() - Finish loading slab summary data.
4595 * @completion: The vio which was used to read the summary data.
4596 *
4597 * Combines the slab summary data from all the previously written zones and copies the combined
4598 * summary to each partition's data region. Then writes the combined summary back out to disk. This
4599 * callback is registered in load_summary_endio().
4600 */
finish_loading_summary(struct vdo_completion * completion)4601 static void finish_loading_summary(struct vdo_completion *completion)
4602 {
4603 struct slab_depot *depot = completion->vdo->depot;
4604
4605 /* Combine the summary from each zone so each zone is correct for all slabs. */
4606 combine_summaries(depot);
4607
4608 /* Write the combined summary back out. */
4609 vdo_submit_metadata_vio(as_vio(completion), depot->summary_origin,
4610 write_summary_endio, handle_combining_error,
4611 REQ_OP_WRITE);
4612 }
4613
load_summary_endio(struct bio * bio)4614 static void load_summary_endio(struct bio *bio)
4615 {
4616 struct vio *vio = bio->bi_private;
4617 struct vdo *vdo = vio->completion.vdo;
4618
4619 continue_vio_after_io(vio, finish_loading_summary,
4620 vdo->thread_config.admin_thread);
4621 }
4622
4623 /**
4624 * load_slab_summary() - Load the slab summary before the slab data.
4625 * @context: The slab depot.
4626 * @parent: The load operation.
4627 *
4628 * Implements vdo_action_preamble_fn.
4629 */
load_slab_summary(void * context,struct vdo_completion * parent)4630 static void load_slab_summary(void *context, struct vdo_completion *parent)
4631 {
4632 int result;
4633 struct vio *vio;
4634 struct slab_depot *depot = context;
4635 const struct admin_state_code *operation =
4636 vdo_get_current_manager_operation(depot->action_manager);
4637
4638 result = create_multi_block_metadata_vio(depot->vdo, VIO_TYPE_SLAB_SUMMARY,
4639 VIO_PRIORITY_METADATA, parent,
4640 VDO_SLAB_SUMMARY_BLOCKS,
4641 (char *) depot->summary_entries, &vio);
4642 if (result != VDO_SUCCESS) {
4643 vdo_fail_completion(parent, result);
4644 return;
4645 }
4646
4647 if ((operation == VDO_ADMIN_STATE_FORMATTING) ||
4648 (operation == VDO_ADMIN_STATE_LOADING_FOR_REBUILD)) {
4649 finish_loading_summary(&vio->completion);
4650 return;
4651 }
4652
4653 vdo_submit_metadata_vio(vio, depot->summary_origin, load_summary_endio,
4654 handle_combining_error, REQ_OP_READ);
4655 }
4656
4657 /* Implements vdo_zone_action_fn. */
load_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4658 static void load_allocator(void *context, zone_count_t zone_number,
4659 struct vdo_completion *parent)
4660 {
4661 struct slab_depot *depot = context;
4662
4663 vdo_start_loading(&depot->allocators[zone_number].state,
4664 vdo_get_current_manager_operation(depot->action_manager),
4665 parent, initiate_load);
4666 }
4667
4668 /**
4669 * vdo_load_slab_depot() - Asynchronously load any slab depot state that isn't included in the
4670 * super_block component.
4671 * @depot: The depot to load.
4672 * @operation: The type of load to perform.
4673 * @parent: The completion to notify when the load is complete.
4674 * @context: Additional context for the load operation; may be NULL.
4675 *
4676 * This method may be called only before entering normal operation from the load thread.
4677 */
vdo_load_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent,void * context)4678 void vdo_load_slab_depot(struct slab_depot *depot,
4679 const struct admin_state_code *operation,
4680 struct vdo_completion *parent, void *context)
4681 {
4682 if (!vdo_assert_load_operation(operation, parent))
4683 return;
4684
4685 vdo_schedule_operation_with_context(depot->action_manager, operation,
4686 load_slab_summary, load_allocator,
4687 NULL, context, parent);
4688 }
4689
4690 /* Implements vdo_zone_action_fn. */
prepare_to_allocate(void * context,zone_count_t zone_number,struct vdo_completion * parent)4691 static void prepare_to_allocate(void *context, zone_count_t zone_number,
4692 struct vdo_completion *parent)
4693 {
4694 struct slab_depot *depot = context;
4695 struct block_allocator *allocator = &depot->allocators[zone_number];
4696 int result;
4697
4698 result = vdo_prepare_slabs_for_allocation(allocator);
4699 if (result != VDO_SUCCESS) {
4700 vdo_fail_completion(parent, result);
4701 return;
4702 }
4703
4704 scrub_slabs(allocator, parent);
4705 }
4706
4707 /**
4708 * vdo_prepare_slab_depot_to_allocate() - Prepare the slab depot to come online and start
4709 * allocating blocks.
4710 * @depot: The depot to prepare.
4711 * @load_type: The load type.
4712 * @parent: The completion to notify when the operation is complete.
4713 *
4714 * This method may be called only before entering normal operation from the load thread. It must be
4715 * called before allocation may proceed.
4716 */
vdo_prepare_slab_depot_to_allocate(struct slab_depot * depot,enum slab_depot_load_type load_type,struct vdo_completion * parent)4717 void vdo_prepare_slab_depot_to_allocate(struct slab_depot *depot,
4718 enum slab_depot_load_type load_type,
4719 struct vdo_completion *parent)
4720 {
4721 depot->load_type = load_type;
4722 atomic_set(&depot->zones_to_scrub, depot->zone_count);
4723 vdo_schedule_action(depot->action_manager, NULL,
4724 prepare_to_allocate, NULL, parent);
4725 }
4726
4727 /**
4728 * vdo_update_slab_depot_size() - Update the slab depot to reflect its new size in memory.
4729 * @depot: The depot to update.
4730 *
4731 * This size is saved to disk as part of the super block.
4732 */
vdo_update_slab_depot_size(struct slab_depot * depot)4733 void vdo_update_slab_depot_size(struct slab_depot *depot)
4734 {
4735 depot->last_block = depot->new_last_block;
4736 }
4737
4738 /**
4739 * vdo_prepare_to_grow_slab_depot() - Allocate new memory needed for a resize of a slab depot to
4740 * the given size.
4741 * @depot: The depot to prepare to resize.
4742 * @partition: The new depot partition.
4743 *
4744 * Return: VDO_SUCCESS or an error.
4745 */
vdo_prepare_to_grow_slab_depot(struct slab_depot * depot,const struct partition * partition)4746 int vdo_prepare_to_grow_slab_depot(struct slab_depot *depot,
4747 const struct partition *partition)
4748 {
4749 struct slab_depot_state_2_0 new_state;
4750 int result;
4751 slab_count_t new_slab_count;
4752
4753 if ((partition->count >> depot->slab_size_shift) <= depot->slab_count)
4754 return VDO_INCREMENT_TOO_SMALL;
4755
4756 /* Generate the depot configuration for the new block count. */
4757 VDO_ASSERT_LOG_ONLY(depot->first_block == partition->offset,
4758 "New slab depot partition doesn't change origin");
4759 result = vdo_configure_slab_depot(partition, depot->slab_config,
4760 depot->zone_count, &new_state);
4761 if (result != VDO_SUCCESS)
4762 return result;
4763
4764 new_slab_count = vdo_compute_slab_count(depot->first_block,
4765 new_state.last_block,
4766 depot->slab_size_shift);
4767 if (new_slab_count <= depot->slab_count)
4768 return vdo_log_error_strerror(VDO_INCREMENT_TOO_SMALL,
4769 "Depot can only grow");
4770 if (new_slab_count == depot->new_slab_count) {
4771 /* Check it out, we've already got all the new slabs allocated! */
4772 return VDO_SUCCESS;
4773 }
4774
4775 vdo_abandon_new_slabs(depot);
4776 result = allocate_slabs(depot, new_slab_count);
4777 if (result != VDO_SUCCESS) {
4778 vdo_abandon_new_slabs(depot);
4779 return result;
4780 }
4781
4782 depot->new_size = partition->count;
4783 depot->old_last_block = depot->last_block;
4784 depot->new_last_block = new_state.last_block;
4785
4786 return VDO_SUCCESS;
4787 }
4788
4789 /**
4790 * finish_registration() - Finish registering new slabs now that all of the allocators have
4791 * received their new slabs.
4792 * @context: The slab depot.
4793 *
4794 * Implements vdo_action_conclusion_fn.
4795 */
finish_registration(void * context)4796 static int finish_registration(void *context)
4797 {
4798 struct slab_depot *depot = context;
4799
4800 WRITE_ONCE(depot->slab_count, depot->new_slab_count);
4801 vdo_free(depot->slabs);
4802 depot->slabs = depot->new_slabs;
4803 depot->new_slabs = NULL;
4804 depot->new_slab_count = 0;
4805 return VDO_SUCCESS;
4806 }
4807
4808 /* Implements vdo_zone_action_fn. */
register_new_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)4809 static void register_new_slabs(void *context, zone_count_t zone_number,
4810 struct vdo_completion *parent)
4811 {
4812 struct slab_depot *depot = context;
4813 struct block_allocator *allocator = &depot->allocators[zone_number];
4814 slab_count_t i;
4815
4816 for (i = depot->slab_count; i < depot->new_slab_count; i++) {
4817 struct vdo_slab *slab = depot->new_slabs[i];
4818
4819 if (slab->allocator == allocator)
4820 register_slab_with_allocator(allocator, slab);
4821 }
4822
4823 vdo_finish_completion(parent);
4824 }
4825
4826 /**
4827 * vdo_use_new_slabs() - Use the new slabs allocated for resize.
4828 * @depot: The depot.
4829 * @parent: The object to notify when complete.
4830 */
vdo_use_new_slabs(struct slab_depot * depot,struct vdo_completion * parent)4831 void vdo_use_new_slabs(struct slab_depot *depot, struct vdo_completion *parent)
4832 {
4833 VDO_ASSERT_LOG_ONLY(depot->new_slabs != NULL, "Must have new slabs to use");
4834 vdo_schedule_operation(depot->action_manager,
4835 VDO_ADMIN_STATE_SUSPENDED_OPERATION,
4836 NULL, register_new_slabs,
4837 finish_registration, parent);
4838 }
4839
4840 /**
4841 * stop_scrubbing() - Tell the scrubber to stop scrubbing after it finishes the slab it is
4842 * currently working on.
4843 * @allocator: The block allocator owning the scrubber to stop.
4844 */
stop_scrubbing(struct block_allocator * allocator)4845 static void stop_scrubbing(struct block_allocator *allocator)
4846 {
4847 struct slab_scrubber *scrubber = &allocator->scrubber;
4848
4849 if (vdo_is_state_quiescent(&scrubber->admin_state)) {
4850 vdo_finish_completion(&allocator->completion);
4851 } else {
4852 vdo_start_draining(&scrubber->admin_state,
4853 VDO_ADMIN_STATE_SUSPENDING,
4854 &allocator->completion, NULL);
4855 }
4856 }
4857
4858 /* Implements vdo_admin_initiator_fn. */
initiate_summary_drain(struct admin_state * state)4859 static void initiate_summary_drain(struct admin_state *state)
4860 {
4861 check_summary_drain_complete(container_of(state, struct block_allocator,
4862 summary_state));
4863 }
4864
do_drain_step(struct vdo_completion * completion)4865 static void do_drain_step(struct vdo_completion *completion)
4866 {
4867 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4868
4869 vdo_prepare_completion_for_requeue(&allocator->completion, do_drain_step,
4870 handle_operation_error, allocator->thread_id,
4871 NULL);
4872 switch (++allocator->drain_step) {
4873 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4874 stop_scrubbing(allocator);
4875 return;
4876
4877 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4878 apply_to_slabs(allocator, do_drain_step);
4879 return;
4880
4881 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4882 vdo_start_draining(&allocator->summary_state,
4883 vdo_get_admin_state_code(&allocator->state),
4884 completion, initiate_summary_drain);
4885 return;
4886
4887 case VDO_DRAIN_ALLOCATOR_STEP_FINISHED:
4888 VDO_ASSERT_LOG_ONLY(!is_vio_pool_busy(allocator->vio_pool),
4889 "vio pool not busy");
4890 vdo_finish_draining_with_result(&allocator->state, completion->result);
4891 return;
4892
4893 default:
4894 vdo_finish_draining_with_result(&allocator->state, UDS_BAD_STATE);
4895 }
4896 }
4897
4898 /* Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)4899 static void initiate_drain(struct admin_state *state)
4900 {
4901 struct block_allocator *allocator =
4902 container_of(state, struct block_allocator, state);
4903
4904 allocator->drain_step = VDO_DRAIN_ALLOCATOR_START;
4905 do_drain_step(&allocator->completion);
4906 }
4907
4908 /*
4909 * Drain all allocator I/O. Depending upon the type of drain, some or all dirty metadata may be
4910 * written to disk. The type of drain will be determined from the state of the allocator's depot.
4911 *
4912 * Implements vdo_zone_action_fn.
4913 */
drain_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)4914 static void drain_allocator(void *context, zone_count_t zone_number,
4915 struct vdo_completion *parent)
4916 {
4917 struct slab_depot *depot = context;
4918
4919 vdo_start_draining(&depot->allocators[zone_number].state,
4920 vdo_get_current_manager_operation(depot->action_manager),
4921 parent, initiate_drain);
4922 }
4923
4924 /**
4925 * vdo_drain_slab_depot() - Drain all slab depot I/O.
4926 * @depot: The depot to drain.
4927 * @operation: The drain operation (flush, rebuild, suspend, or save).
4928 * @parent: The completion to finish when the drain is complete.
4929 *
4930 * If saving, or flushing, all dirty depot metadata will be written out. If saving or suspending,
4931 * the depot will be left in a suspended state.
4932 */
vdo_drain_slab_depot(struct slab_depot * depot,const struct admin_state_code * operation,struct vdo_completion * parent)4933 void vdo_drain_slab_depot(struct slab_depot *depot,
4934 const struct admin_state_code *operation,
4935 struct vdo_completion *parent)
4936 {
4937 vdo_schedule_operation(depot->action_manager, operation,
4938 NULL, drain_allocator, NULL, parent);
4939 }
4940
4941 /**
4942 * resume_scrubbing() - Tell the scrubber to resume scrubbing if it has been stopped.
4943 * @allocator: The allocator being resumed.
4944 */
resume_scrubbing(struct block_allocator * allocator)4945 static void resume_scrubbing(struct block_allocator *allocator)
4946 {
4947 int result;
4948 struct slab_scrubber *scrubber = &allocator->scrubber;
4949
4950 if (!has_slabs_to_scrub(scrubber)) {
4951 vdo_finish_completion(&allocator->completion);
4952 return;
4953 }
4954
4955 result = vdo_resume_if_quiescent(&scrubber->admin_state);
4956 if (result != VDO_SUCCESS) {
4957 vdo_fail_completion(&allocator->completion, result);
4958 return;
4959 }
4960
4961 scrub_next_slab(scrubber);
4962 vdo_finish_completion(&allocator->completion);
4963 }
4964
do_resume_step(struct vdo_completion * completion)4965 static void do_resume_step(struct vdo_completion *completion)
4966 {
4967 struct block_allocator *allocator = vdo_as_block_allocator(completion);
4968
4969 vdo_prepare_completion_for_requeue(&allocator->completion, do_resume_step,
4970 handle_operation_error,
4971 allocator->thread_id, NULL);
4972 switch (--allocator->drain_step) {
4973 case VDO_DRAIN_ALLOCATOR_STEP_SUMMARY:
4974 vdo_fail_completion(completion,
4975 vdo_resume_if_quiescent(&allocator->summary_state));
4976 return;
4977
4978 case VDO_DRAIN_ALLOCATOR_STEP_SLABS:
4979 apply_to_slabs(allocator, do_resume_step);
4980 return;
4981
4982 case VDO_DRAIN_ALLOCATOR_STEP_SCRUBBER:
4983 resume_scrubbing(allocator);
4984 return;
4985
4986 case VDO_DRAIN_ALLOCATOR_START:
4987 vdo_finish_resuming_with_result(&allocator->state, completion->result);
4988 return;
4989
4990 default:
4991 vdo_finish_resuming_with_result(&allocator->state, UDS_BAD_STATE);
4992 }
4993 }
4994
4995 /* Implements vdo_admin_initiator_fn. */
initiate_resume(struct admin_state * state)4996 static void initiate_resume(struct admin_state *state)
4997 {
4998 struct block_allocator *allocator =
4999 container_of(state, struct block_allocator, state);
5000
5001 allocator->drain_step = VDO_DRAIN_ALLOCATOR_STEP_FINISHED;
5002 do_resume_step(&allocator->completion);
5003 }
5004
5005 /* Implements vdo_zone_action_fn. */
resume_allocator(void * context,zone_count_t zone_number,struct vdo_completion * parent)5006 static void resume_allocator(void *context, zone_count_t zone_number,
5007 struct vdo_completion *parent)
5008 {
5009 struct slab_depot *depot = context;
5010
5011 vdo_start_resuming(&depot->allocators[zone_number].state,
5012 vdo_get_current_manager_operation(depot->action_manager),
5013 parent, initiate_resume);
5014 }
5015
5016 /**
5017 * vdo_resume_slab_depot() - Resume a suspended slab depot.
5018 * @depot: The depot to resume.
5019 * @parent: The completion to finish when the depot has resumed.
5020 */
vdo_resume_slab_depot(struct slab_depot * depot,struct vdo_completion * parent)5021 void vdo_resume_slab_depot(struct slab_depot *depot, struct vdo_completion *parent)
5022 {
5023 if (vdo_is_read_only(depot->vdo)) {
5024 vdo_continue_completion(parent, VDO_READ_ONLY);
5025 return;
5026 }
5027
5028 vdo_schedule_operation(depot->action_manager, VDO_ADMIN_STATE_RESUMING,
5029 NULL, resume_allocator, NULL, parent);
5030 }
5031
5032 /**
5033 * vdo_commit_oldest_slab_journal_tail_blocks() - Commit all dirty tail blocks which are locking a
5034 * given recovery journal block.
5035 * @depot: The depot.
5036 * @recovery_block_number: The sequence number of the recovery journal block whose locks should be
5037 * released.
5038 *
5039 * Context: This method must be called from the journal zone thread.
5040 */
vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot * depot,sequence_number_t recovery_block_number)5041 void vdo_commit_oldest_slab_journal_tail_blocks(struct slab_depot *depot,
5042 sequence_number_t recovery_block_number)
5043 {
5044 if (depot == NULL)
5045 return;
5046
5047 depot->new_release_request = recovery_block_number;
5048 vdo_schedule_default_action(depot->action_manager);
5049 }
5050
5051 /* Implements vdo_zone_action_fn. */
scrub_all_unrecovered_slabs(void * context,zone_count_t zone_number,struct vdo_completion * parent)5052 static void scrub_all_unrecovered_slabs(void *context, zone_count_t zone_number,
5053 struct vdo_completion *parent)
5054 {
5055 struct slab_depot *depot = context;
5056
5057 scrub_slabs(&depot->allocators[zone_number], NULL);
5058 vdo_launch_completion(parent);
5059 }
5060
5061 /**
5062 * vdo_scrub_all_unrecovered_slabs() - Scrub all unrecovered slabs.
5063 * @depot: The depot to scrub.
5064 * @parent: The object to notify when scrubbing has been launched for all zones.
5065 */
vdo_scrub_all_unrecovered_slabs(struct slab_depot * depot,struct vdo_completion * parent)5066 void vdo_scrub_all_unrecovered_slabs(struct slab_depot *depot,
5067 struct vdo_completion *parent)
5068 {
5069 vdo_schedule_action(depot->action_manager, NULL,
5070 scrub_all_unrecovered_slabs,
5071 NULL, parent);
5072 }
5073
5074 /**
5075 * get_block_allocator_statistics() - Get the total of the statistics from all the block allocators
5076 * in the depot.
5077 * @depot: The slab depot.
5078 *
5079 * Return: The statistics from all block allocators in the depot.
5080 */
5081 static struct block_allocator_statistics __must_check
get_block_allocator_statistics(const struct slab_depot * depot)5082 get_block_allocator_statistics(const struct slab_depot *depot)
5083 {
5084 struct block_allocator_statistics totals;
5085 zone_count_t zone;
5086
5087 memset(&totals, 0, sizeof(totals));
5088
5089 for (zone = 0; zone < depot->zone_count; zone++) {
5090 const struct block_allocator *allocator = &depot->allocators[zone];
5091 const struct block_allocator_statistics *stats = &allocator->statistics;
5092
5093 totals.slab_count += allocator->slab_count;
5094 totals.slabs_opened += READ_ONCE(stats->slabs_opened);
5095 totals.slabs_reopened += READ_ONCE(stats->slabs_reopened);
5096 }
5097
5098 return totals;
5099 }
5100
5101 /**
5102 * get_ref_counts_statistics() - Get the cumulative ref_counts statistics for the depot.
5103 * @depot: The slab depot.
5104 *
5105 * Return: The cumulative statistics for all ref_counts in the depot.
5106 */
5107 static struct ref_counts_statistics __must_check
get_ref_counts_statistics(const struct slab_depot * depot)5108 get_ref_counts_statistics(const struct slab_depot *depot)
5109 {
5110 struct ref_counts_statistics totals;
5111 zone_count_t zone;
5112
5113 memset(&totals, 0, sizeof(totals));
5114
5115 for (zone = 0; zone < depot->zone_count; zone++) {
5116 totals.blocks_written +=
5117 READ_ONCE(depot->allocators[zone].ref_counts_statistics.blocks_written);
5118 }
5119
5120 return totals;
5121 }
5122
5123 /**
5124 * get_slab_journal_statistics() - Get the aggregated slab journal statistics for the depot.
5125 * @depot: The slab depot.
5126 *
5127 * Return: The aggregated statistics for all slab journals in the depot.
5128 */
5129 static struct slab_journal_statistics __must_check
get_slab_journal_statistics(const struct slab_depot * depot)5130 get_slab_journal_statistics(const struct slab_depot *depot)
5131 {
5132 struct slab_journal_statistics totals;
5133 zone_count_t zone;
5134
5135 memset(&totals, 0, sizeof(totals));
5136
5137 for (zone = 0; zone < depot->zone_count; zone++) {
5138 const struct slab_journal_statistics *stats =
5139 &depot->allocators[zone].slab_journal_statistics;
5140
5141 totals.disk_full_count += READ_ONCE(stats->disk_full_count);
5142 totals.flush_count += READ_ONCE(stats->flush_count);
5143 totals.blocked_count += READ_ONCE(stats->blocked_count);
5144 totals.blocks_written += READ_ONCE(stats->blocks_written);
5145 totals.tail_busy_count += READ_ONCE(stats->tail_busy_count);
5146 }
5147
5148 return totals;
5149 }
5150
5151 /**
5152 * vdo_get_slab_depot_statistics() - Get all the vdo_statistics fields that are properties of the
5153 * slab depot.
5154 * @depot: The slab depot.
5155 * @stats: The vdo statistics structure to partially fill.
5156 */
vdo_get_slab_depot_statistics(const struct slab_depot * depot,struct vdo_statistics * stats)5157 void vdo_get_slab_depot_statistics(const struct slab_depot *depot,
5158 struct vdo_statistics *stats)
5159 {
5160 slab_count_t slab_count = READ_ONCE(depot->slab_count);
5161 slab_count_t unrecovered = 0;
5162 zone_count_t zone;
5163
5164 for (zone = 0; zone < depot->zone_count; zone++) {
5165 /* The allocators are responsible for thread safety. */
5166 unrecovered += READ_ONCE(depot->allocators[zone].scrubber.slab_count);
5167 }
5168
5169 stats->recovery_percentage = (slab_count - unrecovered) * 100 / slab_count;
5170 stats->allocator = get_block_allocator_statistics(depot);
5171 stats->ref_counts = get_ref_counts_statistics(depot);
5172 stats->slab_journal = get_slab_journal_statistics(depot);
5173 stats->slab_summary = (struct slab_summary_statistics) {
5174 .blocks_written = atomic64_read(&depot->summary_statistics.blocks_written),
5175 };
5176 }
5177
5178 /**
5179 * vdo_dump_slab_depot() - Dump the slab depot, in a thread-unsafe fashion.
5180 * @depot: The slab depot.
5181 */
vdo_dump_slab_depot(const struct slab_depot * depot)5182 void vdo_dump_slab_depot(const struct slab_depot *depot)
5183 {
5184 vdo_log_info("vdo slab depot");
5185 vdo_log_info(" zone_count=%u old_zone_count=%u slabCount=%u active_release_request=%llu new_release_request=%llu",
5186 (unsigned int) depot->zone_count,
5187 (unsigned int) depot->old_zone_count, READ_ONCE(depot->slab_count),
5188 (unsigned long long) depot->active_release_request,
5189 (unsigned long long) depot->new_release_request);
5190 }
5191