xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision 61e57e1cb149de03ae1e0b799b9e5ba9a4a63ace)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use std::collections::{BTreeMap, HashMap, VecDeque};
12 use std::num::Wrapping;
13 use std::ops::Deref;
14 use std::os::unix::io::AsRawFd;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17 use std::sync::{Arc, Barrier};
18 use std::{io, result};
19 
20 use anyhow::anyhow;
21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile};
22 use block::{build_serial, Request, RequestType, VirtioBlockConfig};
23 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
24 use rate_limiter::TokenType;
25 use seccompiler::SeccompAction;
26 use serde::{Deserialize, Serialize};
27 use thiserror::Error;
28 use virtio_bindings::virtio_blk::*;
29 use virtio_bindings::virtio_config::*;
30 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
31 use virtio_queue::{Queue, QueueOwnedT, QueueT};
32 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
33 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
34 use vm_virtio::AccessPlatform;
35 use vmm_sys_util::eventfd::EventFd;
36 
37 use super::{
38     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
39     Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
40     EPOLL_HELPER_EVENT_LAST,
41 };
42 use crate::seccomp_filters::Thread;
43 use crate::thread_helper::spawn_virtio_thread;
44 use crate::{GuestMemoryMmap, VirtioInterrupt};
45 
46 const SECTOR_SHIFT: u8 = 9;
47 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
48 
49 // New descriptors are pending on the virtio queue.
50 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
51 // New completed tasks are pending on the completion ring.
52 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
53 // New 'wake up' event from the rate limiter
54 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
55 
56 // latency scale, for reduce precision loss in calculate.
57 const LATENCY_SCALE: u64 = 10000;
58 
59 #[derive(Error, Debug)]
60 pub enum Error {
61     #[error("Failed to parse the request: {0}")]
62     RequestParsing(block::Error),
63     #[error("Failed to execute the request: {0}")]
64     RequestExecuting(block::ExecuteError),
65     #[error("Failed to complete the request: {0}")]
66     RequestCompleting(block::Error),
67     #[error("Missing the expected entry in the list of requests")]
68     MissingEntryRequestList,
69     #[error("The asynchronous request returned with failure")]
70     AsyncRequestFailure,
71     #[error("Failed synchronizing the file: {0}")]
72     Fsync(AsyncIoError),
73     #[error("Failed adding used index: {0}")]
74     QueueAddUsed(virtio_queue::Error),
75     #[error("Failed creating an iterator over the queue: {0}")]
76     QueueIterator(virtio_queue::Error),
77     #[error("Failed to update request status: {0}")]
78     RequestStatus(GuestMemoryError),
79     #[error("Failed to enable notification: {0}")]
80     QueueEnableNotification(virtio_queue::Error),
81 }
82 
83 pub type Result<T> = result::Result<T, Error>;
84 
85 // latency will be records as microseconds, average latency
86 // will be save as scaled value.
87 #[derive(Clone)]
88 pub struct BlockCounters {
89     read_bytes: Arc<AtomicU64>,
90     read_ops: Arc<AtomicU64>,
91     read_latency_min: Arc<AtomicU64>,
92     read_latency_max: Arc<AtomicU64>,
93     read_latency_avg: Arc<AtomicU64>,
94     write_bytes: Arc<AtomicU64>,
95     write_ops: Arc<AtomicU64>,
96     write_latency_min: Arc<AtomicU64>,
97     write_latency_max: Arc<AtomicU64>,
98     write_latency_avg: Arc<AtomicU64>,
99 }
100 
101 impl Default for BlockCounters {
102     fn default() -> Self {
103         BlockCounters {
104             read_bytes: Arc::new(AtomicU64::new(0)),
105             read_ops: Arc::new(AtomicU64::new(0)),
106             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
107             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
108             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
109             write_bytes: Arc::new(AtomicU64::new(0)),
110             write_ops: Arc::new(AtomicU64::new(0)),
111             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
112             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
113             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
114         }
115     }
116 }
117 
118 struct BlockEpollHandler {
119     queue_index: u16,
120     queue: Queue,
121     mem: GuestMemoryAtomic<GuestMemoryMmap>,
122     disk_image: Box<dyn AsyncIo>,
123     disk_nsectors: u64,
124     interrupt_cb: Arc<dyn VirtioInterrupt>,
125     serial: Vec<u8>,
126     kill_evt: EventFd,
127     pause_evt: EventFd,
128     writeback: Arc<AtomicBool>,
129     counters: BlockCounters,
130     queue_evt: EventFd,
131     inflight_requests: VecDeque<(u16, Request)>,
132     rate_limiter: Option<RateLimiterGroupHandle>,
133     access_platform: Option<Arc<dyn AccessPlatform>>,
134     read_only: bool,
135     host_cpus: Option<Vec<usize>>,
136 }
137 
138 impl BlockEpollHandler {
139     fn process_queue_submit(&mut self) -> Result<()> {
140         let queue = &mut self.queue;
141 
142         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
143             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
144                 .map_err(Error::RequestParsing)?;
145 
146             // For virtio spec compliance
147             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
148             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
149             if self.read_only
150                 && (request.request_type == RequestType::Out
151                     || request.request_type == RequestType::Flush)
152             {
153                 desc_chain
154                     .memory()
155                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
156                     .map_err(Error::RequestStatus)?;
157 
158                 // If no asynchronous operation has been submitted, we can
159                 // simply return the used descriptor.
160                 queue
161                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
162                     .map_err(Error::QueueAddUsed)?;
163                 queue
164                     .enable_notification(self.mem.memory().deref())
165                     .map_err(Error::QueueEnableNotification)?;
166                 continue;
167             }
168 
169             if let Some(rate_limiter) = &mut self.rate_limiter {
170                 // If limiter.consume() fails it means there is no more TokenType::Ops
171                 // budget and rate limiting is in effect.
172                 if !rate_limiter.consume(1, TokenType::Ops) {
173                     // Stop processing the queue and return this descriptor chain to the
174                     // avail ring, for later processing.
175                     queue.go_to_previous_position();
176                     break;
177                 }
178                 // Exercise the rate limiter only if this request is of data transfer type.
179                 if request.request_type == RequestType::In
180                     || request.request_type == RequestType::Out
181                 {
182                     let mut bytes = Wrapping(0);
183                     for (_, data_len) in &request.data_descriptors {
184                         bytes += Wrapping(*data_len as u64);
185                     }
186 
187                     // If limiter.consume() fails it means there is no more TokenType::Bytes
188                     // budget and rate limiting is in effect.
189                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
190                         // Revert the OPS consume().
191                         rate_limiter.manual_replenish(1, TokenType::Ops);
192                         // Stop processing the queue and return this descriptor chain to the
193                         // avail ring, for later processing.
194                         queue.go_to_previous_position();
195                         break;
196                     }
197                 };
198             }
199 
200             request.set_writeback(self.writeback.load(Ordering::Acquire));
201 
202             if request
203                 .execute_async(
204                     desc_chain.memory(),
205                     self.disk_nsectors,
206                     self.disk_image.as_mut(),
207                     &self.serial,
208                     desc_chain.head_index() as u64,
209                 )
210                 .map_err(Error::RequestExecuting)?
211             {
212                 self.inflight_requests
213                     .push_back((desc_chain.head_index(), request));
214             } else {
215                 desc_chain
216                     .memory()
217                     .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr)
218                     .map_err(Error::RequestStatus)?;
219 
220                 // If no asynchronous operation has been submitted, we can
221                 // simply return the used descriptor.
222                 queue
223                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
224                     .map_err(Error::QueueAddUsed)?;
225                 queue
226                     .enable_notification(self.mem.memory().deref())
227                     .map_err(Error::QueueEnableNotification)?;
228             }
229         }
230 
231         Ok(())
232     }
233 
234     fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
235         if self
236             .queue
237             .needs_notification(self.mem.memory().deref())
238             .map_err(|e| {
239                 EpollHelperError::HandleEvent(anyhow!(
240                     "Failed to check needs_notification: {:?}",
241                     e
242                 ))
243             })?
244         {
245             self.signal_used_queue().map_err(|e| {
246                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
247             })?;
248         }
249 
250         Ok(())
251     }
252 
253     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
254         self.process_queue_submit().map_err(|e| {
255             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
256         })?;
257 
258         self.try_signal_used_queue()
259     }
260 
261     #[inline]
262     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
263         // This loop neatly handles the fast path where the completions are
264         // in order (it turns into just a pop_front()) and the 1% of the time
265         // (analysis during boot) where slight out of ordering has been
266         // observed e.g.
267         // Submissions: 1 2 3 4 5 6 7
268         // Completions: 2 1 3 5 4 7 6
269         // In this case find the corresponding item and swap it with the front
270         // This is a O(1) operation and is prepared for the future as it it likely
271         // the next completion would be for the one that was skipped which will
272         // now be the new front.
273         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
274             if head == &completed_head {
275                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
276             }
277         }
278 
279         Err(Error::MissingEntryRequestList)
280     }
281 
282     fn process_queue_complete(&mut self) -> Result<()> {
283         let mem = self.mem.memory();
284         let mut read_bytes = Wrapping(0);
285         let mut write_bytes = Wrapping(0);
286         let mut read_ops = Wrapping(0);
287         let mut write_ops = Wrapping(0);
288 
289         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
290             let desc_index = user_data as u16;
291 
292             let mut request = self.find_inflight_request(desc_index)?;
293 
294             request.complete_async().map_err(Error::RequestCompleting)?;
295 
296             let latency = request.start.elapsed().as_micros() as u64;
297             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
298             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
299             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
300             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
301             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
302             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
303             let (status, len) = if result >= 0 {
304                 match request.request_type {
305                     RequestType::In => {
306                         for (_, data_len) in &request.data_descriptors {
307                             read_bytes += Wrapping(*data_len as u64);
308                         }
309                         read_ops += Wrapping(1);
310                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
311                             self.counters
312                                 .read_latency_min
313                                 .store(latency, Ordering::Relaxed);
314                         }
315                         if latency > read_max || read_max == u64::MAX {
316                             self.counters
317                                 .read_latency_max
318                                 .store(latency, Ordering::Relaxed);
319                         }
320 
321                         // Special case the first real latency report
322                         read_avg = if read_avg == u64::MAX {
323                             latency * LATENCY_SCALE
324                         } else {
325                             // Cumulative average is guaranteed to be
326                             // positive if being calculated properly
327                             (read_avg as i64
328                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
329                                     / (read_ops_last + read_ops.0) as i64)
330                                 .try_into()
331                                 .unwrap()
332                         };
333                     }
334                     RequestType::Out => {
335                         if !request.writeback {
336                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
337                         }
338                         for (_, data_len) in &request.data_descriptors {
339                             write_bytes += Wrapping(*data_len as u64);
340                         }
341                         write_ops += Wrapping(1);
342                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
343                             self.counters
344                                 .write_latency_min
345                                 .store(latency, Ordering::Relaxed);
346                         }
347                         if latency > write_max || write_max == u64::MAX {
348                             self.counters
349                                 .write_latency_max
350                                 .store(latency, Ordering::Relaxed);
351                         }
352 
353                         // Special case the first real latency report
354                         write_avg = if write_avg == u64::MAX {
355                             latency * LATENCY_SCALE
356                         } else {
357                             // Cumulative average is guaranteed to be
358                             // positive if being calculated properly
359                             (write_avg as i64
360                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
361                                     / (write_ops_last + write_ops.0) as i64)
362                                 .try_into()
363                                 .unwrap()
364                         }
365                     }
366                     _ => {}
367                 }
368 
369                 self.counters
370                     .read_latency_avg
371                     .store(read_avg, Ordering::Relaxed);
372 
373                 self.counters
374                     .write_latency_avg
375                     .store(write_avg, Ordering::Relaxed);
376 
377                 (VIRTIO_BLK_S_OK as u8, result as u32)
378             } else {
379                 error!(
380                     "Request failed: {:x?} {:?}",
381                     request,
382                     io::Error::from_raw_os_error(-result)
383                 );
384                 return Err(Error::AsyncRequestFailure);
385             };
386 
387             mem.write_obj(status, request.status_addr)
388                 .map_err(Error::RequestStatus)?;
389 
390             let queue = &mut self.queue;
391 
392             queue
393                 .add_used(mem.deref(), desc_index, len)
394                 .map_err(Error::QueueAddUsed)?;
395             queue
396                 .enable_notification(mem.deref())
397                 .map_err(Error::QueueEnableNotification)?;
398         }
399 
400         self.counters
401             .write_bytes
402             .fetch_add(write_bytes.0, Ordering::AcqRel);
403         self.counters
404             .write_ops
405             .fetch_add(write_ops.0, Ordering::AcqRel);
406 
407         self.counters
408             .read_bytes
409             .fetch_add(read_bytes.0, Ordering::AcqRel);
410         self.counters
411             .read_ops
412             .fetch_add(read_ops.0, Ordering::AcqRel);
413 
414         Ok(())
415     }
416 
417     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
418         self.interrupt_cb
419             .trigger(VirtioInterruptType::Queue(self.queue_index))
420             .map_err(|e| {
421                 error!("Failed to signal used queue: {:?}", e);
422                 DeviceError::FailedSignalingUsedQueue(e)
423             })
424     }
425 
426     fn set_queue_thread_affinity(&self) {
427         // Prepare the CPU set the current queue thread is expected to run onto.
428         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
429             // SAFETY: all zeros is a valid pattern
430             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
431             // SAFETY: FFI call, trivially safe
432             unsafe { libc::CPU_ZERO(&mut cpuset) };
433             for host_cpu in host_cpus {
434                 // SAFETY: FFI call, trivially safe
435                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
436             }
437             cpuset
438         });
439 
440         // Schedule the thread to run on the expected CPU set
441         if let Some(cpuset) = cpuset.as_ref() {
442             // SAFETY: FFI call with correct arguments
443             let ret = unsafe {
444                 libc::sched_setaffinity(
445                     0,
446                     std::mem::size_of::<libc::cpu_set_t>(),
447                     cpuset as *const libc::cpu_set_t,
448                 )
449             };
450 
451             if ret != 0 {
452                 error!(
453                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
454                     self.queue_index,
455                     io::Error::last_os_error()
456                 )
457             }
458         }
459     }
460 
461     fn run(
462         &mut self,
463         paused: Arc<AtomicBool>,
464         paused_sync: Arc<Barrier>,
465     ) -> result::Result<(), EpollHelperError> {
466         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
467         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
468         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
469         if let Some(rate_limiter) = &self.rate_limiter {
470             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
471         }
472         self.set_queue_thread_affinity();
473         helper.run(paused, paused_sync, self)?;
474 
475         Ok(())
476     }
477 }
478 
479 impl EpollHelperHandler for BlockEpollHandler {
480     fn handle_event(
481         &mut self,
482         _helper: &mut EpollHelper,
483         event: &epoll::Event,
484     ) -> result::Result<(), EpollHelperError> {
485         let ev_type = event.data as u16;
486         match ev_type {
487             QUEUE_AVAIL_EVENT => {
488                 self.queue_evt.read().map_err(|e| {
489                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
490                 })?;
491 
492                 let rate_limit_reached =
493                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
494 
495                 // Process the queue only when the rate limit is not reached
496                 if !rate_limit_reached {
497                     self.process_queue_submit_and_signal()?
498                 }
499             }
500             COMPLETION_EVENT => {
501                 self.disk_image.notifier().read().map_err(|e| {
502                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
503                 })?;
504 
505                 self.process_queue_complete().map_err(|e| {
506                     EpollHelperError::HandleEvent(anyhow!(
507                         "Failed to process queue (complete): {:?}",
508                         e
509                     ))
510                 })?;
511 
512                 let rate_limit_reached =
513                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
514 
515                 // Process the queue only when the rate limit is not reached
516                 if !rate_limit_reached {
517                     self.process_queue_submit().map_err(|e| {
518                         EpollHelperError::HandleEvent(anyhow!(
519                             "Failed to process queue (submit): {:?}",
520                             e
521                         ))
522                     })?;
523                 }
524                 self.try_signal_used_queue()?;
525             }
526             RATE_LIMITER_EVENT => {
527                 if let Some(rate_limiter) = &mut self.rate_limiter {
528                     // Upon rate limiter event, call the rate limiter handler
529                     // and restart processing the queue.
530                     rate_limiter.event_handler().map_err(|e| {
531                         EpollHelperError::HandleEvent(anyhow!(
532                             "Failed to process rate limiter event: {:?}",
533                             e
534                         ))
535                     })?;
536 
537                     self.process_queue_submit_and_signal()?
538                 } else {
539                     return Err(EpollHelperError::HandleEvent(anyhow!(
540                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
541                     )));
542                 }
543             }
544             _ => {
545                 return Err(EpollHelperError::HandleEvent(anyhow!(
546                     "Unexpected event: {}",
547                     ev_type
548                 )));
549             }
550         }
551         Ok(())
552     }
553 }
554 
555 /// Virtio device for exposing block level read/write operations on a host file.
556 pub struct Block {
557     common: VirtioCommon,
558     id: String,
559     disk_image: Box<dyn DiskFile>,
560     disk_path: PathBuf,
561     disk_nsectors: u64,
562     config: VirtioBlockConfig,
563     writeback: Arc<AtomicBool>,
564     counters: BlockCounters,
565     seccomp_action: SeccompAction,
566     rate_limiter: Option<Arc<RateLimiterGroup>>,
567     exit_evt: EventFd,
568     read_only: bool,
569     serial: Vec<u8>,
570     queue_affinity: BTreeMap<u16, Vec<usize>>,
571 }
572 
573 #[derive(Serialize, Deserialize)]
574 pub struct BlockState {
575     pub disk_path: String,
576     pub disk_nsectors: u64,
577     pub avail_features: u64,
578     pub acked_features: u64,
579     pub config: VirtioBlockConfig,
580 }
581 
582 impl Block {
583     /// Create a new virtio block device that operates on the given file.
584     #[allow(clippy::too_many_arguments)]
585     pub fn new(
586         id: String,
587         mut disk_image: Box<dyn DiskFile>,
588         disk_path: PathBuf,
589         read_only: bool,
590         iommu: bool,
591         num_queues: usize,
592         queue_size: u16,
593         serial: Option<String>,
594         seccomp_action: SeccompAction,
595         rate_limiter: Option<Arc<RateLimiterGroup>>,
596         exit_evt: EventFd,
597         state: Option<BlockState>,
598         queue_affinity: BTreeMap<u16, Vec<usize>>,
599     ) -> io::Result<Self> {
600         let (disk_nsectors, avail_features, acked_features, config, paused) =
601             if let Some(state) = state {
602                 info!("Restoring virtio-block {}", id);
603                 (
604                     state.disk_nsectors,
605                     state.avail_features,
606                     state.acked_features,
607                     state.config,
608                     true,
609                 )
610             } else {
611                 let disk_size = disk_image.size().map_err(|e| {
612                     io::Error::new(
613                         io::ErrorKind::Other,
614                         format!("Failed getting disk size: {e}"),
615                     )
616                 })?;
617                 if disk_size % SECTOR_SIZE != 0 {
618                     warn!(
619                         "Disk size {} is not a multiple of sector size {}; \
620                  the remainder will not be visible to the guest.",
621                         disk_size, SECTOR_SIZE
622                     );
623                 }
624 
625                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
626                     | (1u64 << VIRTIO_BLK_F_FLUSH)
627                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
628                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
629                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY)
630                     | (1u64 << VIRTIO_RING_F_EVENT_IDX);
631 
632                 if iommu {
633                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
634                 }
635 
636                 if read_only {
637                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
638                 }
639 
640                 let topology = disk_image.topology();
641                 info!("Disk topology: {:?}", topology);
642 
643                 let logical_block_size = if topology.logical_block_size > 512 {
644                     topology.logical_block_size
645                 } else {
646                     512
647                 };
648 
649                 // Calculate the exponent that maps physical block to logical block
650                 let mut physical_block_exp = 0;
651                 let mut size = logical_block_size;
652                 while size < topology.physical_block_size {
653                     physical_block_exp += 1;
654                     size <<= 1;
655                 }
656 
657                 let disk_nsectors = disk_size / SECTOR_SIZE;
658                 let mut config = VirtioBlockConfig {
659                     capacity: disk_nsectors,
660                     writeback: 1,
661                     blk_size: topology.logical_block_size as u32,
662                     physical_block_exp,
663                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
664                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
665                     ..Default::default()
666                 };
667 
668                 if num_queues > 1 {
669                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
670                     config.num_queues = num_queues as u16;
671                 }
672 
673                 (disk_nsectors, avail_features, 0, config, false)
674             };
675 
676         let serial = serial
677             .map(Vec::from)
678             .unwrap_or_else(|| build_serial(&disk_path));
679 
680         Ok(Block {
681             common: VirtioCommon {
682                 device_type: VirtioDeviceType::Block as u32,
683                 avail_features,
684                 acked_features,
685                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
686                 queue_sizes: vec![queue_size; num_queues],
687                 min_queues: 1,
688                 paused: Arc::new(AtomicBool::new(paused)),
689                 ..Default::default()
690             },
691             id,
692             disk_image,
693             disk_path,
694             disk_nsectors,
695             config,
696             writeback: Arc::new(AtomicBool::new(true)),
697             counters: BlockCounters::default(),
698             seccomp_action,
699             rate_limiter,
700             exit_evt,
701             read_only,
702             serial,
703             queue_affinity,
704         })
705     }
706 
707     fn state(&self) -> BlockState {
708         BlockState {
709             disk_path: self.disk_path.to_str().unwrap().to_owned(),
710             disk_nsectors: self.disk_nsectors,
711             avail_features: self.common.avail_features,
712             acked_features: self.common.acked_features,
713             config: self.config,
714         }
715     }
716 
717     fn update_writeback(&mut self) {
718         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
719         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
720             self.config.writeback == 1
721         } else {
722             // Else check if VIRTIO_BLK_F_FLUSH negotiated
723             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
724         };
725 
726         info!(
727             "Changing cache mode to {}",
728             if writeback {
729                 "writeback"
730             } else {
731                 "writethrough"
732             }
733         );
734         self.writeback.store(writeback, Ordering::Release);
735     }
736 
737     #[cfg(fuzzing)]
738     pub fn wait_for_epoll_threads(&mut self) {
739         self.common.wait_for_epoll_threads();
740     }
741 }
742 
743 impl Drop for Block {
744     fn drop(&mut self) {
745         if let Some(kill_evt) = self.common.kill_evt.take() {
746             // Ignore the result because there is nothing we can do about it.
747             let _ = kill_evt.write(1);
748         }
749         self.common.wait_for_epoll_threads();
750     }
751 }
752 
753 impl VirtioDevice for Block {
754     fn device_type(&self) -> u32 {
755         self.common.device_type
756     }
757 
758     fn queue_max_sizes(&self) -> &[u16] {
759         &self.common.queue_sizes
760     }
761 
762     fn features(&self) -> u64 {
763         self.common.avail_features
764     }
765 
766     fn ack_features(&mut self, value: u64) {
767         self.common.ack_features(value)
768     }
769 
770     fn read_config(&self, offset: u64, data: &mut [u8]) {
771         self.read_config_from_slice(self.config.as_slice(), offset, data);
772     }
773 
774     fn write_config(&mut self, offset: u64, data: &[u8]) {
775         // The "writeback" field is the only mutable field
776         let writeback_offset =
777             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
778         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
779         {
780             error!(
781                 "Attempt to write to read-only field: offset {:x} length {}",
782                 offset,
783                 data.len()
784             );
785             return;
786         }
787 
788         self.config.writeback = data[0];
789         self.update_writeback();
790     }
791 
792     fn activate(
793         &mut self,
794         mem: GuestMemoryAtomic<GuestMemoryMmap>,
795         interrupt_cb: Arc<dyn VirtioInterrupt>,
796         mut queues: Vec<(usize, Queue, EventFd)>,
797     ) -> ActivateResult {
798         self.common.activate(&queues, &interrupt_cb)?;
799 
800         self.update_writeback();
801 
802         let mut epoll_threads = Vec::new();
803         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
804 
805         for i in 0..queues.len() {
806             let (_, mut queue, queue_evt) = queues.remove(0);
807             queue.set_event_idx(event_idx);
808 
809             let queue_size = queue.size();
810             let (kill_evt, pause_evt) = self.common.dup_eventfds();
811             let queue_idx = i as u16;
812 
813             let mut handler = BlockEpollHandler {
814                 queue_index: queue_idx,
815                 queue,
816                 mem: mem.clone(),
817                 disk_image: self
818                     .disk_image
819                     .new_async_io(queue_size as u32)
820                     .map_err(|e| {
821                         error!("failed to create new AsyncIo: {}", e);
822                         ActivateError::BadActivate
823                     })?,
824                 disk_nsectors: self.disk_nsectors,
825                 interrupt_cb: interrupt_cb.clone(),
826                 serial: self.serial.clone(),
827                 kill_evt,
828                 pause_evt,
829                 writeback: self.writeback.clone(),
830                 counters: self.counters.clone(),
831                 queue_evt,
832                 // Analysis during boot shows around ~40 maximum requests
833                 // This gives head room for systems with slower I/O without
834                 // compromising the cost of the reallocation or memory overhead
835                 inflight_requests: VecDeque::with_capacity(64),
836                 rate_limiter: self
837                     .rate_limiter
838                     .as_ref()
839                     .map(|r| r.new_handle())
840                     .transpose()
841                     .unwrap(),
842                 access_platform: self.common.access_platform.clone(),
843                 read_only: self.read_only,
844                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
845             };
846 
847             let paused = self.common.paused.clone();
848             let paused_sync = self.common.paused_sync.clone();
849 
850             spawn_virtio_thread(
851                 &format!("{}_q{}", self.id.clone(), i),
852                 &self.seccomp_action,
853                 Thread::VirtioBlock,
854                 &mut epoll_threads,
855                 &self.exit_evt,
856                 move || handler.run(paused, paused_sync.unwrap()),
857             )?;
858         }
859 
860         self.common.epoll_threads = Some(epoll_threads);
861         event!("virtio-device", "activated", "id", &self.id);
862 
863         Ok(())
864     }
865 
866     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
867         let result = self.common.reset();
868         event!("virtio-device", "reset", "id", &self.id);
869         result
870     }
871 
872     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
873         let mut counters = HashMap::new();
874 
875         counters.insert(
876             "read_bytes",
877             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
878         );
879         counters.insert(
880             "write_bytes",
881             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
882         );
883         counters.insert(
884             "read_ops",
885             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
886         );
887         counters.insert(
888             "write_ops",
889             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
890         );
891         counters.insert(
892             "write_latency_min",
893             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
894         );
895         counters.insert(
896             "write_latency_max",
897             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
898         );
899         counters.insert(
900             "write_latency_avg",
901             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
902         );
903         counters.insert(
904             "read_latency_min",
905             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
906         );
907         counters.insert(
908             "read_latency_max",
909             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
910         );
911         counters.insert(
912             "read_latency_avg",
913             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
914         );
915 
916         Some(counters)
917     }
918 
919     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
920         self.common.set_access_platform(access_platform)
921     }
922 }
923 
924 impl Pausable for Block {
925     fn pause(&mut self) -> result::Result<(), MigratableError> {
926         self.common.pause()
927     }
928 
929     fn resume(&mut self) -> result::Result<(), MigratableError> {
930         self.common.resume()
931     }
932 }
933 
934 impl Snapshottable for Block {
935     fn id(&self) -> String {
936         self.id.clone()
937     }
938 
939     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
940         Snapshot::new_from_state(&self.state())
941     }
942 }
943 impl Transportable for Block {}
944 impl Migratable for Block {}
945