xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision eea9bcea38e0c5649f444c829f3a4f9c22aa486c)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use super::Error as DeviceError;
12 use super::{
13     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
14     RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
15     EPOLL_HELPER_EVENT_LAST,
16 };
17 use crate::seccomp_filters::Thread;
18 use crate::thread_helper::spawn_virtio_thread;
19 use crate::GuestMemoryMmap;
20 use crate::VirtioInterrupt;
21 use anyhow::anyhow;
22 use block_util::{
23     async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request,
24     RequestType, VirtioBlockConfig,
25 };
26 use rate_limiter::{RateLimiter, TokenType};
27 use seccompiler::SeccompAction;
28 use std::io;
29 use std::num::Wrapping;
30 use std::ops::Deref;
31 use std::os::unix::io::AsRawFd;
32 use std::path::PathBuf;
33 use std::result;
34 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
35 use std::sync::{Arc, Barrier};
36 use std::{collections::HashMap, convert::TryInto};
37 use thiserror::Error;
38 use versionize::{VersionMap, Versionize, VersionizeResult};
39 use versionize_derive::Versionize;
40 use virtio_bindings::bindings::virtio_blk::*;
41 use virtio_queue::{Queue, QueueOwnedT, QueueT};
42 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
43 use vm_migration::VersionMapped;
44 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
45 use vm_virtio::AccessPlatform;
46 use vmm_sys_util::eventfd::EventFd;
47 
48 const SECTOR_SHIFT: u8 = 9;
49 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
50 
51 // New descriptors are pending on the virtio queue.
52 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
53 // New completed tasks are pending on the completion ring.
54 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
55 // New 'wake up' event from the rate limiter
56 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
57 
58 #[derive(Error, Debug)]
59 pub enum Error {
60     #[error("Failed to parse the request: {0}")]
61     RequestParsing(block_util::Error),
62     #[error("Failed to execute the request: {0}")]
63     RequestExecuting(block_util::ExecuteError),
64     #[error("Failed to complete the request: {0}")]
65     RequestCompleting(block_util::Error),
66     #[error("Missing the expected entry in the list of requests")]
67     MissingEntryRequestList,
68     #[error("The asynchronous request returned with failure")]
69     AsyncRequestFailure,
70     #[error("Failed synchronizing the file: {0}")]
71     Fsync(AsyncIoError),
72     #[error("Failed adding used index: {0}")]
73     QueueAddUsed(virtio_queue::Error),
74     #[error("Failed creating an iterator over the queue: {0}")]
75     QueueIterator(virtio_queue::Error),
76     #[error("Failed to update request status: {0}")]
77     RequestStatus(GuestMemoryError),
78 }
79 
80 pub type Result<T> = result::Result<T, Error>;
81 
82 #[derive(Default, Clone)]
83 pub struct BlockCounters {
84     read_bytes: Arc<AtomicU64>,
85     read_ops: Arc<AtomicU64>,
86     write_bytes: Arc<AtomicU64>,
87     write_ops: Arc<AtomicU64>,
88 }
89 
90 struct BlockEpollHandler {
91     queue_index: u16,
92     queue: Queue,
93     mem: GuestMemoryAtomic<GuestMemoryMmap>,
94     disk_image: Box<dyn AsyncIo>,
95     disk_nsectors: u64,
96     interrupt_cb: Arc<dyn VirtioInterrupt>,
97     disk_image_id: Vec<u8>,
98     kill_evt: EventFd,
99     pause_evt: EventFd,
100     writeback: Arc<AtomicBool>,
101     counters: BlockCounters,
102     queue_evt: EventFd,
103     request_list: HashMap<u16, Request>,
104     rate_limiter: Option<RateLimiter>,
105     access_platform: Option<Arc<dyn AccessPlatform>>,
106 }
107 
108 impl BlockEpollHandler {
109     fn process_queue_submit(&mut self) -> Result<bool> {
110         let queue = &mut self.queue;
111 
112         let mut used_descs = false;
113 
114         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
115             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
116                 .map_err(Error::RequestParsing)?;
117 
118             if let Some(rate_limiter) = &mut self.rate_limiter {
119                 // If limiter.consume() fails it means there is no more TokenType::Ops
120                 // budget and rate limiting is in effect.
121                 if !rate_limiter.consume(1, TokenType::Ops) {
122                     // Stop processing the queue and return this descriptor chain to the
123                     // avail ring, for later processing.
124                     queue.go_to_previous_position();
125                     break;
126                 }
127                 // Exercise the rate limiter only if this request is of data transfer type.
128                 if request.request_type == RequestType::In
129                     || request.request_type == RequestType::Out
130                 {
131                     let mut bytes = Wrapping(0);
132                     for (_, data_len) in &request.data_descriptors {
133                         bytes += Wrapping(*data_len as u64);
134                     }
135 
136                     // If limiter.consume() fails it means there is no more TokenType::Bytes
137                     // budget and rate limiting is in effect.
138                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
139                         // Revert the OPS consume().
140                         rate_limiter.manual_replenish(1, TokenType::Ops);
141                         // Stop processing the queue and return this descriptor chain to the
142                         // avail ring, for later processing.
143                         queue.go_to_previous_position();
144                         break;
145                     }
146                 };
147             }
148 
149             request.set_writeback(self.writeback.load(Ordering::Acquire));
150 
151             if request
152                 .execute_async(
153                     desc_chain.memory(),
154                     self.disk_nsectors,
155                     self.disk_image.as_mut(),
156                     &self.disk_image_id,
157                     desc_chain.head_index() as u64,
158                 )
159                 .map_err(Error::RequestExecuting)?
160             {
161                 self.request_list.insert(desc_chain.head_index(), request);
162             } else {
163                 desc_chain
164                     .memory()
165                     .write_obj(VIRTIO_BLK_S_OK, request.status_addr)
166                     .map_err(Error::RequestStatus)?;
167 
168                 // If no asynchronous operation has been submitted, we can
169                 // simply return the used descriptor.
170                 queue
171                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
172                     .map_err(Error::QueueAddUsed)?;
173                 used_descs = true;
174             }
175         }
176 
177         Ok(used_descs)
178     }
179 
180     fn process_queue_complete(&mut self) -> Result<bool> {
181         let queue = &mut self.queue;
182 
183         let mut used_descs = false;
184         let mem = self.mem.memory();
185         let mut read_bytes = Wrapping(0);
186         let mut write_bytes = Wrapping(0);
187         let mut read_ops = Wrapping(0);
188         let mut write_ops = Wrapping(0);
189 
190         let completion_list = self.disk_image.complete();
191         for (user_data, result) in completion_list {
192             let desc_index = user_data as u16;
193             let mut request = self
194                 .request_list
195                 .remove(&desc_index)
196                 .ok_or(Error::MissingEntryRequestList)?;
197             request.complete_async().map_err(Error::RequestCompleting)?;
198 
199             let (status, len) = if result >= 0 {
200                 match request.request_type {
201                     RequestType::In => {
202                         for (_, data_len) in &request.data_descriptors {
203                             read_bytes += Wrapping(*data_len as u64);
204                         }
205                         read_ops += Wrapping(1);
206                     }
207                     RequestType::Out => {
208                         if !request.writeback {
209                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
210                         }
211                         for (_, data_len) in &request.data_descriptors {
212                             write_bytes += Wrapping(*data_len as u64);
213                         }
214                         write_ops += Wrapping(1);
215                     }
216                     _ => {}
217                 }
218 
219                 (VIRTIO_BLK_S_OK, result as u32)
220             } else {
221                 error!(
222                     "Request failed: {:?}",
223                     io::Error::from_raw_os_error(-result)
224                 );
225                 return Err(Error::AsyncRequestFailure);
226             };
227 
228             mem.write_obj(status, request.status_addr)
229                 .map_err(Error::RequestStatus)?;
230 
231             queue
232                 .add_used(mem.deref(), desc_index as u16, len)
233                 .map_err(Error::QueueAddUsed)?;
234             used_descs = true;
235         }
236 
237         self.counters
238             .write_bytes
239             .fetch_add(write_bytes.0, Ordering::AcqRel);
240         self.counters
241             .write_ops
242             .fetch_add(write_ops.0, Ordering::AcqRel);
243 
244         self.counters
245             .read_bytes
246             .fetch_add(read_bytes.0, Ordering::AcqRel);
247         self.counters
248             .read_ops
249             .fetch_add(read_ops.0, Ordering::AcqRel);
250 
251         Ok(used_descs)
252     }
253 
254     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
255         self.interrupt_cb
256             .trigger(VirtioInterruptType::Queue(self.queue_index))
257             .map_err(|e| {
258                 error!("Failed to signal used queue: {:?}", e);
259                 DeviceError::FailedSignalingUsedQueue(e)
260             })
261     }
262 
263     fn run(
264         &mut self,
265         paused: Arc<AtomicBool>,
266         paused_sync: Arc<Barrier>,
267     ) -> result::Result<(), EpollHelperError> {
268         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
269         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
270         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
271         if let Some(rate_limiter) = &self.rate_limiter {
272             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
273         }
274         helper.run(paused, paused_sync, self)?;
275 
276         Ok(())
277     }
278 }
279 
280 impl EpollHelperHandler for BlockEpollHandler {
281     fn handle_event(
282         &mut self,
283         _helper: &mut EpollHelper,
284         event: &epoll::Event,
285     ) -> result::Result<(), EpollHelperError> {
286         let ev_type = event.data as u16;
287         match ev_type {
288             QUEUE_AVAIL_EVENT => {
289                 self.queue_evt.read().map_err(|e| {
290                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
291                 })?;
292 
293                 let rate_limit_reached =
294                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
295 
296                 // Process the queue only when the rate limit is not reached
297                 if !rate_limit_reached {
298                     let needs_notification = self.process_queue_submit().map_err(|e| {
299                         EpollHelperError::HandleEvent(anyhow!(
300                             "Failed to process queue (submit): {:?}",
301                             e
302                         ))
303                     })?;
304 
305                     if needs_notification {
306                         self.signal_used_queue().map_err(|e| {
307                             EpollHelperError::HandleEvent(anyhow!(
308                                 "Failed to signal used queue: {:?}",
309                                 e
310                             ))
311                         })?
312                     };
313                 }
314             }
315             COMPLETION_EVENT => {
316                 self.disk_image.notifier().read().map_err(|e| {
317                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
318                 })?;
319 
320                 let needs_notification = self.process_queue_complete().map_err(|e| {
321                     EpollHelperError::HandleEvent(anyhow!(
322                         "Failed to process queue (complete): {:?}",
323                         e
324                     ))
325                 })?;
326 
327                 if needs_notification {
328                     self.signal_used_queue().map_err(|e| {
329                         EpollHelperError::HandleEvent(anyhow!(
330                             "Failed to signal used queue: {:?}",
331                             e
332                         ))
333                     })?;
334                 }
335             }
336             RATE_LIMITER_EVENT => {
337                 if let Some(rate_limiter) = &mut self.rate_limiter {
338                     // Upon rate limiter event, call the rate limiter handler
339                     // and restart processing the queue.
340                     rate_limiter.event_handler().map_err(|e| {
341                         EpollHelperError::HandleEvent(anyhow!(
342                             "Failed to process rate limiter event: {:?}",
343                             e
344                         ))
345                     })?;
346 
347                     let needs_notification = self.process_queue_submit().map_err(|e| {
348                         EpollHelperError::HandleEvent(anyhow!(
349                             "Failed to process queue (submit): {:?}",
350                             e
351                         ))
352                     })?;
353 
354                     if needs_notification {
355                         self.signal_used_queue().map_err(|e| {
356                             EpollHelperError::HandleEvent(anyhow!(
357                                 "Failed to signal used queue: {:?}",
358                                 e
359                             ))
360                         })?
361                     };
362                 } else {
363                     return Err(EpollHelperError::HandleEvent(anyhow!(
364                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
365                     )));
366                 }
367             }
368             _ => {
369                 return Err(EpollHelperError::HandleEvent(anyhow!(
370                     "Unexpected event: {}",
371                     ev_type
372                 )));
373             }
374         }
375         Ok(())
376     }
377 }
378 
379 /// Virtio device for exposing block level read/write operations on a host file.
380 pub struct Block {
381     common: VirtioCommon,
382     id: String,
383     disk_image: Box<dyn DiskFile>,
384     disk_path: PathBuf,
385     disk_nsectors: u64,
386     config: VirtioBlockConfig,
387     writeback: Arc<AtomicBool>,
388     counters: BlockCounters,
389     seccomp_action: SeccompAction,
390     rate_limiter_config: Option<RateLimiterConfig>,
391     exit_evt: EventFd,
392 }
393 
394 #[derive(Versionize)]
395 pub struct BlockState {
396     pub disk_path: String,
397     pub disk_nsectors: u64,
398     pub avail_features: u64,
399     pub acked_features: u64,
400     pub config: VirtioBlockConfig,
401 }
402 
403 impl VersionMapped for BlockState {}
404 
405 impl Block {
406     /// Create a new virtio block device that operates on the given file.
407     #[allow(clippy::too_many_arguments)]
408     pub fn new(
409         id: String,
410         mut disk_image: Box<dyn DiskFile>,
411         disk_path: PathBuf,
412         is_disk_read_only: bool,
413         iommu: bool,
414         num_queues: usize,
415         queue_size: u16,
416         seccomp_action: SeccompAction,
417         rate_limiter_config: Option<RateLimiterConfig>,
418         exit_evt: EventFd,
419     ) -> io::Result<Self> {
420         let disk_size = disk_image.size().map_err(|e| {
421             io::Error::new(
422                 io::ErrorKind::Other,
423                 format!("Failed getting disk size: {}", e),
424             )
425         })?;
426         if disk_size % SECTOR_SIZE != 0 {
427             warn!(
428                 "Disk size {} is not a multiple of sector size {}; \
429                  the remainder will not be visible to the guest.",
430                 disk_size, SECTOR_SIZE
431             );
432         }
433 
434         let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
435             | (1u64 << VIRTIO_BLK_F_FLUSH)
436             | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
437             | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
438             | (1u64 << VIRTIO_BLK_F_TOPOLOGY);
439 
440         if iommu {
441             avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
442         }
443 
444         if is_disk_read_only {
445             avail_features |= 1u64 << VIRTIO_BLK_F_RO;
446         }
447 
448         let topology = disk_image.topology();
449         info!("Disk topology: {:?}", topology);
450 
451         let logical_block_size = if topology.logical_block_size > 512 {
452             topology.logical_block_size
453         } else {
454             512
455         };
456 
457         // Calculate the exponent that maps physical block to logical block
458         let mut physical_block_exp = 0;
459         let mut size = logical_block_size;
460         while size < topology.physical_block_size {
461             physical_block_exp += 1;
462             size <<= 1;
463         }
464 
465         let disk_nsectors = disk_size / SECTOR_SIZE;
466         let mut config = VirtioBlockConfig {
467             capacity: disk_nsectors,
468             writeback: 1,
469             blk_size: topology.logical_block_size as u32,
470             physical_block_exp,
471             min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
472             opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
473             ..Default::default()
474         };
475 
476         if num_queues > 1 {
477             avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
478             config.num_queues = num_queues as u16;
479         }
480 
481         Ok(Block {
482             common: VirtioCommon {
483                 device_type: VirtioDeviceType::Block as u32,
484                 avail_features,
485                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
486                 queue_sizes: vec![queue_size; num_queues],
487                 min_queues: 1,
488                 ..Default::default()
489             },
490             id,
491             disk_image,
492             disk_path,
493             disk_nsectors,
494             config,
495             writeback: Arc::new(AtomicBool::new(true)),
496             counters: BlockCounters::default(),
497             seccomp_action,
498             rate_limiter_config,
499             exit_evt,
500         })
501     }
502 
503     fn state(&self) -> BlockState {
504         BlockState {
505             disk_path: self.disk_path.to_str().unwrap().to_owned(),
506             disk_nsectors: self.disk_nsectors,
507             avail_features: self.common.avail_features,
508             acked_features: self.common.acked_features,
509             config: self.config,
510         }
511     }
512 
513     fn set_state(&mut self, state: &BlockState) {
514         self.disk_path = state.disk_path.clone().into();
515         self.disk_nsectors = state.disk_nsectors;
516         self.common.avail_features = state.avail_features;
517         self.common.acked_features = state.acked_features;
518         self.config = state.config;
519     }
520 
521     fn update_writeback(&mut self) {
522         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
523         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
524             self.config.writeback == 1
525         } else {
526             // Else check if VIRTIO_BLK_F_FLUSH negotiated
527             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
528         };
529 
530         info!(
531             "Changing cache mode to {}",
532             if writeback {
533                 "writeback"
534             } else {
535                 "writethrough"
536             }
537         );
538         self.writeback.store(writeback, Ordering::Release);
539     }
540 
541     #[cfg(fuzzing)]
542     pub fn wait_for_epoll_threads(&mut self) {
543         self.common.wait_for_epoll_threads();
544     }
545 }
546 
547 impl Drop for Block {
548     fn drop(&mut self) {
549         if let Some(kill_evt) = self.common.kill_evt.take() {
550             // Ignore the result because there is nothing we can do about it.
551             let _ = kill_evt.write(1);
552         }
553     }
554 }
555 
556 impl VirtioDevice for Block {
557     fn device_type(&self) -> u32 {
558         self.common.device_type
559     }
560 
561     fn queue_max_sizes(&self) -> &[u16] {
562         &self.common.queue_sizes
563     }
564 
565     fn features(&self) -> u64 {
566         self.common.avail_features
567     }
568 
569     fn ack_features(&mut self, value: u64) {
570         self.common.ack_features(value)
571     }
572 
573     fn read_config(&self, offset: u64, data: &mut [u8]) {
574         self.read_config_from_slice(self.config.as_slice(), offset, data);
575     }
576 
577     fn write_config(&mut self, offset: u64, data: &[u8]) {
578         // The "writeback" field is the only mutable field
579         let writeback_offset =
580             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
581         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
582         {
583             error!(
584                 "Attempt to write to read-only field: offset {:x} length {}",
585                 offset,
586                 data.len()
587             );
588             return;
589         }
590 
591         self.config.writeback = data[0];
592         self.update_writeback();
593     }
594 
595     fn activate(
596         &mut self,
597         mem: GuestMemoryAtomic<GuestMemoryMmap>,
598         interrupt_cb: Arc<dyn VirtioInterrupt>,
599         mut queues: Vec<(usize, Queue, EventFd)>,
600     ) -> ActivateResult {
601         self.common.activate(&queues, &interrupt_cb)?;
602 
603         let disk_image_id = build_disk_image_id(&self.disk_path);
604         self.update_writeback();
605 
606         let mut epoll_threads = Vec::new();
607         for i in 0..queues.len() {
608             let (_, queue, queue_evt) = queues.remove(0);
609             let queue_size = queue.size();
610             let (kill_evt, pause_evt) = self.common.dup_eventfds();
611 
612             let rate_limiter: Option<RateLimiter> = self
613                 .rate_limiter_config
614                 .map(RateLimiterConfig::try_into)
615                 .transpose()
616                 .map_err(ActivateError::CreateRateLimiter)?;
617 
618             let mut handler = BlockEpollHandler {
619                 queue_index: i as u16,
620                 queue,
621                 mem: mem.clone(),
622                 disk_image: self
623                     .disk_image
624                     .new_async_io(queue_size as u32)
625                     .map_err(|e| {
626                         error!("failed to create new AsyncIo: {}", e);
627                         ActivateError::BadActivate
628                     })?,
629                 disk_nsectors: self.disk_nsectors,
630                 interrupt_cb: interrupt_cb.clone(),
631                 disk_image_id: disk_image_id.clone(),
632                 kill_evt,
633                 pause_evt,
634                 writeback: self.writeback.clone(),
635                 counters: self.counters.clone(),
636                 queue_evt,
637                 request_list: HashMap::with_capacity(queue_size.into()),
638                 rate_limiter,
639                 access_platform: self.common.access_platform.clone(),
640             };
641 
642             let paused = self.common.paused.clone();
643             let paused_sync = self.common.paused_sync.clone();
644 
645             spawn_virtio_thread(
646                 &format!("{}_q{}", self.id.clone(), i),
647                 &self.seccomp_action,
648                 Thread::VirtioBlock,
649                 &mut epoll_threads,
650                 &self.exit_evt,
651                 move || handler.run(paused, paused_sync.unwrap()),
652             )?;
653         }
654 
655         self.common.epoll_threads = Some(epoll_threads);
656         event!("virtio-device", "activated", "id", &self.id);
657 
658         Ok(())
659     }
660 
661     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
662         let result = self.common.reset();
663         event!("virtio-device", "reset", "id", &self.id);
664         result
665     }
666 
667     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
668         let mut counters = HashMap::new();
669 
670         counters.insert(
671             "read_bytes",
672             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
673         );
674         counters.insert(
675             "write_bytes",
676             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
677         );
678         counters.insert(
679             "read_ops",
680             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
681         );
682         counters.insert(
683             "write_ops",
684             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
685         );
686 
687         Some(counters)
688     }
689 
690     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
691         self.common.set_access_platform(access_platform)
692     }
693 }
694 
695 impl Pausable for Block {
696     fn pause(&mut self) -> result::Result<(), MigratableError> {
697         self.common.pause()
698     }
699 
700     fn resume(&mut self) -> result::Result<(), MigratableError> {
701         self.common.resume()
702     }
703 }
704 
705 impl Snapshottable for Block {
706     fn id(&self) -> String {
707         self.id.clone()
708     }
709 
710     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
711         Snapshot::new_from_versioned_state(&self.id(), &self.state())
712     }
713 
714     fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> {
715         self.set_state(&snapshot.to_versioned_state(&self.id)?);
716         Ok(())
717     }
718 }
719 impl Transportable for Block {}
720 impl Migratable for Block {}
721