xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision eb0b14f70ed5ed44b76579145fd2a741c0100ae4)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use std::collections::{BTreeMap, HashMap, VecDeque};
12 use std::num::Wrapping;
13 use std::ops::Deref;
14 use std::os::unix::io::AsRawFd;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
17 use std::sync::{Arc, Barrier};
18 use std::{io, result};
19 
20 use anyhow::anyhow;
21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile};
22 use block::fcntl::{get_lock_state, LockError, LockType};
23 use block::{build_serial, fcntl, Request, RequestType, VirtioBlockConfig};
24 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
25 use rate_limiter::TokenType;
26 use seccompiler::SeccompAction;
27 use serde::{Deserialize, Serialize};
28 use thiserror::Error;
29 use virtio_bindings::virtio_blk::*;
30 use virtio_bindings::virtio_config::*;
31 use virtio_bindings::virtio_ring::{VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC};
32 use virtio_queue::{Queue, QueueOwnedT, QueueT};
33 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
34 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
35 use vm_virtio::AccessPlatform;
36 use vmm_sys_util::eventfd::EventFd;
37 
38 use super::{
39     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler,
40     Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType,
41     EPOLL_HELPER_EVENT_LAST,
42 };
43 use crate::seccomp_filters::Thread;
44 use crate::thread_helper::spawn_virtio_thread;
45 use crate::{GuestMemoryMmap, VirtioInterrupt};
46 
47 const SECTOR_SHIFT: u8 = 9;
48 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
49 
50 // New descriptors are pending on the virtio queue.
51 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
52 // New completed tasks are pending on the completion ring.
53 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
54 // New 'wake up' event from the rate limiter
55 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
56 
57 // latency scale, for reduce precision loss in calculate.
58 const LATENCY_SCALE: u64 = 10000;
59 
60 pub const MINIMUM_BLOCK_QUEUE_SIZE: u16 = 2;
61 
62 #[derive(Error, Debug)]
63 pub enum Error {
64     #[error("Failed to parse the request: {0}")]
65     RequestParsing(#[source] block::Error),
66     #[error("Failed to execute the request: {0}")]
67     RequestExecuting(#[source] block::ExecuteError),
68     #[error("Failed to complete the request: {0}")]
69     RequestCompleting(#[source] block::Error),
70     #[error("Missing the expected entry in the list of requests")]
71     MissingEntryRequestList,
72     #[error("The asynchronous request returned with failure")]
73     AsyncRequestFailure,
74     #[error("Failed synchronizing the file: {0}")]
75     Fsync(#[source] AsyncIoError),
76     #[error("Failed adding used index: {0}")]
77     QueueAddUsed(#[source] virtio_queue::Error),
78     #[error("Failed creating an iterator over the queue: {0}")]
79     QueueIterator(#[source] virtio_queue::Error),
80     #[error("Failed to update request status: {0}")]
81     RequestStatus(#[source] GuestMemoryError),
82     #[error("Failed to enable notification: {0}")]
83     QueueEnableNotification(#[source] virtio_queue::Error),
84     #[error("Failed to get {lock_type:?} lock for disk image {path}: {error}")]
85     LockDiskImage {
86         /// The underlying error.
87         #[source]
88         error: LockError,
89         /// The requested lock type.
90         lock_type: LockType,
91         /// The path of the disk image.
92         path: PathBuf,
93     },
94 }
95 
96 pub type Result<T> = result::Result<T, Error>;
97 
98 // latency will be records as microseconds, average latency
99 // will be save as scaled value.
100 #[derive(Clone)]
101 pub struct BlockCounters {
102     read_bytes: Arc<AtomicU64>,
103     read_ops: Arc<AtomicU64>,
104     read_latency_min: Arc<AtomicU64>,
105     read_latency_max: Arc<AtomicU64>,
106     read_latency_avg: Arc<AtomicU64>,
107     write_bytes: Arc<AtomicU64>,
108     write_ops: Arc<AtomicU64>,
109     write_latency_min: Arc<AtomicU64>,
110     write_latency_max: Arc<AtomicU64>,
111     write_latency_avg: Arc<AtomicU64>,
112 }
113 
114 impl Default for BlockCounters {
115     fn default() -> Self {
116         BlockCounters {
117             read_bytes: Arc::new(AtomicU64::new(0)),
118             read_ops: Arc::new(AtomicU64::new(0)),
119             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
120             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
121             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
122             write_bytes: Arc::new(AtomicU64::new(0)),
123             write_ops: Arc::new(AtomicU64::new(0)),
124             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
125             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
126             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
127         }
128     }
129 }
130 
131 struct BlockEpollHandler {
132     queue_index: u16,
133     queue: Queue,
134     mem: GuestMemoryAtomic<GuestMemoryMmap>,
135     disk_image: Box<dyn AsyncIo>,
136     disk_nsectors: u64,
137     interrupt_cb: Arc<dyn VirtioInterrupt>,
138     serial: Vec<u8>,
139     kill_evt: EventFd,
140     pause_evt: EventFd,
141     writeback: Arc<AtomicBool>,
142     counters: BlockCounters,
143     queue_evt: EventFd,
144     inflight_requests: VecDeque<(u16, Request)>,
145     rate_limiter: Option<RateLimiterGroupHandle>,
146     access_platform: Option<Arc<dyn AccessPlatform>>,
147     read_only: bool,
148     host_cpus: Option<Vec<usize>>,
149 }
150 
151 impl BlockEpollHandler {
152     fn process_queue_submit(&mut self) -> Result<()> {
153         let queue = &mut self.queue;
154 
155         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
156             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
157                 .map_err(Error::RequestParsing)?;
158 
159             // For virtio spec compliance
160             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
161             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
162             if self.read_only
163                 && (request.request_type == RequestType::Out
164                     || request.request_type == RequestType::Flush)
165             {
166                 desc_chain
167                     .memory()
168                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
169                     .map_err(Error::RequestStatus)?;
170 
171                 // If no asynchronous operation has been submitted, we can
172                 // simply return the used descriptor.
173                 queue
174                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
175                     .map_err(Error::QueueAddUsed)?;
176                 queue
177                     .enable_notification(self.mem.memory().deref())
178                     .map_err(Error::QueueEnableNotification)?;
179                 continue;
180             }
181 
182             if let Some(rate_limiter) = &mut self.rate_limiter {
183                 // If limiter.consume() fails it means there is no more TokenType::Ops
184                 // budget and rate limiting is in effect.
185                 if !rate_limiter.consume(1, TokenType::Ops) {
186                     // Stop processing the queue and return this descriptor chain to the
187                     // avail ring, for later processing.
188                     queue.go_to_previous_position();
189                     break;
190                 }
191                 // Exercise the rate limiter only if this request is of data transfer type.
192                 if request.request_type == RequestType::In
193                     || request.request_type == RequestType::Out
194                 {
195                     let mut bytes = Wrapping(0);
196                     for (_, data_len) in &request.data_descriptors {
197                         bytes += Wrapping(*data_len as u64);
198                     }
199 
200                     // If limiter.consume() fails it means there is no more TokenType::Bytes
201                     // budget and rate limiting is in effect.
202                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
203                         // Revert the OPS consume().
204                         rate_limiter.manual_replenish(1, TokenType::Ops);
205                         // Stop processing the queue and return this descriptor chain to the
206                         // avail ring, for later processing.
207                         queue.go_to_previous_position();
208                         break;
209                     }
210                 };
211             }
212 
213             request.set_writeback(self.writeback.load(Ordering::Acquire));
214 
215             if request
216                 .execute_async(
217                     desc_chain.memory(),
218                     self.disk_nsectors,
219                     self.disk_image.as_mut(),
220                     &self.serial,
221                     desc_chain.head_index() as u64,
222                 )
223                 .map_err(Error::RequestExecuting)?
224             {
225                 self.inflight_requests
226                     .push_back((desc_chain.head_index(), request));
227             } else {
228                 desc_chain
229                     .memory()
230                     .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr)
231                     .map_err(Error::RequestStatus)?;
232 
233                 // If no asynchronous operation has been submitted, we can
234                 // simply return the used descriptor.
235                 queue
236                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
237                     .map_err(Error::QueueAddUsed)?;
238                 queue
239                     .enable_notification(self.mem.memory().deref())
240                     .map_err(Error::QueueEnableNotification)?;
241             }
242         }
243 
244         Ok(())
245     }
246 
247     fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> {
248         if self
249             .queue
250             .needs_notification(self.mem.memory().deref())
251             .map_err(|e| {
252                 EpollHelperError::HandleEvent(anyhow!(
253                     "Failed to check needs_notification: {:?}",
254                     e
255                 ))
256             })?
257         {
258             self.signal_used_queue().map_err(|e| {
259                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
260             })?;
261         }
262 
263         Ok(())
264     }
265 
266     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
267         self.process_queue_submit().map_err(|e| {
268             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
269         })?;
270 
271         self.try_signal_used_queue()
272     }
273 
274     #[inline]
275     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
276         // This loop neatly handles the fast path where the completions are
277         // in order (it turns into just a pop_front()) and the 1% of the time
278         // (analysis during boot) where slight out of ordering has been
279         // observed e.g.
280         // Submissions: 1 2 3 4 5 6 7
281         // Completions: 2 1 3 5 4 7 6
282         // In this case find the corresponding item and swap it with the front
283         // This is a O(1) operation and is prepared for the future as it it likely
284         // the next completion would be for the one that was skipped which will
285         // now be the new front.
286         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
287             if head == &completed_head {
288                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
289             }
290         }
291 
292         Err(Error::MissingEntryRequestList)
293     }
294 
295     fn process_queue_complete(&mut self) -> Result<()> {
296         let mem = self.mem.memory();
297         let mut read_bytes = Wrapping(0);
298         let mut write_bytes = Wrapping(0);
299         let mut read_ops = Wrapping(0);
300         let mut write_ops = Wrapping(0);
301 
302         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
303             let desc_index = user_data as u16;
304 
305             let mut request = self.find_inflight_request(desc_index)?;
306 
307             request.complete_async().map_err(Error::RequestCompleting)?;
308 
309             let latency = request.start.elapsed().as_micros() as u64;
310             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
311             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
312             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
313             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
314             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
315             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
316             let (status, len) = if result >= 0 {
317                 match request.request_type {
318                     RequestType::In => {
319                         for (_, data_len) in &request.data_descriptors {
320                             read_bytes += Wrapping(*data_len as u64);
321                         }
322                         read_ops += Wrapping(1);
323                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
324                             self.counters
325                                 .read_latency_min
326                                 .store(latency, Ordering::Relaxed);
327                         }
328                         if latency > read_max || read_max == u64::MAX {
329                             self.counters
330                                 .read_latency_max
331                                 .store(latency, Ordering::Relaxed);
332                         }
333 
334                         // Special case the first real latency report
335                         read_avg = if read_avg == u64::MAX {
336                             latency * LATENCY_SCALE
337                         } else {
338                             // Cumulative average is guaranteed to be
339                             // positive if being calculated properly
340                             (read_avg as i64
341                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
342                                     / (read_ops_last + read_ops.0) as i64)
343                                 .try_into()
344                                 .unwrap()
345                         };
346                     }
347                     RequestType::Out => {
348                         if !request.writeback {
349                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
350                         }
351                         for (_, data_len) in &request.data_descriptors {
352                             write_bytes += Wrapping(*data_len as u64);
353                         }
354                         write_ops += Wrapping(1);
355                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
356                             self.counters
357                                 .write_latency_min
358                                 .store(latency, Ordering::Relaxed);
359                         }
360                         if latency > write_max || write_max == u64::MAX {
361                             self.counters
362                                 .write_latency_max
363                                 .store(latency, Ordering::Relaxed);
364                         }
365 
366                         // Special case the first real latency report
367                         write_avg = if write_avg == u64::MAX {
368                             latency * LATENCY_SCALE
369                         } else {
370                             // Cumulative average is guaranteed to be
371                             // positive if being calculated properly
372                             (write_avg as i64
373                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
374                                     / (write_ops_last + write_ops.0) as i64)
375                                 .try_into()
376                                 .unwrap()
377                         }
378                     }
379                     _ => {}
380                 }
381 
382                 self.counters
383                     .read_latency_avg
384                     .store(read_avg, Ordering::Relaxed);
385 
386                 self.counters
387                     .write_latency_avg
388                     .store(write_avg, Ordering::Relaxed);
389 
390                 (VIRTIO_BLK_S_OK as u8, result as u32)
391             } else {
392                 error!(
393                     "Request failed: {:x?} {:?}",
394                     request,
395                     io::Error::from_raw_os_error(-result)
396                 );
397                 return Err(Error::AsyncRequestFailure);
398             };
399 
400             mem.write_obj(status, request.status_addr)
401                 .map_err(Error::RequestStatus)?;
402 
403             let queue = &mut self.queue;
404 
405             queue
406                 .add_used(mem.deref(), desc_index, len)
407                 .map_err(Error::QueueAddUsed)?;
408             queue
409                 .enable_notification(mem.deref())
410                 .map_err(Error::QueueEnableNotification)?;
411         }
412 
413         self.counters
414             .write_bytes
415             .fetch_add(write_bytes.0, Ordering::AcqRel);
416         self.counters
417             .write_ops
418             .fetch_add(write_ops.0, Ordering::AcqRel);
419 
420         self.counters
421             .read_bytes
422             .fetch_add(read_bytes.0, Ordering::AcqRel);
423         self.counters
424             .read_ops
425             .fetch_add(read_ops.0, Ordering::AcqRel);
426 
427         Ok(())
428     }
429 
430     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
431         self.interrupt_cb
432             .trigger(VirtioInterruptType::Queue(self.queue_index))
433             .map_err(|e| {
434                 error!("Failed to signal used queue: {:?}", e);
435                 DeviceError::FailedSignalingUsedQueue(e)
436             })
437     }
438 
439     fn set_queue_thread_affinity(&self) {
440         // Prepare the CPU set the current queue thread is expected to run onto.
441         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
442             // SAFETY: all zeros is a valid pattern
443             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
444             // SAFETY: FFI call, trivially safe
445             unsafe { libc::CPU_ZERO(&mut cpuset) };
446             for host_cpu in host_cpus {
447                 // SAFETY: FFI call, trivially safe
448                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
449             }
450             cpuset
451         });
452 
453         // Schedule the thread to run on the expected CPU set
454         if let Some(cpuset) = cpuset.as_ref() {
455             // SAFETY: FFI call with correct arguments
456             let ret = unsafe {
457                 libc::sched_setaffinity(
458                     0,
459                     std::mem::size_of::<libc::cpu_set_t>(),
460                     cpuset as *const libc::cpu_set_t,
461                 )
462             };
463 
464             if ret != 0 {
465                 error!(
466                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
467                     self.queue_index,
468                     io::Error::last_os_error()
469                 )
470             }
471         }
472     }
473 
474     fn run(
475         &mut self,
476         paused: Arc<AtomicBool>,
477         paused_sync: Arc<Barrier>,
478     ) -> result::Result<(), EpollHelperError> {
479         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
480         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
481         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
482         if let Some(rate_limiter) = &self.rate_limiter {
483             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
484         }
485         self.set_queue_thread_affinity();
486         helper.run(paused, paused_sync, self)?;
487 
488         Ok(())
489     }
490 }
491 
492 impl EpollHelperHandler for BlockEpollHandler {
493     fn handle_event(
494         &mut self,
495         _helper: &mut EpollHelper,
496         event: &epoll::Event,
497     ) -> result::Result<(), EpollHelperError> {
498         let ev_type = event.data as u16;
499         match ev_type {
500             QUEUE_AVAIL_EVENT => {
501                 self.queue_evt.read().map_err(|e| {
502                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
503                 })?;
504 
505                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
506 
507                 // Process the queue only when the rate limit is not reached
508                 if !rate_limit_reached {
509                     self.process_queue_submit_and_signal()?
510                 }
511             }
512             COMPLETION_EVENT => {
513                 self.disk_image.notifier().read().map_err(|e| {
514                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
515                 })?;
516 
517                 self.process_queue_complete().map_err(|e| {
518                     EpollHelperError::HandleEvent(anyhow!(
519                         "Failed to process queue (complete): {:?}",
520                         e
521                     ))
522                 })?;
523 
524                 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked());
525 
526                 // Process the queue only when the rate limit is not reached
527                 if !rate_limit_reached {
528                     self.process_queue_submit().map_err(|e| {
529                         EpollHelperError::HandleEvent(anyhow!(
530                             "Failed to process queue (submit): {:?}",
531                             e
532                         ))
533                     })?;
534                 }
535                 self.try_signal_used_queue()?;
536             }
537             RATE_LIMITER_EVENT => {
538                 if let Some(rate_limiter) = &mut self.rate_limiter {
539                     // Upon rate limiter event, call the rate limiter handler
540                     // and restart processing the queue.
541                     rate_limiter.event_handler().map_err(|e| {
542                         EpollHelperError::HandleEvent(anyhow!(
543                             "Failed to process rate limiter event: {:?}",
544                             e
545                         ))
546                     })?;
547 
548                     self.process_queue_submit_and_signal()?
549                 } else {
550                     return Err(EpollHelperError::HandleEvent(anyhow!(
551                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
552                     )));
553                 }
554             }
555             _ => {
556                 return Err(EpollHelperError::HandleEvent(anyhow!(
557                     "Unexpected event: {}",
558                     ev_type
559                 )));
560             }
561         }
562         Ok(())
563     }
564 }
565 
566 /// Virtio device for exposing block level read/write operations on a host file.
567 pub struct Block {
568     common: VirtioCommon,
569     id: String,
570     disk_image: Box<dyn DiskFile>,
571     disk_path: PathBuf,
572     disk_nsectors: u64,
573     config: VirtioBlockConfig,
574     writeback: Arc<AtomicBool>,
575     counters: BlockCounters,
576     seccomp_action: SeccompAction,
577     rate_limiter: Option<Arc<RateLimiterGroup>>,
578     exit_evt: EventFd,
579     read_only: bool,
580     serial: Vec<u8>,
581     queue_affinity: BTreeMap<u16, Vec<usize>>,
582 }
583 
584 #[derive(Serialize, Deserialize)]
585 pub struct BlockState {
586     pub disk_path: String,
587     pub disk_nsectors: u64,
588     pub avail_features: u64,
589     pub acked_features: u64,
590     pub config: VirtioBlockConfig,
591 }
592 
593 impl Block {
594     /// Create a new virtio block device that operates on the given file.
595     #[allow(clippy::too_many_arguments)]
596     pub fn new(
597         id: String,
598         mut disk_image: Box<dyn DiskFile>,
599         disk_path: PathBuf,
600         read_only: bool,
601         iommu: bool,
602         num_queues: usize,
603         queue_size: u16,
604         serial: Option<String>,
605         seccomp_action: SeccompAction,
606         rate_limiter: Option<Arc<RateLimiterGroup>>,
607         exit_evt: EventFd,
608         state: Option<BlockState>,
609         queue_affinity: BTreeMap<u16, Vec<usize>>,
610     ) -> io::Result<Self> {
611         let (disk_nsectors, avail_features, acked_features, config, paused) =
612             if let Some(state) = state {
613                 info!("Restoring virtio-block {}", id);
614                 (
615                     state.disk_nsectors,
616                     state.avail_features,
617                     state.acked_features,
618                     state.config,
619                     true,
620                 )
621             } else {
622                 let disk_size = disk_image
623                     .size()
624                     .map_err(|e| io::Error::other(format!("Failed getting disk size: {e}")))?;
625                 if disk_size % SECTOR_SIZE != 0 {
626                     warn!(
627                         "Disk size {} is not a multiple of sector size {}; \
628                  the remainder will not be visible to the guest.",
629                         disk_size, SECTOR_SIZE
630                     );
631                 }
632 
633                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
634                     | (1u64 << VIRTIO_BLK_F_FLUSH)
635                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
636                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
637                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY)
638                     | (1u64 << VIRTIO_BLK_F_SEG_MAX)
639                     | (1u64 << VIRTIO_RING_F_EVENT_IDX)
640                     | (1u64 << VIRTIO_RING_F_INDIRECT_DESC);
641                 if iommu {
642                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
643                 }
644 
645                 if read_only {
646                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
647                 }
648 
649                 let topology = disk_image.topology();
650                 info!("Disk topology: {:?}", topology);
651 
652                 let logical_block_size = if topology.logical_block_size > 512 {
653                     topology.logical_block_size
654                 } else {
655                     512
656                 };
657 
658                 // Calculate the exponent that maps physical block to logical block
659                 let mut physical_block_exp = 0;
660                 let mut size = logical_block_size;
661                 while size < topology.physical_block_size {
662                     physical_block_exp += 1;
663                     size <<= 1;
664                 }
665 
666                 let disk_nsectors = disk_size / SECTOR_SIZE;
667                 let mut config = VirtioBlockConfig {
668                     capacity: disk_nsectors,
669                     writeback: 1,
670                     blk_size: topology.logical_block_size as u32,
671                     physical_block_exp,
672                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
673                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
674                     seg_max: (queue_size - MINIMUM_BLOCK_QUEUE_SIZE) as u32,
675                     ..Default::default()
676                 };
677 
678                 if num_queues > 1 {
679                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
680                     config.num_queues = num_queues as u16;
681                 }
682 
683                 (disk_nsectors, avail_features, 0, config, false)
684             };
685 
686         let serial = serial
687             .map(Vec::from)
688             .unwrap_or_else(|| build_serial(&disk_path));
689 
690         Ok(Block {
691             common: VirtioCommon {
692                 device_type: VirtioDeviceType::Block as u32,
693                 avail_features,
694                 acked_features,
695                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
696                 queue_sizes: vec![queue_size; num_queues],
697                 min_queues: 1,
698                 paused: Arc::new(AtomicBool::new(paused)),
699                 ..Default::default()
700             },
701             id,
702             disk_image,
703             disk_path,
704             disk_nsectors,
705             config,
706             writeback: Arc::new(AtomicBool::new(true)),
707             counters: BlockCounters::default(),
708             seccomp_action,
709             rate_limiter,
710             exit_evt,
711             read_only,
712             serial,
713             queue_affinity,
714         })
715     }
716 
717     /// Tries to set an advisory lock for the corresponding disk image.
718     pub fn try_lock_image(&mut self) -> Result<()> {
719         let lock_type = match self.read_only {
720             true => LockType::Read,
721             false => LockType::Write,
722         };
723         log::debug!(
724             "Attempting to acquire {lock_type:?} lock for disk image id={},path={}",
725             self.id,
726             self.disk_path.display()
727         );
728         let fd = self.disk_image.fd();
729         fcntl::try_acquire_lock(fd, lock_type).map_err(|error| {
730             let current_lock = get_lock_state(fd);
731             // Don't propagate the error to the outside, as it is not useful at all. Instead,
732             // we try to log additional help to the user.
733             if let Ok(current_lock) = current_lock {
734                 log::error!("Can't get {lock_type:?} lock for {} as there is already a {current_lock:?} lock", self.disk_path.display());
735             } else {
736                 log::error!("Can't get {lock_type:?} lock for {}, but also can't determine the current lock state", self.disk_path.display());
737             }
738             Error::LockDiskImage {
739                 path: self.disk_path.clone(),
740                 error,
741                 lock_type,
742             }
743         })?;
744         log::info!(
745             "Acquired {lock_type:?} lock for disk image id={},path={}",
746             self.id,
747             self.disk_path.display()
748         );
749         Ok(())
750     }
751 
752     /// Releases the advisory lock held for the corresponding disk image.
753     pub fn unlock_image(&mut self) -> Result<()> {
754         // It is very unlikely that this fails;
755         // Should we remove the Result to simplify the error propagation on
756         // higher levels?
757         fcntl::clear_lock(self.disk_image.fd()).map_err(|error| Error::LockDiskImage {
758             path: self.disk_path.clone(),
759             error,
760             lock_type: LockType::Unlock,
761         })
762     }
763 
764     fn state(&self) -> BlockState {
765         BlockState {
766             disk_path: self.disk_path.to_str().unwrap().to_owned(),
767             disk_nsectors: self.disk_nsectors,
768             avail_features: self.common.avail_features,
769             acked_features: self.common.acked_features,
770             config: self.config,
771         }
772     }
773 
774     fn update_writeback(&mut self) {
775         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
776         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
777             self.config.writeback == 1
778         } else {
779             // Else check if VIRTIO_BLK_F_FLUSH negotiated
780             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
781         };
782 
783         info!(
784             "Changing cache mode to {}",
785             if writeback {
786                 "writeback"
787             } else {
788                 "writethrough"
789             }
790         );
791         self.writeback.store(writeback, Ordering::Release);
792     }
793 
794     #[cfg(fuzzing)]
795     pub fn wait_for_epoll_threads(&mut self) {
796         self.common.wait_for_epoll_threads();
797     }
798 }
799 
800 impl Drop for Block {
801     fn drop(&mut self) {
802         if let Some(kill_evt) = self.common.kill_evt.take() {
803             // Ignore the result because there is nothing we can do about it.
804             let _ = kill_evt.write(1);
805         }
806         self.common.wait_for_epoll_threads();
807     }
808 }
809 
810 impl VirtioDevice for Block {
811     fn device_type(&self) -> u32 {
812         self.common.device_type
813     }
814 
815     fn queue_max_sizes(&self) -> &[u16] {
816         &self.common.queue_sizes
817     }
818 
819     fn features(&self) -> u64 {
820         self.common.avail_features
821     }
822 
823     fn ack_features(&mut self, value: u64) {
824         self.common.ack_features(value)
825     }
826 
827     fn read_config(&self, offset: u64, data: &mut [u8]) {
828         self.read_config_from_slice(self.config.as_slice(), offset, data);
829     }
830 
831     fn write_config(&mut self, offset: u64, data: &[u8]) {
832         // The "writeback" field is the only mutable field
833         let writeback_offset =
834             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
835         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
836         {
837             error!(
838                 "Attempt to write to read-only field: offset {:x} length {}",
839                 offset,
840                 data.len()
841             );
842             return;
843         }
844 
845         self.config.writeback = data[0];
846         self.update_writeback();
847     }
848 
849     fn activate(
850         &mut self,
851         mem: GuestMemoryAtomic<GuestMemoryMmap>,
852         interrupt_cb: Arc<dyn VirtioInterrupt>,
853         mut queues: Vec<(usize, Queue, EventFd)>,
854     ) -> ActivateResult {
855         self.common.activate(&queues, &interrupt_cb)?;
856 
857         self.update_writeback();
858 
859         let mut epoll_threads = Vec::new();
860         let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into());
861 
862         for i in 0..queues.len() {
863             let (_, mut queue, queue_evt) = queues.remove(0);
864             queue.set_event_idx(event_idx);
865 
866             let queue_size = queue.size();
867             let (kill_evt, pause_evt) = self.common.dup_eventfds();
868             let queue_idx = i as u16;
869 
870             let mut handler = BlockEpollHandler {
871                 queue_index: queue_idx,
872                 queue,
873                 mem: mem.clone(),
874                 disk_image: self
875                     .disk_image
876                     .new_async_io(queue_size as u32)
877                     .map_err(|e| {
878                         error!("failed to create new AsyncIo: {}", e);
879                         ActivateError::BadActivate
880                     })?,
881                 disk_nsectors: self.disk_nsectors,
882                 interrupt_cb: interrupt_cb.clone(),
883                 serial: self.serial.clone(),
884                 kill_evt,
885                 pause_evt,
886                 writeback: self.writeback.clone(),
887                 counters: self.counters.clone(),
888                 queue_evt,
889                 // Analysis during boot shows around ~40 maximum requests
890                 // This gives head room for systems with slower I/O without
891                 // compromising the cost of the reallocation or memory overhead
892                 inflight_requests: VecDeque::with_capacity(64),
893                 rate_limiter: self
894                     .rate_limiter
895                     .as_ref()
896                     .map(|r| r.new_handle())
897                     .transpose()
898                     .unwrap(),
899                 access_platform: self.common.access_platform.clone(),
900                 read_only: self.read_only,
901                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
902             };
903 
904             let paused = self.common.paused.clone();
905             let paused_sync = self.common.paused_sync.clone();
906 
907             spawn_virtio_thread(
908                 &format!("{}_q{}", self.id.clone(), i),
909                 &self.seccomp_action,
910                 Thread::VirtioBlock,
911                 &mut epoll_threads,
912                 &self.exit_evt,
913                 move || handler.run(paused, paused_sync.unwrap()),
914             )?;
915         }
916 
917         self.common.epoll_threads = Some(epoll_threads);
918         event!("virtio-device", "activated", "id", &self.id);
919 
920         Ok(())
921     }
922 
923     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
924         let result = self.common.reset();
925         event!("virtio-device", "reset", "id", &self.id);
926         result
927     }
928 
929     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
930         let mut counters = HashMap::new();
931 
932         counters.insert(
933             "read_bytes",
934             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
935         );
936         counters.insert(
937             "write_bytes",
938             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
939         );
940         counters.insert(
941             "read_ops",
942             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
943         );
944         counters.insert(
945             "write_ops",
946             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
947         );
948         counters.insert(
949             "write_latency_min",
950             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
951         );
952         counters.insert(
953             "write_latency_max",
954             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
955         );
956         counters.insert(
957             "write_latency_avg",
958             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
959         );
960         counters.insert(
961             "read_latency_min",
962             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
963         );
964         counters.insert(
965             "read_latency_max",
966             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
967         );
968         counters.insert(
969             "read_latency_avg",
970             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
971         );
972 
973         Some(counters)
974     }
975 
976     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
977         self.common.set_access_platform(access_platform)
978     }
979 }
980 
981 impl Pausable for Block {
982     fn pause(&mut self) -> result::Result<(), MigratableError> {
983         self.common.pause()
984     }
985 
986     fn resume(&mut self) -> result::Result<(), MigratableError> {
987         self.common.resume()
988     }
989 }
990 
991 impl Snapshottable for Block {
992     fn id(&self) -> String {
993         self.id.clone()
994     }
995 
996     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
997         Snapshot::new_from_state(&self.state())
998     }
999 }
1000 impl Transportable for Block {}
1001 impl Migratable for Block {}
1002