xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision 4ad44caa5217cf55eed512fc10fd68416a37d31c)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use super::Error as DeviceError;
12 use super::{
13     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon,
14     VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
15 };
16 use crate::seccomp_filters::Thread;
17 use crate::thread_helper::spawn_virtio_thread;
18 use crate::GuestMemoryMmap;
19 use crate::VirtioInterrupt;
20 use anyhow::anyhow;
21 use block::{
22     async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request,
23     RequestType, VirtioBlockConfig,
24 };
25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
26 use rate_limiter::TokenType;
27 use seccompiler::SeccompAction;
28 use serde::{Deserialize, Serialize};
29 use std::collections::BTreeMap;
30 use std::collections::HashMap;
31 use std::collections::VecDeque;
32 use std::io;
33 use std::num::Wrapping;
34 use std::ops::Deref;
35 use std::os::unix::io::AsRawFd;
36 use std::path::PathBuf;
37 use std::result;
38 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39 use std::sync::{Arc, Barrier};
40 use thiserror::Error;
41 use virtio_bindings::virtio_blk::*;
42 use virtio_bindings::virtio_config::*;
43 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
44 use virtio_queue::{Queue, QueueOwnedT, QueueT};
45 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
46 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
47 use vm_virtio::AccessPlatform;
48 use vmm_sys_util::eventfd::EventFd;
49 
50 const SECTOR_SHIFT: u8 = 9;
51 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
52 
53 // New descriptors are pending on the virtio queue.
54 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
55 // New completed tasks are pending on the completion ring.
56 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
57 // New 'wake up' event from the rate limiter
58 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
59 
60 // latency scale, for reduce precision loss in calculate.
61 const LATENCY_SCALE: u64 = 10000;
62 
63 #[derive(Error, Debug)]
64 pub enum Error {
65     #[error("Failed to parse the request: {0}")]
66     RequestParsing(block::Error),
67     #[error("Failed to execute the request: {0}")]
68     RequestExecuting(block::ExecuteError),
69     #[error("Failed to complete the request: {0}")]
70     RequestCompleting(block::Error),
71     #[error("Missing the expected entry in the list of requests")]
72     MissingEntryRequestList,
73     #[error("The asynchronous request returned with failure")]
74     AsyncRequestFailure,
75     #[error("Failed synchronizing the file: {0}")]
76     Fsync(AsyncIoError),
77     #[error("Failed adding used index: {0}")]
78     QueueAddUsed(virtio_queue::Error),
79     #[error("Failed creating an iterator over the queue: {0}")]
80     QueueIterator(virtio_queue::Error),
81     #[error("Failed to update request status: {0}")]
82     RequestStatus(GuestMemoryError),
83     #[error("Failed to enable notification: {0}")]
84     QueueEnableNotification(virtio_queue::Error),
85 }
86 
87 pub type Result<T> = result::Result<T, Error>;
88 
89 // latency will be records as microseconds, average latency
90 // will be save as scaled value.
91 #[derive(Clone)]
92 pub struct BlockCounters {
93     read_bytes: Arc<AtomicU64>,
94     read_ops: Arc<AtomicU64>,
95     read_latency_min: Arc<AtomicU64>,
96     read_latency_max: Arc<AtomicU64>,
97     read_latency_avg: Arc<AtomicU64>,
98     write_bytes: Arc<AtomicU64>,
99     write_ops: Arc<AtomicU64>,
100     write_latency_min: Arc<AtomicU64>,
101     write_latency_max: Arc<AtomicU64>,
102     write_latency_avg: Arc<AtomicU64>,
103 }
104 
105 impl Default for BlockCounters {
106     fn default() -> Self {
107         BlockCounters {
108             read_bytes: Arc::new(AtomicU64::new(0)),
109             read_ops: Arc::new(AtomicU64::new(0)),
110             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
111             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
112             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
113             write_bytes: Arc::new(AtomicU64::new(0)),
114             write_ops: Arc::new(AtomicU64::new(0)),
115             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
116             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
117             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
118         }
119     }
120 }
121 
122 struct BlockEpollHandler {
123     queue_index: u16,
124     queue: Queue,
125     mem: GuestMemoryAtomic<GuestMemoryMmap>,
126     disk_image: Box<dyn AsyncIo>,
127     disk_nsectors: u64,
128     interrupt_cb: Arc<dyn VirtioInterrupt>,
129     serial: Vec<u8>,
130     kill_evt: EventFd,
131     pause_evt: EventFd,
132     writeback: Arc<AtomicBool>,
133     counters: BlockCounters,
134     queue_evt: EventFd,
135     inflight_requests: VecDeque<(u16, Request)>,
136     rate_limiter: Option<RateLimiterGroupHandle>,
137     access_platform: Option<Arc<dyn AccessPlatform>>,
138     read_only: bool,
139     host_cpus: Option<Vec<usize>>,
140 }
141 
142 impl BlockEpollHandler {
143     fn process_queue_submit(&mut self) -> Result<()> {
144         let queue = &mut self.queue;
145 
146         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
147             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
148                 .map_err(Error::RequestParsing)?;
149 
150             // For virtio spec compliance
151             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
152             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
153             if self.read_only
154                 && (request.request_type == RequestType::Out
155                     || request.request_type == RequestType::Flush)
156             {
157                 desc_chain
158                     .memory()
159                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
160                     .map_err(Error::RequestStatus)?;
161 
162                 // If no asynchronous operation has been submitted, we can
163                 // simply return the used descriptor.
164                 queue
165                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
166                     .map_err(Error::QueueAddUsed)?;
167                 queue
168                     .enable_notification(self.mem.memory().deref())
169                     .map_err(Error::QueueEnableNotification)?;
170                 continue;
171             }
172 
173             if let Some(rate_limiter) = &mut self.rate_limiter {
174                 // If limiter.consume() fails it means there is no more TokenType::Ops
175                 // budget and rate limiting is in effect.
176                 if !rate_limiter.consume(1, TokenType::Ops) {
177                     // Stop processing the queue and return this descriptor chain to the
178                     // avail ring, for later processing.
179                     queue.go_to_previous_position();
180                     break;
181                 }
182                 // Exercise the rate limiter only if this request is of data transfer type.
183                 if request.request_type == RequestType::In
184                     || request.request_type == RequestType::Out
185                 {
186                     let mut bytes = Wrapping(0);
187                     for (_, data_len) in &request.data_descriptors {
188                         bytes += Wrapping(*data_len as u64);
189                     }
190 
191                     // If limiter.consume() fails it means there is no more TokenType::Bytes
192                     // budget and rate limiting is in effect.
193                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
194                         // Revert the OPS consume().
195                         rate_limiter.manual_replenish(1, TokenType::Ops);
196                         // Stop processing the queue and return this descriptor chain to the
197                         // avail ring, for later processing.
198                         queue.go_to_previous_position();
199                         break;
200                     }
201                 };
202             }
203 
204             request.set_writeback(self.writeback.load(Ordering::Acquire));
205 
206             if request
207                 .execute_async(
208                     desc_chain.memory(),
209                     self.disk_nsectors,
210                     self.disk_image.as_mut(),
211                     &self.serial,
212                     desc_chain.head_index() as u64,
213                 )
214                 .map_err(Error::RequestExecuting)?
215             {
216                 self.inflight_requests
217                     .push_back((desc_chain.head_index(), request));
218             } else {
219                 desc_chain
220                     .memory()
221                     .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr)
222                     .map_err(Error::RequestStatus)?;
223 
224                 // If no asynchronous operation has been submitted, we can
225                 // simply return the used descriptor.
226                 queue
227                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
228                     .map_err(Error::QueueAddUsed)?;
229                 queue
230                     .enable_notification(self.mem.memory().deref())
231                     .map_err(Error::QueueEnableNotification)?;
232             }
233         }
234 
235         Ok(())
236     }
237 
238     fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
239         if self
240             .queue
241             .needs_notification(self.mem.memory().deref())
242             .map_err(|e| {
243                 EpollHelperError::HandleEvent(anyhow!(
244                     "Failed to check needs_notification: {:?}",
245                     e
246                 ))
247             })?
248         {
249             self.signal_used_queue().map_err(|e| {
250                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
251             })?;
252         }
253 
254         Ok(())
255     }
256 
257     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
258         self.process_queue_submit().map_err(|e| {
259             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
260         })?;
261 
262         self.try_signal_used_queue()
263     }
264 
265     #[inline]
266     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
267         // This loop neatly handles the fast path where the completions are
268         // in order (it turns into just a pop_front()) and the 1% of the time
269         // (analysis during boot) where slight out of ordering has been
270         // observed e.g.
271         // Submissions: 1 2 3 4 5 6 7
272         // Completions: 2 1 3 5 4 7 6
273         // In this case find the corresponding item and swap it with the front
274         // This is a O(1) operation and is prepared for the future as it it likely
275         // the next completion would be for the one that was skipped which will
276         // now be the new front.
277         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
278             if head == &completed_head {
279                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
280             }
281         }
282 
283         Err(Error::MissingEntryRequestList)
284     }
285 
286     fn process_queue_complete(&mut self) -> Result<()> {
287         let mem = self.mem.memory();
288         let mut read_bytes = Wrapping(0);
289         let mut write_bytes = Wrapping(0);
290         let mut read_ops = Wrapping(0);
291         let mut write_ops = Wrapping(0);
292 
293         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
294             let desc_index = user_data as u16;
295 
296             let mut request = self.find_inflight_request(desc_index)?;
297 
298             request.complete_async().map_err(Error::RequestCompleting)?;
299 
300             let latency = request.start.elapsed().as_micros() as u64;
301             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
302             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
303             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
304             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
305             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
306             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
307             let (status, len) = if result >= 0 {
308                 match request.request_type {
309                     RequestType::In => {
310                         for (_, data_len) in &request.data_descriptors {
311                             read_bytes += Wrapping(*data_len as u64);
312                         }
313                         read_ops += Wrapping(1);
314                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
315                             self.counters
316                                 .read_latency_min
317                                 .store(latency, Ordering::Relaxed);
318                         }
319                         if latency > read_max || read_max == u64::MAX {
320                             self.counters
321                                 .read_latency_max
322                                 .store(latency, Ordering::Relaxed);
323                         }
324 
325                         // Special case the first real latency report
326                         read_avg = if read_avg == u64::MAX {
327                             latency * LATENCY_SCALE
328                         } else {
329                             // Cumulative average is guaranteed to be
330                             // positive if being calculated properly
331                             (read_avg as i64
332                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
333                                     / (read_ops_last + read_ops.0) as i64)
334                                 .try_into()
335                                 .unwrap()
336                         };
337                     }
338                     RequestType::Out => {
339                         if !request.writeback {
340                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
341                         }
342                         for (_, data_len) in &request.data_descriptors {
343                             write_bytes += Wrapping(*data_len as u64);
344                         }
345                         write_ops += Wrapping(1);
346                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
347                             self.counters
348                                 .write_latency_min
349                                 .store(latency, Ordering::Relaxed);
350                         }
351                         if latency > write_max || write_max == u64::MAX {
352                             self.counters
353                                 .write_latency_max
354                                 .store(latency, Ordering::Relaxed);
355                         }
356 
357                         // Special case the first real latency report
358                         write_avg = if write_avg == u64::MAX {
359                             latency * LATENCY_SCALE
360                         } else {
361                             // Cumulative average is guaranteed to be
362                             // positive if being calculated properly
363                             (write_avg as i64
364                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
365                                     / (write_ops_last + write_ops.0) as i64)
366                                 .try_into()
367                                 .unwrap()
368                         }
369                     }
370                     _ => {}
371                 }
372 
373                 self.counters
374                     .read_latency_avg
375                     .store(read_avg, Ordering::Relaxed);
376 
377                 self.counters
378                     .write_latency_avg
379                     .store(write_avg, Ordering::Relaxed);
380 
381                 (VIRTIO_BLK_S_OK as u8, result as u32)
382             } else {
383                 error!(
384                     "Request failed: {:x?} {:?}",
385                     request,
386                     io::Error::from_raw_os_error(-result)
387                 );
388                 return Err(Error::AsyncRequestFailure);
389             };
390 
391             mem.write_obj(status, request.status_addr)
392                 .map_err(Error::RequestStatus)?;
393 
394             let queue = &mut self.queue;
395 
396             queue
397                 .add_used(mem.deref(), desc_index, len)
398                 .map_err(Error::QueueAddUsed)?;
399             queue
400                 .enable_notification(mem.deref())
401                 .map_err(Error::QueueEnableNotification)?;
402         }
403 
404         self.counters
405             .write_bytes
406             .fetch_add(write_bytes.0, Ordering::AcqRel);
407         self.counters
408             .write_ops
409             .fetch_add(write_ops.0, Ordering::AcqRel);
410 
411         self.counters
412             .read_bytes
413             .fetch_add(read_bytes.0, Ordering::AcqRel);
414         self.counters
415             .read_ops
416             .fetch_add(read_ops.0, Ordering::AcqRel);
417 
418         Ok(())
419     }
420 
421     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
422         self.interrupt_cb
423             .trigger(VirtioInterruptType::Queue(self.queue_index))
424             .map_err(|e| {
425                 error!("Failed to signal used queue: {:?}", e);
426                 DeviceError::FailedSignalingUsedQueue(e)
427             })
428     }
429 
430     fn set_queue_thread_affinity(&self) {
431         // Prepare the CPU set the current queue thread is expected to run onto.
432         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
433             // SAFETY: all zeros is a valid pattern
434             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
435             // SAFETY: FFI call, trivially safe
436             unsafe { libc::CPU_ZERO(&mut cpuset) };
437             for host_cpu in host_cpus {
438                 // SAFETY: FFI call, trivially safe
439                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
440             }
441             cpuset
442         });
443 
444         // Schedule the thread to run on the expected CPU set
445         if let Some(cpuset) = cpuset.as_ref() {
446             // SAFETY: FFI call with correct arguments
447             let ret = unsafe {
448                 libc::sched_setaffinity(
449                     0,
450                     std::mem::size_of::<libc::cpu_set_t>(),
451                     cpuset as *const libc::cpu_set_t,
452                 )
453             };
454 
455             if ret != 0 {
456                 error!(
457                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
458                     self.queue_index,
459                     io::Error::last_os_error()
460                 )
461             }
462         }
463     }
464 
465     fn run(
466         &mut self,
467         paused: Arc<AtomicBool>,
468         paused_sync: Arc<Barrier>,
469     ) -> result::Result<(), EpollHelperError> {
470         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
471         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
472         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
473         if let Some(rate_limiter) = &self.rate_limiter {
474             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
475         }
476         self.set_queue_thread_affinity();
477         helper.run(paused, paused_sync, self)?;
478 
479         Ok(())
480     }
481 }
482 
483 impl EpollHelperHandler for BlockEpollHandler {
484     fn handle_event(
485         &mut self,
486         _helper: &mut EpollHelper,
487         event: &epoll::Event,
488     ) -> result::Result<(), EpollHelperError> {
489         let ev_type = event.data as u16;
490         match ev_type {
491             QUEUE_AVAIL_EVENT => {
492                 self.queue_evt.read().map_err(|e| {
493                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
494                 })?;
495 
496                 let rate_limit_reached =
497                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
498 
499                 // Process the queue only when the rate limit is not reached
500                 if !rate_limit_reached {
501                     self.process_queue_submit_and_signal()?
502                 }
503             }
504             COMPLETION_EVENT => {
505                 self.disk_image.notifier().read().map_err(|e| {
506                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
507                 })?;
508 
509                 self.process_queue_complete().map_err(|e| {
510                     EpollHelperError::HandleEvent(anyhow!(
511                         "Failed to process queue (complete): {:?}",
512                         e
513                     ))
514                 })?;
515 
516                 let rate_limit_reached =
517                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
518 
519                 // Process the queue only when the rate limit is not reached
520                 if !rate_limit_reached {
521                     self.process_queue_submit().map_err(|e| {
522                         EpollHelperError::HandleEvent(anyhow!(
523                             "Failed to process queue (submit): {:?}",
524                             e
525                         ))
526                     })?;
527                 }
528                 self.try_signal_used_queue()?;
529             }
530             RATE_LIMITER_EVENT => {
531                 if let Some(rate_limiter) = &mut self.rate_limiter {
532                     // Upon rate limiter event, call the rate limiter handler
533                     // and restart processing the queue.
534                     rate_limiter.event_handler().map_err(|e| {
535                         EpollHelperError::HandleEvent(anyhow!(
536                             "Failed to process rate limiter event: {:?}",
537                             e
538                         ))
539                     })?;
540 
541                     self.process_queue_submit_and_signal()?
542                 } else {
543                     return Err(EpollHelperError::HandleEvent(anyhow!(
544                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
545                     )));
546                 }
547             }
548             _ => {
549                 return Err(EpollHelperError::HandleEvent(anyhow!(
550                     "Unexpected event: {}",
551                     ev_type
552                 )));
553             }
554         }
555         Ok(())
556     }
557 }
558 
559 /// Virtio device for exposing block level read/write operations on a host file.
560 pub struct Block {
561     common: VirtioCommon,
562     id: String,
563     disk_image: Box<dyn DiskFile>,
564     disk_path: PathBuf,
565     disk_nsectors: u64,
566     config: VirtioBlockConfig,
567     writeback: Arc<AtomicBool>,
568     counters: BlockCounters,
569     seccomp_action: SeccompAction,
570     rate_limiter: Option<Arc<RateLimiterGroup>>,
571     exit_evt: EventFd,
572     read_only: bool,
573     serial: Vec<u8>,
574     queue_affinity: BTreeMap<u16, Vec<usize>>,
575 }
576 
577 #[derive(Serialize, Deserialize)]
578 pub struct BlockState {
579     pub disk_path: String,
580     pub disk_nsectors: u64,
581     pub avail_features: u64,
582     pub acked_features: u64,
583     pub config: VirtioBlockConfig,
584 }
585 
586 impl Block {
587     /// Create a new virtio block device that operates on the given file.
588     #[allow(clippy::too_many_arguments)]
589     pub fn new(
590         id: String,
591         mut disk_image: Box<dyn DiskFile>,
592         disk_path: PathBuf,
593         read_only: bool,
594         iommu: bool,
595         num_queues: usize,
596         queue_size: u16,
597         serial: Option<String>,
598         seccomp_action: SeccompAction,
599         rate_limiter: Option<Arc<RateLimiterGroup>>,
600         exit_evt: EventFd,
601         state: Option<BlockState>,
602         queue_affinity: BTreeMap<u16, Vec<usize>>,
603     ) -> io::Result<Self> {
604         let (disk_nsectors, avail_features, acked_features, config, paused) =
605             if let Some(state) = state {
606                 info!("Restoring virtio-block {}", id);
607                 (
608                     state.disk_nsectors,
609                     state.avail_features,
610                     state.acked_features,
611                     state.config,
612                     true,
613                 )
614             } else {
615                 let disk_size = disk_image.size().map_err(|e| {
616                     io::Error::new(
617                         io::ErrorKind::Other,
618                         format!("Failed getting disk size: {e}"),
619                     )
620                 })?;
621                 if disk_size % SECTOR_SIZE != 0 {
622                     warn!(
623                         "Disk size {} is not a multiple of sector size {}; \
624                  the remainder will not be visible to the guest.",
625                         disk_size, SECTOR_SIZE
626                     );
627                 }
628 
629                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
630                     | (1u64 << VIRTIO_BLK_F_FLUSH)
631                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
632                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
633                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY)
634                     | (1u64 << VIRTIO_RING_F_EVENT_IDX);
635 
636                 if iommu {
637                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
638                 }
639 
640                 if read_only {
641                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
642                 }
643 
644                 let topology = disk_image.topology();
645                 info!("Disk topology: {:?}", topology);
646 
647                 let logical_block_size = if topology.logical_block_size > 512 {
648                     topology.logical_block_size
649                 } else {
650                     512
651                 };
652 
653                 // Calculate the exponent that maps physical block to logical block
654                 let mut physical_block_exp = 0;
655                 let mut size = logical_block_size;
656                 while size < topology.physical_block_size {
657                     physical_block_exp += 1;
658                     size <<= 1;
659                 }
660 
661                 let disk_nsectors = disk_size / SECTOR_SIZE;
662                 let mut config = VirtioBlockConfig {
663                     capacity: disk_nsectors,
664                     writeback: 1,
665                     blk_size: topology.logical_block_size as u32,
666                     physical_block_exp,
667                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
668                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
669                     ..Default::default()
670                 };
671 
672                 if num_queues > 1 {
673                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
674                     config.num_queues = num_queues as u16;
675                 }
676 
677                 (disk_nsectors, avail_features, 0, config, false)
678             };
679 
680         let serial = serial
681             .map(Vec::from)
682             .unwrap_or_else(|| build_serial(&disk_path));
683 
684         Ok(Block {
685             common: VirtioCommon {
686                 device_type: VirtioDeviceType::Block as u32,
687                 avail_features,
688                 acked_features,
689                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
690                 queue_sizes: vec![queue_size; num_queues],
691                 min_queues: 1,
692                 paused: Arc::new(AtomicBool::new(paused)),
693                 ..Default::default()
694             },
695             id,
696             disk_image,
697             disk_path,
698             disk_nsectors,
699             config,
700             writeback: Arc::new(AtomicBool::new(true)),
701             counters: BlockCounters::default(),
702             seccomp_action,
703             rate_limiter,
704             exit_evt,
705             read_only,
706             serial,
707             queue_affinity,
708         })
709     }
710 
711     fn state(&self) -> BlockState {
712         BlockState {
713             disk_path: self.disk_path.to_str().unwrap().to_owned(),
714             disk_nsectors: self.disk_nsectors,
715             avail_features: self.common.avail_features,
716             acked_features: self.common.acked_features,
717             config: self.config,
718         }
719     }
720 
721     fn update_writeback(&mut self) {
722         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
723         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
724             self.config.writeback == 1
725         } else {
726             // Else check if VIRTIO_BLK_F_FLUSH negotiated
727             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
728         };
729 
730         info!(
731             "Changing cache mode to {}",
732             if writeback {
733                 "writeback"
734             } else {
735                 "writethrough"
736             }
737         );
738         self.writeback.store(writeback, Ordering::Release);
739     }
740 
741     #[cfg(fuzzing)]
742     pub fn wait_for_epoll_threads(&mut self) {
743         self.common.wait_for_epoll_threads();
744     }
745 }
746 
747 impl Drop for Block {
748     fn drop(&mut self) {
749         if let Some(kill_evt) = self.common.kill_evt.take() {
750             // Ignore the result because there is nothing we can do about it.
751             let _ = kill_evt.write(1);
752         }
753         self.common.wait_for_epoll_threads();
754     }
755 }
756 
757 impl VirtioDevice for Block {
758     fn device_type(&self) -> u32 {
759         self.common.device_type
760     }
761 
762     fn queue_max_sizes(&self) -> &[u16] {
763         &self.common.queue_sizes
764     }
765 
766     fn features(&self) -> u64 {
767         self.common.avail_features
768     }
769 
770     fn ack_features(&mut self, value: u64) {
771         self.common.ack_features(value)
772     }
773 
774     fn read_config(&self, offset: u64, data: &mut [u8]) {
775         self.read_config_from_slice(self.config.as_slice(), offset, data);
776     }
777 
778     fn write_config(&mut self, offset: u64, data: &[u8]) {
779         // The "writeback" field is the only mutable field
780         let writeback_offset =
781             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
782         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
783         {
784             error!(
785                 "Attempt to write to read-only field: offset {:x} length {}",
786                 offset,
787                 data.len()
788             );
789             return;
790         }
791 
792         self.config.writeback = data[0];
793         self.update_writeback();
794     }
795 
796     fn activate(
797         &mut self,
798         mem: GuestMemoryAtomic<GuestMemoryMmap>,
799         interrupt_cb: Arc<dyn VirtioInterrupt>,
800         mut queues: Vec<(usize, Queue, EventFd)>,
801     ) -> ActivateResult {
802         self.common.activate(&queues, &interrupt_cb)?;
803 
804         self.update_writeback();
805 
806         let mut epoll_threads = Vec::new();
807         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
808 
809         for i in 0..queues.len() {
810             let (_, mut queue, queue_evt) = queues.remove(0);
811             queue.set_event_idx(event_idx);
812 
813             let queue_size = queue.size();
814             let (kill_evt, pause_evt) = self.common.dup_eventfds();
815             let queue_idx = i as u16;
816 
817             let mut handler = BlockEpollHandler {
818                 queue_index: queue_idx,
819                 queue,
820                 mem: mem.clone(),
821                 disk_image: self
822                     .disk_image
823                     .new_async_io(queue_size as u32)
824                     .map_err(|e| {
825                         error!("failed to create new AsyncIo: {}", e);
826                         ActivateError::BadActivate
827                     })?,
828                 disk_nsectors: self.disk_nsectors,
829                 interrupt_cb: interrupt_cb.clone(),
830                 serial: self.serial.clone(),
831                 kill_evt,
832                 pause_evt,
833                 writeback: self.writeback.clone(),
834                 counters: self.counters.clone(),
835                 queue_evt,
836                 // Analysis during boot shows around ~40 maximum requests
837                 // This gives head room for systems with slower I/O without
838                 // compromising the cost of the reallocation or memory overhead
839                 inflight_requests: VecDeque::with_capacity(64),
840                 rate_limiter: self
841                     .rate_limiter
842                     .as_ref()
843                     .map(|r| r.new_handle())
844                     .transpose()
845                     .unwrap(),
846                 access_platform: self.common.access_platform.clone(),
847                 read_only: self.read_only,
848                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
849             };
850 
851             let paused = self.common.paused.clone();
852             let paused_sync = self.common.paused_sync.clone();
853 
854             spawn_virtio_thread(
855                 &format!("{}_q{}", self.id.clone(), i),
856                 &self.seccomp_action,
857                 Thread::VirtioBlock,
858                 &mut epoll_threads,
859                 &self.exit_evt,
860                 move || handler.run(paused, paused_sync.unwrap()),
861             )?;
862         }
863 
864         self.common.epoll_threads = Some(epoll_threads);
865         event!("virtio-device", "activated", "id", &self.id);
866 
867         Ok(())
868     }
869 
870     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
871         let result = self.common.reset();
872         event!("virtio-device", "reset", "id", &self.id);
873         result
874     }
875 
876     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
877         let mut counters = HashMap::new();
878 
879         counters.insert(
880             "read_bytes",
881             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
882         );
883         counters.insert(
884             "write_bytes",
885             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
886         );
887         counters.insert(
888             "read_ops",
889             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
890         );
891         counters.insert(
892             "write_ops",
893             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
894         );
895         counters.insert(
896             "write_latency_min",
897             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
898         );
899         counters.insert(
900             "write_latency_max",
901             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
902         );
903         counters.insert(
904             "write_latency_avg",
905             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
906         );
907         counters.insert(
908             "read_latency_min",
909             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
910         );
911         counters.insert(
912             "read_latency_max",
913             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
914         );
915         counters.insert(
916             "read_latency_avg",
917             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
918         );
919 
920         Some(counters)
921     }
922 
923     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
924         self.common.set_access_platform(access_platform)
925     }
926 }
927 
928 impl Pausable for Block {
929     fn pause(&mut self) -> result::Result<(), MigratableError> {
930         self.common.pause()
931     }
932 
933     fn resume(&mut self) -> result::Result<(), MigratableError> {
934         self.common.resume()
935     }
936 }
937 
938 impl Snapshottable for Block {
939     fn id(&self) -> String {
940         self.id.clone()
941     }
942 
943     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
944         Snapshot::new_from_state(&self.state())
945     }
946 }
947 impl Transportable for Block {}
948 impl Migratable for Block {}
949