1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3 * Copyright 2023 Red Hat
4 */
5
6 #include "recovery-journal.h"
7
8 #include <linux/atomic.h>
9 #include <linux/bio.h>
10
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14
15 #include "block-map.h"
16 #include "completion.h"
17 #include "constants.h"
18 #include "data-vio.h"
19 #include "encodings.h"
20 #include "io-submitter.h"
21 #include "slab-depot.h"
22 #include "types.h"
23 #include "vdo.h"
24 #include "vio.h"
25 #include "wait-queue.h"
26
27 static const u64 RECOVERY_COUNT_MASK = 0xff;
28
29 /*
30 * The number of reserved blocks must be large enough to prevent a new recovery journal
31 * block write from overwriting a block which appears to still be a valid head block of the
32 * journal. Currently, that means reserving enough space for all 2048 data_vios.
33 */
34 #define RECOVERY_JOURNAL_RESERVED_BLOCKS \
35 ((MAXIMUM_VDO_USER_VIOS / RECOVERY_JOURNAL_ENTRIES_PER_BLOCK) + 2)
36
37 /**
38 * DOC: Lock Counters.
39 *
40 * A lock_counter is intended to keep all of the locks for the blocks in the recovery journal. The
41 * per-zone counters are all kept in a single array which is arranged by zone (i.e. zone 0's lock 0
42 * is at index 0, zone 0's lock 1 is at index 1, and zone 1's lock 0 is at index 'locks'. This
43 * arrangement is intended to minimize cache-line contention for counters from different zones.
44 *
45 * The locks are implemented as a single object instead of as a lock counter per lock both to
46 * afford this opportunity to reduce cache line contention and also to eliminate the need to have a
47 * completion per lock.
48 *
49 * Lock sets are laid out with the set for recovery journal first, followed by the logical zones,
50 * and then the physical zones.
51 */
52
53 enum lock_counter_state {
54 LOCK_COUNTER_STATE_NOT_NOTIFYING,
55 LOCK_COUNTER_STATE_NOTIFYING,
56 LOCK_COUNTER_STATE_SUSPENDED,
57 };
58
59 /**
60 * get_zone_count_ptr() - Get a pointer to the zone count for a given lock on a given zone.
61 * @journal: The recovery journal.
62 * @lock_number: The lock to get.
63 * @zone_type: The zone type whose count is desired.
64 *
65 * Return: A pointer to the zone count for the given lock and zone.
66 */
get_zone_count_ptr(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type)67 static inline atomic_t *get_zone_count_ptr(struct recovery_journal *journal,
68 block_count_t lock_number,
69 enum vdo_zone_type zone_type)
70 {
71 return ((zone_type == VDO_ZONE_TYPE_LOGICAL)
72 ? &journal->lock_counter.logical_zone_counts[lock_number]
73 : &journal->lock_counter.physical_zone_counts[lock_number]);
74 }
75
76 /**
77 * get_counter() - Get the zone counter for a given lock on a given zone.
78 * @journal: The recovery journal.
79 * @lock_number: The lock to get.
80 * @zone_type: The zone type whose count is desired.
81 * @zone_id: The zone index whose count is desired.
82 *
83 * Return: The counter for the given lock and zone.
84 */
get_counter(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type,zone_count_t zone_id)85 static inline u16 *get_counter(struct recovery_journal *journal,
86 block_count_t lock_number, enum vdo_zone_type zone_type,
87 zone_count_t zone_id)
88 {
89 struct lock_counter *counter = &journal->lock_counter;
90 block_count_t zone_counter = (counter->locks * zone_id) + lock_number;
91
92 if (zone_type == VDO_ZONE_TYPE_JOURNAL)
93 return &counter->journal_counters[zone_counter];
94
95 if (zone_type == VDO_ZONE_TYPE_LOGICAL)
96 return &counter->logical_counters[zone_counter];
97
98 return &counter->physical_counters[zone_counter];
99 }
100
get_decrement_counter(struct recovery_journal * journal,block_count_t lock_number)101 static atomic_t *get_decrement_counter(struct recovery_journal *journal,
102 block_count_t lock_number)
103 {
104 return &journal->lock_counter.journal_decrement_counts[lock_number];
105 }
106
107 /**
108 * is_journal_zone_locked() - Check whether the journal zone is locked for a given lock.
109 * @journal: The recovery journal.
110 * @lock_number: The lock to check.
111 *
112 * Return: True if the journal zone is locked.
113 */
is_journal_zone_locked(struct recovery_journal * journal,block_count_t lock_number)114 static bool is_journal_zone_locked(struct recovery_journal *journal,
115 block_count_t lock_number)
116 {
117 u16 journal_value = *get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
118 u32 decrements = atomic_read(get_decrement_counter(journal, lock_number));
119
120 /* Pairs with barrier in vdo_release_journal_entry_lock() */
121 smp_rmb();
122 VDO_ASSERT_LOG_ONLY((decrements <= journal_value),
123 "journal zone lock counter must not underflow");
124 return (journal_value != decrements);
125 }
126
127 /**
128 * vdo_release_recovery_journal_block_reference() - Release a reference to a recovery journal
129 * block.
130 * @journal: The recovery journal.
131 * @sequence_number: The journal sequence number of the referenced block.
132 * @zone_type: The type of the zone making the adjustment.
133 * @zone_id: The ID of the zone making the adjustment.
134 *
135 * If this is the last reference for a given zone type, an attempt will be made to reap the
136 * journal.
137 */
vdo_release_recovery_journal_block_reference(struct recovery_journal * journal,sequence_number_t sequence_number,enum vdo_zone_type zone_type,zone_count_t zone_id)138 void vdo_release_recovery_journal_block_reference(struct recovery_journal *journal,
139 sequence_number_t sequence_number,
140 enum vdo_zone_type zone_type,
141 zone_count_t zone_id)
142 {
143 u16 *current_value;
144 block_count_t lock_number;
145 int prior_state;
146
147 if (sequence_number == 0)
148 return;
149
150 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
151 current_value = get_counter(journal, lock_number, zone_type, zone_id);
152
153 VDO_ASSERT_LOG_ONLY((*current_value >= 1),
154 "decrement of lock counter must not underflow");
155 *current_value -= 1;
156
157 if (zone_type == VDO_ZONE_TYPE_JOURNAL) {
158 if (is_journal_zone_locked(journal, lock_number))
159 return;
160 } else {
161 atomic_t *zone_count;
162
163 if (*current_value != 0)
164 return;
165
166 zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
167
168 if (atomic_add_return(-1, zone_count) > 0)
169 return;
170 }
171
172 /*
173 * Extra barriers because this was original developed using a CAS operation that implicitly
174 * had them.
175 */
176 smp_mb__before_atomic();
177 prior_state = atomic_cmpxchg(&journal->lock_counter.state,
178 LOCK_COUNTER_STATE_NOT_NOTIFYING,
179 LOCK_COUNTER_STATE_NOTIFYING);
180 /* same as before_atomic */
181 smp_mb__after_atomic();
182
183 if (prior_state != LOCK_COUNTER_STATE_NOT_NOTIFYING)
184 return;
185
186 vdo_launch_completion(&journal->lock_counter.completion);
187 }
188
get_journal_block(struct list_head * list)189 static inline struct recovery_journal_block * __must_check get_journal_block(struct list_head *list)
190 {
191 return list_first_entry_or_null(list, struct recovery_journal_block, list_node);
192 }
193
194 /**
195 * pop_free_list() - Get a block from the end of the free list.
196 * @journal: The journal.
197 *
198 * Return: The block or NULL if the list is empty.
199 */
pop_free_list(struct recovery_journal * journal)200 static struct recovery_journal_block * __must_check pop_free_list(struct recovery_journal *journal)
201 {
202 struct recovery_journal_block *block;
203
204 if (list_empty(&journal->free_tail_blocks))
205 return NULL;
206
207 block = list_last_entry(&journal->free_tail_blocks,
208 struct recovery_journal_block, list_node);
209 list_del_init(&block->list_node);
210 return block;
211 }
212
213 /**
214 * is_block_dirty() - Check whether a recovery block is dirty.
215 * @block: The block to check.
216 *
217 * Indicates it has any uncommitted entries, which includes both entries not written and entries
218 * written but not yet acknowledged.
219 *
220 * Return: True if the block has any uncommitted entries.
221 */
is_block_dirty(const struct recovery_journal_block * block)222 static inline bool __must_check is_block_dirty(const struct recovery_journal_block *block)
223 {
224 return (block->uncommitted_entry_count > 0);
225 }
226
227 /**
228 * is_block_empty() - Check whether a journal block is empty.
229 * @block: The block to check.
230 *
231 * Return: True if the block has no entries.
232 */
is_block_empty(const struct recovery_journal_block * block)233 static inline bool __must_check is_block_empty(const struct recovery_journal_block *block)
234 {
235 return (block->entry_count == 0);
236 }
237
238 /**
239 * is_block_full() - Check whether a journal block is full.
240 * @block: The block to check.
241 *
242 * Return: True if the block is full.
243 */
is_block_full(const struct recovery_journal_block * block)244 static inline bool __must_check is_block_full(const struct recovery_journal_block *block)
245 {
246 return ((block == NULL) || (block->journal->entries_per_block == block->entry_count));
247 }
248
249 /**
250 * assert_on_journal_thread() - Assert that we are running on the journal thread.
251 * @journal: The journal.
252 * @function_name: The function doing the check (for logging).
253 */
assert_on_journal_thread(struct recovery_journal * journal,const char * function_name)254 static void assert_on_journal_thread(struct recovery_journal *journal,
255 const char *function_name)
256 {
257 VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == journal->thread_id),
258 "%s() called on journal thread", function_name);
259 }
260
261 /**
262 * continue_waiter() - Release a data_vio from the journal.
263 * @waiter: The data_vio waiting on journal activity.
264 * @context: The result of the journal operation.
265 *
266 * Invoked whenever a data_vio is to be released from the journal, either because its entry was
267 * committed to disk, or because there was an error. Implements waiter_callback_fn.
268 */
continue_waiter(struct vdo_waiter * waiter,void * context)269 static void continue_waiter(struct vdo_waiter *waiter, void *context)
270 {
271 continue_data_vio_with_error(vdo_waiter_as_data_vio(waiter), *((int *) context));
272 }
273
274 /**
275 * has_block_waiters() - Check whether the journal has any waiters on any blocks.
276 * @journal: The journal in question.
277 *
278 * Return: True if any block has a waiter.
279 */
has_block_waiters(struct recovery_journal * journal)280 static inline bool has_block_waiters(struct recovery_journal *journal)
281 {
282 struct recovery_journal_block *block = get_journal_block(&journal->active_tail_blocks);
283
284 /*
285 * Either the first active tail block (if it exists) has waiters, or no active tail block
286 * has waiters.
287 */
288 return ((block != NULL) &&
289 (vdo_waitq_has_waiters(&block->entry_waiters) ||
290 vdo_waitq_has_waiters(&block->commit_waiters)));
291 }
292
293 static void recycle_journal_blocks(struct recovery_journal *journal);
294 static void recycle_journal_block(struct recovery_journal_block *block);
295 static void notify_commit_waiters(struct recovery_journal *journal);
296
297 /**
298 * suspend_lock_counter() - Prevent the lock counter from notifying.
299 * @counter: The counter.
300 *
301 * Return: True if the lock counter was not notifying and hence the suspend was efficacious.
302 */
suspend_lock_counter(struct lock_counter * counter)303 static bool suspend_lock_counter(struct lock_counter *counter)
304 {
305 int prior_state;
306
307 /*
308 * Extra barriers because this was originally developed using a CAS operation that
309 * implicitly had them.
310 */
311 smp_mb__before_atomic();
312 prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_NOT_NOTIFYING,
313 LOCK_COUNTER_STATE_SUSPENDED);
314 /* same as before_atomic */
315 smp_mb__after_atomic();
316
317 return ((prior_state == LOCK_COUNTER_STATE_SUSPENDED) ||
318 (prior_state == LOCK_COUNTER_STATE_NOT_NOTIFYING));
319 }
320
is_read_only(struct recovery_journal * journal)321 static inline bool is_read_only(struct recovery_journal *journal)
322 {
323 return vdo_is_read_only(journal->flush_vio->completion.vdo);
324 }
325
326 /**
327 * check_for_drain_complete() - Check whether the journal has drained.
328 * @journal: The journal which may have just drained.
329 */
check_for_drain_complete(struct recovery_journal * journal)330 static void check_for_drain_complete(struct recovery_journal *journal)
331 {
332 int result = VDO_SUCCESS;
333
334 if (is_read_only(journal)) {
335 result = VDO_READ_ONLY;
336 /*
337 * Clean up any full active blocks which were not written due to read-only mode.
338 *
339 * FIXME: This would probably be better as a short-circuit in write_block().
340 */
341 notify_commit_waiters(journal);
342 recycle_journal_blocks(journal);
343
344 /* Release any data_vios waiting to be assigned entries. */
345 vdo_waitq_notify_all_waiters(&journal->entry_waiters,
346 continue_waiter, &result);
347 }
348
349 if (!vdo_is_state_draining(&journal->state) ||
350 journal->reaping ||
351 has_block_waiters(journal) ||
352 vdo_waitq_has_waiters(&journal->entry_waiters) ||
353 !suspend_lock_counter(&journal->lock_counter))
354 return;
355
356 if (vdo_is_state_saving(&journal->state)) {
357 if (journal->active_block != NULL) {
358 VDO_ASSERT_LOG_ONLY(((result == VDO_READ_ONLY) ||
359 !is_block_dirty(journal->active_block)),
360 "journal being saved has clean active block");
361 recycle_journal_block(journal->active_block);
362 }
363
364 VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
365 "all blocks in a journal being saved must be inactive");
366 }
367
368 vdo_finish_draining_with_result(&journal->state, result);
369 }
370
371 /**
372 * notify_recovery_journal_of_read_only_mode() - Notify a recovery journal that the VDO has gone
373 * read-only.
374 * @listener: The journal.
375 * @parent: The completion to notify in order to acknowledge the notification.
376 *
377 * Implements vdo_read_only_notification_fn.
378 */
notify_recovery_journal_of_read_only_mode(void * listener,struct vdo_completion * parent)379 static void notify_recovery_journal_of_read_only_mode(void *listener,
380 struct vdo_completion *parent)
381 {
382 check_for_drain_complete(listener);
383 vdo_finish_completion(parent);
384 }
385
386 /**
387 * enter_journal_read_only_mode() - Put the journal in read-only mode.
388 * @journal: The journal which has failed.
389 * @error_code: The error result triggering this call.
390 *
391 * All attempts to add entries after this function is called will fail. All VIOs waiting for
392 * commits will be awakened with an error.
393 */
enter_journal_read_only_mode(struct recovery_journal * journal,int error_code)394 static void enter_journal_read_only_mode(struct recovery_journal *journal,
395 int error_code)
396 {
397 vdo_enter_read_only_mode(journal->flush_vio->completion.vdo, error_code);
398 check_for_drain_complete(journal);
399 }
400
401 /**
402 * vdo_get_recovery_journal_current_sequence_number() - Obtain the recovery journal's current
403 * sequence number.
404 * @journal: The journal in question.
405 *
406 * Exposed only so the block map can be initialized therefrom.
407 *
408 * Return: The sequence number of the tail block.
409 */
vdo_get_recovery_journal_current_sequence_number(struct recovery_journal * journal)410 sequence_number_t vdo_get_recovery_journal_current_sequence_number(struct recovery_journal *journal)
411 {
412 return journal->tail;
413 }
414
415 /**
416 * get_recovery_journal_head() - Get the head of the recovery journal.
417 * @journal: The journal.
418 *
419 * The head is the lowest sequence number of the block map head and the slab journal head.
420 *
421 * Return: The head of the journal.
422 */
get_recovery_journal_head(const struct recovery_journal * journal)423 static inline sequence_number_t get_recovery_journal_head(const struct recovery_journal *journal)
424 {
425 return min(journal->block_map_head, journal->slab_journal_head);
426 }
427
428 /**
429 * compute_recovery_count_byte() - Compute the recovery count byte for a given recovery count.
430 * @recovery_count: The recovery count.
431 *
432 * Return: The byte corresponding to the recovery count.
433 */
compute_recovery_count_byte(u64 recovery_count)434 static inline u8 __must_check compute_recovery_count_byte(u64 recovery_count)
435 {
436 return (u8)(recovery_count & RECOVERY_COUNT_MASK);
437 }
438
439 /**
440 * check_slab_journal_commit_threshold() - Check whether the journal is over the threshold, and if
441 * so, force the oldest slab journal tail block to commit.
442 * @journal: The journal.
443 */
check_slab_journal_commit_threshold(struct recovery_journal * journal)444 static void check_slab_journal_commit_threshold(struct recovery_journal *journal)
445 {
446 block_count_t current_length = journal->tail - journal->slab_journal_head;
447
448 if (current_length > journal->slab_journal_commit_threshold) {
449 journal->events.slab_journal_commits_requested++;
450 vdo_commit_oldest_slab_journal_tail_blocks(journal->depot,
451 journal->slab_journal_head);
452 }
453 }
454
455 static void reap_recovery_journal(struct recovery_journal *journal);
456 static void assign_entries(struct recovery_journal *journal);
457
458 /**
459 * finish_reaping() - Finish reaping the journal.
460 * @journal: The journal being reaped.
461 */
finish_reaping(struct recovery_journal * journal)462 static void finish_reaping(struct recovery_journal *journal)
463 {
464 block_count_t blocks_reaped;
465 sequence_number_t old_head = get_recovery_journal_head(journal);
466
467 journal->block_map_head = journal->block_map_reap_head;
468 journal->slab_journal_head = journal->slab_journal_reap_head;
469 blocks_reaped = get_recovery_journal_head(journal) - old_head;
470 journal->available_space += blocks_reaped * journal->entries_per_block;
471 journal->reaping = false;
472 check_slab_journal_commit_threshold(journal);
473 assign_entries(journal);
474 check_for_drain_complete(journal);
475 }
476
477 /**
478 * complete_reaping() - Finish reaping the journal after flushing the lower layer.
479 * @completion: The journal's flush VIO.
480 *
481 * This is the callback registered in reap_recovery_journal().
482 */
complete_reaping(struct vdo_completion * completion)483 static void complete_reaping(struct vdo_completion *completion)
484 {
485 struct recovery_journal *journal = completion->parent;
486
487 finish_reaping(journal);
488
489 /* Try reaping again in case more locks were released while flush was out. */
490 reap_recovery_journal(journal);
491 }
492
493 /**
494 * handle_flush_error() - Handle an error when flushing the lower layer due to reaping.
495 * @completion: The journal's flush VIO.
496 */
handle_flush_error(struct vdo_completion * completion)497 static void handle_flush_error(struct vdo_completion *completion)
498 {
499 struct recovery_journal *journal = completion->parent;
500
501 vio_record_metadata_io_error(as_vio(completion));
502 journal->reaping = false;
503 enter_journal_read_only_mode(journal, completion->result);
504 }
505
flush_endio(struct bio * bio)506 static void flush_endio(struct bio *bio)
507 {
508 struct vio *vio = bio->bi_private;
509 struct recovery_journal *journal = vio->completion.parent;
510
511 continue_vio_after_io(vio, complete_reaping, journal->thread_id);
512 }
513
514 /**
515 * initialize_journal_state() - Set all journal fields appropriately to start journaling from the
516 * current active block.
517 * @journal: The journal to be reset based on its active block.
518 */
initialize_journal_state(struct recovery_journal * journal)519 static void initialize_journal_state(struct recovery_journal *journal)
520 {
521 journal->append_point.sequence_number = journal->tail;
522 journal->last_write_acknowledged = journal->tail;
523 journal->block_map_head = journal->tail;
524 journal->slab_journal_head = journal->tail;
525 journal->block_map_reap_head = journal->tail;
526 journal->slab_journal_reap_head = journal->tail;
527 journal->block_map_head_block_number =
528 vdo_get_recovery_journal_block_number(journal, journal->block_map_head);
529 journal->slab_journal_head_block_number =
530 vdo_get_recovery_journal_block_number(journal,
531 journal->slab_journal_head);
532 journal->available_space =
533 (journal->entries_per_block * vdo_get_recovery_journal_length(journal->size));
534 }
535
536 /**
537 * vdo_get_recovery_journal_length() - Get the number of usable recovery journal blocks.
538 * @journal_size: The size of the recovery journal in blocks.
539 *
540 * Return: The number of recovery journal blocks usable for entries.
541 */
vdo_get_recovery_journal_length(block_count_t journal_size)542 block_count_t vdo_get_recovery_journal_length(block_count_t journal_size)
543 {
544 block_count_t reserved_blocks = journal_size / 4;
545
546 if (reserved_blocks > RECOVERY_JOURNAL_RESERVED_BLOCKS)
547 reserved_blocks = RECOVERY_JOURNAL_RESERVED_BLOCKS;
548 return (journal_size - reserved_blocks);
549 }
550
551 /**
552 * reap_recovery_journal_callback() - Attempt to reap the journal.
553 * @completion: The lock counter completion.
554 *
555 * Attempts to reap the journal now that all the locks on some journal block have been released.
556 * This is the callback registered with the lock counter.
557 */
reap_recovery_journal_callback(struct vdo_completion * completion)558 static void reap_recovery_journal_callback(struct vdo_completion *completion)
559 {
560 struct recovery_journal *journal = (struct recovery_journal *) completion->parent;
561 /*
562 * The acknowledgment must be done before reaping so that there is no race between
563 * acknowledging the notification and unlocks wishing to notify.
564 */
565 smp_wmb();
566 atomic_set(&journal->lock_counter.state, LOCK_COUNTER_STATE_NOT_NOTIFYING);
567
568 if (vdo_is_state_quiescing(&journal->state)) {
569 /*
570 * Don't start reaping when the journal is trying to quiesce. Do check if this
571 * notification is the last thing the is waiting on.
572 */
573 check_for_drain_complete(journal);
574 return;
575 }
576
577 reap_recovery_journal(journal);
578 check_slab_journal_commit_threshold(journal);
579 }
580
581 /**
582 * initialize_lock_counter() - Initialize a lock counter.
583 *
584 * @journal: The recovery journal.
585 * @vdo: The vdo.
586 *
587 * Return: VDO_SUCCESS or an error.
588 */
initialize_lock_counter(struct recovery_journal * journal,struct vdo * vdo)589 static int __must_check initialize_lock_counter(struct recovery_journal *journal,
590 struct vdo *vdo)
591 {
592 int result;
593 struct thread_config *config = &vdo->thread_config;
594 struct lock_counter *counter = &journal->lock_counter;
595
596 result = vdo_allocate(journal->size, __func__, &counter->journal_counters);
597 if (result != VDO_SUCCESS)
598 return result;
599
600 result = vdo_allocate(journal->size, __func__, &counter->journal_decrement_counts);
601 if (result != VDO_SUCCESS)
602 return result;
603
604 result = vdo_allocate(journal->size * config->logical_zone_count, __func__,
605 &counter->logical_counters);
606 if (result != VDO_SUCCESS)
607 return result;
608
609 result = vdo_allocate(journal->size, __func__, &counter->logical_zone_counts);
610 if (result != VDO_SUCCESS)
611 return result;
612
613 result = vdo_allocate(journal->size * config->physical_zone_count, __func__,
614 &counter->physical_counters);
615 if (result != VDO_SUCCESS)
616 return result;
617
618 result = vdo_allocate(journal->size, __func__, &counter->physical_zone_counts);
619 if (result != VDO_SUCCESS)
620 return result;
621
622 vdo_initialize_completion(&counter->completion, vdo,
623 VDO_LOCK_COUNTER_COMPLETION);
624 vdo_prepare_completion(&counter->completion, reap_recovery_journal_callback,
625 reap_recovery_journal_callback, config->journal_thread,
626 journal);
627 counter->logical_zones = config->logical_zone_count;
628 counter->physical_zones = config->physical_zone_count;
629 counter->locks = journal->size;
630 return VDO_SUCCESS;
631 }
632
633 /**
634 * set_journal_tail() - Set the journal's tail sequence number.
635 * @journal: The journal whose tail is to be set.
636 * @tail: The new tail value.
637 */
set_journal_tail(struct recovery_journal * journal,sequence_number_t tail)638 static void set_journal_tail(struct recovery_journal *journal, sequence_number_t tail)
639 {
640 /* VDO does not support sequence numbers above 1 << 48 in the slab journal. */
641 if (tail >= (1ULL << 48))
642 enter_journal_read_only_mode(journal, VDO_JOURNAL_OVERFLOW);
643
644 journal->tail = tail;
645 }
646
647 /**
648 * initialize_recovery_block() - Initialize a journal block.
649 * @vdo: The vdo from which to construct vios.
650 * @journal: The journal to which the block will belong.
651 * @block: The block to initialize.
652 *
653 * Return: VDO_SUCCESS or an error.
654 */
initialize_recovery_block(struct vdo * vdo,struct recovery_journal * journal,struct recovery_journal_block * block)655 static int initialize_recovery_block(struct vdo *vdo, struct recovery_journal *journal,
656 struct recovery_journal_block *block)
657 {
658 char *data;
659 int result;
660
661 /*
662 * Ensure that a block is large enough to store RECOVERY_JOURNAL_ENTRIES_PER_BLOCK entries.
663 */
664 BUILD_BUG_ON(RECOVERY_JOURNAL_ENTRIES_PER_BLOCK >
665 ((VDO_BLOCK_SIZE - sizeof(struct packed_journal_header)) /
666 sizeof(struct packed_recovery_journal_entry)));
667
668 /*
669 * Allocate a full block for the journal block even though not all of the space is used
670 * since the VIO needs to write a full disk block.
671 */
672 result = vdo_allocate(VDO_BLOCK_SIZE, __func__, &data);
673 if (result != VDO_SUCCESS)
674 return result;
675
676 result = allocate_vio_components(vdo, VIO_TYPE_RECOVERY_JOURNAL,
677 VIO_PRIORITY_HIGH, block, 1, data, &block->vio);
678 if (result != VDO_SUCCESS) {
679 vdo_free(data);
680 return result;
681 }
682
683 list_add_tail(&block->list_node, &journal->free_tail_blocks);
684 block->journal = journal;
685 return VDO_SUCCESS;
686 }
687
688 /**
689 * vdo_decode_recovery_journal() - Make a recovery journal and initialize it with the state that
690 * was decoded from the super block.
691 *
692 * @state: The decoded state of the journal.
693 * @nonce: The nonce of the VDO.
694 * @vdo: The VDO.
695 * @partition: The partition for the journal.
696 * @recovery_count: The VDO's number of completed recoveries.
697 * @journal_size: The number of blocks in the journal on disk.
698 * @journal_ptr: The pointer to hold the new recovery journal.
699 *
700 * Return: A success or error code.
701 */
vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state,nonce_t nonce,struct vdo * vdo,struct partition * partition,u64 recovery_count,block_count_t journal_size,struct recovery_journal ** journal_ptr)702 int vdo_decode_recovery_journal(struct recovery_journal_state_7_0 state, nonce_t nonce,
703 struct vdo *vdo, struct partition *partition,
704 u64 recovery_count, block_count_t journal_size,
705 struct recovery_journal **journal_ptr)
706 {
707 block_count_t i;
708 struct recovery_journal *journal;
709 int result;
710
711 result = vdo_allocate_extended(RECOVERY_JOURNAL_RESERVED_BLOCKS, blocks,
712 __func__, &journal);
713 if (result != VDO_SUCCESS)
714 return result;
715
716 INIT_LIST_HEAD(&journal->free_tail_blocks);
717 INIT_LIST_HEAD(&journal->active_tail_blocks);
718 vdo_waitq_init(&journal->pending_writes);
719
720 journal->thread_id = vdo->thread_config.journal_thread;
721 journal->origin = partition->offset;
722 journal->nonce = nonce;
723 journal->recovery_count = compute_recovery_count_byte(recovery_count);
724 journal->size = journal_size;
725 journal->slab_journal_commit_threshold = (journal_size * 2) / 3;
726 journal->logical_blocks_used = state.logical_blocks_used;
727 journal->block_map_data_blocks = state.block_map_data_blocks;
728 journal->entries_per_block = RECOVERY_JOURNAL_ENTRIES_PER_BLOCK;
729 set_journal_tail(journal, state.journal_start);
730 initialize_journal_state(journal);
731 /* TODO: this will have to change if we make initial resume of a VDO a real resume */
732 vdo_set_admin_state_code(&journal->state, VDO_ADMIN_STATE_SUSPENDED);
733
734 for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
735 struct recovery_journal_block *block = &journal->blocks[i];
736
737 result = initialize_recovery_block(vdo, journal, block);
738 if (result != VDO_SUCCESS) {
739 vdo_free_recovery_journal(journal);
740 return result;
741 }
742 }
743
744 result = initialize_lock_counter(journal, vdo);
745 if (result != VDO_SUCCESS) {
746 vdo_free_recovery_journal(journal);
747 return result;
748 }
749
750 result = create_metadata_vio(vdo, VIO_TYPE_RECOVERY_JOURNAL, VIO_PRIORITY_HIGH,
751 journal, NULL, &journal->flush_vio);
752 if (result != VDO_SUCCESS) {
753 vdo_free_recovery_journal(journal);
754 return result;
755 }
756
757 result = vdo_register_read_only_listener(vdo, journal,
758 notify_recovery_journal_of_read_only_mode,
759 journal->thread_id);
760 if (result != VDO_SUCCESS) {
761 vdo_free_recovery_journal(journal);
762 return result;
763 }
764
765 result = vdo_make_default_thread(vdo, journal->thread_id);
766 if (result != VDO_SUCCESS) {
767 vdo_free_recovery_journal(journal);
768 return result;
769 }
770
771 journal->flush_vio->completion.callback_thread_id = journal->thread_id;
772 *journal_ptr = journal;
773 return VDO_SUCCESS;
774 }
775
776 /**
777 * vdo_free_recovery_journal() - Free a recovery journal.
778 * @journal: The recovery journal to free.
779 */
vdo_free_recovery_journal(struct recovery_journal * journal)780 void vdo_free_recovery_journal(struct recovery_journal *journal)
781 {
782 block_count_t i;
783
784 if (journal == NULL)
785 return;
786
787 vdo_free(vdo_forget(journal->lock_counter.logical_zone_counts));
788 vdo_free(vdo_forget(journal->lock_counter.physical_zone_counts));
789 vdo_free(vdo_forget(journal->lock_counter.journal_counters));
790 vdo_free(vdo_forget(journal->lock_counter.journal_decrement_counts));
791 vdo_free(vdo_forget(journal->lock_counter.logical_counters));
792 vdo_free(vdo_forget(journal->lock_counter.physical_counters));
793 free_vio(vdo_forget(journal->flush_vio));
794
795 /*
796 * FIXME: eventually, the journal should be constructed in a quiescent state which
797 * requires opening before use.
798 */
799 if (!vdo_is_state_quiescent(&journal->state)) {
800 VDO_ASSERT_LOG_ONLY(list_empty(&journal->active_tail_blocks),
801 "journal being freed has no active tail blocks");
802 } else if (!vdo_is_state_saved(&journal->state) &&
803 !list_empty(&journal->active_tail_blocks)) {
804 vdo_log_warning("journal being freed has uncommitted entries");
805 }
806
807 for (i = 0; i < RECOVERY_JOURNAL_RESERVED_BLOCKS; i++) {
808 struct recovery_journal_block *block = &journal->blocks[i];
809
810 vdo_free(vdo_forget(block->vio.data));
811 free_vio_components(&block->vio);
812 }
813
814 vdo_free(journal);
815 }
816
817 /**
818 * vdo_initialize_recovery_journal_post_repair() - Initialize the journal after a repair.
819 * @journal: The journal in question.
820 * @recovery_count: The number of completed recoveries.
821 * @tail: The new tail block sequence number.
822 * @logical_blocks_used: The new number of logical blocks used.
823 * @block_map_data_blocks: The new number of block map data blocks.
824 */
vdo_initialize_recovery_journal_post_repair(struct recovery_journal * journal,u64 recovery_count,sequence_number_t tail,block_count_t logical_blocks_used,block_count_t block_map_data_blocks)825 void vdo_initialize_recovery_journal_post_repair(struct recovery_journal *journal,
826 u64 recovery_count,
827 sequence_number_t tail,
828 block_count_t logical_blocks_used,
829 block_count_t block_map_data_blocks)
830 {
831 set_journal_tail(journal, tail + 1);
832 journal->recovery_count = compute_recovery_count_byte(recovery_count);
833 initialize_journal_state(journal);
834 journal->logical_blocks_used = logical_blocks_used;
835 journal->block_map_data_blocks = block_map_data_blocks;
836 }
837
838 /**
839 * vdo_get_journal_block_map_data_blocks_used() - Get the number of block map pages, allocated from
840 * data blocks, currently in use.
841 * @journal: The journal in question.
842 *
843 * Return: The number of block map pages allocated from slabs.
844 */
vdo_get_journal_block_map_data_blocks_used(struct recovery_journal * journal)845 block_count_t vdo_get_journal_block_map_data_blocks_used(struct recovery_journal *journal)
846 {
847 return journal->block_map_data_blocks;
848 }
849
850 /**
851 * vdo_get_recovery_journal_thread_id() - Get the ID of a recovery journal's thread.
852 * @journal: The journal to query.
853 *
854 * Return: The ID of the journal's thread.
855 */
vdo_get_recovery_journal_thread_id(struct recovery_journal * journal)856 thread_id_t vdo_get_recovery_journal_thread_id(struct recovery_journal *journal)
857 {
858 return journal->thread_id;
859 }
860
861 /**
862 * vdo_open_recovery_journal() - Prepare the journal for new entries.
863 * @journal: The journal in question.
864 * @depot: The slab depot for this VDO.
865 * @block_map: The block map for this VDO.
866 */
vdo_open_recovery_journal(struct recovery_journal * journal,struct slab_depot * depot,struct block_map * block_map)867 void vdo_open_recovery_journal(struct recovery_journal *journal,
868 struct slab_depot *depot, struct block_map *block_map)
869 {
870 journal->depot = depot;
871 journal->block_map = block_map;
872 WRITE_ONCE(journal->state.current_state, VDO_ADMIN_STATE_NORMAL_OPERATION);
873 }
874
875 /**
876 * vdo_record_recovery_journal() - Record the state of a recovery journal for encoding in the super
877 * block.
878 * @journal: the recovery journal.
879 *
880 * Return: the state of the journal.
881 */
882 struct recovery_journal_state_7_0
vdo_record_recovery_journal(const struct recovery_journal * journal)883 vdo_record_recovery_journal(const struct recovery_journal *journal)
884 {
885 struct recovery_journal_state_7_0 state = {
886 .logical_blocks_used = journal->logical_blocks_used,
887 .block_map_data_blocks = journal->block_map_data_blocks,
888 };
889
890 if (vdo_is_state_saved(&journal->state)) {
891 /*
892 * If the journal is saved, we should start one past the active block (since the
893 * active block is not guaranteed to be empty).
894 */
895 state.journal_start = journal->tail;
896 } else {
897 /*
898 * When we're merely suspended or have gone read-only, we must record the first
899 * block that might have entries that need to be applied.
900 */
901 state.journal_start = get_recovery_journal_head(journal);
902 }
903
904 return state;
905 }
906
907 /**
908 * get_block_header() - Get a pointer to the packed journal block header in the block buffer.
909 * @block: The recovery block.
910 *
911 * Return: The block's header.
912 */
913 static inline struct packed_journal_header *
get_block_header(const struct recovery_journal_block * block)914 get_block_header(const struct recovery_journal_block *block)
915 {
916 return (struct packed_journal_header *) block->vio.data;
917 }
918
919 /**
920 * set_active_sector() - Set the current sector of the current block and initialize it.
921 * @block: The block to update.
922 * @sector: A pointer to the first byte of the new sector.
923 */
set_active_sector(struct recovery_journal_block * block,void * sector)924 static void set_active_sector(struct recovery_journal_block *block, void *sector)
925 {
926 block->sector = sector;
927 block->sector->check_byte = get_block_header(block)->check_byte;
928 block->sector->recovery_count = block->journal->recovery_count;
929 block->sector->entry_count = 0;
930 }
931
932 /**
933 * advance_tail() - Advance the tail of the journal.
934 * @journal: The journal whose tail should be advanced.
935 *
936 * Return: true if the tail was advanced.
937 */
advance_tail(struct recovery_journal * journal)938 static bool advance_tail(struct recovery_journal *journal)
939 {
940 struct recovery_block_header unpacked;
941 struct packed_journal_header *header;
942 struct recovery_journal_block *block;
943
944 block = journal->active_block = pop_free_list(journal);
945 if (block == NULL)
946 return false;
947
948 list_move_tail(&block->list_node, &journal->active_tail_blocks);
949
950 unpacked = (struct recovery_block_header) {
951 .metadata_type = VDO_METADATA_RECOVERY_JOURNAL_2,
952 .block_map_data_blocks = journal->block_map_data_blocks,
953 .logical_blocks_used = journal->logical_blocks_used,
954 .nonce = journal->nonce,
955 .recovery_count = journal->recovery_count,
956 .sequence_number = journal->tail,
957 .check_byte = vdo_compute_recovery_journal_check_byte(journal,
958 journal->tail),
959 };
960
961 header = get_block_header(block);
962 memset(block->vio.data, 0x0, VDO_BLOCK_SIZE);
963 block->sequence_number = journal->tail;
964 block->entry_count = 0;
965 block->uncommitted_entry_count = 0;
966 block->block_number = vdo_get_recovery_journal_block_number(journal,
967 journal->tail);
968
969 vdo_pack_recovery_block_header(&unpacked, header);
970 set_active_sector(block, vdo_get_journal_block_sector(header, 1));
971 set_journal_tail(journal, journal->tail + 1);
972 vdo_advance_block_map_era(journal->block_map, journal->tail);
973 return true;
974 }
975
976 /**
977 * initialize_lock_count() - Initialize the value of the journal zone's counter for a given lock.
978 * @journal: The recovery journal.
979 *
980 * Context: This must be called from the journal zone.
981 */
initialize_lock_count(struct recovery_journal * journal)982 static void initialize_lock_count(struct recovery_journal *journal)
983 {
984 u16 *journal_value;
985 block_count_t lock_number = journal->active_block->block_number;
986 atomic_t *decrement_counter = get_decrement_counter(journal, lock_number);
987
988 journal_value = get_counter(journal, lock_number, VDO_ZONE_TYPE_JOURNAL, 0);
989 VDO_ASSERT_LOG_ONLY((*journal_value == atomic_read(decrement_counter)),
990 "count to be initialized not in use");
991 *journal_value = journal->entries_per_block + 1;
992 atomic_set(decrement_counter, 0);
993 }
994
995 /**
996 * prepare_to_assign_entry() - Prepare the currently active block to receive an entry and check
997 * whether an entry of the given type may be assigned at this time.
998 * @journal: The journal receiving an entry.
999 *
1000 * Return: true if there is space in the journal to store an entry of the specified type.
1001 */
prepare_to_assign_entry(struct recovery_journal * journal)1002 static bool prepare_to_assign_entry(struct recovery_journal *journal)
1003 {
1004 if (journal->available_space == 0)
1005 return false;
1006
1007 if (is_block_full(journal->active_block) && !advance_tail(journal))
1008 return false;
1009
1010 if (!is_block_empty(journal->active_block))
1011 return true;
1012
1013 if ((journal->tail - get_recovery_journal_head(journal)) > journal->size) {
1014 /* Cannot use this block since the journal is full. */
1015 journal->events.disk_full++;
1016 return false;
1017 }
1018
1019 /*
1020 * Don't allow the new block to be reaped until all of its entries have been committed to
1021 * the block map and until the journal block has been fully committed as well. Because the
1022 * block map update is done only after any slab journal entries have been made, the
1023 * per-entry lock for the block map entry serves to protect those as well.
1024 */
1025 initialize_lock_count(journal);
1026 return true;
1027 }
1028
1029 static void write_blocks(struct recovery_journal *journal);
1030
1031 /**
1032 * schedule_block_write() - Queue a block for writing.
1033 * @journal: The journal in question.
1034 * @block: The block which is now ready to write.
1035 *
1036 * The block is expected to be full. If the block is currently writing, this is a noop as the block
1037 * will be queued for writing when the write finishes. The block must not currently be queued for
1038 * writing.
1039 */
schedule_block_write(struct recovery_journal * journal,struct recovery_journal_block * block)1040 static void schedule_block_write(struct recovery_journal *journal,
1041 struct recovery_journal_block *block)
1042 {
1043 if (!block->committing)
1044 vdo_waitq_enqueue_waiter(&journal->pending_writes, &block->write_waiter);
1045 /*
1046 * At the end of adding entries, or discovering this partial block is now full and ready to
1047 * rewrite, we will call write_blocks() and write a whole batch.
1048 */
1049 }
1050
1051 /**
1052 * release_journal_block_reference() - Release a reference to a journal block.
1053 * @block: The journal block from which to release a reference.
1054 */
release_journal_block_reference(struct recovery_journal_block * block)1055 static void release_journal_block_reference(struct recovery_journal_block *block)
1056 {
1057 vdo_release_recovery_journal_block_reference(block->journal,
1058 block->sequence_number,
1059 VDO_ZONE_TYPE_JOURNAL, 0);
1060 }
1061
update_usages(struct recovery_journal * journal,struct data_vio * data_vio)1062 static void update_usages(struct recovery_journal *journal, struct data_vio *data_vio)
1063 {
1064 if (data_vio->increment_updater.operation == VDO_JOURNAL_BLOCK_MAP_REMAPPING) {
1065 journal->block_map_data_blocks++;
1066 return;
1067 }
1068
1069 if (data_vio->new_mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1070 journal->logical_blocks_used++;
1071
1072 if (data_vio->mapped.state != VDO_MAPPING_STATE_UNMAPPED)
1073 journal->logical_blocks_used--;
1074 }
1075
1076 /**
1077 * assign_entry() - Assign an entry waiter to the active block.
1078 * @waiter: The data_vio.
1079 * @context: The recovery journal block.
1080 *
1081 * Implements waiter_callback_fn.
1082 */
assign_entry(struct vdo_waiter * waiter,void * context)1083 static void assign_entry(struct vdo_waiter *waiter, void *context)
1084 {
1085 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1086 struct recovery_journal_block *block = context;
1087 struct recovery_journal *journal = block->journal;
1088
1089 /* Record the point at which we will make the journal entry. */
1090 data_vio->recovery_journal_point = (struct journal_point) {
1091 .sequence_number = block->sequence_number,
1092 .entry_count = block->entry_count,
1093 };
1094
1095 update_usages(journal, data_vio);
1096 journal->available_space--;
1097
1098 if (!vdo_waitq_has_waiters(&block->entry_waiters))
1099 journal->events.blocks.started++;
1100
1101 vdo_waitq_enqueue_waiter(&block->entry_waiters, &data_vio->waiter);
1102 block->entry_count++;
1103 block->uncommitted_entry_count++;
1104 journal->events.entries.started++;
1105
1106 if (is_block_full(block)) {
1107 /*
1108 * The block is full, so we can write it anytime henceforth. If it is already
1109 * committing, we'll queue it for writing when it comes back.
1110 */
1111 schedule_block_write(journal, block);
1112 }
1113
1114 /* Force out slab journal tail blocks when threshold is reached. */
1115 check_slab_journal_commit_threshold(journal);
1116 }
1117
assign_entries(struct recovery_journal * journal)1118 static void assign_entries(struct recovery_journal *journal)
1119 {
1120 if (journal->adding_entries) {
1121 /* Protect against re-entrancy. */
1122 return;
1123 }
1124
1125 journal->adding_entries = true;
1126 while (vdo_waitq_has_waiters(&journal->entry_waiters) &&
1127 prepare_to_assign_entry(journal)) {
1128 vdo_waitq_notify_next_waiter(&journal->entry_waiters,
1129 assign_entry, journal->active_block);
1130 }
1131
1132 /* Now that we've finished with entries, see if we have a batch of blocks to write. */
1133 write_blocks(journal);
1134 journal->adding_entries = false;
1135 }
1136
1137 /**
1138 * recycle_journal_block() - Prepare an in-memory journal block to be reused now that it has been
1139 * fully committed.
1140 * @block: The block to be recycled.
1141 */
recycle_journal_block(struct recovery_journal_block * block)1142 static void recycle_journal_block(struct recovery_journal_block *block)
1143 {
1144 struct recovery_journal *journal = block->journal;
1145 block_count_t i;
1146
1147 list_move_tail(&block->list_node, &journal->free_tail_blocks);
1148
1149 /* Release any unused entry locks. */
1150 for (i = block->entry_count; i < journal->entries_per_block; i++)
1151 release_journal_block_reference(block);
1152
1153 /*
1154 * Release our own lock against reaping now that the block is completely committed, or
1155 * we're giving up because we're in read-only mode.
1156 */
1157 if (block->entry_count > 0)
1158 release_journal_block_reference(block);
1159
1160 if (block == journal->active_block)
1161 journal->active_block = NULL;
1162 }
1163
1164 /**
1165 * continue_committed_waiter() - invoked whenever a VIO is to be released from the journal because
1166 * its entry was committed to disk.
1167 * @waiter: The data_vio waiting on a journal write.
1168 * @context: A pointer to the recovery journal.
1169 *
1170 * Implements waiter_callback_fn.
1171 */
continue_committed_waiter(struct vdo_waiter * waiter,void * context)1172 static void continue_committed_waiter(struct vdo_waiter *waiter, void *context)
1173 {
1174 struct data_vio *data_vio = vdo_waiter_as_data_vio(waiter);
1175 struct recovery_journal *journal = context;
1176 int result = (is_read_only(journal) ? VDO_READ_ONLY : VDO_SUCCESS);
1177 bool has_decrement;
1178
1179 VDO_ASSERT_LOG_ONLY(vdo_before_journal_point(&journal->commit_point,
1180 &data_vio->recovery_journal_point),
1181 "DataVIOs released from recovery journal in order. Recovery journal point is (%llu, %u), but commit waiter point is (%llu, %u)",
1182 (unsigned long long) journal->commit_point.sequence_number,
1183 journal->commit_point.entry_count,
1184 (unsigned long long) data_vio->recovery_journal_point.sequence_number,
1185 data_vio->recovery_journal_point.entry_count);
1186
1187 journal->commit_point = data_vio->recovery_journal_point;
1188 data_vio->last_async_operation = VIO_ASYNC_OP_UPDATE_REFERENCE_COUNTS;
1189 if (result != VDO_SUCCESS) {
1190 continue_data_vio_with_error(data_vio, result);
1191 return;
1192 }
1193
1194 /*
1195 * The increment must be launched first since it must come before the
1196 * decrement if they are in the same slab.
1197 */
1198 has_decrement = (data_vio->decrement_updater.zpbn.pbn != VDO_ZERO_BLOCK);
1199 if ((data_vio->increment_updater.zpbn.pbn != VDO_ZERO_BLOCK) || !has_decrement)
1200 continue_data_vio(data_vio);
1201
1202 if (has_decrement)
1203 vdo_launch_completion(&data_vio->decrement_completion);
1204 }
1205
1206 /**
1207 * notify_commit_waiters() - Notify any VIOs whose entries have now committed.
1208 * @journal: The recovery journal to update.
1209 */
notify_commit_waiters(struct recovery_journal * journal)1210 static void notify_commit_waiters(struct recovery_journal *journal)
1211 {
1212 struct recovery_journal_block *block;
1213
1214 list_for_each_entry(block, &journal->active_tail_blocks, list_node) {
1215 if (block->committing)
1216 return;
1217
1218 vdo_waitq_notify_all_waiters(&block->commit_waiters,
1219 continue_committed_waiter, journal);
1220 if (is_read_only(journal)) {
1221 vdo_waitq_notify_all_waiters(&block->entry_waiters,
1222 continue_committed_waiter,
1223 journal);
1224 } else if (is_block_dirty(block) || !is_block_full(block)) {
1225 /* Stop at partially-committed or partially-filled blocks. */
1226 return;
1227 }
1228 }
1229 }
1230
1231 /**
1232 * recycle_journal_blocks() - Recycle any journal blocks which have been fully committed.
1233 * @journal: The recovery journal to update.
1234 */
recycle_journal_blocks(struct recovery_journal * journal)1235 static void recycle_journal_blocks(struct recovery_journal *journal)
1236 {
1237 struct recovery_journal_block *block, *tmp;
1238
1239 list_for_each_entry_safe(block, tmp, &journal->active_tail_blocks, list_node) {
1240 if (block->committing) {
1241 /* Don't recycle committing blocks. */
1242 return;
1243 }
1244
1245 if (!is_read_only(journal) &&
1246 (is_block_dirty(block) || !is_block_full(block))) {
1247 /*
1248 * Don't recycle partially written or partially full blocks, except in
1249 * read-only mode.
1250 */
1251 return;
1252 }
1253
1254 recycle_journal_block(block);
1255 }
1256 }
1257
1258 /**
1259 * complete_write() - Handle post-commit processing.
1260 * @completion: The completion of the VIO writing this block.
1261 *
1262 * This is the callback registered by write_block(). If more entries accumulated in the block being
1263 * committed while the commit was in progress, another commit will be initiated.
1264 */
complete_write(struct vdo_completion * completion)1265 static void complete_write(struct vdo_completion *completion)
1266 {
1267 struct recovery_journal_block *block = completion->parent;
1268 struct recovery_journal *journal = block->journal;
1269 struct recovery_journal_block *last_active_block;
1270
1271 assert_on_journal_thread(journal, __func__);
1272
1273 journal->pending_write_count -= 1;
1274 journal->events.blocks.committed += 1;
1275 journal->events.entries.committed += block->entries_in_commit;
1276 block->uncommitted_entry_count -= block->entries_in_commit;
1277 block->entries_in_commit = 0;
1278 block->committing = false;
1279
1280 /* If this block is the latest block to be acknowledged, record that fact. */
1281 if (block->sequence_number > journal->last_write_acknowledged)
1282 journal->last_write_acknowledged = block->sequence_number;
1283
1284 last_active_block = get_journal_block(&journal->active_tail_blocks);
1285 VDO_ASSERT_LOG_ONLY((block->sequence_number >= last_active_block->sequence_number),
1286 "completed journal write is still active");
1287
1288 notify_commit_waiters(journal);
1289
1290 /*
1291 * Is this block now full? Reaping, and adding entries, might have already sent it off for
1292 * rewriting; else, queue it for rewrite.
1293 */
1294 if (is_block_dirty(block) && is_block_full(block))
1295 schedule_block_write(journal, block);
1296
1297 recycle_journal_blocks(journal);
1298 write_blocks(journal);
1299
1300 check_for_drain_complete(journal);
1301 }
1302
handle_write_error(struct vdo_completion * completion)1303 static void handle_write_error(struct vdo_completion *completion)
1304 {
1305 struct recovery_journal_block *block = completion->parent;
1306 struct recovery_journal *journal = block->journal;
1307
1308 vio_record_metadata_io_error(as_vio(completion));
1309 vdo_log_error_strerror(completion->result,
1310 "cannot write recovery journal block %llu",
1311 (unsigned long long) block->sequence_number);
1312 enter_journal_read_only_mode(journal, completion->result);
1313 complete_write(completion);
1314 }
1315
complete_write_endio(struct bio * bio)1316 static void complete_write_endio(struct bio *bio)
1317 {
1318 struct vio *vio = bio->bi_private;
1319 struct recovery_journal_block *block = vio->completion.parent;
1320 struct recovery_journal *journal = block->journal;
1321
1322 continue_vio_after_io(vio, complete_write, journal->thread_id);
1323 }
1324
1325 /**
1326 * add_queued_recovery_entries() - Actually add entries from the queue to the given block.
1327 * @block: The journal block.
1328 */
add_queued_recovery_entries(struct recovery_journal_block * block)1329 static void add_queued_recovery_entries(struct recovery_journal_block *block)
1330 {
1331 while (vdo_waitq_has_waiters(&block->entry_waiters)) {
1332 struct data_vio *data_vio =
1333 vdo_waiter_as_data_vio(vdo_waitq_dequeue_waiter(&block->entry_waiters));
1334 struct tree_lock *lock = &data_vio->tree_lock;
1335 struct packed_recovery_journal_entry *packed_entry;
1336 struct recovery_journal_entry new_entry;
1337
1338 if (block->sector->entry_count == RECOVERY_JOURNAL_ENTRIES_PER_SECTOR)
1339 set_active_sector(block,
1340 (char *) block->sector + VDO_SECTOR_SIZE);
1341
1342 /* Compose and encode the entry. */
1343 packed_entry = &block->sector->entries[block->sector->entry_count++];
1344 new_entry = (struct recovery_journal_entry) {
1345 .mapping = {
1346 .pbn = data_vio->increment_updater.zpbn.pbn,
1347 .state = data_vio->increment_updater.zpbn.state,
1348 },
1349 .unmapping = {
1350 .pbn = data_vio->decrement_updater.zpbn.pbn,
1351 .state = data_vio->decrement_updater.zpbn.state,
1352 },
1353 .operation = data_vio->increment_updater.operation,
1354 .slot = lock->tree_slots[lock->height].block_map_slot,
1355 };
1356 *packed_entry = vdo_pack_recovery_journal_entry(&new_entry);
1357 data_vio->recovery_sequence_number = block->sequence_number;
1358
1359 /* Enqueue the data_vio to wait for its entry to commit. */
1360 vdo_waitq_enqueue_waiter(&block->commit_waiters, &data_vio->waiter);
1361 }
1362 }
1363
1364 /**
1365 * write_block() - Issue a block for writing.
1366 * @waiter: The recovery journal block to write.
1367 * @context: Not used.
1368 *
1369 * Implements waiter_callback_fn.
1370 */
write_block(struct vdo_waiter * waiter,void __always_unused * context)1371 static void write_block(struct vdo_waiter *waiter, void __always_unused *context)
1372 {
1373 struct recovery_journal_block *block =
1374 container_of(waiter, struct recovery_journal_block, write_waiter);
1375 struct recovery_journal *journal = block->journal;
1376 struct packed_journal_header *header = get_block_header(block);
1377
1378 if (block->committing || !vdo_waitq_has_waiters(&block->entry_waiters) ||
1379 is_read_only(journal))
1380 return;
1381
1382 block->entries_in_commit = vdo_waitq_num_waiters(&block->entry_waiters);
1383 add_queued_recovery_entries(block);
1384
1385 journal->pending_write_count += 1;
1386 journal->events.blocks.written += 1;
1387 journal->events.entries.written += block->entries_in_commit;
1388
1389 header->block_map_head = __cpu_to_le64(journal->block_map_head);
1390 header->slab_journal_head = __cpu_to_le64(journal->slab_journal_head);
1391 header->entry_count = __cpu_to_le16(block->entry_count);
1392
1393 block->committing = true;
1394
1395 /*
1396 * We must issue a flush and a FUA for every commit. The flush is necessary to ensure that
1397 * the data being referenced is stable. The FUA is necessary to ensure that the journal
1398 * block itself is stable before allowing overwrites of the lbn's previous data.
1399 */
1400 vdo_submit_metadata_vio(&block->vio, journal->origin + block->block_number,
1401 complete_write_endio, handle_write_error,
1402 REQ_OP_WRITE | REQ_PRIO | REQ_PREFLUSH | REQ_SYNC | REQ_FUA);
1403 }
1404
1405
1406 /**
1407 * write_blocks() - Attempt to commit blocks, according to write policy.
1408 * @journal: The recovery journal.
1409 */
write_blocks(struct recovery_journal * journal)1410 static void write_blocks(struct recovery_journal *journal)
1411 {
1412 assert_on_journal_thread(journal, __func__);
1413 /*
1414 * We call this function after adding entries to the journal and after finishing a block
1415 * write. Thus, when this function terminates we must either have no VIOs waiting in the
1416 * journal or have some outstanding IO to provide a future wakeup.
1417 *
1418 * We want to only issue full blocks if there are no pending writes. However, if there are
1419 * no outstanding writes and some unwritten entries, we must issue a block, even if it's
1420 * the active block and it isn't full.
1421 */
1422 if (journal->pending_write_count > 0)
1423 return;
1424
1425 /* Write all the full blocks. */
1426 vdo_waitq_notify_all_waiters(&journal->pending_writes, write_block, NULL);
1427
1428 /*
1429 * Do we need to write the active block? Only if we have no outstanding writes, even after
1430 * issuing all of the full writes.
1431 */
1432 if ((journal->pending_write_count == 0) && (journal->active_block != NULL))
1433 write_block(&journal->active_block->write_waiter, NULL);
1434 }
1435
1436 /**
1437 * vdo_add_recovery_journal_entry() - Add an entry to a recovery journal.
1438 * @journal: The journal in which to make an entry.
1439 * @data_vio: The data_vio for which to add the entry. The entry will be taken
1440 * from the logical and new_mapped fields of the data_vio. The
1441 * data_vio's recovery_sequence_number field will be set to the
1442 * sequence number of the journal block in which the entry was
1443 * made.
1444 *
1445 * This method is asynchronous. The data_vio will not be called back until the entry is committed
1446 * to the on-disk journal.
1447 */
vdo_add_recovery_journal_entry(struct recovery_journal * journal,struct data_vio * data_vio)1448 void vdo_add_recovery_journal_entry(struct recovery_journal *journal,
1449 struct data_vio *data_vio)
1450 {
1451 assert_on_journal_thread(journal, __func__);
1452 if (!vdo_is_state_normal(&journal->state)) {
1453 continue_data_vio_with_error(data_vio, VDO_INVALID_ADMIN_STATE);
1454 return;
1455 }
1456
1457 if (is_read_only(journal)) {
1458 continue_data_vio_with_error(data_vio, VDO_READ_ONLY);
1459 return;
1460 }
1461
1462 VDO_ASSERT_LOG_ONLY(data_vio->recovery_sequence_number == 0,
1463 "journal lock not held for new entry");
1464
1465 vdo_advance_journal_point(&journal->append_point, journal->entries_per_block);
1466 vdo_waitq_enqueue_waiter(&journal->entry_waiters, &data_vio->waiter);
1467 assign_entries(journal);
1468 }
1469
1470 /**
1471 * is_lock_locked() - Check whether a lock is locked for a zone type.
1472 * @journal: The recovery journal.
1473 * @lock_number: The lock to check.
1474 * @zone_type: The type of the zone.
1475 *
1476 * If the recovery journal has a lock on the lock number, both logical and physical zones are
1477 * considered locked.
1478 *
1479 * Return: true if the specified lock has references (is locked).
1480 */
is_lock_locked(struct recovery_journal * journal,block_count_t lock_number,enum vdo_zone_type zone_type)1481 static bool is_lock_locked(struct recovery_journal *journal, block_count_t lock_number,
1482 enum vdo_zone_type zone_type)
1483 {
1484 atomic_t *zone_count;
1485 bool locked;
1486
1487 if (is_journal_zone_locked(journal, lock_number))
1488 return true;
1489
1490 zone_count = get_zone_count_ptr(journal, lock_number, zone_type);
1491 locked = (atomic_read(zone_count) != 0);
1492 /* Pairs with implicit barrier in vdo_release_recovery_journal_block_reference() */
1493 smp_rmb();
1494 return locked;
1495 }
1496
1497 /**
1498 * reap_recovery_journal() - Conduct a sweep on a recovery journal to reclaim unreferenced blocks.
1499 * @journal: The recovery journal.
1500 */
reap_recovery_journal(struct recovery_journal * journal)1501 static void reap_recovery_journal(struct recovery_journal *journal)
1502 {
1503 if (journal->reaping) {
1504 /*
1505 * We already have an outstanding reap in progress. We need to wait for it to
1506 * finish.
1507 */
1508 return;
1509 }
1510
1511 if (vdo_is_state_quiescent(&journal->state)) {
1512 /* We are supposed to not do IO. Don't botch it by reaping. */
1513 return;
1514 }
1515
1516 /*
1517 * Start reclaiming blocks only when the journal head has no references. Then stop when a
1518 * block is referenced.
1519 */
1520 while ((journal->block_map_reap_head < journal->last_write_acknowledged) &&
1521 !is_lock_locked(journal, journal->block_map_head_block_number,
1522 VDO_ZONE_TYPE_LOGICAL)) {
1523 journal->block_map_reap_head++;
1524 if (++journal->block_map_head_block_number == journal->size)
1525 journal->block_map_head_block_number = 0;
1526 }
1527
1528 while ((journal->slab_journal_reap_head < journal->last_write_acknowledged) &&
1529 !is_lock_locked(journal, journal->slab_journal_head_block_number,
1530 VDO_ZONE_TYPE_PHYSICAL)) {
1531 journal->slab_journal_reap_head++;
1532 if (++journal->slab_journal_head_block_number == journal->size)
1533 journal->slab_journal_head_block_number = 0;
1534 }
1535
1536 if ((journal->block_map_reap_head == journal->block_map_head) &&
1537 (journal->slab_journal_reap_head == journal->slab_journal_head)) {
1538 /* Nothing happened. */
1539 return;
1540 }
1541
1542 /*
1543 * If the block map head will advance, we must flush any block map page modified by the
1544 * entries we are reaping. If the slab journal head will advance, we must flush the slab
1545 * summary update covering the slab journal that just released some lock.
1546 */
1547 journal->reaping = true;
1548 vdo_submit_flush_vio(journal->flush_vio, flush_endio, handle_flush_error);
1549 }
1550
1551 /**
1552 * vdo_acquire_recovery_journal_block_reference() - Acquire a reference to a recovery journal block
1553 * from somewhere other than the journal itself.
1554 * @journal: The recovery journal.
1555 * @sequence_number: The journal sequence number of the referenced block.
1556 * @zone_type: The type of the zone making the adjustment.
1557 * @zone_id: The ID of the zone making the adjustment.
1558 */
vdo_acquire_recovery_journal_block_reference(struct recovery_journal * journal,sequence_number_t sequence_number,enum vdo_zone_type zone_type,zone_count_t zone_id)1559 void vdo_acquire_recovery_journal_block_reference(struct recovery_journal *journal,
1560 sequence_number_t sequence_number,
1561 enum vdo_zone_type zone_type,
1562 zone_count_t zone_id)
1563 {
1564 block_count_t lock_number;
1565 u16 *current_value;
1566
1567 if (sequence_number == 0)
1568 return;
1569
1570 VDO_ASSERT_LOG_ONLY((zone_type != VDO_ZONE_TYPE_JOURNAL),
1571 "invalid lock count increment from journal zone");
1572
1573 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1574 current_value = get_counter(journal, lock_number, zone_type, zone_id);
1575 VDO_ASSERT_LOG_ONLY(*current_value < U16_MAX,
1576 "increment of lock counter must not overflow");
1577
1578 if (*current_value == 0) {
1579 /*
1580 * This zone is acquiring this lock for the first time. Extra barriers because this
1581 * was original developed using an atomic add operation that implicitly had them.
1582 */
1583 smp_mb__before_atomic();
1584 atomic_inc(get_zone_count_ptr(journal, lock_number, zone_type));
1585 /* same as before_atomic */
1586 smp_mb__after_atomic();
1587 }
1588
1589 *current_value += 1;
1590 }
1591
1592 /**
1593 * vdo_release_journal_entry_lock() - Release a single per-entry reference count for a recovery
1594 * journal block.
1595 * @journal: The recovery journal.
1596 * @sequence_number: The journal sequence number of the referenced block.
1597 */
vdo_release_journal_entry_lock(struct recovery_journal * journal,sequence_number_t sequence_number)1598 void vdo_release_journal_entry_lock(struct recovery_journal *journal,
1599 sequence_number_t sequence_number)
1600 {
1601 block_count_t lock_number;
1602
1603 if (sequence_number == 0)
1604 return;
1605
1606 lock_number = vdo_get_recovery_journal_block_number(journal, sequence_number);
1607 /*
1608 * Extra barriers because this was originally developed using an atomic add operation that
1609 * implicitly had them.
1610 */
1611 smp_mb__before_atomic();
1612 atomic_inc(get_decrement_counter(journal, lock_number));
1613 /* same as before_atomic */
1614 smp_mb__after_atomic();
1615 }
1616
1617 /** Implements vdo_admin_initiator_fn. */
initiate_drain(struct admin_state * state)1618 static void initiate_drain(struct admin_state *state)
1619 {
1620 check_for_drain_complete(container_of(state, struct recovery_journal, state));
1621 }
1622
1623 /**
1624 * vdo_drain_recovery_journal() - Drain recovery journal I/O.
1625 * @journal: The journal to drain.
1626 * @operation: The drain operation (suspend or save).
1627 * @parent: The completion to notify once the journal is drained.
1628 *
1629 * All uncommitted entries will be written out.
1630 */
vdo_drain_recovery_journal(struct recovery_journal * journal,const struct admin_state_code * operation,struct vdo_completion * parent)1631 void vdo_drain_recovery_journal(struct recovery_journal *journal,
1632 const struct admin_state_code *operation,
1633 struct vdo_completion *parent)
1634 {
1635 assert_on_journal_thread(journal, __func__);
1636 vdo_start_draining(&journal->state, operation, parent, initiate_drain);
1637 }
1638
1639 /**
1640 * resume_lock_counter() - Re-allow notifications from a suspended lock counter.
1641 * @counter: The counter.
1642 *
1643 * Return: true if the lock counter was suspended.
1644 */
resume_lock_counter(struct lock_counter * counter)1645 static bool resume_lock_counter(struct lock_counter *counter)
1646 {
1647 int prior_state;
1648
1649 /*
1650 * Extra barriers because this was original developed using a CAS operation that implicitly
1651 * had them.
1652 */
1653 smp_mb__before_atomic();
1654 prior_state = atomic_cmpxchg(&counter->state, LOCK_COUNTER_STATE_SUSPENDED,
1655 LOCK_COUNTER_STATE_NOT_NOTIFYING);
1656 /* same as before_atomic */
1657 smp_mb__after_atomic();
1658
1659 return (prior_state == LOCK_COUNTER_STATE_SUSPENDED);
1660 }
1661
1662 /**
1663 * vdo_resume_recovery_journal() - Resume a recovery journal which has been drained.
1664 * @journal: The journal to resume.
1665 * @parent: The completion to finish once the journal is resumed.
1666 */
vdo_resume_recovery_journal(struct recovery_journal * journal,struct vdo_completion * parent)1667 void vdo_resume_recovery_journal(struct recovery_journal *journal,
1668 struct vdo_completion *parent)
1669 {
1670 bool saved;
1671
1672 assert_on_journal_thread(journal, __func__);
1673 saved = vdo_is_state_saved(&journal->state);
1674 vdo_set_completion_result(parent, vdo_resume_if_quiescent(&journal->state));
1675 if (is_read_only(journal)) {
1676 vdo_continue_completion(parent, VDO_READ_ONLY);
1677 return;
1678 }
1679
1680 if (saved)
1681 initialize_journal_state(journal);
1682
1683 if (resume_lock_counter(&journal->lock_counter)) {
1684 /* We might have missed a notification. */
1685 reap_recovery_journal(journal);
1686 }
1687
1688 vdo_launch_completion(parent);
1689 }
1690
1691 /**
1692 * vdo_get_recovery_journal_logical_blocks_used() - Get the number of logical blocks in use by the
1693 * VDO.
1694 * @journal: The journal.
1695 *
1696 * Return: The number of logical blocks in use by the VDO.
1697 */
vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal * journal)1698 block_count_t vdo_get_recovery_journal_logical_blocks_used(const struct recovery_journal *journal)
1699 {
1700 return journal->logical_blocks_used;
1701 }
1702
1703 /**
1704 * vdo_get_recovery_journal_statistics() - Get the current statistics from the recovery journal.
1705 * @journal: The recovery journal to query.
1706 *
1707 * Return: A copy of the current statistics for the journal.
1708 */
1709 struct recovery_journal_statistics
vdo_get_recovery_journal_statistics(const struct recovery_journal * journal)1710 vdo_get_recovery_journal_statistics(const struct recovery_journal *journal)
1711 {
1712 return journal->events;
1713 }
1714
1715 /**
1716 * dump_recovery_block() - Dump the contents of the recovery block to the log.
1717 * @block: The block to dump.
1718 */
dump_recovery_block(const struct recovery_journal_block * block)1719 static void dump_recovery_block(const struct recovery_journal_block *block)
1720 {
1721 vdo_log_info(" sequence number %llu; entries %u; %s; %zu entry waiters; %zu commit waiters",
1722 (unsigned long long) block->sequence_number, block->entry_count,
1723 (block->committing ? "committing" : "waiting"),
1724 vdo_waitq_num_waiters(&block->entry_waiters),
1725 vdo_waitq_num_waiters(&block->commit_waiters));
1726 }
1727
1728 /**
1729 * vdo_dump_recovery_journal_statistics() - Dump some current statistics and other debug info from
1730 * the recovery journal.
1731 * @journal: The recovery journal to dump.
1732 */
vdo_dump_recovery_journal_statistics(const struct recovery_journal * journal)1733 void vdo_dump_recovery_journal_statistics(const struct recovery_journal *journal)
1734 {
1735 const struct recovery_journal_block *block;
1736 struct recovery_journal_statistics stats = vdo_get_recovery_journal_statistics(journal);
1737
1738 vdo_log_info("Recovery Journal");
1739 vdo_log_info(" block_map_head=%llu slab_journal_head=%llu last_write_acknowledged=%llu tail=%llu block_map_reap_head=%llu slab_journal_reap_head=%llu disk_full=%llu slab_journal_commits_requested=%llu entry_waiters=%zu",
1740 (unsigned long long) journal->block_map_head,
1741 (unsigned long long) journal->slab_journal_head,
1742 (unsigned long long) journal->last_write_acknowledged,
1743 (unsigned long long) journal->tail,
1744 (unsigned long long) journal->block_map_reap_head,
1745 (unsigned long long) journal->slab_journal_reap_head,
1746 (unsigned long long) stats.disk_full,
1747 (unsigned long long) stats.slab_journal_commits_requested,
1748 vdo_waitq_num_waiters(&journal->entry_waiters));
1749 vdo_log_info(" entries: started=%llu written=%llu committed=%llu",
1750 (unsigned long long) stats.entries.started,
1751 (unsigned long long) stats.entries.written,
1752 (unsigned long long) stats.entries.committed);
1753 vdo_log_info(" blocks: started=%llu written=%llu committed=%llu",
1754 (unsigned long long) stats.blocks.started,
1755 (unsigned long long) stats.blocks.written,
1756 (unsigned long long) stats.blocks.committed);
1757
1758 vdo_log_info(" active blocks:");
1759 list_for_each_entry(block, &journal->active_tail_blocks, list_node)
1760 dump_recovery_block(block);
1761 }
1762