xref: /cloud-hypervisor/virtio-devices/src/block.rs (revision bc6acb842f1ebb263245cd95fe5a92fe5f350bd3)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 //
3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
4 // Use of this source code is governed by a BSD-style license that can be
5 // found in the LICENSE-BSD-3-Clause file.
6 //
7 // Copyright © 2020 Intel Corporation
8 //
9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
10 
11 use super::Error as DeviceError;
12 use super::{
13     ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon,
14     VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST,
15 };
16 use crate::seccomp_filters::Thread;
17 use crate::thread_helper::spawn_virtio_thread;
18 use crate::GuestMemoryMmap;
19 use crate::VirtioInterrupt;
20 use anyhow::anyhow;
21 use block::{
22     async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request,
23     RequestType, VirtioBlockConfig,
24 };
25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle};
26 use rate_limiter::TokenType;
27 use seccompiler::SeccompAction;
28 use serde::{Deserialize, Serialize};
29 use std::collections::BTreeMap;
30 use std::collections::HashMap;
31 use std::collections::VecDeque;
32 use std::io;
33 use std::num::Wrapping;
34 use std::ops::Deref;
35 use std::os::unix::io::AsRawFd;
36 use std::path::PathBuf;
37 use std::result;
38 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
39 use std::sync::{Arc, Barrier};
40 use thiserror::Error;
41 use virtio_bindings::virtio_blk::*;
42 use virtio_bindings::virtio_config::*;
43 use virtio_queue::{Queue, QueueOwnedT, QueueT};
44 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError};
45 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable};
46 use vm_virtio::AccessPlatform;
47 use vmm_sys_util::eventfd::EventFd;
48 
49 const SECTOR_SHIFT: u8 = 9;
50 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
51 
52 // New descriptors are pending on the virtio queue.
53 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1;
54 // New completed tasks are pending on the completion ring.
55 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2;
56 // New 'wake up' event from the rate limiter
57 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3;
58 
59 // latency scale, for reduce precision loss in calculate.
60 const LATENCY_SCALE: u64 = 10000;
61 
62 #[derive(Error, Debug)]
63 pub enum Error {
64     #[error("Failed to parse the request: {0}")]
65     RequestParsing(block::Error),
66     #[error("Failed to execute the request: {0}")]
67     RequestExecuting(block::ExecuteError),
68     #[error("Failed to complete the request: {0}")]
69     RequestCompleting(block::Error),
70     #[error("Missing the expected entry in the list of requests")]
71     MissingEntryRequestList,
72     #[error("The asynchronous request returned with failure")]
73     AsyncRequestFailure,
74     #[error("Failed synchronizing the file: {0}")]
75     Fsync(AsyncIoError),
76     #[error("Failed adding used index: {0}")]
77     QueueAddUsed(virtio_queue::Error),
78     #[error("Failed creating an iterator over the queue: {0}")]
79     QueueIterator(virtio_queue::Error),
80     #[error("Failed to update request status: {0}")]
81     RequestStatus(GuestMemoryError),
82 }
83 
84 pub type Result<T> = result::Result<T, Error>;
85 
86 // latency will be records as microseconds, average latency
87 // will be save as scaled value.
88 #[derive(Clone)]
89 pub struct BlockCounters {
90     read_bytes: Arc<AtomicU64>,
91     read_ops: Arc<AtomicU64>,
92     read_latency_min: Arc<AtomicU64>,
93     read_latency_max: Arc<AtomicU64>,
94     read_latency_avg: Arc<AtomicU64>,
95     write_bytes: Arc<AtomicU64>,
96     write_ops: Arc<AtomicU64>,
97     write_latency_min: Arc<AtomicU64>,
98     write_latency_max: Arc<AtomicU64>,
99     write_latency_avg: Arc<AtomicU64>,
100 }
101 
102 impl Default for BlockCounters {
103     fn default() -> Self {
104         BlockCounters {
105             read_bytes: Arc::new(AtomicU64::new(0)),
106             read_ops: Arc::new(AtomicU64::new(0)),
107             read_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
108             read_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
109             read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
110             write_bytes: Arc::new(AtomicU64::new(0)),
111             write_ops: Arc::new(AtomicU64::new(0)),
112             write_latency_min: Arc::new(AtomicU64::new(u64::MAX)),
113             write_latency_max: Arc::new(AtomicU64::new(u64::MAX)),
114             write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)),
115         }
116     }
117 }
118 
119 struct BlockEpollHandler {
120     queue_index: u16,
121     queue: Queue,
122     mem: GuestMemoryAtomic<GuestMemoryMmap>,
123     disk_image: Box<dyn AsyncIo>,
124     disk_nsectors: u64,
125     interrupt_cb: Arc<dyn VirtioInterrupt>,
126     serial: Vec<u8>,
127     kill_evt: EventFd,
128     pause_evt: EventFd,
129     writeback: Arc<AtomicBool>,
130     counters: BlockCounters,
131     queue_evt: EventFd,
132     inflight_requests: VecDeque<(u16, Request)>,
133     rate_limiter: Option<RateLimiterGroupHandle>,
134     access_platform: Option<Arc<dyn AccessPlatform>>,
135     read_only: bool,
136     host_cpus: Option<Vec<usize>>,
137 }
138 
139 impl BlockEpollHandler {
140     fn process_queue_submit(&mut self) -> Result<bool> {
141         let queue = &mut self.queue;
142 
143         let mut used_descs = false;
144 
145         while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) {
146             let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref())
147                 .map_err(Error::RequestParsing)?;
148 
149             // For virtio spec compliance
150             // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request
151             // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data."
152             if self.read_only
153                 && (request.request_type == RequestType::Out
154                     || request.request_type == RequestType::Flush)
155             {
156                 desc_chain
157                     .memory()
158                     .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr)
159                     .map_err(Error::RequestStatus)?;
160 
161                 // If no asynchronous operation has been submitted, we can
162                 // simply return the used descriptor.
163                 queue
164                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
165                     .map_err(Error::QueueAddUsed)?;
166                 used_descs = true;
167                 continue;
168             }
169 
170             if let Some(rate_limiter) = &mut self.rate_limiter {
171                 // If limiter.consume() fails it means there is no more TokenType::Ops
172                 // budget and rate limiting is in effect.
173                 if !rate_limiter.consume(1, TokenType::Ops) {
174                     // Stop processing the queue and return this descriptor chain to the
175                     // avail ring, for later processing.
176                     queue.go_to_previous_position();
177                     break;
178                 }
179                 // Exercise the rate limiter only if this request is of data transfer type.
180                 if request.request_type == RequestType::In
181                     || request.request_type == RequestType::Out
182                 {
183                     let mut bytes = Wrapping(0);
184                     for (_, data_len) in &request.data_descriptors {
185                         bytes += Wrapping(*data_len as u64);
186                     }
187 
188                     // If limiter.consume() fails it means there is no more TokenType::Bytes
189                     // budget and rate limiting is in effect.
190                     if !rate_limiter.consume(bytes.0, TokenType::Bytes) {
191                         // Revert the OPS consume().
192                         rate_limiter.manual_replenish(1, TokenType::Ops);
193                         // Stop processing the queue and return this descriptor chain to the
194                         // avail ring, for later processing.
195                         queue.go_to_previous_position();
196                         break;
197                     }
198                 };
199             }
200 
201             request.set_writeback(self.writeback.load(Ordering::Acquire));
202 
203             if request
204                 .execute_async(
205                     desc_chain.memory(),
206                     self.disk_nsectors,
207                     self.disk_image.as_mut(),
208                     &self.serial,
209                     desc_chain.head_index() as u64,
210                 )
211                 .map_err(Error::RequestExecuting)?
212             {
213                 self.inflight_requests
214                     .push_back((desc_chain.head_index(), request));
215             } else {
216                 desc_chain
217                     .memory()
218                     .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr)
219                     .map_err(Error::RequestStatus)?;
220 
221                 // If no asynchronous operation has been submitted, we can
222                 // simply return the used descriptor.
223                 queue
224                     .add_used(desc_chain.memory(), desc_chain.head_index(), 0)
225                     .map_err(Error::QueueAddUsed)?;
226                 used_descs = true;
227             }
228         }
229 
230         Ok(used_descs)
231     }
232 
233     fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> {
234         let needs_notification = self.process_queue_submit().map_err(|e| {
235             EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e))
236         })?;
237 
238         if needs_notification {
239             self.signal_used_queue().map_err(|e| {
240                 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e))
241             })?
242         };
243 
244         Ok(())
245     }
246 
247     #[inline]
248     fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> {
249         // This loop neatly handles the fast path where the completions are
250         // in order (it turns into just a pop_front()) and the 1% of the time
251         // (analysis during boot) where slight out of ordering has been
252         // observed e.g.
253         // Submissions: 1 2 3 4 5 6 7
254         // Completions: 2 1 3 5 4 7 6
255         // In this case find the corresponding item and swap it with the front
256         // This is a O(1) operation and is prepared for the future as it it likely
257         // the next completion would be for the one that was skipped which will
258         // now be the new front.
259         for (i, (head, _)) in self.inflight_requests.iter().enumerate() {
260             if head == &completed_head {
261                 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1);
262             }
263         }
264 
265         Err(Error::MissingEntryRequestList)
266     }
267 
268     fn process_queue_complete(&mut self) -> Result<bool> {
269         let mut used_descs = false;
270         let mem = self.mem.memory();
271         let mut read_bytes = Wrapping(0);
272         let mut write_bytes = Wrapping(0);
273         let mut read_ops = Wrapping(0);
274         let mut write_ops = Wrapping(0);
275 
276         while let Some((user_data, result)) = self.disk_image.next_completed_request() {
277             let desc_index = user_data as u16;
278 
279             let mut request = self.find_inflight_request(desc_index)?;
280 
281             request.complete_async().map_err(Error::RequestCompleting)?;
282 
283             let latency = request.start.elapsed().as_micros() as u64;
284             let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed);
285             let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed);
286             let read_max = self.counters.read_latency_max.load(Ordering::Relaxed);
287             let write_max = self.counters.write_latency_max.load(Ordering::Relaxed);
288             let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed);
289             let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed);
290             let (status, len) = if result >= 0 {
291                 match request.request_type {
292                     RequestType::In => {
293                         for (_, data_len) in &request.data_descriptors {
294                             read_bytes += Wrapping(*data_len as u64);
295                         }
296                         read_ops += Wrapping(1);
297                         if latency < self.counters.read_latency_min.load(Ordering::Relaxed) {
298                             self.counters
299                                 .read_latency_min
300                                 .store(latency, Ordering::Relaxed);
301                         }
302                         if latency > read_max || read_max == u64::MAX {
303                             self.counters
304                                 .read_latency_max
305                                 .store(latency, Ordering::Relaxed);
306                         }
307 
308                         // Special case the first real latency report
309                         read_avg = if read_avg == u64::MAX {
310                             latency * LATENCY_SCALE
311                         } else {
312                             // Cumulative average is guaranteed to be
313                             // positive if being calculated properly
314                             (read_avg as i64
315                                 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64)
316                                     / (read_ops_last + read_ops.0) as i64)
317                                 .try_into()
318                                 .unwrap()
319                         };
320                     }
321                     RequestType::Out => {
322                         if !request.writeback {
323                             self.disk_image.fsync(None).map_err(Error::Fsync)?;
324                         }
325                         for (_, data_len) in &request.data_descriptors {
326                             write_bytes += Wrapping(*data_len as u64);
327                         }
328                         write_ops += Wrapping(1);
329                         if latency < self.counters.write_latency_min.load(Ordering::Relaxed) {
330                             self.counters
331                                 .write_latency_min
332                                 .store(latency, Ordering::Relaxed);
333                         }
334                         if latency > write_max || write_max == u64::MAX {
335                             self.counters
336                                 .write_latency_max
337                                 .store(latency, Ordering::Relaxed);
338                         }
339 
340                         // Special case the first real latency report
341                         write_avg = if write_avg == u64::MAX {
342                             latency * LATENCY_SCALE
343                         } else {
344                             // Cumulative average is guaranteed to be
345                             // positive if being calculated properly
346                             (write_avg as i64
347                                 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64)
348                                     / (write_ops_last + write_ops.0) as i64)
349                                 .try_into()
350                                 .unwrap()
351                         }
352                     }
353                     _ => {}
354                 }
355 
356                 self.counters
357                     .read_latency_avg
358                     .store(read_avg, Ordering::Relaxed);
359 
360                 self.counters
361                     .write_latency_avg
362                     .store(write_avg, Ordering::Relaxed);
363 
364                 (VIRTIO_BLK_S_OK as u8, result as u32)
365             } else {
366                 error!(
367                     "Request failed: {:x?} {:?}",
368                     request,
369                     io::Error::from_raw_os_error(-result)
370                 );
371                 return Err(Error::AsyncRequestFailure);
372             };
373 
374             mem.write_obj(status, request.status_addr)
375                 .map_err(Error::RequestStatus)?;
376 
377             let queue = &mut self.queue;
378 
379             queue
380                 .add_used(mem.deref(), desc_index, len)
381                 .map_err(Error::QueueAddUsed)?;
382             used_descs = true;
383         }
384 
385         self.counters
386             .write_bytes
387             .fetch_add(write_bytes.0, Ordering::AcqRel);
388         self.counters
389             .write_ops
390             .fetch_add(write_ops.0, Ordering::AcqRel);
391 
392         self.counters
393             .read_bytes
394             .fetch_add(read_bytes.0, Ordering::AcqRel);
395         self.counters
396             .read_ops
397             .fetch_add(read_ops.0, Ordering::AcqRel);
398 
399         Ok(used_descs)
400     }
401 
402     fn signal_used_queue(&self) -> result::Result<(), DeviceError> {
403         self.interrupt_cb
404             .trigger(VirtioInterruptType::Queue(self.queue_index))
405             .map_err(|e| {
406                 error!("Failed to signal used queue: {:?}", e);
407                 DeviceError::FailedSignalingUsedQueue(e)
408             })
409     }
410 
411     fn set_queue_thread_affinity(&self) {
412         // Prepare the CPU set the current queue thread is expected to run onto.
413         let cpuset = self.host_cpus.as_ref().map(|host_cpus| {
414             // SAFETY: all zeros is a valid pattern
415             let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() };
416             // SAFETY: FFI call, trivially safe
417             unsafe { libc::CPU_ZERO(&mut cpuset) };
418             for host_cpu in host_cpus {
419                 // SAFETY: FFI call, trivially safe
420                 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) };
421             }
422             cpuset
423         });
424 
425         // Schedule the thread to run on the expected CPU set
426         if let Some(cpuset) = cpuset.as_ref() {
427             // SAFETY: FFI call with correct arguments
428             let ret = unsafe {
429                 libc::sched_setaffinity(
430                     0,
431                     std::mem::size_of::<libc::cpu_set_t>(),
432                     cpuset as *const libc::cpu_set_t,
433                 )
434             };
435 
436             if ret != 0 {
437                 error!(
438                     "Failed scheduling the virtqueue thread {} on the expected CPU set: {}",
439                     self.queue_index,
440                     io::Error::last_os_error()
441                 )
442             }
443         }
444     }
445 
446     fn run(
447         &mut self,
448         paused: Arc<AtomicBool>,
449         paused_sync: Arc<Barrier>,
450     ) -> result::Result<(), EpollHelperError> {
451         let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?;
452         helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?;
453         helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?;
454         if let Some(rate_limiter) = &self.rate_limiter {
455             helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?;
456         }
457         self.set_queue_thread_affinity();
458         helper.run(paused, paused_sync, self)?;
459 
460         Ok(())
461     }
462 }
463 
464 impl EpollHelperHandler for BlockEpollHandler {
465     fn handle_event(
466         &mut self,
467         _helper: &mut EpollHelper,
468         event: &epoll::Event,
469     ) -> result::Result<(), EpollHelperError> {
470         let ev_type = event.data as u16;
471         match ev_type {
472             QUEUE_AVAIL_EVENT => {
473                 self.queue_evt.read().map_err(|e| {
474                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
475                 })?;
476 
477                 let rate_limit_reached =
478                     self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked());
479 
480                 // Process the queue only when the rate limit is not reached
481                 if !rate_limit_reached {
482                     self.process_queue_submit_and_signal()?
483                 }
484             }
485             COMPLETION_EVENT => {
486                 self.disk_image.notifier().read().map_err(|e| {
487                     EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e))
488                 })?;
489 
490                 let needs_notification = self.process_queue_complete().map_err(|e| {
491                     EpollHelperError::HandleEvent(anyhow!(
492                         "Failed to process queue (complete): {:?}",
493                         e
494                     ))
495                 })?;
496 
497                 if needs_notification {
498                     self.signal_used_queue().map_err(|e| {
499                         EpollHelperError::HandleEvent(anyhow!(
500                             "Failed to signal used queue: {:?}",
501                             e
502                         ))
503                     })?;
504                 }
505             }
506             RATE_LIMITER_EVENT => {
507                 if let Some(rate_limiter) = &mut self.rate_limiter {
508                     // Upon rate limiter event, call the rate limiter handler
509                     // and restart processing the queue.
510                     rate_limiter.event_handler().map_err(|e| {
511                         EpollHelperError::HandleEvent(anyhow!(
512                             "Failed to process rate limiter event: {:?}",
513                             e
514                         ))
515                     })?;
516 
517                     self.process_queue_submit_and_signal()?
518                 } else {
519                     return Err(EpollHelperError::HandleEvent(anyhow!(
520                         "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."
521                     )));
522                 }
523             }
524             _ => {
525                 return Err(EpollHelperError::HandleEvent(anyhow!(
526                     "Unexpected event: {}",
527                     ev_type
528                 )));
529             }
530         }
531         Ok(())
532     }
533 }
534 
535 /// Virtio device for exposing block level read/write operations on a host file.
536 pub struct Block {
537     common: VirtioCommon,
538     id: String,
539     disk_image: Box<dyn DiskFile>,
540     disk_path: PathBuf,
541     disk_nsectors: u64,
542     config: VirtioBlockConfig,
543     writeback: Arc<AtomicBool>,
544     counters: BlockCounters,
545     seccomp_action: SeccompAction,
546     rate_limiter: Option<Arc<RateLimiterGroup>>,
547     exit_evt: EventFd,
548     read_only: bool,
549     serial: Vec<u8>,
550     queue_affinity: BTreeMap<u16, Vec<usize>>,
551 }
552 
553 #[derive(Serialize, Deserialize)]
554 pub struct BlockState {
555     pub disk_path: String,
556     pub disk_nsectors: u64,
557     pub avail_features: u64,
558     pub acked_features: u64,
559     pub config: VirtioBlockConfig,
560 }
561 
562 impl Block {
563     /// Create a new virtio block device that operates on the given file.
564     #[allow(clippy::too_many_arguments)]
565     pub fn new(
566         id: String,
567         mut disk_image: Box<dyn DiskFile>,
568         disk_path: PathBuf,
569         read_only: bool,
570         iommu: bool,
571         num_queues: usize,
572         queue_size: u16,
573         serial: Option<String>,
574         seccomp_action: SeccompAction,
575         rate_limiter: Option<Arc<RateLimiterGroup>>,
576         exit_evt: EventFd,
577         state: Option<BlockState>,
578         queue_affinity: BTreeMap<u16, Vec<usize>>,
579     ) -> io::Result<Self> {
580         let (disk_nsectors, avail_features, acked_features, config, paused) =
581             if let Some(state) = state {
582                 info!("Restoring virtio-block {}", id);
583                 (
584                     state.disk_nsectors,
585                     state.avail_features,
586                     state.acked_features,
587                     state.config,
588                     true,
589                 )
590             } else {
591                 let disk_size = disk_image.size().map_err(|e| {
592                     io::Error::new(
593                         io::ErrorKind::Other,
594                         format!("Failed getting disk size: {e}"),
595                     )
596                 })?;
597                 if disk_size % SECTOR_SIZE != 0 {
598                     warn!(
599                         "Disk size {} is not a multiple of sector size {}; \
600                  the remainder will not be visible to the guest.",
601                         disk_size, SECTOR_SIZE
602                     );
603                 }
604 
605                 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1)
606                     | (1u64 << VIRTIO_BLK_F_FLUSH)
607                     | (1u64 << VIRTIO_BLK_F_CONFIG_WCE)
608                     | (1u64 << VIRTIO_BLK_F_BLK_SIZE)
609                     | (1u64 << VIRTIO_BLK_F_TOPOLOGY);
610 
611                 if iommu {
612                     avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM;
613                 }
614 
615                 if read_only {
616                     avail_features |= 1u64 << VIRTIO_BLK_F_RO;
617                 }
618 
619                 let topology = disk_image.topology();
620                 info!("Disk topology: {:?}", topology);
621 
622                 let logical_block_size = if topology.logical_block_size > 512 {
623                     topology.logical_block_size
624                 } else {
625                     512
626                 };
627 
628                 // Calculate the exponent that maps physical block to logical block
629                 let mut physical_block_exp = 0;
630                 let mut size = logical_block_size;
631                 while size < topology.physical_block_size {
632                     physical_block_exp += 1;
633                     size <<= 1;
634                 }
635 
636                 let disk_nsectors = disk_size / SECTOR_SIZE;
637                 let mut config = VirtioBlockConfig {
638                     capacity: disk_nsectors,
639                     writeback: 1,
640                     blk_size: topology.logical_block_size as u32,
641                     physical_block_exp,
642                     min_io_size: (topology.minimum_io_size / logical_block_size) as u16,
643                     opt_io_size: (topology.optimal_io_size / logical_block_size) as u32,
644                     ..Default::default()
645                 };
646 
647                 if num_queues > 1 {
648                     avail_features |= 1u64 << VIRTIO_BLK_F_MQ;
649                     config.num_queues = num_queues as u16;
650                 }
651 
652                 (disk_nsectors, avail_features, 0, config, false)
653             };
654 
655         let serial = serial
656             .map(Vec::from)
657             .unwrap_or_else(|| build_serial(&disk_path));
658 
659         Ok(Block {
660             common: VirtioCommon {
661                 device_type: VirtioDeviceType::Block as u32,
662                 avail_features,
663                 acked_features,
664                 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))),
665                 queue_sizes: vec![queue_size; num_queues],
666                 min_queues: 1,
667                 paused: Arc::new(AtomicBool::new(paused)),
668                 ..Default::default()
669             },
670             id,
671             disk_image,
672             disk_path,
673             disk_nsectors,
674             config,
675             writeback: Arc::new(AtomicBool::new(true)),
676             counters: BlockCounters::default(),
677             seccomp_action,
678             rate_limiter,
679             exit_evt,
680             read_only,
681             serial,
682             queue_affinity,
683         })
684     }
685 
686     fn state(&self) -> BlockState {
687         BlockState {
688             disk_path: self.disk_path.to_str().unwrap().to_owned(),
689             disk_nsectors: self.disk_nsectors,
690             avail_features: self.common.avail_features,
691             acked_features: self.common.acked_features,
692             config: self.config,
693         }
694     }
695 
696     fn update_writeback(&mut self) {
697         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
698         let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) {
699             self.config.writeback == 1
700         } else {
701             // Else check if VIRTIO_BLK_F_FLUSH negotiated
702             self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into())
703         };
704 
705         info!(
706             "Changing cache mode to {}",
707             if writeback {
708                 "writeback"
709             } else {
710                 "writethrough"
711             }
712         );
713         self.writeback.store(writeback, Ordering::Release);
714     }
715 
716     #[cfg(fuzzing)]
717     pub fn wait_for_epoll_threads(&mut self) {
718         self.common.wait_for_epoll_threads();
719     }
720 }
721 
722 impl Drop for Block {
723     fn drop(&mut self) {
724         if let Some(kill_evt) = self.common.kill_evt.take() {
725             // Ignore the result because there is nothing we can do about it.
726             let _ = kill_evt.write(1);
727         }
728         self.common.wait_for_epoll_threads();
729     }
730 }
731 
732 impl VirtioDevice for Block {
733     fn device_type(&self) -> u32 {
734         self.common.device_type
735     }
736 
737     fn queue_max_sizes(&self) -> &[u16] {
738         &self.common.queue_sizes
739     }
740 
741     fn features(&self) -> u64 {
742         self.common.avail_features
743     }
744 
745     fn ack_features(&mut self, value: u64) {
746         self.common.ack_features(value)
747     }
748 
749     fn read_config(&self, offset: u64, data: &mut [u8]) {
750         self.read_config_from_slice(self.config.as_slice(), offset, data);
751     }
752 
753     fn write_config(&mut self, offset: u64, data: &[u8]) {
754         // The "writeback" field is the only mutable field
755         let writeback_offset =
756             (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64);
757         if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback)
758         {
759             error!(
760                 "Attempt to write to read-only field: offset {:x} length {}",
761                 offset,
762                 data.len()
763             );
764             return;
765         }
766 
767         self.config.writeback = data[0];
768         self.update_writeback();
769     }
770 
771     fn activate(
772         &mut self,
773         mem: GuestMemoryAtomic<GuestMemoryMmap>,
774         interrupt_cb: Arc<dyn VirtioInterrupt>,
775         mut queues: Vec<(usize, Queue, EventFd)>,
776     ) -> ActivateResult {
777         self.common.activate(&queues, &interrupt_cb)?;
778 
779         self.update_writeback();
780 
781         let mut epoll_threads = Vec::new();
782         for i in 0..queues.len() {
783             let (_, queue, queue_evt) = queues.remove(0);
784             let queue_size = queue.size();
785             let (kill_evt, pause_evt) = self.common.dup_eventfds();
786             let queue_idx = i as u16;
787 
788             let mut handler = BlockEpollHandler {
789                 queue_index: queue_idx,
790                 queue,
791                 mem: mem.clone(),
792                 disk_image: self
793                     .disk_image
794                     .new_async_io(queue_size as u32)
795                     .map_err(|e| {
796                         error!("failed to create new AsyncIo: {}", e);
797                         ActivateError::BadActivate
798                     })?,
799                 disk_nsectors: self.disk_nsectors,
800                 interrupt_cb: interrupt_cb.clone(),
801                 serial: self.serial.clone(),
802                 kill_evt,
803                 pause_evt,
804                 writeback: self.writeback.clone(),
805                 counters: self.counters.clone(),
806                 queue_evt,
807                 // Analysis during boot shows around ~40 maximum requests
808                 // This gives head room for systems with slower I/O without
809                 // compromising the cost of the reallocation or memory overhead
810                 inflight_requests: VecDeque::with_capacity(64),
811                 rate_limiter: self
812                     .rate_limiter
813                     .as_ref()
814                     .map(|r| r.new_handle())
815                     .transpose()
816                     .unwrap(),
817                 access_platform: self.common.access_platform.clone(),
818                 read_only: self.read_only,
819                 host_cpus: self.queue_affinity.get(&queue_idx).cloned(),
820             };
821 
822             let paused = self.common.paused.clone();
823             let paused_sync = self.common.paused_sync.clone();
824 
825             spawn_virtio_thread(
826                 &format!("{}_q{}", self.id.clone(), i),
827                 &self.seccomp_action,
828                 Thread::VirtioBlock,
829                 &mut epoll_threads,
830                 &self.exit_evt,
831                 move || handler.run(paused, paused_sync.unwrap()),
832             )?;
833         }
834 
835         self.common.epoll_threads = Some(epoll_threads);
836         event!("virtio-device", "activated", "id", &self.id);
837 
838         Ok(())
839     }
840 
841     fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> {
842         let result = self.common.reset();
843         event!("virtio-device", "reset", "id", &self.id);
844         result
845     }
846 
847     fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> {
848         let mut counters = HashMap::new();
849 
850         counters.insert(
851             "read_bytes",
852             Wrapping(self.counters.read_bytes.load(Ordering::Acquire)),
853         );
854         counters.insert(
855             "write_bytes",
856             Wrapping(self.counters.write_bytes.load(Ordering::Acquire)),
857         );
858         counters.insert(
859             "read_ops",
860             Wrapping(self.counters.read_ops.load(Ordering::Acquire)),
861         );
862         counters.insert(
863             "write_ops",
864             Wrapping(self.counters.write_ops.load(Ordering::Acquire)),
865         );
866         counters.insert(
867             "write_latency_min",
868             Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)),
869         );
870         counters.insert(
871             "write_latency_max",
872             Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)),
873         );
874         counters.insert(
875             "write_latency_avg",
876             Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
877         );
878         counters.insert(
879             "read_latency_min",
880             Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)),
881         );
882         counters.insert(
883             "read_latency_max",
884             Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)),
885         );
886         counters.insert(
887             "read_latency_avg",
888             Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE),
889         );
890 
891         Some(counters)
892     }
893 
894     fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) {
895         self.common.set_access_platform(access_platform)
896     }
897 }
898 
899 impl Pausable for Block {
900     fn pause(&mut self) -> result::Result<(), MigratableError> {
901         self.common.pause()
902     }
903 
904     fn resume(&mut self) -> result::Result<(), MigratableError> {
905         self.common.resume()
906     }
907 }
908 
909 impl Snapshottable for Block {
910     fn id(&self) -> String {
911         self.id.clone()
912     }
913 
914     fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> {
915         Snapshot::new_from_state(&self.state())
916     }
917 }
918 impl Transportable for Block {}
919 impl Migratable for Block {}
920