xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision 261bfac4d47e4da0a8554b0968706ce30c6cc70c)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use std::collections::{BTreeMap, HashMap, VecDeque};
12 use std::num::Wrapping;
13 use std::ops::Deref;
14 use std::os::unix::io::AsRawFd;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17 use std::sync::{Arc, Barrier};
18 use std::{io, result};
19 
20 use anyhow::anyhow;
21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile};
22 use block::{build_serial, Request, RequestType, VirtioBlockConfig};
23 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
24 use rate_limiter::TokenType;
25 use seccompiler::SeccompAction;
26 use serde::{Deserialize, Serialize};
27 use thiserror::Error;
28 use virtio_bindings::virtio_blk::*;
29 use virtio_bindings::virtio_config::*;
30 use virtio_bindings::virtio_ring::{VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC};
31 use virtio_queue::{Queue, QueueOwnedT, QueueT};
32 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
33 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
34 use vm_virtio::AccessPlatform;
35 use vmm_sys_util::eventfd::EventFd;
36 
37 use super::{
38     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
39     Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
40     EPOLL_HELPER_EVENT_LAST,
41 };
42 use crate::seccomp_filters::Thread;
43 use crate::thread_helper::spawn_virtio_thread;
44 use crate::{GuestMemoryMmap, VirtioInterrupt};
45 
46 const SECTOR_SHIFT: u8 = 9;
47 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
48 
49 // New descriptors are pending on the virtio queue.
50 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
51 // New completed tasks are pending on the completion ring.
52 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
53 // New 'wake up' event from the rate limiter
54 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
55 
56 // latency scale, for reduce precision loss in calculate.
57 const LATENCY_SCALE: u64 = 10000;
58 
59 #[derive(Error, Debug)]
60 pub enum Error {
61     #[error("Failed to parse the request: {0}")]
62     RequestParsing(block::Error),
63     #[error("Failed to execute the request: {0}")]
64     RequestExecuting(block::ExecuteError),
65     #[error("Failed to complete the request: {0}")]
66     RequestCompleting(block::Error),
67     #[error("Missing the expected entry in the list of requests")]
68     MissingEntryRequestList,
69     #[error("The asynchronous request returned with failure")]
70     AsyncRequestFailure,
71     #[error("Failed synchronizing the file: {0}")]
72     Fsync(AsyncIoError),
73     #[error("Failed adding used index: {0}")]
74     QueueAddUsed(virtio_queue::Error),
75     #[error("Failed creating an iterator over the queue: {0}")]
76     QueueIterator(virtio_queue::Error),
77     #[error("Failed to update request status: {0}")]
78     RequestStatus(GuestMemoryError),
79     #[error("Failed to enable notification: {0}")]
80     QueueEnableNotification(virtio_queue::Error),
81 }
82 
83 pub type Result<T> = result::Result<T, Error>;
84 
85 // latency will be records as microseconds, average latency
86 // will be save as scaled value.
87 #[derive(Clone)]
88 pub struct BlockCounters {
89     read_bytes: Arc<AtomicU64>,
90     read_ops: Arc<AtomicU64>,
91     read_latency_min: Arc<AtomicU64>,
92     read_latency_max: Arc<AtomicU64>,
93     read_latency_avg: Arc<AtomicU64>,
94     write_bytes: Arc<AtomicU64>,
95     write_ops: Arc<AtomicU64>,
96     write_latency_min: Arc<AtomicU64>,
97     write_latency_max: Arc<AtomicU64>,
98     write_latency_avg: Arc<AtomicU64>,
99 }
100 
101 impl Default for BlockCounters {
102     fn default() -> Self {
103         BlockCounters {
104             read_bytes: Arc::new(AtomicU64::new(0)),
105             read_ops: Arc::new(AtomicU64::new(0)),
106             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
107             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
108             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
109             write_bytes: Arc::new(AtomicU64::new(0)),
110             write_ops: Arc::new(AtomicU64::new(0)),
111             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
112             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
113             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
114         }
115     }
116 }
117 
118 struct BlockEpollHandler {
119     queue_index: u16,
120     queue: Queue,
121     mem: GuestMemoryAtomic<GuestMemoryMmap>,
122     disk_image: Box<dyn AsyncIo>,
123     disk_nsectors: u64,
124     interrupt_cb: Arc<dyn VirtioInterrupt>,
125     serial: Vec<u8>,
126     kill_evt: EventFd,
127     pause_evt: EventFd,
128     writeback: Arc<AtomicBool>,
129     counters: BlockCounters,
130     queue_evt: EventFd,
131     inflight_requests: VecDeque<(u16, Request)>,
132     rate_limiter: Option<RateLimiterGroupHandle>,
133     access_platform: Option<Arc<dyn AccessPlatform>>,
134     read_only: bool,
135     host_cpus: Option<Vec<usize>>,
136 }
137 
138 impl BlockEpollHandler {
139     fn process_queue_submit(&mut self) -> Result<()> {
140         let queue = &mut self.queue;
141 
142         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
143             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
144                 .map_err(Error::RequestParsing)?;
145 
146             // For virtio spec compliance
147             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
148             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
149             if self.read_only
150                 && (request.request_type == RequestType::Out
151                     || request.request_type == RequestType::Flush)
152             {
153                 desc_chain
154                     .memory()
155                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
156                     .map_err(Error::RequestStatus)?;
157 
158                 // If no asynchronous operation has been submitted, we can
159                 // simply return the used descriptor.
160                 queue
161                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
162                     .map_err(Error::QueueAddUsed)?;
163                 queue
164                     .enable_notification(self.mem.memory().deref())
165                     .map_err(Error::QueueEnableNotification)?;
166                 continue;
167             }
168 
169             if let Some(rate_limiter) = &mut self.rate_limiter {
170                 // If limiter.consume() fails it means there is no more TokenType::Ops
171                 // budget and rate limiting is in effect.
172                 if !rate_limiter.consume(1, TokenType::Ops) {
173                     // Stop processing the queue and return this descriptor chain to the
174                     // avail ring, for later processing.
175                     queue.go_to_previous_position();
176                     break;
177                 }
178                 // Exercise the rate limiter only if this request is of data transfer type.
179                 if request.request_type == RequestType::In
180                     || request.request_type == RequestType::Out
181                 {
182                     let mut bytes = Wrapping(0);
183                     for (_, data_len) in &request.data_descriptors {
184                         bytes += Wrapping(*data_len as u64);
185                     }
186 
187                     // If limiter.consume() fails it means there is no more TokenType::Bytes
188                     // budget and rate limiting is in effect.
189                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
190                         // Revert the OPS consume().
191                         rate_limiter.manual_replenish(1, TokenType::Ops);
192                         // Stop processing the queue and return this descriptor chain to the
193                         // avail ring, for later processing.
194                         queue.go_to_previous_position();
195                         break;
196                     }
197                 };
198             }
199 
200             request.set_writeback(self.writeback.load(Ordering::Acquire));
201 
202             if request
203                 .execute_async(
204                     desc_chain.memory(),
205                     self.disk_nsectors,
206                     self.disk_image.as_mut(),
207                     &self.serial,
208                     desc_chain.head_index() as u64,
209                 )
210                 .map_err(Error::RequestExecuting)?
211             {
212                 self.inflight_requests
213                     .push_back((desc_chain.head_index(), request));
214             } else {
215                 desc_chain
216                     .memory()
217                     .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr)
218                     .map_err(Error::RequestStatus)?;
219 
220                 // If no asynchronous operation has been submitted, we can
221                 // simply return the used descriptor.
222                 queue
223                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
224                     .map_err(Error::QueueAddUsed)?;
225                 queue
226                     .enable_notification(self.mem.memory().deref())
227                     .map_err(Error::QueueEnableNotification)?;
228             }
229         }
230 
231         Ok(())
232     }
233 
234     fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
235         if self
236             .queue
237             .needs_notification(self.mem.memory().deref())
238             .map_err(|e| {
239                 EpollHelperError::HandleEvent(anyhow!(
240                     "Failed to check needs_notification: {:?}",
241                     e
242                 ))
243             })?
244         {
245             self.signal_used_queue().map_err(|e| {
246                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
247             })?;
248         }
249 
250         Ok(())
251     }
252 
253     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
254         self.process_queue_submit().map_err(|e| {
255             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
256         })?;
257 
258         self.try_signal_used_queue()
259     }
260 
261     #[inline]
262     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
263         // This loop neatly handles the fast path where the completions are
264         // in order (it turns into just a pop_front()) and the 1% of the time
265         // (analysis during boot) where slight out of ordering has been
266         // observed e.g.
267         // Submissions: 1 2 3 4 5 6 7
268         // Completions: 2 1 3 5 4 7 6
269         // In this case find the corresponding item and swap it with the front
270         // This is a O(1) operation and is prepared for the future as it it likely
271         // the next completion would be for the one that was skipped which will
272         // now be the new front.
273         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
274             if head == &completed_head {
275                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
276             }
277         }
278 
279         Err(Error::MissingEntryRequestList)
280     }
281 
282     fn process_queue_complete(&mut self) -> Result<()> {
283         let mem = self.mem.memory();
284         let mut read_bytes = Wrapping(0);
285         let mut write_bytes = Wrapping(0);
286         let mut read_ops = Wrapping(0);
287         let mut write_ops = Wrapping(0);
288 
289         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
290             let desc_index = user_data as u16;
291 
292             let mut request = self.find_inflight_request(desc_index)?;
293 
294             request.complete_async().map_err(Error::RequestCompleting)?;
295 
296             let latency = request.start.elapsed().as_micros() as u64;
297             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
298             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
299             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
300             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
301             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
302             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
303             let (status, len) = if result >= 0 {
304                 match request.request_type {
305                     RequestType::In => {
306                         for (_, data_len) in &request.data_descriptors {
307                             read_bytes += Wrapping(*data_len as u64);
308                         }
309                         read_ops += Wrapping(1);
310                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
311                             self.counters
312                                 .read_latency_min
313                                 .store(latency, Ordering::Relaxed);
314                         }
315                         if latency > read_max || read_max == u64::MAX {
316                             self.counters
317                                 .read_latency_max
318                                 .store(latency, Ordering::Relaxed);
319                         }
320 
321                         // Special case the first real latency report
322                         read_avg = if read_avg == u64::MAX {
323                             latency * LATENCY_SCALE
324                         } else {
325                             // Cumulative average is guaranteed to be
326                             // positive if being calculated properly
327                             (read_avg as i64
328                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
329                                     / (read_ops_last + read_ops.0) as i64)
330                                 .try_into()
331                                 .unwrap()
332                         };
333                     }
334                     RequestType::Out => {
335                         if !request.writeback {
336                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
337                         }
338                         for (_, data_len) in &request.data_descriptors {
339                             write_bytes += Wrapping(*data_len as u64);
340                         }
341                         write_ops += Wrapping(1);
342                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
343                             self.counters
344                                 .write_latency_min
345                                 .store(latency, Ordering::Relaxed);
346                         }
347                         if latency > write_max || write_max == u64::MAX {
348                             self.counters
349                                 .write_latency_max
350                                 .store(latency, Ordering::Relaxed);
351                         }
352 
353                         // Special case the first real latency report
354                         write_avg = if write_avg == u64::MAX {
355                             latency * LATENCY_SCALE
356                         } else {
357                             // Cumulative average is guaranteed to be
358                             // positive if being calculated properly
359                             (write_avg as i64
360                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
361                                     / (write_ops_last + write_ops.0) as i64)
362                                 .try_into()
363                                 .unwrap()
364                         }
365                     }
366                     _ => {}
367                 }
368 
369                 self.counters
370                     .read_latency_avg
371                     .store(read_avg, Ordering::Relaxed);
372 
373                 self.counters
374                     .write_latency_avg
375                     .store(write_avg, Ordering::Relaxed);
376 
377                 (VIRTIO_BLK_S_OK as u8, result as u32)
378             } else {
379                 error!(
380                     "Request failed: {:x?} {:?}",
381                     request,
382                     io::Error::from_raw_os_error(-result)
383                 );
384                 return Err(Error::AsyncRequestFailure);
385             };
386 
387             mem.write_obj(status, request.status_addr)
388                 .map_err(Error::RequestStatus)?;
389 
390             let queue = &mut self.queue;
391 
392             queue
393                 .add_used(mem.deref(), desc_index, len)
394                 .map_err(Error::QueueAddUsed)?;
395             queue
396                 .enable_notification(mem.deref())
397                 .map_err(Error::QueueEnableNotification)?;
398         }
399 
400         self.counters
401             .write_bytes
402             .fetch_add(write_bytes.0, Ordering::AcqRel);
403         self.counters
404             .write_ops
405             .fetch_add(write_ops.0, Ordering::AcqRel);
406 
407         self.counters
408             .read_bytes
409             .fetch_add(read_bytes.0, Ordering::AcqRel);
410         self.counters
411             .read_ops
412             .fetch_add(read_ops.0, Ordering::AcqRel);
413 
414         Ok(())
415     }
416 
417     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
418         self.interrupt_cb
419             .trigger(VirtioInterruptType::Queue(self.queue_index))
420             .map_err(|e| {
421                 error!("Failed to signal used queue: {:?}", e);
422                 DeviceError::FailedSignalingUsedQueue(e)
423             })
424     }
425 
426     fn set_queue_thread_affinity(&self) {
427         // Prepare the CPU set the current queue thread is expected to run onto.
428         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
429             // SAFETY: all zeros is a valid pattern
430             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
431             // SAFETY: FFI call, trivially safe
432             unsafe { libc::CPU_ZERO(&mut cpuset) };
433             for host_cpu in host_cpus {
434                 // SAFETY: FFI call, trivially safe
435                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
436             }
437             cpuset
438         });
439 
440         // Schedule the thread to run on the expected CPU set
441         if let Some(cpuset) = cpuset.as_ref() {
442             // SAFETY: FFI call with correct arguments
443             let ret = unsafe {
444                 libc::sched_setaffinity(
445                     0,
446                     std::mem::size_of::<libc::cpu_set_t>(),
447                     cpuset as *const libc::cpu_set_t,
448                 )
449             };
450 
451             if ret != 0 {
452                 error!(
453                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
454                     self.queue_index,
455                     io::Error::last_os_error()
456                 )
457             }
458         }
459     }
460 
461     fn run(
462         &mut self,
463         paused: Arc<AtomicBool>,
464         paused_sync: Arc<Barrier>,
465     ) -> result::Result<(), EpollHelperError> {
466         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
467         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
468         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
469         if let Some(rate_limiter) = &self.rate_limiter {
470             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
471         }
472         self.set_queue_thread_affinity();
473         helper.run(paused, paused_sync, self)?;
474 
475         Ok(())
476     }
477 }
478 
479 impl EpollHelperHandler for BlockEpollHandler {
480     fn handle_event(
481         &mut self,
482         _helper: &mut EpollHelper,
483         event: &epoll::Event,
484     ) -> result::Result<(), EpollHelperError> {
485         let ev_type = event.data as u16;
486         match ev_type {
487             QUEUE_AVAIL_EVENT => {
488                 self.queue_evt.read().map_err(|e| {
489                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
490                 })?;
491 
492                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
493 
494                 // Process the queue only when the rate limit is not reached
495                 if !rate_limit_reached {
496                     self.process_queue_submit_and_signal()?
497                 }
498             }
499             COMPLETION_EVENT => {
500                 self.disk_image.notifier().read().map_err(|e| {
501                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
502                 })?;
503 
504                 self.process_queue_complete().map_err(|e| {
505                     EpollHelperError::HandleEvent(anyhow!(
506                         "Failed to process queue (complete): {:?}",
507                         e
508                     ))
509                 })?;
510 
511                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
512 
513                 // Process the queue only when the rate limit is not reached
514                 if !rate_limit_reached {
515                     self.process_queue_submit().map_err(|e| {
516                         EpollHelperError::HandleEvent(anyhow!(
517                             "Failed to process queue (submit): {:?}",
518                             e
519                         ))
520                     })?;
521                 }
522                 self.try_signal_used_queue()?;
523             }
524             RATE_LIMITER_EVENT => {
525                 if let Some(rate_limiter) = &mut self.rate_limiter {
526                     // Upon rate limiter event, call the rate limiter handler
527                     // and restart processing the queue.
528                     rate_limiter.event_handler().map_err(|e| {
529                         EpollHelperError::HandleEvent(anyhow!(
530                             "Failed to process rate limiter event: {:?}",
531                             e
532                         ))
533                     })?;
534 
535                     self.process_queue_submit_and_signal()?
536                 } else {
537                     return Err(EpollHelperError::HandleEvent(anyhow!(
538                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
539                     )));
540                 }
541             }
542             _ => {
543                 return Err(EpollHelperError::HandleEvent(anyhow!(
544                     "Unexpected event: {}",
545                     ev_type
546                 )));
547             }
548         }
549         Ok(())
550     }
551 }
552 
553 /// Virtio device for exposing block level read/write operations on a host file.
554 pub struct Block {
555     common: VirtioCommon,
556     id: String,
557     disk_image: Box<dyn DiskFile>,
558     disk_path: PathBuf,
559     disk_nsectors: u64,
560     config: VirtioBlockConfig,
561     writeback: Arc<AtomicBool>,
562     counters: BlockCounters,
563     seccomp_action: SeccompAction,
564     rate_limiter: Option<Arc<RateLimiterGroup>>,
565     exit_evt: EventFd,
566     read_only: bool,
567     serial: Vec<u8>,
568     queue_affinity: BTreeMap<u16, Vec<usize>>,
569 }
570 
571 #[derive(Serialize, Deserialize)]
572 pub struct BlockState {
573     pub disk_path: String,
574     pub disk_nsectors: u64,
575     pub avail_features: u64,
576     pub acked_features: u64,
577     pub config: VirtioBlockConfig,
578 }
579 
580 impl Block {
581     /// Create a new virtio block device that operates on the given file.
582     #[allow(clippy::too_many_arguments)]
583     pub fn new(
584         id: String,
585         mut disk_image: Box<dyn DiskFile>,
586         disk_path: PathBuf,
587         read_only: bool,
588         iommu: bool,
589         num_queues: usize,
590         queue_size: u16,
591         serial: Option<String>,
592         seccomp_action: SeccompAction,
593         rate_limiter: Option<Arc<RateLimiterGroup>>,
594         exit_evt: EventFd,
595         state: Option<BlockState>,
596         queue_affinity: BTreeMap<u16, Vec<usize>>,
597     ) -> io::Result<Self> {
598         let (disk_nsectors, avail_features, acked_features, config, paused) =
599             if let Some(state) = state {
600                 info!("Restoring virtio-block {}", id);
601                 (
602                     state.disk_nsectors,
603                     state.avail_features,
604                     state.acked_features,
605                     state.config,
606                     true,
607                 )
608             } else {
609                 let disk_size = disk_image.size().map_err(|e| {
610                     io::Error::new(
611                         io::ErrorKind::Other,
612                         format!("Failed getting disk size: {e}"),
613                     )
614                 })?;
615                 if disk_size % SECTOR_SIZE != 0 {
616                     warn!(
617                         "Disk size {} is not a multiple of sector size {}; \
618                  the remainder will not be visible to the guest.",
619                         disk_size, SECTOR_SIZE
620                     );
621                 }
622 
623                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
624                     | (1u64 << VIRTIO_BLK_F_FLUSH)
625                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
626                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
627                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY)
628                     | (1u64 << VIRTIO_RING_F_EVENT_IDX)
629                     | (1u64 << VIRTIO_RING_F_INDIRECT_DESC);
630                 if iommu {
631                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
632                 }
633 
634                 if read_only {
635                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
636                 }
637 
638                 let topology = disk_image.topology();
639                 info!("Disk topology: {:?}", topology);
640 
641                 let logical_block_size = if topology.logical_block_size > 512 {
642                     topology.logical_block_size
643                 } else {
644                     512
645                 };
646 
647                 // Calculate the exponent that maps physical block to logical block
648                 let mut physical_block_exp = 0;
649                 let mut size = logical_block_size;
650                 while size < topology.physical_block_size {
651                     physical_block_exp += 1;
652                     size <<= 1;
653                 }
654 
655                 let disk_nsectors = disk_size / SECTOR_SIZE;
656                 let mut config = VirtioBlockConfig {
657                     capacity: disk_nsectors,
658                     writeback: 1,
659                     blk_size: topology.logical_block_size as u32,
660                     physical_block_exp,
661                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
662                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
663                     ..Default::default()
664                 };
665 
666                 if num_queues > 1 {
667                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
668                     config.num_queues = num_queues as u16;
669                 }
670 
671                 (disk_nsectors, avail_features, 0, config, false)
672             };
673 
674         let serial = serial
675             .map(Vec::from)
676             .unwrap_or_else(|| build_serial(&disk_path));
677 
678         Ok(Block {
679             common: VirtioCommon {
680                 device_type: VirtioDeviceType::Block as u32,
681                 avail_features,
682                 acked_features,
683                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
684                 queue_sizes: vec![queue_size; num_queues],
685                 min_queues: 1,
686                 paused: Arc::new(AtomicBool::new(paused)),
687                 ..Default::default()
688             },
689             id,
690             disk_image,
691             disk_path,
692             disk_nsectors,
693             config,
694             writeback: Arc::new(AtomicBool::new(true)),
695             counters: BlockCounters::default(),
696             seccomp_action,
697             rate_limiter,
698             exit_evt,
699             read_only,
700             serial,
701             queue_affinity,
702         })
703     }
704 
705     fn state(&self) -> BlockState {
706         BlockState {
707             disk_path: self.disk_path.to_str().unwrap().to_owned(),
708             disk_nsectors: self.disk_nsectors,
709             avail_features: self.common.avail_features,
710             acked_features: self.common.acked_features,
711             config: self.config,
712         }
713     }
714 
715     fn update_writeback(&mut self) {
716         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
717         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
718             self.config.writeback == 1
719         } else {
720             // Else check if VIRTIO_BLK_F_FLUSH negotiated
721             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
722         };
723 
724         info!(
725             "Changing cache mode to {}",
726             if writeback {
727                 "writeback"
728             } else {
729                 "writethrough"
730             }
731         );
732         self.writeback.store(writeback, Ordering::Release);
733     }
734 
735     #[cfg(fuzzing)]
736     pub fn wait_for_epoll_threads(&mut self) {
737         self.common.wait_for_epoll_threads();
738     }
739 }
740 
741 impl Drop for Block {
742     fn drop(&mut self) {
743         if let Some(kill_evt) = self.common.kill_evt.take() {
744             // Ignore the result because there is nothing we can do about it.
745             let _ = kill_evt.write(1);
746         }
747         self.common.wait_for_epoll_threads();
748     }
749 }
750 
751 impl VirtioDevice for Block {
752     fn device_type(&self) -> u32 {
753         self.common.device_type
754     }
755 
756     fn queue_max_sizes(&self) -> &[u16] {
757         &self.common.queue_sizes
758     }
759 
760     fn features(&self) -> u64 {
761         self.common.avail_features
762     }
763 
764     fn ack_features(&mut self, value: u64) {
765         self.common.ack_features(value)
766     }
767 
768     fn read_config(&self, offset: u64, data: &mut [u8]) {
769         self.read_config_from_slice(self.config.as_slice(), offset, data);
770     }
771 
772     fn write_config(&mut self, offset: u64, data: &[u8]) {
773         // The "writeback" field is the only mutable field
774         let writeback_offset =
775             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
776         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
777         {
778             error!(
779                 "Attempt to write to read-only field: offset {:x} length {}",
780                 offset,
781                 data.len()
782             );
783             return;
784         }
785 
786         self.config.writeback = data[0];
787         self.update_writeback();
788     }
789 
790     fn activate(
791         &mut self,
792         mem: GuestMemoryAtomic<GuestMemoryMmap>,
793         interrupt_cb: Arc<dyn VirtioInterrupt>,
794         mut queues: Vec<(usize, Queue, EventFd)>,
795     ) -> ActivateResult {
796         self.common.activate(&queues, &interrupt_cb)?;
797 
798         self.update_writeback();
799 
800         let mut epoll_threads = Vec::new();
801         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
802 
803         for i in 0..queues.len() {
804             let (_, mut queue, queue_evt) = queues.remove(0);
805             queue.set_event_idx(event_idx);
806 
807             let queue_size = queue.size();
808             let (kill_evt, pause_evt) = self.common.dup_eventfds();
809             let queue_idx = i as u16;
810 
811             let mut handler = BlockEpollHandler {
812                 queue_index: queue_idx,
813                 queue,
814                 mem: mem.clone(),
815                 disk_image: self
816                     .disk_image
817                     .new_async_io(queue_size as u32)
818                     .map_err(|e| {
819                         error!("failed to create new AsyncIo: {}", e);
820                         ActivateError::BadActivate
821                     })?,
822                 disk_nsectors: self.disk_nsectors,
823                 interrupt_cb: interrupt_cb.clone(),
824                 serial: self.serial.clone(),
825                 kill_evt,
826                 pause_evt,
827                 writeback: self.writeback.clone(),
828                 counters: self.counters.clone(),
829                 queue_evt,
830                 // Analysis during boot shows around ~40 maximum requests
831                 // This gives head room for systems with slower I/O without
832                 // compromising the cost of the reallocation or memory overhead
833                 inflight_requests: VecDeque::with_capacity(64),
834                 rate_limiter: self
835                     .rate_limiter
836                     .as_ref()
837                     .map(|r| r.new_handle())
838                     .transpose()
839                     .unwrap(),
840                 access_platform: self.common.access_platform.clone(),
841                 read_only: self.read_only,
842                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
843             };
844 
845             let paused = self.common.paused.clone();
846             let paused_sync = self.common.paused_sync.clone();
847 
848             spawn_virtio_thread(
849                 &format!("{}_q{}", self.id.clone(), i),
850                 &self.seccomp_action,
851                 Thread::VirtioBlock,
852                 &mut epoll_threads,
853                 &self.exit_evt,
854                 move || handler.run(paused, paused_sync.unwrap()),
855             )?;
856         }
857 
858         self.common.epoll_threads = Some(epoll_threads);
859         event!("virtio-device", "activated", "id", &self.id);
860 
861         Ok(())
862     }
863 
864     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
865         let result = self.common.reset();
866         event!("virtio-device", "reset", "id", &self.id);
867         result
868     }
869 
870     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
871         let mut counters = HashMap::new();
872 
873         counters.insert(
874             "read_bytes",
875             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
876         );
877         counters.insert(
878             "write_bytes",
879             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
880         );
881         counters.insert(
882             "read_ops",
883             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
884         );
885         counters.insert(
886             "write_ops",
887             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
888         );
889         counters.insert(
890             "write_latency_min",
891             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
892         );
893         counters.insert(
894             "write_latency_max",
895             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
896         );
897         counters.insert(
898             "write_latency_avg",
899             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
900         );
901         counters.insert(
902             "read_latency_min",
903             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
904         );
905         counters.insert(
906             "read_latency_max",
907             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
908         );
909         counters.insert(
910             "read_latency_avg",
911             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
912         );
913 
914         Some(counters)
915     }
916 
917     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
918         self.common.set_access_platform(access_platform)
919     }
920 }
921 
922 impl Pausable for Block {
923     fn pause(&mut self) -> result::Result<(), MigratableError> {
924         self.common.pause()
925     }
926 
927     fn resume(&mut self) -> result::Result<(), MigratableError> {
928         self.common.resume()
929     }
930 }
931 
932 impl Snapshottable for Block {
933     fn id(&self) -> String {
934         self.id.clone()
935     }
936 
937     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
938         Snapshot::new_from_state(&self.state())
939     }
940 }
941 impl Transportable for Block {}
942 impl Migratable for Block {}
943