xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision 190a11f2124b0b60a2d44e85b7c9988373acfb6d)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use std::collections::{BTreeMap, HashMap, VecDeque};
12 use std::num::Wrapping;
13 use std::ops::Deref;
14 use std::os::unix::io::AsRawFd;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17 use std::sync::{Arc, Barrier};
18 use std::{io, result};
19 
20 use anyhow::anyhow;
21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile};
22 use block::fcntl::{get_lock_state, LockError, LockType};
23 use block::{build_serial, fcntl, Request, RequestType, VirtioBlockConfig};
24 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
25 use rate_limiter::TokenType;
26 use seccompiler::SeccompAction;
27 use serde::{Deserialize, Serialize};
28 use thiserror::Error;
29 use virtio_bindings::virtio_blk::*;
30 use virtio_bindings::virtio_config::*;
31 use virtio_bindings::virtio_ring::{VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC};
32 use virtio_queue::{Queue, QueueOwnedT, QueueT};
33 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
34 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
35 use vm_virtio::AccessPlatform;
36 use vmm_sys_util::eventfd::EventFd;
37 
38 use super::{
39     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
40     Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
41     EPOLL_HELPER_EVENT_LAST,
42 };
43 use crate::seccomp_filters::Thread;
44 use crate::thread_helper::spawn_virtio_thread;
45 use crate::{GuestMemoryMmap, VirtioInterrupt};
46 
47 const SECTOR_SHIFT: u8 = 9;
48 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
49 
50 // New descriptors are pending on the virtio queue.
51 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
52 // New completed tasks are pending on the completion ring.
53 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
54 // New 'wake up' event from the rate limiter
55 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
56 
57 // latency scale, for reduce precision loss in calculate.
58 const LATENCY_SCALE: u64 = 10000;
59 
60 pub const MINIMUM_BLOCK_QUEUE_SIZE: u16 = 2;
61 
62 #[derive(Error, Debug)]
63 pub enum Error {
64     #[error("Failed to parse the request")]
65     RequestParsing(#[source] block::Error),
66     #[error("Failed to execute the request")]
67     RequestExecuting(#[source] block::ExecuteError),
68     #[error("Failed to complete the request")]
69     RequestCompleting(#[source] block::Error),
70     #[error("Missing the expected entry in the list of requests")]
71     MissingEntryRequestList,
72     #[error("The asynchronous request returned with failure")]
73     AsyncRequestFailure,
74     #[error("Failed synchronizing the file")]
75     Fsync(#[source] AsyncIoError),
76     #[error("Failed adding used index")]
77     QueueAddUsed(#[source] virtio_queue::Error),
78     #[error("Failed creating an iterator over the queue")]
79     QueueIterator(#[source] virtio_queue::Error),
80     #[error("Failed to update request status")]
81     RequestStatus(#[source] GuestMemoryError),
82     #[error("Failed to enable notification")]
83     QueueEnableNotification(#[source] virtio_queue::Error),
84     #[error("Failed to get {lock_type:?} lock for disk image: {path}")]
85     LockDiskImage {
86         /// The underlying error.
87         #[source]
88         error: LockError,
89         /// The requested lock type.
90         lock_type: LockType,
91         /// The path of the disk image.
92         path: PathBuf,
93     },
94 }
95 
96 pub type Result<T> = result::Result<T, Error>;
97 
98 // latency will be records as microseconds, average latency
99 // will be save as scaled value.
100 #[derive(Clone)]
101 pub struct BlockCounters {
102     read_bytes: Arc<AtomicU64>,
103     read_ops: Arc<AtomicU64>,
104     read_latency_min: Arc<AtomicU64>,
105     read_latency_max: Arc<AtomicU64>,
106     read_latency_avg: Arc<AtomicU64>,
107     write_bytes: Arc<AtomicU64>,
108     write_ops: Arc<AtomicU64>,
109     write_latency_min: Arc<AtomicU64>,
110     write_latency_max: Arc<AtomicU64>,
111     write_latency_avg: Arc<AtomicU64>,
112 }
113 
114 impl Default for BlockCounters {
115     fn default() -> Self {
116         BlockCounters {
117             read_bytes: Arc::new(AtomicU64::new(0)),
118             read_ops: Arc::new(AtomicU64::new(0)),
119             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
120             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
121             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
122             write_bytes: Arc::new(AtomicU64::new(0)),
123             write_ops: Arc::new(AtomicU64::new(0)),
124             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
125             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
126             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
127         }
128     }
129 }
130 
131 struct BlockEpollHandler {
132     queue_index: u16,
133     queue: Queue,
134     mem: GuestMemoryAtomic<GuestMemoryMmap>,
135     disk_image: Box<dyn AsyncIo>,
136     disk_nsectors: u64,
137     interrupt_cb: Arc<dyn VirtioInterrupt>,
138     serial: Vec<u8>,
139     kill_evt: EventFd,
140     pause_evt: EventFd,
141     writeback: Arc<AtomicBool>,
142     counters: BlockCounters,
143     queue_evt: EventFd,
144     inflight_requests: VecDeque<(u16, Request)>,
145     rate_limiter: Option<RateLimiterGroupHandle>,
146     access_platform: Option<Arc<dyn AccessPlatform>>,
147     read_only: bool,
148     host_cpus: Option<Vec<usize>>,
149 }
150 
151 impl BlockEpollHandler {
152     fn process_queue_submit(&mut self) -> Result<()> {
153         let queue = &mut self.queue;
154 
155         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
156             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
157                 .map_err(Error::RequestParsing)?;
158 
159             // For virtio spec compliance
160             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
161             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
162             if self.read_only
163                 && (request.request_type == RequestType::Out
164                     || request.request_type == RequestType::Flush)
165             {
166                 desc_chain
167                     .memory()
168                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
169                     .map_err(Error::RequestStatus)?;
170 
171                 // If no asynchronous operation has been submitted, we can
172                 // simply return the used descriptor.
173                 queue
174                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
175                     .map_err(Error::QueueAddUsed)?;
176                 queue
177                     .enable_notification(self.mem.memory().deref())
178                     .map_err(Error::QueueEnableNotification)?;
179                 continue;
180             }
181 
182             if let Some(rate_limiter) = &mut self.rate_limiter {
183                 // If limiter.consume() fails it means there is no more TokenType::Ops
184                 // budget and rate limiting is in effect.
185                 if !rate_limiter.consume(1, TokenType::Ops) {
186                     // Stop processing the queue and return this descriptor chain to the
187                     // avail ring, for later processing.
188                     queue.go_to_previous_position();
189                     break;
190                 }
191                 // Exercise the rate limiter only if this request is of data transfer type.
192                 if request.request_type == RequestType::In
193                     || request.request_type == RequestType::Out
194                 {
195                     let mut bytes = Wrapping(0);
196                     for (_, data_len) in &request.data_descriptors {
197                         bytes += Wrapping(*data_len as u64);
198                     }
199 
200                     // If limiter.consume() fails it means there is no more TokenType::Bytes
201                     // budget and rate limiting is in effect.
202                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
203                         // Revert the OPS consume().
204                         rate_limiter.manual_replenish(1, TokenType::Ops);
205                         // Stop processing the queue and return this descriptor chain to the
206                         // avail ring, for later processing.
207                         queue.go_to_previous_position();
208                         break;
209                     }
210                 };
211             }
212 
213             request.set_writeback(self.writeback.load(Ordering::Acquire));
214 
215             let result = request.execute_async(
216                 desc_chain.memory(),
217                 self.disk_nsectors,
218                 self.disk_image.as_mut(),
219                 &self.serial,
220                 desc_chain.head_index() as u64,
221             );
222 
223             if let Ok(true) = result {
224                 self.inflight_requests
225                     .push_back((desc_chain.head_index(), request));
226             } else {
227                 let status = match result {
228                     Ok(_) => VIRTIO_BLK_S_OK,
229                     Err(e) => {
230                         warn!("Request failed: {:x?} {:?}", request, e);
231                         VIRTIO_BLK_S_IOERR
232                     }
233                 };
234 
235                 desc_chain
236                     .memory()
237                     .write_obj(status as u8, request.status_addr)
238                     .map_err(Error::RequestStatus)?;
239 
240                 // If no asynchronous operation has been submitted, we can
241                 // simply return the used descriptor.
242                 queue
243                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
244                     .map_err(Error::QueueAddUsed)?;
245                 queue
246                     .enable_notification(self.mem.memory().deref())
247                     .map_err(Error::QueueEnableNotification)?;
248             }
249         }
250 
251         Ok(())
252     }
253 
254     fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
255         if self
256             .queue
257             .needs_notification(self.mem.memory().deref())
258             .map_err(|e| {
259                 EpollHelperError::HandleEvent(anyhow!(
260                     "Failed to check needs_notification: {:?}",
261                     e
262                 ))
263             })?
264         {
265             self.signal_used_queue().map_err(|e| {
266                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
267             })?;
268         }
269 
270         Ok(())
271     }
272 
273     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
274         self.process_queue_submit().map_err(|e| {
275             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
276         })?;
277 
278         self.try_signal_used_queue()
279     }
280 
281     #[inline]
282     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
283         // This loop neatly handles the fast path where the completions are
284         // in order (it turns into just a pop_front()) and the 1% of the time
285         // (analysis during boot) where slight out of ordering has been
286         // observed e.g.
287         // Submissions: 1 2 3 4 5 6 7
288         // Completions: 2 1 3 5 4 7 6
289         // In this case find the corresponding item and swap it with the front
290         // This is a O(1) operation and is prepared for the future as it it likely
291         // the next completion would be for the one that was skipped which will
292         // now be the new front.
293         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
294             if head == &completed_head {
295                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
296             }
297         }
298 
299         Err(Error::MissingEntryRequestList)
300     }
301 
302     fn process_queue_complete(&mut self) -> Result<()> {
303         let mem = self.mem.memory();
304         let mut read_bytes = Wrapping(0);
305         let mut write_bytes = Wrapping(0);
306         let mut read_ops = Wrapping(0);
307         let mut write_ops = Wrapping(0);
308 
309         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
310             let desc_index = user_data as u16;
311 
312             let mut request = self.find_inflight_request(desc_index)?;
313 
314             request.complete_async().map_err(Error::RequestCompleting)?;
315 
316             let latency = request.start.elapsed().as_micros() as u64;
317             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
318             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
319             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
320             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
321             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
322             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
323             let (status, len) = if result >= 0 {
324                 match request.request_type {
325                     RequestType::In => {
326                         for (_, data_len) in &request.data_descriptors {
327                             read_bytes += Wrapping(*data_len as u64);
328                         }
329                         read_ops += Wrapping(1);
330                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
331                             self.counters
332                                 .read_latency_min
333                                 .store(latency, Ordering::Relaxed);
334                         }
335                         if latency > read_max || read_max == u64::MAX {
336                             self.counters
337                                 .read_latency_max
338                                 .store(latency, Ordering::Relaxed);
339                         }
340 
341                         // Special case the first real latency report
342                         read_avg = if read_avg == u64::MAX {
343                             latency * LATENCY_SCALE
344                         } else {
345                             // Cumulative average is guaranteed to be
346                             // positive if being calculated properly
347                             (read_avg as i64
348                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
349                                     / (read_ops_last + read_ops.0) as i64)
350                                 .try_into()
351                                 .unwrap()
352                         };
353                     }
354                     RequestType::Out => {
355                         if !request.writeback {
356                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
357                         }
358                         for (_, data_len) in &request.data_descriptors {
359                             write_bytes += Wrapping(*data_len as u64);
360                         }
361                         write_ops += Wrapping(1);
362                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
363                             self.counters
364                                 .write_latency_min
365                                 .store(latency, Ordering::Relaxed);
366                         }
367                         if latency > write_max || write_max == u64::MAX {
368                             self.counters
369                                 .write_latency_max
370                                 .store(latency, Ordering::Relaxed);
371                         }
372 
373                         // Special case the first real latency report
374                         write_avg = if write_avg == u64::MAX {
375                             latency * LATENCY_SCALE
376                         } else {
377                             // Cumulative average is guaranteed to be
378                             // positive if being calculated properly
379                             (write_avg as i64
380                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
381                                     / (write_ops_last + write_ops.0) as i64)
382                                 .try_into()
383                                 .unwrap()
384                         }
385                     }
386                     _ => {}
387                 }
388 
389                 self.counters
390                     .read_latency_avg
391                     .store(read_avg, Ordering::Relaxed);
392 
393                 self.counters
394                     .write_latency_avg
395                     .store(write_avg, Ordering::Relaxed);
396 
397                 (VIRTIO_BLK_S_OK as u8, result as u32)
398             } else {
399                 warn!(
400                     "Request failed: {:x?} {:?}",
401                     request,
402                     io::Error::from_raw_os_error(-result)
403                 );
404                 (VIRTIO_BLK_S_IOERR as u8, 0)
405             };
406 
407             mem.write_obj(status, request.status_addr)
408                 .map_err(Error::RequestStatus)?;
409 
410             let queue = &mut self.queue;
411 
412             queue
413                 .add_used(mem.deref(), desc_index, len)
414                 .map_err(Error::QueueAddUsed)?;
415             queue
416                 .enable_notification(mem.deref())
417                 .map_err(Error::QueueEnableNotification)?;
418         }
419 
420         self.counters
421             .write_bytes
422             .fetch_add(write_bytes.0, Ordering::AcqRel);
423         self.counters
424             .write_ops
425             .fetch_add(write_ops.0, Ordering::AcqRel);
426 
427         self.counters
428             .read_bytes
429             .fetch_add(read_bytes.0, Ordering::AcqRel);
430         self.counters
431             .read_ops
432             .fetch_add(read_ops.0, Ordering::AcqRel);
433 
434         Ok(())
435     }
436 
437     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
438         self.interrupt_cb
439             .trigger(VirtioInterruptType::Queue(self.queue_index))
440             .map_err(|e| {
441                 error!("Failed to signal used queue: {:?}", e);
442                 DeviceError::FailedSignalingUsedQueue(e)
443             })
444     }
445 
446     fn set_queue_thread_affinity(&self) {
447         // Prepare the CPU set the current queue thread is expected to run onto.
448         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
449             // SAFETY: all zeros is a valid pattern
450             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
451             // SAFETY: FFI call, trivially safe
452             unsafe { libc::CPU_ZERO(&mut cpuset) };
453             for host_cpu in host_cpus {
454                 // SAFETY: FFI call, trivially safe
455                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
456             }
457             cpuset
458         });
459 
460         // Schedule the thread to run on the expected CPU set
461         if let Some(cpuset) = cpuset.as_ref() {
462             // SAFETY: FFI call with correct arguments
463             let ret = unsafe {
464                 libc::sched_setaffinity(
465                     0,
466                     std::mem::size_of::<libc::cpu_set_t>(),
467                     cpuset as *const libc::cpu_set_t,
468                 )
469             };
470 
471             if ret != 0 {
472                 error!(
473                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
474                     self.queue_index,
475                     io::Error::last_os_error()
476                 )
477             }
478         }
479     }
480 
481     fn run(
482         &mut self,
483         paused: Arc<AtomicBool>,
484         paused_sync: Arc<Barrier>,
485     ) -> result::Result<(), EpollHelperError> {
486         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
487         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
488         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
489         if let Some(rate_limiter) = &self.rate_limiter {
490             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
491         }
492         self.set_queue_thread_affinity();
493         helper.run(paused, paused_sync, self)?;
494 
495         Ok(())
496     }
497 }
498 
499 impl EpollHelperHandler for BlockEpollHandler {
500     fn handle_event(
501         &mut self,
502         _helper: &mut EpollHelper,
503         event: &epoll::Event,
504     ) -> result::Result<(), EpollHelperError> {
505         let ev_type = event.data as u16;
506         match ev_type {
507             QUEUE_AVAIL_EVENT => {
508                 self.queue_evt.read().map_err(|e| {
509                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
510                 })?;
511 
512                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
513 
514                 // Process the queue only when the rate limit is not reached
515                 if !rate_limit_reached {
516                     self.process_queue_submit_and_signal()?
517                 }
518             }
519             COMPLETION_EVENT => {
520                 self.disk_image.notifier().read().map_err(|e| {
521                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
522                 })?;
523 
524                 self.process_queue_complete().map_err(|e| {
525                     EpollHelperError::HandleEvent(anyhow!(
526                         "Failed to process queue (complete): {:?}",
527                         e
528                     ))
529                 })?;
530 
531                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
532 
533                 // Process the queue only when the rate limit is not reached
534                 if !rate_limit_reached {
535                     self.process_queue_submit().map_err(|e| {
536                         EpollHelperError::HandleEvent(anyhow!(
537                             "Failed to process queue (submit): {:?}",
538                             e
539                         ))
540                     })?;
541                 }
542                 self.try_signal_used_queue()?;
543             }
544             RATE_LIMITER_EVENT => {
545                 if let Some(rate_limiter) = &mut self.rate_limiter {
546                     // Upon rate limiter event, call the rate limiter handler
547                     // and restart processing the queue.
548                     rate_limiter.event_handler().map_err(|e| {
549                         EpollHelperError::HandleEvent(anyhow!(
550                             "Failed to process rate limiter event: {:?}",
551                             e
552                         ))
553                     })?;
554 
555                     self.process_queue_submit_and_signal()?
556                 } else {
557                     return Err(EpollHelperError::HandleEvent(anyhow!(
558                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
559                     )));
560                 }
561             }
562             _ => {
563                 return Err(EpollHelperError::HandleEvent(anyhow!(
564                     "Unexpected event: {}",
565                     ev_type
566                 )));
567             }
568         }
569         Ok(())
570     }
571 }
572 
573 /// Virtio device for exposing block level read/write operations on a host file.
574 pub struct Block {
575     common: VirtioCommon,
576     id: String,
577     disk_image: Box<dyn DiskFile>,
578     disk_path: PathBuf,
579     disk_nsectors: u64,
580     config: VirtioBlockConfig,
581     writeback: Arc<AtomicBool>,
582     counters: BlockCounters,
583     seccomp_action: SeccompAction,
584     rate_limiter: Option<Arc<RateLimiterGroup>>,
585     exit_evt: EventFd,
586     read_only: bool,
587     serial: Vec<u8>,
588     queue_affinity: BTreeMap<u16, Vec<usize>>,
589 }
590 
591 #[derive(Serialize, Deserialize)]
592 pub struct BlockState {
593     pub disk_path: String,
594     pub disk_nsectors: u64,
595     pub avail_features: u64,
596     pub acked_features: u64,
597     pub config: VirtioBlockConfig,
598 }
599 
600 impl Block {
601     /// Create a new virtio block device that operates on the given file.
602     #[allow(clippy::too_many_arguments)]
603     pub fn new(
604         id: String,
605         mut disk_image: Box<dyn DiskFile>,
606         disk_path: PathBuf,
607         read_only: bool,
608         iommu: bool,
609         num_queues: usize,
610         queue_size: u16,
611         serial: Option<String>,
612         seccomp_action: SeccompAction,
613         rate_limiter: Option<Arc<RateLimiterGroup>>,
614         exit_evt: EventFd,
615         state: Option<BlockState>,
616         queue_affinity: BTreeMap<u16, Vec<usize>>,
617     ) -> io::Result<Self> {
618         let (disk_nsectors, avail_features, acked_features, config, paused) =
619             if let Some(state) = state {
620                 info!("Restoring virtio-block {}", id);
621                 (
622                     state.disk_nsectors,
623                     state.avail_features,
624                     state.acked_features,
625                     state.config,
626                     true,
627                 )
628             } else {
629                 let disk_size = disk_image
630                     .size()
631                     .map_err(|e| io::Error::other(format!("Failed getting disk size: {e}")))?;
632                 if disk_size % SECTOR_SIZE != 0 {
633                     warn!(
634                         "Disk size {} is not a multiple of sector size {}; \
635                  the remainder will not be visible to the guest.",
636                         disk_size, SECTOR_SIZE
637                     );
638                 }
639 
640                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
641                     | (1u64 << VIRTIO_BLK_F_FLUSH)
642                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
643                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
644                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY)
645                     | (1u64 << VIRTIO_BLK_F_SEG_MAX)
646                     | (1u64 << VIRTIO_RING_F_EVENT_IDX)
647                     | (1u64 << VIRTIO_RING_F_INDIRECT_DESC);
648                 if iommu {
649                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
650                 }
651 
652                 if read_only {
653                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
654                 }
655 
656                 let topology = disk_image.topology();
657                 info!("Disk topology: {:?}", topology);
658 
659                 let logical_block_size = if topology.logical_block_size > 512 {
660                     topology.logical_block_size
661                 } else {
662                     512
663                 };
664 
665                 // Calculate the exponent that maps physical block to logical block
666                 let mut physical_block_exp = 0;
667                 let mut size = logical_block_size;
668                 while size < topology.physical_block_size {
669                     physical_block_exp += 1;
670                     size <<= 1;
671                 }
672 
673                 let disk_nsectors = disk_size / SECTOR_SIZE;
674                 let mut config = VirtioBlockConfig {
675                     capacity: disk_nsectors,
676                     writeback: 1,
677                     blk_size: topology.logical_block_size as u32,
678                     physical_block_exp,
679                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
680                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
681                     seg_max: (queue_size - MINIMUM_BLOCK_QUEUE_SIZE) as u32,
682                     ..Default::default()
683                 };
684 
685                 if num_queues > 1 {
686                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
687                     config.num_queues = num_queues as u16;
688                 }
689 
690                 (disk_nsectors, avail_features, 0, config, false)
691             };
692 
693         let serial = serial
694             .map(Vec::from)
695             .unwrap_or_else(|| build_serial(&disk_path));
696 
697         Ok(Block {
698             common: VirtioCommon {
699                 device_type: VirtioDeviceType::Block as u32,
700                 avail_features,
701                 acked_features,
702                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
703                 queue_sizes: vec![queue_size; num_queues],
704                 min_queues: 1,
705                 paused: Arc::new(AtomicBool::new(paused)),
706                 ..Default::default()
707             },
708             id,
709             disk_image,
710             disk_path,
711             disk_nsectors,
712             config,
713             writeback: Arc::new(AtomicBool::new(true)),
714             counters: BlockCounters::default(),
715             seccomp_action,
716             rate_limiter,
717             exit_evt,
718             read_only,
719             serial,
720             queue_affinity,
721         })
722     }
723 
724     /// Tries to set an advisory lock for the corresponding disk image.
725     pub fn try_lock_image(&mut self) -> Result<()> {
726         let lock_type = match self.read_only {
727             true => LockType::Read,
728             false => LockType::Write,
729         };
730         log::debug!(
731             "Attempting to acquire {lock_type:?} lock for disk image id={},path={}",
732             self.id,
733             self.disk_path.display()
734         );
735         let fd = self.disk_image.fd();
736         fcntl::try_acquire_lock(fd, lock_type).map_err(|error| {
737             let current_lock = get_lock_state(fd);
738             // Don't propagate the error to the outside, as it is not useful at all. Instead,
739             // we try to log additional help to the user.
740             if let Ok(current_lock) = current_lock {
741                 log::error!("Can't get {lock_type:?} lock for {} as there is already a {current_lock:?} lock", self.disk_path.display());
742             } else {
743                 log::error!("Can't get {lock_type:?} lock for {}, but also can't determine the current lock state", self.disk_path.display());
744             }
745             Error::LockDiskImage {
746                 path: self.disk_path.clone(),
747                 error,
748                 lock_type,
749             }
750         })?;
751         log::info!(
752             "Acquired {lock_type:?} lock for disk image id={},path={}",
753             self.id,
754             self.disk_path.display()
755         );
756         Ok(())
757     }
758 
759     /// Releases the advisory lock held for the corresponding disk image.
760     pub fn unlock_image(&mut self) -> Result<()> {
761         // It is very unlikely that this fails;
762         // Should we remove the Result to simplify the error propagation on
763         // higher levels?
764         fcntl::clear_lock(self.disk_image.fd()).map_err(|error| Error::LockDiskImage {
765             path: self.disk_path.clone(),
766             error,
767             lock_type: LockType::Unlock,
768         })
769     }
770 
771     fn state(&self) -> BlockState {
772         BlockState {
773             disk_path: self.disk_path.to_str().unwrap().to_owned(),
774             disk_nsectors: self.disk_nsectors,
775             avail_features: self.common.avail_features,
776             acked_features: self.common.acked_features,
777             config: self.config,
778         }
779     }
780 
781     fn update_writeback(&mut self) {
782         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
783         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
784             self.config.writeback == 1
785         } else {
786             // Else check if VIRTIO_BLK_F_FLUSH negotiated
787             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
788         };
789 
790         info!(
791             "Changing cache mode to {}",
792             if writeback {
793                 "writeback"
794             } else {
795                 "writethrough"
796             }
797         );
798         self.writeback.store(writeback, Ordering::Release);
799     }
800 
801     #[cfg(fuzzing)]
802     pub fn wait_for_epoll_threads(&mut self) {
803         self.common.wait_for_epoll_threads();
804     }
805 }
806 
807 impl Drop for Block {
808     fn drop(&mut self) {
809         if let Some(kill_evt) = self.common.kill_evt.take() {
810             // Ignore the result because there is nothing we can do about it.
811             let _ = kill_evt.write(1);
812         }
813         self.common.wait_for_epoll_threads();
814     }
815 }
816 
817 impl VirtioDevice for Block {
818     fn device_type(&self) -> u32 {
819         self.common.device_type
820     }
821 
822     fn queue_max_sizes(&self) -> &[u16] {
823         &self.common.queue_sizes
824     }
825 
826     fn features(&self) -> u64 {
827         self.common.avail_features
828     }
829 
830     fn ack_features(&mut self, value: u64) {
831         self.common.ack_features(value)
832     }
833 
834     fn read_config(&self, offset: u64, data: &mut [u8]) {
835         self.read_config_from_slice(self.config.as_slice(), offset, data);
836     }
837 
838     fn write_config(&mut self, offset: u64, data: &[u8]) {
839         // The "writeback" field is the only mutable field
840         let writeback_offset =
841             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
842         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
843         {
844             error!(
845                 "Attempt to write to read-only field: offset {:x} length {}",
846                 offset,
847                 data.len()
848             );
849             return;
850         }
851 
852         self.config.writeback = data[0];
853         self.update_writeback();
854     }
855 
856     fn activate(
857         &mut self,
858         mem: GuestMemoryAtomic<GuestMemoryMmap>,
859         interrupt_cb: Arc<dyn VirtioInterrupt>,
860         mut queues: Vec<(usize, Queue, EventFd)>,
861     ) -> ActivateResult {
862         self.common.activate(&queues, &interrupt_cb)?;
863 
864         self.update_writeback();
865 
866         let mut epoll_threads = Vec::new();
867         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
868 
869         for i in 0..queues.len() {
870             let (_, mut queue, queue_evt) = queues.remove(0);
871             queue.set_event_idx(event_idx);
872 
873             let queue_size = queue.size();
874             let (kill_evt, pause_evt) = self.common.dup_eventfds();
875             let queue_idx = i as u16;
876 
877             let mut handler = BlockEpollHandler {
878                 queue_index: queue_idx,
879                 queue,
880                 mem: mem.clone(),
881                 disk_image: self
882                     .disk_image
883                     .new_async_io(queue_size as u32)
884                     .map_err(|e| {
885                         error!("failed to create new AsyncIo: {}", e);
886                         ActivateError::BadActivate
887                     })?,
888                 disk_nsectors: self.disk_nsectors,
889                 interrupt_cb: interrupt_cb.clone(),
890                 serial: self.serial.clone(),
891                 kill_evt,
892                 pause_evt,
893                 writeback: self.writeback.clone(),
894                 counters: self.counters.clone(),
895                 queue_evt,
896                 // Analysis during boot shows around ~40 maximum requests
897                 // This gives head room for systems with slower I/O without
898                 // compromising the cost of the reallocation or memory overhead
899                 inflight_requests: VecDeque::with_capacity(64),
900                 rate_limiter: self
901                     .rate_limiter
902                     .as_ref()
903                     .map(|r| r.new_handle())
904                     .transpose()
905                     .unwrap(),
906                 access_platform: self.common.access_platform.clone(),
907                 read_only: self.read_only,
908                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
909             };
910 
911             let paused = self.common.paused.clone();
912             let paused_sync = self.common.paused_sync.clone();
913 
914             spawn_virtio_thread(
915                 &format!("{}_q{}", self.id.clone(), i),
916                 &self.seccomp_action,
917                 Thread::VirtioBlock,
918                 &mut epoll_threads,
919                 &self.exit_evt,
920                 move || handler.run(paused, paused_sync.unwrap()),
921             )?;
922         }
923 
924         self.common.epoll_threads = Some(epoll_threads);
925         event!("virtio-device", "activated", "id", &self.id);
926 
927         Ok(())
928     }
929 
930     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
931         let result = self.common.reset();
932         event!("virtio-device", "reset", "id", &self.id);
933         result
934     }
935 
936     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
937         let mut counters = HashMap::new();
938 
939         counters.insert(
940             "read_bytes",
941             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
942         );
943         counters.insert(
944             "write_bytes",
945             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
946         );
947         counters.insert(
948             "read_ops",
949             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
950         );
951         counters.insert(
952             "write_ops",
953             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
954         );
955         counters.insert(
956             "write_latency_min",
957             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
958         );
959         counters.insert(
960             "write_latency_max",
961             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
962         );
963         counters.insert(
964             "write_latency_avg",
965             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
966         );
967         counters.insert(
968             "read_latency_min",
969             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
970         );
971         counters.insert(
972             "read_latency_max",
973             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
974         );
975         counters.insert(
976             "read_latency_avg",
977             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
978         );
979 
980         Some(counters)
981     }
982 
983     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
984         self.common.set_access_platform(access_platform)
985     }
986 }
987 
988 impl Pausable for Block {
989     fn pause(&mut self) -> result::Result<(), MigratableError> {
990         self.common.pause()
991     }
992 
993     fn resume(&mut self) -> result::Result<(), MigratableError> {
994         self.common.resume()
995     }
996 }
997 
998 impl Snapshottable for Block {
999     fn id(&self) -> String {
1000         self.id.clone()
1001     }
1002 
1003     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
1004         Snapshot::new_from_state(&self.state())
1005     }
1006 }
1007 impl Transportable for Block {}
1008 impl Migratable for Block {}
1009