xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision fa7a000dbe9637eb256af18ae8c3c4a8d5bf9c8f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use super::Error as DeviceError;
12 use super::{
13     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon,
14     VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
15 };
16 use crate::seccomp_filters::Thread;
17 use crate::thread_helper::spawn_virtio_thread;
18 use crate::GuestMemoryMmap;
19 use crate::VirtioInterrupt;
20 use anyhow::anyhow;
21 use block::{
22     async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request,
23     RequestType, VirtioBlockConfig,
24 };
25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
26 use rate_limiter::TokenType;
27 use seccompiler::SeccompAction;
28 use std::collections::BTreeMap;
29 use std::collections::HashMap;
30 use std::collections::VecDeque;
31 use std::io;
32 use std::num::Wrapping;
33 use std::ops::Deref;
34 use std::os::unix::io::AsRawFd;
35 use std::path::PathBuf;
36 use std::result;
37 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
38 use std::sync::{Arc, Barrier};
39 use thiserror::Error;
40 use versionize::{VersionMap, Versionize, VersionizeResult};
41 use versionize_derive::Versionize;
42 use virtio_bindings::virtio_blk::*;
43 use virtio_bindings::virtio_config::*;
44 use virtio_queue::{Queue, QueueOwnedT, QueueT};
45 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
46 use vm_migration::VersionMapped;
47 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
48 use vm_virtio::AccessPlatform;
49 use vmm_sys_util::eventfd::EventFd;
50 
51 const SECTOR_SHIFT: u8 = 9;
52 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
53 
54 // New descriptors are pending on the virtio queue.
55 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
56 // New completed tasks are pending on the completion ring.
57 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
58 // New 'wake up' event from the rate limiter
59 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
60 
61 // latency scale, for reduce precision loss in calculate.
62 const LATENCY_SCALE: u64 = 10000;
63 
64 #[derive(Error, Debug)]
65 pub enum Error {
66     #[error("Failed to parse the request: {0}")]
67     RequestParsing(block::Error),
68     #[error("Failed to execute the request: {0}")]
69     RequestExecuting(block::ExecuteError),
70     #[error("Failed to complete the request: {0}")]
71     RequestCompleting(block::Error),
72     #[error("Missing the expected entry in the list of requests")]
73     MissingEntryRequestList,
74     #[error("The asynchronous request returned with failure")]
75     AsyncRequestFailure,
76     #[error("Failed synchronizing the file: {0}")]
77     Fsync(AsyncIoError),
78     #[error("Failed adding used index: {0}")]
79     QueueAddUsed(virtio_queue::Error),
80     #[error("Failed creating an iterator over the queue: {0}")]
81     QueueIterator(virtio_queue::Error),
82     #[error("Failed to update request status: {0}")]
83     RequestStatus(GuestMemoryError),
84 }
85 
86 pub type Result<T> = result::Result<T, Error>;
87 
88 // latency will be records as microseconds, average latency
89 // will be save as scaled value.
90 #[derive(Clone)]
91 pub struct BlockCounters {
92     read_bytes: Arc<AtomicU64>,
93     read_ops: Arc<AtomicU64>,
94     read_latency_min: Arc<AtomicU64>,
95     read_latency_max: Arc<AtomicU64>,
96     read_latency_avg: Arc<AtomicU64>,
97     write_bytes: Arc<AtomicU64>,
98     write_ops: Arc<AtomicU64>,
99     write_latency_min: Arc<AtomicU64>,
100     write_latency_max: Arc<AtomicU64>,
101     write_latency_avg: Arc<AtomicU64>,
102 }
103 
104 impl Default for BlockCounters {
105     fn default() -> Self {
106         BlockCounters {
107             read_bytes: Arc::new(AtomicU64::new(0)),
108             read_ops: Arc::new(AtomicU64::new(0)),
109             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
110             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
111             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
112             write_bytes: Arc::new(AtomicU64::new(0)),
113             write_ops: Arc::new(AtomicU64::new(0)),
114             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
115             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
116             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
117         }
118     }
119 }
120 
121 struct BlockEpollHandler {
122     queue_index: u16,
123     queue: Queue,
124     mem: GuestMemoryAtomic<GuestMemoryMmap>,
125     disk_image: Box<dyn AsyncIo>,
126     disk_nsectors: u64,
127     interrupt_cb: Arc<dyn VirtioInterrupt>,
128     serial: Vec<u8>,
129     kill_evt: EventFd,
130     pause_evt: EventFd,
131     writeback: Arc<AtomicBool>,
132     counters: BlockCounters,
133     queue_evt: EventFd,
134     inflight_requests: VecDeque<(u16, Request)>,
135     rate_limiter: Option<RateLimiterGroupHandle>,
136     access_platform: Option<Arc<dyn AccessPlatform>>,
137     read_only: bool,
138     host_cpus: Option<Vec<usize>>,
139 }
140 
141 impl BlockEpollHandler {
142     fn process_queue_submit(&mut self) -> Result<bool> {
143         let queue = &mut self.queue;
144 
145         let mut used_descs = false;
146 
147         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
148             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
149                 .map_err(Error::RequestParsing)?;
150 
151             // For virtio spec compliance
152             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
153             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
154             if self.read_only
155                 && (request.request_type == RequestType::Out
156                     || request.request_type == RequestType::Flush)
157             {
158                 desc_chain
159                     .memory()
160                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
161                     .map_err(Error::RequestStatus)?;
162 
163                 // If no asynchronous operation has been submitted, we can
164                 // simply return the used descriptor.
165                 queue
166                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
167                     .map_err(Error::QueueAddUsed)?;
168                 used_descs = true;
169                 continue;
170             }
171 
172             if let Some(rate_limiter) = &mut self.rate_limiter {
173                 // If limiter.consume() fails it means there is no more TokenType::Ops
174                 // budget and rate limiting is in effect.
175                 if !rate_limiter.consume(1, TokenType::Ops) {
176                     // Stop processing the queue and return this descriptor chain to the
177                     // avail ring, for later processing.
178                     queue.go_to_previous_position();
179                     break;
180                 }
181                 // Exercise the rate limiter only if this request is of data transfer type.
182                 if request.request_type == RequestType::In
183                     || request.request_type == RequestType::Out
184                 {
185                     let mut bytes = Wrapping(0);
186                     for (_, data_len) in &request.data_descriptors {
187                         bytes += Wrapping(*data_len as u64);
188                     }
189 
190                     // If limiter.consume() fails it means there is no more TokenType::Bytes
191                     // budget and rate limiting is in effect.
192                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
193                         // Revert the OPS consume().
194                         rate_limiter.manual_replenish(1, TokenType::Ops);
195                         // Stop processing the queue and return this descriptor chain to the
196                         // avail ring, for later processing.
197                         queue.go_to_previous_position();
198                         break;
199                     }
200                 };
201             }
202 
203             request.set_writeback(self.writeback.load(Ordering::Acquire));
204 
205             if request
206                 .execute_async(
207                     desc_chain.memory(),
208                     self.disk_nsectors,
209                     self.disk_image.as_mut(),
210                     &self.serial,
211                     desc_chain.head_index() as u64,
212                 )
213                 .map_err(Error::RequestExecuting)?
214             {
215                 self.inflight_requests
216                     .push_back((desc_chain.head_index(), request));
217             } else {
218                 desc_chain
219                     .memory()
220                     .write_obj(VIRTIO_BLK_S_OK, request.status_addr)
221                     .map_err(Error::RequestStatus)?;
222 
223                 // If no asynchronous operation has been submitted, we can
224                 // simply return the used descriptor.
225                 queue
226                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
227                     .map_err(Error::QueueAddUsed)?;
228                 used_descs = true;
229             }
230         }
231 
232         Ok(used_descs)
233     }
234 
235     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
236         let needs_notification = self.process_queue_submit().map_err(|e| {
237             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
238         })?;
239 
240         if needs_notification {
241             self.signal_used_queue().map_err(|e| {
242                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
243             })?
244         };
245 
246         Ok(())
247     }
248 
249     #[inline]
250     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
251         // This loop neatly handles the fast path where the completions are
252         // in order (it turng into just a pop_front()) and the 1% of the time
253         // (analysis during boot) where slight out of ordering has been
254         // observed e.g.
255         // Submissions: 1 2 3 4 5 6 7
256         // Completions: 2 1 3 5 4 7 6
257         // In this case find the corresponding item and swap it with the front
258         // This is a O(1) operation and is prepared for the future as it it likely
259         // the next completion would be for the one that was skipped which will
260         // now be the new front.
261         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
262             if head == &completed_head {
263                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
264             }
265         }
266 
267         Err(Error::MissingEntryRequestList)
268     }
269 
270     fn process_queue_complete(&mut self) -> Result<bool> {
271         let mut used_descs = false;
272         let mem = self.mem.memory();
273         let mut read_bytes = Wrapping(0);
274         let mut write_bytes = Wrapping(0);
275         let mut read_ops = Wrapping(0);
276         let mut write_ops = Wrapping(0);
277 
278         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
279             let desc_index = user_data as u16;
280 
281             let mut request = self.find_inflight_request(desc_index)?;
282 
283             request.complete_async().map_err(Error::RequestCompleting)?;
284 
285             let latency = request.start.elapsed().as_micros() as u64;
286             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
287             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
288             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
289             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
290             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
291             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
292             let (status, len) = if result >= 0 {
293                 match request.request_type {
294                     RequestType::In => {
295                         for (_, data_len) in &request.data_descriptors {
296                             read_bytes += Wrapping(*data_len as u64);
297                         }
298                         read_ops += Wrapping(1);
299                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
300                             self.counters
301                                 .read_latency_min
302                                 .store(latency, Ordering::Relaxed);
303                         }
304                         if latency > read_max || read_max == u64::MAX {
305                             self.counters
306                                 .read_latency_max
307                                 .store(latency, Ordering::Relaxed);
308                         }
309 
310                         // Special case the first real latency report
311                         read_avg = if read_avg == u64::MAX {
312                             latency * LATENCY_SCALE
313                         } else {
314                             // Cumulative average is guaranteed to be
315                             // positive if being calculated properly
316                             (read_avg as i64
317                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
318                                     / (read_ops_last + read_ops.0) as i64)
319                                 .try_into()
320                                 .unwrap()
321                         };
322                     }
323                     RequestType::Out => {
324                         if !request.writeback {
325                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
326                         }
327                         for (_, data_len) in &request.data_descriptors {
328                             write_bytes += Wrapping(*data_len as u64);
329                         }
330                         write_ops += Wrapping(1);
331                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
332                             self.counters
333                                 .write_latency_min
334                                 .store(latency, Ordering::Relaxed);
335                         }
336                         if latency > write_max || write_max == u64::MAX {
337                             self.counters
338                                 .write_latency_max
339                                 .store(latency, Ordering::Relaxed);
340                         }
341 
342                         // Special case the first real latency report
343                         write_avg = if write_avg == u64::MAX {
344                             latency * LATENCY_SCALE
345                         } else {
346                             // Cumulative average is guaranteed to be
347                             // positive if being calculated properly
348                             (write_avg as i64
349                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
350                                     / (write_ops_last + write_ops.0) as i64)
351                                 .try_into()
352                                 .unwrap()
353                         }
354                     }
355                     _ => {}
356                 }
357 
358                 self.counters
359                     .read_latency_avg
360                     .store(read_avg, Ordering::Relaxed);
361 
362                 self.counters
363                     .write_latency_avg
364                     .store(write_avg, Ordering::Relaxed);
365 
366                 (VIRTIO_BLK_S_OK, result as u32)
367             } else {
368                 error!(
369                     "Request failed: {:x?} {:?}",
370                     request,
371                     io::Error::from_raw_os_error(-result)
372                 );
373                 return Err(Error::AsyncRequestFailure);
374             };
375 
376             mem.write_obj(status, request.status_addr)
377                 .map_err(Error::RequestStatus)?;
378 
379             let queue = &mut self.queue;
380 
381             queue
382                 .add_used(mem.deref(), desc_index, len)
383                 .map_err(Error::QueueAddUsed)?;
384             used_descs = true;
385         }
386 
387         self.counters
388             .write_bytes
389             .fetch_add(write_bytes.0, Ordering::AcqRel);
390         self.counters
391             .write_ops
392             .fetch_add(write_ops.0, Ordering::AcqRel);
393 
394         self.counters
395             .read_bytes
396             .fetch_add(read_bytes.0, Ordering::AcqRel);
397         self.counters
398             .read_ops
399             .fetch_add(read_ops.0, Ordering::AcqRel);
400 
401         Ok(used_descs)
402     }
403 
404     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
405         self.interrupt_cb
406             .trigger(VirtioInterruptType::Queue(self.queue_index))
407             .map_err(|e| {
408                 error!("Failed to signal used queue: {:?}", e);
409                 DeviceError::FailedSignalingUsedQueue(e)
410             })
411     }
412 
413     fn set_queue_thread_affinity(&self) {
414         // Prepare the CPU set the current queue thread is expected to run onto.
415         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
416             // SAFETY: all zeros is a valid pattern
417             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
418             // SAFETY: FFI call, trivially safe
419             unsafe { libc::CPU_ZERO(&mut cpuset) };
420             for host_cpu in host_cpus {
421                 // SAFETY: FFI call, trivially safe
422                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
423             }
424             cpuset
425         });
426 
427         // Schedule the thread to run on the expected CPU set
428         if let Some(cpuset) = cpuset.as_ref() {
429             // SAFETY: FFI call with correct arguments
430             let ret = unsafe {
431                 libc::sched_setaffinity(
432                     0,
433                     std::mem::size_of::<libc::cpu_set_t>(),
434                     cpuset as *const libc::cpu_set_t,
435                 )
436             };
437 
438             if ret != 0 {
439                 error!(
440                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
441                     self.queue_index,
442                     io::Error::last_os_error()
443                 )
444             }
445         }
446     }
447 
448     fn run(
449         &mut self,
450         paused: Arc<AtomicBool>,
451         paused_sync: Arc<Barrier>,
452     ) -> result::Result<(), EpollHelperError> {
453         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
454         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
455         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
456         if let Some(rate_limiter) = &self.rate_limiter {
457             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
458         }
459         self.set_queue_thread_affinity();
460         helper.run(paused, paused_sync, self)?;
461 
462         Ok(())
463     }
464 }
465 
466 impl EpollHelperHandler for BlockEpollHandler {
467     fn handle_event(
468         &mut self,
469         _helper: &mut EpollHelper,
470         event: &epoll::Event,
471     ) -> result::Result<(), EpollHelperError> {
472         let ev_type = event.data as u16;
473         match ev_type {
474             QUEUE_AVAIL_EVENT => {
475                 self.queue_evt.read().map_err(|e| {
476                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
477                 })?;
478 
479                 let rate_limit_reached =
480                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
481 
482                 // Process the queue only when the rate limit is not reached
483                 if !rate_limit_reached {
484                     self.process_queue_submit_and_signal()?
485                 }
486             }
487             COMPLETION_EVENT => {
488                 self.disk_image.notifier().read().map_err(|e| {
489                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
490                 })?;
491 
492                 let needs_notification = self.process_queue_complete().map_err(|e| {
493                     EpollHelperError::HandleEvent(anyhow!(
494                         "Failed to process queue (complete): {:?}",
495                         e
496                     ))
497                 })?;
498 
499                 if needs_notification {
500                     self.signal_used_queue().map_err(|e| {
501                         EpollHelperError::HandleEvent(anyhow!(
502                             "Failed to signal used queue: {:?}",
503                             e
504                         ))
505                     })?;
506                 }
507             }
508             RATE_LIMITER_EVENT => {
509                 if let Some(rate_limiter) = &mut self.rate_limiter {
510                     // Upon rate limiter event, call the rate limiter handler
511                     // and restart processing the queue.
512                     rate_limiter.event_handler().map_err(|e| {
513                         EpollHelperError::HandleEvent(anyhow!(
514                             "Failed to process rate limiter event: {:?}",
515                             e
516                         ))
517                     })?;
518 
519                     self.process_queue_submit_and_signal()?
520                 } else {
521                     return Err(EpollHelperError::HandleEvent(anyhow!(
522                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
523                     )));
524                 }
525             }
526             _ => {
527                 return Err(EpollHelperError::HandleEvent(anyhow!(
528                     "Unexpected event: {}",
529                     ev_type
530                 )));
531             }
532         }
533         Ok(())
534     }
535 }
536 
537 /// Virtio device for exposing block level read/write operations on a host file.
538 pub struct Block {
539     common: VirtioCommon,
540     id: String,
541     disk_image: Box<dyn DiskFile>,
542     disk_path: PathBuf,
543     disk_nsectors: u64,
544     config: VirtioBlockConfig,
545     writeback: Arc<AtomicBool>,
546     counters: BlockCounters,
547     seccomp_action: SeccompAction,
548     rate_limiter: Option<Arc<RateLimiterGroup>>,
549     exit_evt: EventFd,
550     read_only: bool,
551     serial: Vec<u8>,
552     queue_affinity: BTreeMap<u16, Vec<usize>>,
553 }
554 
555 #[derive(Versionize)]
556 pub struct BlockState {
557     pub disk_path: String,
558     pub disk_nsectors: u64,
559     pub avail_features: u64,
560     pub acked_features: u64,
561     pub config: VirtioBlockConfig,
562 }
563 
564 impl VersionMapped for BlockState {}
565 
566 impl Block {
567     /// Create a new virtio block device that operates on the given file.
568     #[allow(clippy::too_many_arguments)]
569     pub fn new(
570         id: String,
571         mut disk_image: Box<dyn DiskFile>,
572         disk_path: PathBuf,
573         read_only: bool,
574         iommu: bool,
575         num_queues: usize,
576         queue_size: u16,
577         serial: Option<String>,
578         seccomp_action: SeccompAction,
579         rate_limiter: Option<Arc<RateLimiterGroup>>,
580         exit_evt: EventFd,
581         state: Option<BlockState>,
582         queue_affinity: BTreeMap<u16, Vec<usize>>,
583     ) -> io::Result<Self> {
584         let (disk_nsectors, avail_features, acked_features, config, paused) =
585             if let Some(state) = state {
586                 info!("Restoring virtio-block {}", id);
587                 (
588                     state.disk_nsectors,
589                     state.avail_features,
590                     state.acked_features,
591                     state.config,
592                     true,
593                 )
594             } else {
595                 let disk_size = disk_image.size().map_err(|e| {
596                     io::Error::new(
597                         io::ErrorKind::Other,
598                         format!("Failed getting disk size: {e}"),
599                     )
600                 })?;
601                 if disk_size % SECTOR_SIZE != 0 {
602                     warn!(
603                         "Disk size {} is not a multiple of sector size {}; \
604                  the remainder will not be visible to the guest.",
605                         disk_size, SECTOR_SIZE
606                     );
607                 }
608 
609                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
610                     | (1u64 << VIRTIO_BLK_F_FLUSH)
611                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
612                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
613                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY);
614 
615                 if iommu {
616                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
617                 }
618 
619                 if read_only {
620                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
621                 }
622 
623                 let topology = disk_image.topology();
624                 info!("Disk topology: {:?}", topology);
625 
626                 let logical_block_size = if topology.logical_block_size > 512 {
627                     topology.logical_block_size
628                 } else {
629                     512
630                 };
631 
632                 // Calculate the exponent that maps physical block to logical block
633                 let mut physical_block_exp = 0;
634                 let mut size = logical_block_size;
635                 while size < topology.physical_block_size {
636                     physical_block_exp += 1;
637                     size <<= 1;
638                 }
639 
640                 let disk_nsectors = disk_size / SECTOR_SIZE;
641                 let mut config = VirtioBlockConfig {
642                     capacity: disk_nsectors,
643                     writeback: 1,
644                     blk_size: topology.logical_block_size as u32,
645                     physical_block_exp,
646                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
647                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
648                     ..Default::default()
649                 };
650 
651                 if num_queues > 1 {
652                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
653                     config.num_queues = num_queues as u16;
654                 }
655 
656                 (disk_nsectors, avail_features, 0, config, false)
657             };
658 
659         let serial = serial
660             .map(Vec::from)
661             .unwrap_or_else(|| build_serial(&disk_path));
662 
663         Ok(Block {
664             common: VirtioCommon {
665                 device_type: VirtioDeviceType::Block as u32,
666                 avail_features,
667                 acked_features,
668                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
669                 queue_sizes: vec![queue_size; num_queues],
670                 min_queues: 1,
671                 paused: Arc::new(AtomicBool::new(paused)),
672                 ..Default::default()
673             },
674             id,
675             disk_image,
676             disk_path,
677             disk_nsectors,
678             config,
679             writeback: Arc::new(AtomicBool::new(true)),
680             counters: BlockCounters::default(),
681             seccomp_action,
682             rate_limiter,
683             exit_evt,
684             read_only,
685             serial,
686             queue_affinity,
687         })
688     }
689 
690     fn state(&self) -> BlockState {
691         BlockState {
692             disk_path: self.disk_path.to_str().unwrap().to_owned(),
693             disk_nsectors: self.disk_nsectors,
694             avail_features: self.common.avail_features,
695             acked_features: self.common.acked_features,
696             config: self.config,
697         }
698     }
699 
700     fn update_writeback(&mut self) {
701         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
702         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
703             self.config.writeback == 1
704         } else {
705             // Else check if VIRTIO_BLK_F_FLUSH negotiated
706             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
707         };
708 
709         info!(
710             "Changing cache mode to {}",
711             if writeback {
712                 "writeback"
713             } else {
714                 "writethrough"
715             }
716         );
717         self.writeback.store(writeback, Ordering::Release);
718     }
719 
720     #[cfg(fuzzing)]
721     pub fn wait_for_epoll_threads(&mut self) {
722         self.common.wait_for_epoll_threads();
723     }
724 }
725 
726 impl Drop for Block {
727     fn drop(&mut self) {
728         if let Some(kill_evt) = self.common.kill_evt.take() {
729             // Ignore the result because there is nothing we can do about it.
730             let _ = kill_evt.write(1);
731         }
732         self.common.wait_for_epoll_threads();
733     }
734 }
735 
736 impl VirtioDevice for Block {
737     fn device_type(&self) -> u32 {
738         self.common.device_type
739     }
740 
741     fn queue_max_sizes(&self) -> &[u16] {
742         &self.common.queue_sizes
743     }
744 
745     fn features(&self) -> u64 {
746         self.common.avail_features
747     }
748 
749     fn ack_features(&mut self, value: u64) {
750         self.common.ack_features(value)
751     }
752 
753     fn read_config(&self, offset: u64, data: &mut [u8]) {
754         self.read_config_from_slice(self.config.as_slice(), offset, data);
755     }
756 
757     fn write_config(&mut self, offset: u64, data: &[u8]) {
758         // The "writeback" field is the only mutable field
759         let writeback_offset =
760             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
761         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
762         {
763             error!(
764                 "Attempt to write to read-only field: offset {:x} length {}",
765                 offset,
766                 data.len()
767             );
768             return;
769         }
770 
771         self.config.writeback = data[0];
772         self.update_writeback();
773     }
774 
775     fn activate(
776         &mut self,
777         mem: GuestMemoryAtomic<GuestMemoryMmap>,
778         interrupt_cb: Arc<dyn VirtioInterrupt>,
779         mut queues: Vec<(usize, Queue, EventFd)>,
780     ) -> ActivateResult {
781         self.common.activate(&queues, &interrupt_cb)?;
782 
783         self.update_writeback();
784 
785         let mut epoll_threads = Vec::new();
786         for i in 0..queues.len() {
787             let (_, queue, queue_evt) = queues.remove(0);
788             let queue_size = queue.size();
789             let (kill_evt, pause_evt) = self.common.dup_eventfds();
790             let queue_idx = i as u16;
791 
792             let mut handler = BlockEpollHandler {
793                 queue_index: queue_idx,
794                 queue,
795                 mem: mem.clone(),
796                 disk_image: self
797                     .disk_image
798                     .new_async_io(queue_size as u32)
799                     .map_err(|e| {
800                         error!("failed to create new AsyncIo: {}", e);
801                         ActivateError::BadActivate
802                     })?,
803                 disk_nsectors: self.disk_nsectors,
804                 interrupt_cb: interrupt_cb.clone(),
805                 serial: self.serial.clone(),
806                 kill_evt,
807                 pause_evt,
808                 writeback: self.writeback.clone(),
809                 counters: self.counters.clone(),
810                 queue_evt,
811                 // Analysis during boot shows around ~40 maximum requests
812                 // This gives head room for systems with slower I/O without
813                 // compromising the cost of the reallocation or memory overhead
814                 inflight_requests: VecDeque::with_capacity(64),
815                 rate_limiter: self
816                     .rate_limiter
817                     .as_ref()
818                     .map(|r| r.new_handle())
819                     .transpose()
820                     .unwrap(),
821                 access_platform: self.common.access_platform.clone(),
822                 read_only: self.read_only,
823                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
824             };
825 
826             let paused = self.common.paused.clone();
827             let paused_sync = self.common.paused_sync.clone();
828 
829             spawn_virtio_thread(
830                 &format!("{}_q{}", self.id.clone(), i),
831                 &self.seccomp_action,
832                 Thread::VirtioBlock,
833                 &mut epoll_threads,
834                 &self.exit_evt,
835                 move || handler.run(paused, paused_sync.unwrap()),
836             )?;
837         }
838 
839         self.common.epoll_threads = Some(epoll_threads);
840         event!("virtio-device", "activated", "id", &self.id);
841 
842         Ok(())
843     }
844 
845     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
846         let result = self.common.reset();
847         event!("virtio-device", "reset", "id", &self.id);
848         result
849     }
850 
851     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
852         let mut counters = HashMap::new();
853 
854         counters.insert(
855             "read_bytes",
856             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
857         );
858         counters.insert(
859             "write_bytes",
860             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
861         );
862         counters.insert(
863             "read_ops",
864             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
865         );
866         counters.insert(
867             "write_ops",
868             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
869         );
870         counters.insert(
871             "write_latency_min",
872             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
873         );
874         counters.insert(
875             "write_latency_max",
876             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
877         );
878         counters.insert(
879             "write_latency_avg",
880             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
881         );
882         counters.insert(
883             "read_latency_min",
884             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
885         );
886         counters.insert(
887             "read_latency_max",
888             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
889         );
890         counters.insert(
891             "read_latency_avg",
892             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
893         );
894 
895         Some(counters)
896     }
897 
898     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
899         self.common.set_access_platform(access_platform)
900     }
901 }
902 
903 impl Pausable for Block {
904     fn pause(&mut self) -> result::Result<(), MigratableError> {
905         self.common.pause()
906     }
907 
908     fn resume(&mut self) -> result::Result<(), MigratableError> {
909         self.common.resume()
910     }
911 }
912 
913 impl Snapshottable for Block {
914     fn id(&self) -> String {
915         self.id.clone()
916     }
917 
918     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
919         Snapshot::new_from_versioned_state(&self.state())
920     }
921 }
922 impl Transportable for Block {}
923 impl Migratable for Block {}
924