xref: /linux/drivers/md/dm-vdo/packer.c (revision a5f998094fa344cdd1342164948abb4d7c6101ce) !
1 // SPDX-License-Identifier: GPL-2.0-only
2 /*
3  * Copyright 2023 Red Hat
4  */
5 
6 #include "packer.h"
7 
8 #include <linux/atomic.h>
9 #include <linux/blkdev.h>
10 
11 #include "logger.h"
12 #include "memory-alloc.h"
13 #include "permassert.h"
14 #include "string-utils.h"
15 
16 #include "admin-state.h"
17 #include "completion.h"
18 #include "constants.h"
19 #include "data-vio.h"
20 #include "dedupe.h"
21 #include "encodings.h"
22 #include "io-submitter.h"
23 #include "physical-zone.h"
24 #include "status-codes.h"
25 #include "vdo.h"
26 #include "vio.h"
27 
28 static const struct version_number COMPRESSED_BLOCK_1_0 = {
29 	.major_version = 1,
30 	.minor_version = 0,
31 };
32 
33 #define COMPRESSED_BLOCK_1_0_SIZE (4 + 4 + (2 * VDO_MAX_COMPRESSION_SLOTS))
34 
35 /**
36  * vdo_get_compressed_block_fragment() - Get a reference to a compressed fragment from a compressed
37  *                                       block.
38  * @mapping_state: The mapping state describing the fragment.
39  * @block: The compressed block that was read from disk.
40  * @fragment_offset: The offset of the fragment within the compressed block.
41  * @fragment_size: The size of the fragment.
42  *
43  * Return: If a valid compressed fragment is found, VDO_SUCCESS; otherwise, VDO_INVALID_FRAGMENT if
44  *         the fragment is invalid.
45  */
46 int vdo_get_compressed_block_fragment(enum block_mapping_state mapping_state,
47 				      struct compressed_block *block,
48 				      u16 *fragment_offset, u16 *fragment_size)
49 {
50 	u16 compressed_size;
51 	u16 offset = 0;
52 	unsigned int i;
53 	u8 slot;
54 	struct version_number version;
55 
56 	if (!vdo_is_state_compressed(mapping_state))
57 		return VDO_INVALID_FRAGMENT;
58 
59 	version = vdo_unpack_version_number(block->header.version);
60 	if (!vdo_are_same_version(version, COMPRESSED_BLOCK_1_0))
61 		return VDO_INVALID_FRAGMENT;
62 
63 	slot = mapping_state - VDO_MAPPING_STATE_COMPRESSED_BASE;
64 	if (slot >= VDO_MAX_COMPRESSION_SLOTS)
65 		return VDO_INVALID_FRAGMENT;
66 
67 	compressed_size = __le16_to_cpu(block->header.sizes[slot]);
68 	for (i = 0; i < slot; i++) {
69 		offset += __le16_to_cpu(block->header.sizes[i]);
70 		if (offset >= VDO_COMPRESSED_BLOCK_DATA_SIZE)
71 			return VDO_INVALID_FRAGMENT;
72 	}
73 
74 	if ((offset + compressed_size) > VDO_COMPRESSED_BLOCK_DATA_SIZE)
75 		return VDO_INVALID_FRAGMENT;
76 
77 	*fragment_offset = offset;
78 	*fragment_size = compressed_size;
79 	return VDO_SUCCESS;
80 }
81 
82 /**
83  * assert_on_packer_thread() - Check that we are on the packer thread.
84  * @packer: The packer.
85  * @caller: The function which is asserting.
86  */
87 static inline void assert_on_packer_thread(struct packer *packer, const char *caller)
88 {
89 	VDO_ASSERT_LOG_ONLY((vdo_get_callback_thread_id() == packer->thread_id),
90 			    "%s() called from packer thread", caller);
91 }
92 
93 /**
94  * insert_in_sorted_list() - Insert a bin to the list.
95  * @packer: The packer.
96  * @bin: The bin to move to its sorted position.
97  *
98  * The list is in ascending order of free space. Since all bins are already in the list, this
99  * actually moves the bin to the correct position in the list.
100  */
101 static void insert_in_sorted_list(struct packer *packer, struct packer_bin *bin)
102 {
103 	struct packer_bin *active_bin;
104 
105 	list_for_each_entry(active_bin, &packer->bins, list)
106 		if (active_bin->free_space > bin->free_space) {
107 			list_move_tail(&bin->list, &active_bin->list);
108 			return;
109 		}
110 
111 	list_move_tail(&bin->list, &packer->bins);
112 }
113 
114 /**
115  * make_bin() - Allocate a bin and put it into the packer's list.
116  * @packer: The packer.
117  */
118 static int __must_check make_bin(struct packer *packer)
119 {
120 	struct packer_bin *bin;
121 	int result;
122 
123 	result = vdo_allocate_extended(VDO_MAX_COMPRESSION_SLOTS, incoming, __func__, &bin);
124 	if (result != VDO_SUCCESS)
125 		return result;
126 
127 	bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
128 	INIT_LIST_HEAD(&bin->list);
129 	list_add_tail(&bin->list, &packer->bins);
130 	return VDO_SUCCESS;
131 }
132 
133 /**
134  * vdo_make_packer() - Make a new block packer.
135  *
136  * @vdo: The vdo to which this packer belongs.
137  * @bin_count: The number of partial bins to keep in memory.
138  * @packer_ptr: A pointer to hold the new packer.
139  *
140  * Return: VDO_SUCCESS or an error
141  */
142 int vdo_make_packer(struct vdo *vdo, block_count_t bin_count, struct packer **packer_ptr)
143 {
144 	struct packer *packer;
145 	block_count_t i;
146 	int result;
147 
148 	result = vdo_allocate(1, __func__, &packer);
149 	if (result != VDO_SUCCESS)
150 		return result;
151 
152 	packer->thread_id = vdo->thread_config.packer_thread;
153 	packer->size = bin_count;
154 	INIT_LIST_HEAD(&packer->bins);
155 	vdo_set_admin_state_code(&packer->state, VDO_ADMIN_STATE_NORMAL_OPERATION);
156 
157 	for (i = 0; i < bin_count; i++) {
158 		result = make_bin(packer);
159 		if (result != VDO_SUCCESS) {
160 			vdo_free_packer(packer);
161 			return result;
162 		}
163 	}
164 
165 	/*
166 	 * The canceled bin can hold up to half the number of user vios. Every canceled vio in the
167 	 * bin must have a canceler for which it is waiting, and any canceler will only have
168 	 * canceled one lock holder at a time.
169 	 */
170 	result = vdo_allocate_extended(MAXIMUM_VDO_USER_VIOS / 2, incoming, __func__,
171 				       &packer->canceled_bin);
172 	if (result != VDO_SUCCESS) {
173 		vdo_free_packer(packer);
174 		return result;
175 	}
176 
177 	result = vdo_make_default_thread(vdo, packer->thread_id);
178 	if (result != VDO_SUCCESS) {
179 		vdo_free_packer(packer);
180 		return result;
181 	}
182 
183 	*packer_ptr = packer;
184 	return VDO_SUCCESS;
185 }
186 
187 /**
188  * vdo_free_packer() - Free a block packer.
189  * @packer: The packer to free.
190  */
191 void vdo_free_packer(struct packer *packer)
192 {
193 	struct packer_bin *bin, *tmp;
194 
195 	if (packer == NULL)
196 		return;
197 
198 	list_for_each_entry_safe(bin, tmp, &packer->bins, list) {
199 		list_del_init(&bin->list);
200 		vdo_free(bin);
201 	}
202 
203 	vdo_free(vdo_forget(packer->canceled_bin));
204 	vdo_free(packer);
205 }
206 
207 /**
208  * get_packer_from_data_vio() - Get the packer from a data_vio.
209  * @data_vio: The data_vio.
210  *
211  * Return: The packer from the VDO to which the data_vio belongs.
212  */
213 static inline struct packer *get_packer_from_data_vio(struct data_vio *data_vio)
214 {
215 	return vdo_from_data_vio(data_vio)->packer;
216 }
217 
218 /**
219  * vdo_get_packer_statistics() - Get the current statistics from the packer.
220  * @packer: The packer to query.
221  *
222  * Return: a copy of the current statistics for the packer.
223  */
224 struct packer_statistics vdo_get_packer_statistics(const struct packer *packer)
225 {
226 	const struct packer_statistics *stats = &packer->statistics;
227 
228 	return (struct packer_statistics) {
229 		.compressed_fragments_written = READ_ONCE(stats->compressed_fragments_written),
230 		.compressed_blocks_written = READ_ONCE(stats->compressed_blocks_written),
231 		.compressed_fragments_in_packer = READ_ONCE(stats->compressed_fragments_in_packer),
232 	};
233 }
234 
235 /**
236  * abort_packing() - Abort packing a data_vio.
237  * @data_vio: The data_vio to abort.
238  */
239 static void abort_packing(struct data_vio *data_vio)
240 {
241 	struct packer *packer = get_packer_from_data_vio(data_vio);
242 
243 	WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
244 		   packer->statistics.compressed_fragments_in_packer - 1);
245 
246 	write_data_vio(data_vio);
247 }
248 
249 /**
250  * release_compressed_write_waiter() - Update a data_vio for which a successful compressed write
251  *                                     has completed and send it on its way.
252  * @data_vio: The data_vio to release.
253  * @allocation: The allocation to which the compressed block was written.
254  */
255 static void release_compressed_write_waiter(struct data_vio *data_vio,
256 					    struct allocation *allocation)
257 {
258 	data_vio->new_mapped = (struct zoned_pbn) {
259 		.pbn = allocation->pbn,
260 		.zone = allocation->zone,
261 		.state = data_vio->compression.slot + VDO_MAPPING_STATE_COMPRESSED_BASE,
262 	};
263 
264 	vdo_share_compressed_write_lock(data_vio, allocation->lock);
265 	update_metadata_for_data_vio_write(data_vio, allocation->lock);
266 }
267 
268 /**
269  * finish_compressed_write() - Finish a compressed block write.
270  * @completion: The compressed write completion.
271  *
272  * This callback is registered in continue_after_allocation().
273  */
274 static void finish_compressed_write(struct vdo_completion *completion)
275 {
276 	struct data_vio *agent = as_data_vio(completion);
277 	struct data_vio *client, *next;
278 
279 	assert_data_vio_in_allocated_zone(agent);
280 
281 	/*
282 	 * Process all the non-agent waiters first to ensure that the pbn lock can not be released
283 	 * until all of them have had a chance to journal their increfs.
284 	 */
285 	for (client = agent->compression.next_in_batch; client != NULL; client = next) {
286 		next = client->compression.next_in_batch;
287 		release_compressed_write_waiter(client, &agent->allocation);
288 	}
289 
290 	completion->error_handler = handle_data_vio_error;
291 	release_compressed_write_waiter(agent, &agent->allocation);
292 }
293 
294 static void handle_compressed_write_error(struct vdo_completion *completion)
295 {
296 	struct data_vio *agent = as_data_vio(completion);
297 	struct allocation *allocation = &agent->allocation;
298 	struct data_vio *client, *next;
299 
300 	if (vdo_requeue_completion_if_needed(completion, allocation->zone->thread_id))
301 		return;
302 
303 	update_vio_error_stats(as_vio(completion),
304 			       "Completing compressed write vio for physical block %llu with error",
305 			       (unsigned long long) allocation->pbn);
306 
307 	for (client = agent->compression.next_in_batch; client != NULL; client = next) {
308 		next = client->compression.next_in_batch;
309 		write_data_vio(client);
310 	}
311 
312 	/* Now that we've released the batch from the packer, forget the error and continue on. */
313 	vdo_reset_completion(completion);
314 	completion->error_handler = handle_data_vio_error;
315 	write_data_vio(agent);
316 }
317 
318 /**
319  * add_to_bin() - Put a data_vio in a specific packer_bin in which it will definitely fit.
320  * @bin: The bin in which to put the data_vio.
321  * @data_vio: The data_vio to add.
322  */
323 static void add_to_bin(struct packer_bin *bin, struct data_vio *data_vio)
324 {
325 	data_vio->compression.bin = bin;
326 	data_vio->compression.slot = bin->slots_used;
327 	bin->incoming[bin->slots_used++] = data_vio;
328 }
329 
330 /**
331  * remove_from_bin() - Get the next data_vio whose compression has not been canceled from a bin.
332  * @packer: The packer.
333  * @bin: The bin from which to get a data_vio.
334  *
335  * Any canceled data_vios will be moved to the canceled bin.
336  * Return: An uncanceled data_vio from the bin or NULL if there are none.
337  */
338 static struct data_vio *remove_from_bin(struct packer *packer, struct packer_bin *bin)
339 {
340 	while (bin->slots_used > 0) {
341 		struct data_vio *data_vio = bin->incoming[--bin->slots_used];
342 
343 		if (!advance_data_vio_compression_stage(data_vio).may_not_compress) {
344 			data_vio->compression.bin = NULL;
345 			return data_vio;
346 		}
347 
348 		add_to_bin(packer->canceled_bin, data_vio);
349 	}
350 
351 	/* The bin is now empty. */
352 	bin->free_space = VDO_COMPRESSED_BLOCK_DATA_SIZE;
353 	return NULL;
354 }
355 
356 /**
357  * initialize_compressed_block() - Initialize a compressed block.
358  * @block: The compressed block to initialize.
359  * @size: The size of the agent's fragment.
360  *
361  * This method initializes the compressed block in the compressed write agent. Because the
362  * compressor already put the agent's compressed fragment at the start of the compressed block's
363  * data field, it needn't be copied. So all we need do is initialize the header and set the size of
364  * the agent's fragment.
365  */
366 static void initialize_compressed_block(struct compressed_block *block, u16 size)
367 {
368 	/*
369 	 * Make sure the block layout isn't accidentally changed by changing the length of the
370 	 * block header.
371 	 */
372 	BUILD_BUG_ON(sizeof(struct compressed_block_header) != COMPRESSED_BLOCK_1_0_SIZE);
373 
374 	block->header.version = vdo_pack_version_number(COMPRESSED_BLOCK_1_0);
375 	block->header.sizes[0] = __cpu_to_le16(size);
376 }
377 
378 /**
379  * pack_fragment() - Pack a data_vio's fragment into the compressed block in which it is already
380  *                   known to fit.
381  * @compression: The agent's compression_state to pack in to.
382  * @data_vio: The data_vio to pack.
383  * @offset: The offset into the compressed block at which to pack the fragment.
384  * @slot: The slot number in the compressed block.
385  * @block: The compressed block which will be written out when batch is fully packed.
386  *
387  * Return: The new amount of space used.
388  */
389 static block_size_t __must_check pack_fragment(struct compression_state *compression,
390 					       struct data_vio *data_vio,
391 					       block_size_t offset, slot_number_t slot,
392 					       struct compressed_block *block)
393 {
394 	struct compression_state *to_pack = &data_vio->compression;
395 	char *fragment = to_pack->block->data;
396 
397 	to_pack->next_in_batch = compression->next_in_batch;
398 	compression->next_in_batch = data_vio;
399 	to_pack->slot = slot;
400 	block->header.sizes[slot] = __cpu_to_le16(to_pack->size);
401 	memcpy(&block->data[offset], fragment, to_pack->size);
402 	return (offset + to_pack->size);
403 }
404 
405 /**
406  * compressed_write_end_io() - The bio_end_io for a compressed block write.
407  * @bio: The bio for the compressed write.
408  */
409 static void compressed_write_end_io(struct bio *bio)
410 {
411 	struct data_vio *data_vio = vio_as_data_vio(bio->bi_private);
412 
413 	vdo_count_completed_bios(bio);
414 	set_data_vio_allocated_zone_callback(data_vio, finish_compressed_write);
415 	continue_data_vio_with_error(data_vio, blk_status_to_errno(bio->bi_status));
416 }
417 
418 /**
419  * write_bin() - Write out a bin.
420  * @packer: The packer.
421  * @bin: The bin to write.
422  */
423 static void write_bin(struct packer *packer, struct packer_bin *bin)
424 {
425 	int result;
426 	block_size_t offset;
427 	slot_number_t slot = 1;
428 	struct compression_state *compression;
429 	struct compressed_block *block;
430 	struct data_vio *agent = remove_from_bin(packer, bin);
431 	struct data_vio *client;
432 	struct packer_statistics *stats;
433 
434 	if (agent == NULL)
435 		return;
436 
437 	compression = &agent->compression;
438 	compression->slot = 0;
439 	block = compression->block;
440 	initialize_compressed_block(block, compression->size);
441 	offset = compression->size;
442 
443 	while ((client = remove_from_bin(packer, bin)) != NULL)
444 		offset = pack_fragment(compression, client, offset, slot++, block);
445 
446 	/*
447 	 * If the batch contains only a single vio, then we save nothing by saving the compressed
448 	 * form. Continue processing the single vio in the batch.
449 	 */
450 	if (slot == 1) {
451 		abort_packing(agent);
452 		return;
453 	}
454 
455 	if (slot < VDO_MAX_COMPRESSION_SLOTS) {
456 		/* Clear out the sizes of the unused slots */
457 		memset(&block->header.sizes[slot], 0,
458 		       (VDO_MAX_COMPRESSION_SLOTS - slot) * sizeof(__le16));
459 	}
460 
461 	agent->vio.completion.error_handler = handle_compressed_write_error;
462 	if (vdo_is_read_only(vdo_from_data_vio(agent))) {
463 		continue_data_vio_with_error(agent, VDO_READ_ONLY);
464 		return;
465 	}
466 
467 	result = vio_reset_bio(&agent->vio, (char *) block, compressed_write_end_io,
468 			       REQ_OP_WRITE, agent->allocation.pbn);
469 	if (result != VDO_SUCCESS) {
470 		continue_data_vio_with_error(agent, result);
471 		return;
472 	}
473 
474 	/*
475 	 * Once the compressed write is submitted, the fragments are no longer in the packer, so
476 	 * update stats now.
477 	 */
478 	stats = &packer->statistics;
479 	WRITE_ONCE(stats->compressed_fragments_in_packer,
480 		   (stats->compressed_fragments_in_packer - slot));
481 	WRITE_ONCE(stats->compressed_fragments_written,
482 		   (stats->compressed_fragments_written + slot));
483 	WRITE_ONCE(stats->compressed_blocks_written,
484 		   stats->compressed_blocks_written + 1);
485 
486 	vdo_submit_data_vio(agent);
487 }
488 
489 /**
490  * add_data_vio_to_packer_bin() - Add a data_vio to a bin's incoming queue
491  * @packer: The packer.
492  * @bin: The bin to which to add the data_vio.
493  * @data_vio: The data_vio to add to the bin's queue.
494  *
495  * Adds a data_vio to a bin's incoming queue, handles logical space change, and calls physical
496  * space processor.
497  */
498 static void add_data_vio_to_packer_bin(struct packer *packer, struct packer_bin *bin,
499 				       struct data_vio *data_vio)
500 {
501 	/* If the selected bin doesn't have room, start a new batch to make room. */
502 	if (bin->free_space < data_vio->compression.size)
503 		write_bin(packer, bin);
504 
505 	add_to_bin(bin, data_vio);
506 	bin->free_space -= data_vio->compression.size;
507 
508 	/* If we happen to exactly fill the bin, start a new batch. */
509 	if ((bin->slots_used == VDO_MAX_COMPRESSION_SLOTS) ||
510 	    (bin->free_space == 0))
511 		write_bin(packer, bin);
512 
513 	/* Now that we've finished changing the free space, restore the sort order. */
514 	insert_in_sorted_list(packer, bin);
515 }
516 
517 /**
518  * select_bin() - Select the bin that should be used to pack the compressed data in a data_vio with
519  *                other data_vios.
520  * @packer: The packer.
521  * @data_vio: The data_vio.
522  */
523 static struct packer_bin * __must_check select_bin(struct packer *packer,
524 						   struct data_vio *data_vio)
525 {
526 	/*
527 	 * First best fit: select the bin with the least free space that has enough room for the
528 	 * compressed data in the data_vio.
529 	 */
530 	struct packer_bin *bin, *fullest_bin;
531 
532 	list_for_each_entry(bin, &packer->bins, list) {
533 		if (bin->free_space >= data_vio->compression.size)
534 			return bin;
535 	}
536 
537 	/*
538 	 * None of the bins have enough space for the data_vio. We're not allowed to create new
539 	 * bins, so we have to overflow one of the existing bins. It's pretty intuitive to select
540 	 * the fullest bin, since that "wastes" the least amount of free space in the compressed
541 	 * block. But if the space currently used in the fullest bin is smaller than the compressed
542 	 * size of the incoming block, it seems wrong to force that bin to write when giving up on
543 	 * compressing the incoming data_vio would likewise "waste" the least amount of free space.
544 	 */
545 	fullest_bin = list_first_entry(&packer->bins, struct packer_bin, list);
546 	if (data_vio->compression.size >=
547 	    (VDO_COMPRESSED_BLOCK_DATA_SIZE - fullest_bin->free_space))
548 		return NULL;
549 
550 	/*
551 	 * The fullest bin doesn't have room, but writing it out and starting a new batch with the
552 	 * incoming data_vio will increase the packer's free space.
553 	 */
554 	return fullest_bin;
555 }
556 
557 /**
558  * vdo_attempt_packing() - Attempt to rewrite the data in this data_vio as part of a compressed
559  *                         block.
560  * @data_vio: The data_vio to pack.
561  */
562 void vdo_attempt_packing(struct data_vio *data_vio)
563 {
564 	int result;
565 	struct packer_bin *bin;
566 	struct data_vio_compression_status status = get_data_vio_compression_status(data_vio);
567 	struct packer *packer = get_packer_from_data_vio(data_vio);
568 
569 	assert_on_packer_thread(packer, __func__);
570 
571 	result = VDO_ASSERT((status.stage == DATA_VIO_COMPRESSING),
572 			    "attempt to pack data_vio not ready for packing, stage: %u",
573 			    status.stage);
574 	if (result != VDO_SUCCESS)
575 		return;
576 
577 	/*
578 	 * Increment whether or not this data_vio will be packed or not since abort_packing()
579 	 * always decrements the counter.
580 	 */
581 	WRITE_ONCE(packer->statistics.compressed_fragments_in_packer,
582 		   packer->statistics.compressed_fragments_in_packer + 1);
583 
584 	/*
585 	 * If packing of this data_vio is disallowed for administrative reasons, give up before
586 	 * making any state changes.
587 	 */
588 	if (!vdo_is_state_normal(&packer->state) ||
589 	    (data_vio->flush_generation < packer->flush_generation)) {
590 		abort_packing(data_vio);
591 		return;
592 	}
593 
594 	/*
595 	 * The advance_data_vio_compression_stage() check here verifies that the data_vio is
596 	 * allowed to be compressed (if it has already been canceled, we'll fall out here). Once
597 	 * the data_vio is in the DATA_VIO_PACKING state, it must be guaranteed to be put in a bin
598 	 * before any more requests can be processed by the packer thread. Otherwise, a canceling
599 	 * data_vio could attempt to remove the canceled data_vio from the packer and fail to
600 	 * rendezvous with it. Thus, we must call select_bin() first to ensure that we will
601 	 * actually add the data_vio to a bin before advancing to the DATA_VIO_PACKING stage.
602 	 */
603 	bin = select_bin(packer, data_vio);
604 	if ((bin == NULL) ||
605 	    (advance_data_vio_compression_stage(data_vio).stage != DATA_VIO_PACKING)) {
606 		abort_packing(data_vio);
607 		return;
608 	}
609 
610 	add_data_vio_to_packer_bin(packer, bin, data_vio);
611 }
612 
613 /**
614  * check_for_drain_complete() - Check whether the packer has drained.
615  * @packer: The packer.
616  */
617 static void check_for_drain_complete(struct packer *packer)
618 {
619 	if (vdo_is_state_draining(&packer->state) && (packer->canceled_bin->slots_used == 0))
620 		vdo_finish_draining(&packer->state);
621 }
622 
623 /**
624  * write_all_non_empty_bins() - Write out all non-empty bins on behalf of a flush or suspend.
625  * @packer: The packer being flushed.
626  */
627 static void write_all_non_empty_bins(struct packer *packer)
628 {
629 	struct packer_bin *bin;
630 
631 	list_for_each_entry(bin, &packer->bins, list)
632 		write_bin(packer, bin);
633 		/*
634 		 * We don't need to re-sort the bin here since this loop will make every bin have
635 		 * the same amount of free space, so every ordering is sorted.
636 		 */
637 
638 	check_for_drain_complete(packer);
639 }
640 
641 /**
642  * vdo_flush_packer() - Request that the packer flush asynchronously.
643  * @packer: The packer to flush.
644  *
645  * All bins with at least two compressed data blocks will be written out, and any solitary pending
646  * VIOs will be released from the packer. While flushing is in progress, any VIOs submitted to
647  * vdo_attempt_packing() will be continued immediately without attempting to pack them.
648  */
649 void vdo_flush_packer(struct packer *packer)
650 {
651 	assert_on_packer_thread(packer, __func__);
652 	if (vdo_is_state_normal(&packer->state))
653 		write_all_non_empty_bins(packer);
654 }
655 
656 /**
657  * vdo_remove_lock_holder_from_packer() - Remove a lock holder from the packer.
658  * @completion: The data_vio which needs a lock held by a data_vio in the packer. The data_vio's
659  *              compression.lock_holder field will point to the data_vio to remove.
660  */
661 void vdo_remove_lock_holder_from_packer(struct vdo_completion *completion)
662 {
663 	struct data_vio *data_vio = as_data_vio(completion);
664 	struct packer *packer = get_packer_from_data_vio(data_vio);
665 	struct data_vio *lock_holder;
666 	struct packer_bin *bin;
667 	slot_number_t slot;
668 
669 	assert_data_vio_in_packer_zone(data_vio);
670 
671 	lock_holder = vdo_forget(data_vio->compression.lock_holder);
672 	bin = lock_holder->compression.bin;
673 	VDO_ASSERT_LOG_ONLY((bin != NULL), "data_vio in packer has a bin");
674 
675 	slot = lock_holder->compression.slot;
676 	bin->slots_used--;
677 	if (slot < bin->slots_used) {
678 		bin->incoming[slot] = bin->incoming[bin->slots_used];
679 		bin->incoming[slot]->compression.slot = slot;
680 	}
681 
682 	lock_holder->compression.bin = NULL;
683 	lock_holder->compression.slot = 0;
684 
685 	if (bin != packer->canceled_bin) {
686 		bin->free_space += lock_holder->compression.size;
687 		insert_in_sorted_list(packer, bin);
688 	}
689 
690 	abort_packing(lock_holder);
691 	check_for_drain_complete(packer);
692 }
693 
694 /**
695  * vdo_increment_packer_flush_generation() - Increment the flush generation in the packer.
696  * @packer: The packer.
697  *
698  * This will also cause the packer to flush so that any VIOs from previous generations will exit
699  * the packer.
700  */
701 void vdo_increment_packer_flush_generation(struct packer *packer)
702 {
703 	assert_on_packer_thread(packer, __func__);
704 	packer->flush_generation++;
705 	vdo_flush_packer(packer);
706 }
707 
708 /** Implements vdo_admin_initiator_fn. */
709 static void initiate_drain(struct admin_state *state)
710 {
711 	struct packer *packer = container_of(state, struct packer, state);
712 
713 	write_all_non_empty_bins(packer);
714 }
715 
716 /**
717  * vdo_drain_packer() - Drain the packer by preventing any more VIOs from entering the packer and
718  *                      then flushing.
719  * @packer: The packer to drain.
720  * @completion: The completion to finish when the packer has drained.
721  */
722 void vdo_drain_packer(struct packer *packer, struct vdo_completion *completion)
723 {
724 	assert_on_packer_thread(packer, __func__);
725 	vdo_start_draining(&packer->state, VDO_ADMIN_STATE_SUSPENDING, completion,
726 			   initiate_drain);
727 }
728 
729 /**
730  * vdo_resume_packer() - Resume a packer which has been suspended.
731  * @packer: The packer to resume.
732  * @parent: The completion to finish when the packer has resumed.
733  */
734 void vdo_resume_packer(struct packer *packer, struct vdo_completion *parent)
735 {
736 	assert_on_packer_thread(packer, __func__);
737 	vdo_continue_completion(parent, vdo_resume_if_quiescent(&packer->state));
738 }
739 
740 static void dump_packer_bin(const struct packer_bin *bin, bool canceled)
741 {
742 	if (bin->slots_used == 0)
743 		/* Don't dump empty bins. */
744 		return;
745 
746 	vdo_log_info("	  %sBin slots_used=%u free_space=%zu",
747 		     (canceled ? "Canceled" : ""), bin->slots_used, bin->free_space);
748 
749 	/*
750 	 * FIXME: dump vios in bin->incoming? The vios should have been dumped from the vio pool.
751 	 * Maybe just dump their addresses so it's clear they're here?
752 	 */
753 }
754 
755 /**
756  * vdo_dump_packer() - Dump the packer.
757  * @packer: The packer.
758  *
759  * Context: dumps in a thread-unsafe fashion.
760  */
761 void vdo_dump_packer(const struct packer *packer)
762 {
763 	struct packer_bin *bin;
764 
765 	vdo_log_info("packer");
766 	vdo_log_info("	flushGeneration=%llu state %s  packer_bin_count=%llu",
767 		     (unsigned long long) packer->flush_generation,
768 		     vdo_get_admin_state_code(&packer->state)->name,
769 		     (unsigned long long) packer->size);
770 
771 	list_for_each_entry(bin, &packer->bins, list)
772 		dump_packer_bin(bin, false);
773 
774 	dump_packer_bin(packer->canceled_bin, true);
775 }
776