1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use super::Error as DeviceError; 12 use super::{ 13 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon, 14 VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST, 15 }; 16 use crate::seccomp_filters::Thread; 17 use crate::thread_helper::spawn_virtio_thread; 18 use crate::GuestMemoryMmap; 19 use crate::VirtioInterrupt; 20 use anyhow::anyhow; 21 use block::{ 22 async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request, 23 RequestType, VirtioBlockConfig, 24 }; 25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; 26 use rate_limiter::TokenType; 27 use seccompiler::SeccompAction; 28 use serde::{Deserialize, Serialize}; 29 use std::collections::BTreeMap; 30 use std::collections::HashMap; 31 use std::collections::VecDeque; 32 use std::io; 33 use std::num::Wrapping; 34 use std::ops::Deref; 35 use std::os::unix::io::AsRawFd; 36 use std::path::PathBuf; 37 use std::result; 38 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 39 use std::sync::{Arc, Barrier}; 40 use thiserror::Error; 41 use virtio_bindings::virtio_blk::*; 42 use virtio_bindings::virtio_config::*; 43 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 44 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 45 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 46 use vm_virtio::AccessPlatform; 47 use vmm_sys_util::eventfd::EventFd; 48 49 const SECTOR_SHIFT: u8 = 9; 50 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 51 52 // New descriptors are pending on the virtio queue. 53 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 54 // New completed tasks are pending on the completion ring. 55 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 56 // New 'wake up' event from the rate limiter 57 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 58 59 // latency scale, for reduce precision loss in calculate. 60 const LATENCY_SCALE: u64 = 10000; 61 62 #[derive(Error, Debug)] 63 pub enum Error { 64 #[error("Failed to parse the request: {0}")] 65 RequestParsing(block::Error), 66 #[error("Failed to execute the request: {0}")] 67 RequestExecuting(block::ExecuteError), 68 #[error("Failed to complete the request: {0}")] 69 RequestCompleting(block::Error), 70 #[error("Missing the expected entry in the list of requests")] 71 MissingEntryRequestList, 72 #[error("The asynchronous request returned with failure")] 73 AsyncRequestFailure, 74 #[error("Failed synchronizing the file: {0}")] 75 Fsync(AsyncIoError), 76 #[error("Failed adding used index: {0}")] 77 QueueAddUsed(virtio_queue::Error), 78 #[error("Failed creating an iterator over the queue: {0}")] 79 QueueIterator(virtio_queue::Error), 80 #[error("Failed to update request status: {0}")] 81 RequestStatus(GuestMemoryError), 82 } 83 84 pub type Result<T> = result::Result<T, Error>; 85 86 // latency will be records as microseconds, average latency 87 // will be save as scaled value. 88 #[derive(Clone)] 89 pub struct BlockCounters { 90 read_bytes: Arc<AtomicU64>, 91 read_ops: Arc<AtomicU64>, 92 read_latency_min: Arc<AtomicU64>, 93 read_latency_max: Arc<AtomicU64>, 94 read_latency_avg: Arc<AtomicU64>, 95 write_bytes: Arc<AtomicU64>, 96 write_ops: Arc<AtomicU64>, 97 write_latency_min: Arc<AtomicU64>, 98 write_latency_max: Arc<AtomicU64>, 99 write_latency_avg: Arc<AtomicU64>, 100 } 101 102 impl Default for BlockCounters { 103 fn default() -> Self { 104 BlockCounters { 105 read_bytes: Arc::new(AtomicU64::new(0)), 106 read_ops: Arc::new(AtomicU64::new(0)), 107 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 108 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 109 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 110 write_bytes: Arc::new(AtomicU64::new(0)), 111 write_ops: Arc::new(AtomicU64::new(0)), 112 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 113 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 114 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 115 } 116 } 117 } 118 119 struct BlockEpollHandler { 120 queue_index: u16, 121 queue: Queue, 122 mem: GuestMemoryAtomic<GuestMemoryMmap>, 123 disk_image: Box<dyn AsyncIo>, 124 disk_nsectors: u64, 125 interrupt_cb: Arc<dyn VirtioInterrupt>, 126 serial: Vec<u8>, 127 kill_evt: EventFd, 128 pause_evt: EventFd, 129 writeback: Arc<AtomicBool>, 130 counters: BlockCounters, 131 queue_evt: EventFd, 132 inflight_requests: VecDeque<(u16, Request)>, 133 rate_limiter: Option<RateLimiterGroupHandle>, 134 access_platform: Option<Arc<dyn AccessPlatform>>, 135 read_only: bool, 136 host_cpus: Option<Vec<usize>>, 137 } 138 139 impl BlockEpollHandler { 140 fn process_queue_submit(&mut self) -> Result<bool> { 141 let queue = &mut self.queue; 142 143 let mut used_descs = false; 144 145 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 146 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 147 .map_err(Error::RequestParsing)?; 148 149 // For virtio spec compliance 150 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 151 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 152 if self.read_only 153 && (request.request_type == RequestType::Out 154 || request.request_type == RequestType::Flush) 155 { 156 desc_chain 157 .memory() 158 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 159 .map_err(Error::RequestStatus)?; 160 161 // If no asynchronous operation has been submitted, we can 162 // simply return the used descriptor. 163 queue 164 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 165 .map_err(Error::QueueAddUsed)?; 166 used_descs = true; 167 continue; 168 } 169 170 if let Some(rate_limiter) = &mut self.rate_limiter { 171 // If limiter.consume() fails it means there is no more TokenType::Ops 172 // budget and rate limiting is in effect. 173 if !rate_limiter.consume(1, TokenType::Ops) { 174 // Stop processing the queue and return this descriptor chain to the 175 // avail ring, for later processing. 176 queue.go_to_previous_position(); 177 break; 178 } 179 // Exercise the rate limiter only if this request is of data transfer type. 180 if request.request_type == RequestType::In 181 || request.request_type == RequestType::Out 182 { 183 let mut bytes = Wrapping(0); 184 for (_, data_len) in &request.data_descriptors { 185 bytes += Wrapping(*data_len as u64); 186 } 187 188 // If limiter.consume() fails it means there is no more TokenType::Bytes 189 // budget and rate limiting is in effect. 190 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 191 // Revert the OPS consume(). 192 rate_limiter.manual_replenish(1, TokenType::Ops); 193 // Stop processing the queue and return this descriptor chain to the 194 // avail ring, for later processing. 195 queue.go_to_previous_position(); 196 break; 197 } 198 }; 199 } 200 201 request.set_writeback(self.writeback.load(Ordering::Acquire)); 202 203 if request 204 .execute_async( 205 desc_chain.memory(), 206 self.disk_nsectors, 207 self.disk_image.as_mut(), 208 &self.serial, 209 desc_chain.head_index() as u64, 210 ) 211 .map_err(Error::RequestExecuting)? 212 { 213 self.inflight_requests 214 .push_back((desc_chain.head_index(), request)); 215 } else { 216 desc_chain 217 .memory() 218 .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr) 219 .map_err(Error::RequestStatus)?; 220 221 // If no asynchronous operation has been submitted, we can 222 // simply return the used descriptor. 223 queue 224 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 225 .map_err(Error::QueueAddUsed)?; 226 used_descs = true; 227 } 228 } 229 230 Ok(used_descs) 231 } 232 233 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 234 let needs_notification = self.process_queue_submit().map_err(|e| { 235 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 236 })?; 237 238 if needs_notification { 239 self.signal_used_queue().map_err(|e| { 240 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 241 })? 242 }; 243 244 Ok(()) 245 } 246 247 #[inline] 248 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 249 // This loop neatly handles the fast path where the completions are 250 // in order (it turns into just a pop_front()) and the 1% of the time 251 // (analysis during boot) where slight out of ordering has been 252 // observed e.g. 253 // Submissions: 1 2 3 4 5 6 7 254 // Completions: 2 1 3 5 4 7 6 255 // In this case find the corresponding item and swap it with the front 256 // This is a O(1) operation and is prepared for the future as it it likely 257 // the next completion would be for the one that was skipped which will 258 // now be the new front. 259 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 260 if head == &completed_head { 261 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 262 } 263 } 264 265 Err(Error::MissingEntryRequestList) 266 } 267 268 fn process_queue_complete(&mut self) -> Result<bool> { 269 let mut used_descs = false; 270 let mem = self.mem.memory(); 271 let mut read_bytes = Wrapping(0); 272 let mut write_bytes = Wrapping(0); 273 let mut read_ops = Wrapping(0); 274 let mut write_ops = Wrapping(0); 275 276 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 277 let desc_index = user_data as u16; 278 279 let mut request = self.find_inflight_request(desc_index)?; 280 281 request.complete_async().map_err(Error::RequestCompleting)?; 282 283 let latency = request.start.elapsed().as_micros() as u64; 284 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 285 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 286 let read_max = self.counters.read_latency_max.load(Ordering::Relaxed); 287 let write_max = self.counters.write_latency_max.load(Ordering::Relaxed); 288 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 289 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 290 let (status, len) = if result >= 0 { 291 match request.request_type { 292 RequestType::In => { 293 for (_, data_len) in &request.data_descriptors { 294 read_bytes += Wrapping(*data_len as u64); 295 } 296 read_ops += Wrapping(1); 297 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 298 self.counters 299 .read_latency_min 300 .store(latency, Ordering::Relaxed); 301 } 302 if latency > read_max || read_max == u64::MAX { 303 self.counters 304 .read_latency_max 305 .store(latency, Ordering::Relaxed); 306 } 307 308 // Special case the first real latency report 309 read_avg = if read_avg == u64::MAX { 310 latency * LATENCY_SCALE 311 } else { 312 // Cumulative average is guaranteed to be 313 // positive if being calculated properly 314 (read_avg as i64 315 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64) 316 / (read_ops_last + read_ops.0) as i64) 317 .try_into() 318 .unwrap() 319 }; 320 } 321 RequestType::Out => { 322 if !request.writeback { 323 self.disk_image.fsync(None).map_err(Error::Fsync)?; 324 } 325 for (_, data_len) in &request.data_descriptors { 326 write_bytes += Wrapping(*data_len as u64); 327 } 328 write_ops += Wrapping(1); 329 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 330 self.counters 331 .write_latency_min 332 .store(latency, Ordering::Relaxed); 333 } 334 if latency > write_max || write_max == u64::MAX { 335 self.counters 336 .write_latency_max 337 .store(latency, Ordering::Relaxed); 338 } 339 340 // Special case the first real latency report 341 write_avg = if write_avg == u64::MAX { 342 latency * LATENCY_SCALE 343 } else { 344 // Cumulative average is guaranteed to be 345 // positive if being calculated properly 346 (write_avg as i64 347 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64) 348 / (write_ops_last + write_ops.0) as i64) 349 .try_into() 350 .unwrap() 351 } 352 } 353 _ => {} 354 } 355 356 self.counters 357 .read_latency_avg 358 .store(read_avg, Ordering::Relaxed); 359 360 self.counters 361 .write_latency_avg 362 .store(write_avg, Ordering::Relaxed); 363 364 (VIRTIO_BLK_S_OK as u8, result as u32) 365 } else { 366 error!( 367 "Request failed: {:x?} {:?}", 368 request, 369 io::Error::from_raw_os_error(-result) 370 ); 371 return Err(Error::AsyncRequestFailure); 372 }; 373 374 mem.write_obj(status, request.status_addr) 375 .map_err(Error::RequestStatus)?; 376 377 let queue = &mut self.queue; 378 379 queue 380 .add_used(mem.deref(), desc_index, len) 381 .map_err(Error::QueueAddUsed)?; 382 used_descs = true; 383 } 384 385 self.counters 386 .write_bytes 387 .fetch_add(write_bytes.0, Ordering::AcqRel); 388 self.counters 389 .write_ops 390 .fetch_add(write_ops.0, Ordering::AcqRel); 391 392 self.counters 393 .read_bytes 394 .fetch_add(read_bytes.0, Ordering::AcqRel); 395 self.counters 396 .read_ops 397 .fetch_add(read_ops.0, Ordering::AcqRel); 398 399 Ok(used_descs) 400 } 401 402 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 403 self.interrupt_cb 404 .trigger(VirtioInterruptType::Queue(self.queue_index)) 405 .map_err(|e| { 406 error!("Failed to signal used queue: {:?}", e); 407 DeviceError::FailedSignalingUsedQueue(e) 408 }) 409 } 410 411 fn set_queue_thread_affinity(&self) { 412 // Prepare the CPU set the current queue thread is expected to run onto. 413 let cpuset = self.host_cpus.as_ref().map(|host_cpus| { 414 // SAFETY: all zeros is a valid pattern 415 let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; 416 // SAFETY: FFI call, trivially safe 417 unsafe { libc::CPU_ZERO(&mut cpuset) }; 418 for host_cpu in host_cpus { 419 // SAFETY: FFI call, trivially safe 420 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; 421 } 422 cpuset 423 }); 424 425 // Schedule the thread to run on the expected CPU set 426 if let Some(cpuset) = cpuset.as_ref() { 427 // SAFETY: FFI call with correct arguments 428 let ret = unsafe { 429 libc::sched_setaffinity( 430 0, 431 std::mem::size_of::<libc::cpu_set_t>(), 432 cpuset as *const libc::cpu_set_t, 433 ) 434 }; 435 436 if ret != 0 { 437 error!( 438 "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", 439 self.queue_index, 440 io::Error::last_os_error() 441 ) 442 } 443 } 444 } 445 446 fn run( 447 &mut self, 448 paused: Arc<AtomicBool>, 449 paused_sync: Arc<Barrier>, 450 ) -> result::Result<(), EpollHelperError> { 451 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 452 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 453 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 454 if let Some(rate_limiter) = &self.rate_limiter { 455 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 456 } 457 self.set_queue_thread_affinity(); 458 helper.run(paused, paused_sync, self)?; 459 460 Ok(()) 461 } 462 } 463 464 impl EpollHelperHandler for BlockEpollHandler { 465 fn handle_event( 466 &mut self, 467 _helper: &mut EpollHelper, 468 event: &epoll::Event, 469 ) -> result::Result<(), EpollHelperError> { 470 let ev_type = event.data as u16; 471 match ev_type { 472 QUEUE_AVAIL_EVENT => { 473 self.queue_evt.read().map_err(|e| { 474 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 475 })?; 476 477 let rate_limit_reached = 478 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 479 480 // Process the queue only when the rate limit is not reached 481 if !rate_limit_reached { 482 self.process_queue_submit_and_signal()? 483 } 484 } 485 COMPLETION_EVENT => { 486 self.disk_image.notifier().read().map_err(|e| { 487 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 488 })?; 489 490 let needs_notification = self.process_queue_complete().map_err(|e| { 491 EpollHelperError::HandleEvent(anyhow!( 492 "Failed to process queue (complete): {:?}", 493 e 494 )) 495 })?; 496 497 if needs_notification { 498 self.signal_used_queue().map_err(|e| { 499 EpollHelperError::HandleEvent(anyhow!( 500 "Failed to signal used queue: {:?}", 501 e 502 )) 503 })?; 504 } 505 } 506 RATE_LIMITER_EVENT => { 507 if let Some(rate_limiter) = &mut self.rate_limiter { 508 // Upon rate limiter event, call the rate limiter handler 509 // and restart processing the queue. 510 rate_limiter.event_handler().map_err(|e| { 511 EpollHelperError::HandleEvent(anyhow!( 512 "Failed to process rate limiter event: {:?}", 513 e 514 )) 515 })?; 516 517 self.process_queue_submit_and_signal()? 518 } else { 519 return Err(EpollHelperError::HandleEvent(anyhow!( 520 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 521 ))); 522 } 523 } 524 _ => { 525 return Err(EpollHelperError::HandleEvent(anyhow!( 526 "Unexpected event: {}", 527 ev_type 528 ))); 529 } 530 } 531 Ok(()) 532 } 533 } 534 535 /// Virtio device for exposing block level read/write operations on a host file. 536 pub struct Block { 537 common: VirtioCommon, 538 id: String, 539 disk_image: Box<dyn DiskFile>, 540 disk_path: PathBuf, 541 disk_nsectors: u64, 542 config: VirtioBlockConfig, 543 writeback: Arc<AtomicBool>, 544 counters: BlockCounters, 545 seccomp_action: SeccompAction, 546 rate_limiter: Option<Arc<RateLimiterGroup>>, 547 exit_evt: EventFd, 548 read_only: bool, 549 serial: Vec<u8>, 550 queue_affinity: BTreeMap<u16, Vec<usize>>, 551 } 552 553 #[derive(Serialize, Deserialize)] 554 pub struct BlockState { 555 pub disk_path: String, 556 pub disk_nsectors: u64, 557 pub avail_features: u64, 558 pub acked_features: u64, 559 pub config: VirtioBlockConfig, 560 } 561 562 impl Block { 563 /// Create a new virtio block device that operates on the given file. 564 #[allow(clippy::too_many_arguments)] 565 pub fn new( 566 id: String, 567 mut disk_image: Box<dyn DiskFile>, 568 disk_path: PathBuf, 569 read_only: bool, 570 iommu: bool, 571 num_queues: usize, 572 queue_size: u16, 573 serial: Option<String>, 574 seccomp_action: SeccompAction, 575 rate_limiter: Option<Arc<RateLimiterGroup>>, 576 exit_evt: EventFd, 577 state: Option<BlockState>, 578 queue_affinity: BTreeMap<u16, Vec<usize>>, 579 ) -> io::Result<Self> { 580 let (disk_nsectors, avail_features, acked_features, config, paused) = 581 if let Some(state) = state { 582 info!("Restoring virtio-block {}", id); 583 ( 584 state.disk_nsectors, 585 state.avail_features, 586 state.acked_features, 587 state.config, 588 true, 589 ) 590 } else { 591 let disk_size = disk_image.size().map_err(|e| { 592 io::Error::new( 593 io::ErrorKind::Other, 594 format!("Failed getting disk size: {e}"), 595 ) 596 })?; 597 if disk_size % SECTOR_SIZE != 0 { 598 warn!( 599 "Disk size {} is not a multiple of sector size {}; \ 600 the remainder will not be visible to the guest.", 601 disk_size, SECTOR_SIZE 602 ); 603 } 604 605 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 606 | (1u64 << VIRTIO_BLK_F_FLUSH) 607 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 608 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 609 | (1u64 << VIRTIO_BLK_F_TOPOLOGY); 610 611 if iommu { 612 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 613 } 614 615 if read_only { 616 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 617 } 618 619 let topology = disk_image.topology(); 620 info!("Disk topology: {:?}", topology); 621 622 let logical_block_size = if topology.logical_block_size > 512 { 623 topology.logical_block_size 624 } else { 625 512 626 }; 627 628 // Calculate the exponent that maps physical block to logical block 629 let mut physical_block_exp = 0; 630 let mut size = logical_block_size; 631 while size < topology.physical_block_size { 632 physical_block_exp += 1; 633 size <<= 1; 634 } 635 636 let disk_nsectors = disk_size / SECTOR_SIZE; 637 let mut config = VirtioBlockConfig { 638 capacity: disk_nsectors, 639 writeback: 1, 640 blk_size: topology.logical_block_size as u32, 641 physical_block_exp, 642 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 643 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 644 ..Default::default() 645 }; 646 647 if num_queues > 1 { 648 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 649 config.num_queues = num_queues as u16; 650 } 651 652 (disk_nsectors, avail_features, 0, config, false) 653 }; 654 655 let serial = serial 656 .map(Vec::from) 657 .unwrap_or_else(|| build_serial(&disk_path)); 658 659 Ok(Block { 660 common: VirtioCommon { 661 device_type: VirtioDeviceType::Block as u32, 662 avail_features, 663 acked_features, 664 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 665 queue_sizes: vec![queue_size; num_queues], 666 min_queues: 1, 667 paused: Arc::new(AtomicBool::new(paused)), 668 ..Default::default() 669 }, 670 id, 671 disk_image, 672 disk_path, 673 disk_nsectors, 674 config, 675 writeback: Arc::new(AtomicBool::new(true)), 676 counters: BlockCounters::default(), 677 seccomp_action, 678 rate_limiter, 679 exit_evt, 680 read_only, 681 serial, 682 queue_affinity, 683 }) 684 } 685 686 fn state(&self) -> BlockState { 687 BlockState { 688 disk_path: self.disk_path.to_str().unwrap().to_owned(), 689 disk_nsectors: self.disk_nsectors, 690 avail_features: self.common.avail_features, 691 acked_features: self.common.acked_features, 692 config: self.config, 693 } 694 } 695 696 fn update_writeback(&mut self) { 697 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 698 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 699 self.config.writeback == 1 700 } else { 701 // Else check if VIRTIO_BLK_F_FLUSH negotiated 702 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 703 }; 704 705 info!( 706 "Changing cache mode to {}", 707 if writeback { 708 "writeback" 709 } else { 710 "writethrough" 711 } 712 ); 713 self.writeback.store(writeback, Ordering::Release); 714 } 715 716 #[cfg(fuzzing)] 717 pub fn wait_for_epoll_threads(&mut self) { 718 self.common.wait_for_epoll_threads(); 719 } 720 } 721 722 impl Drop for Block { 723 fn drop(&mut self) { 724 if let Some(kill_evt) = self.common.kill_evt.take() { 725 // Ignore the result because there is nothing we can do about it. 726 let _ = kill_evt.write(1); 727 } 728 self.common.wait_for_epoll_threads(); 729 } 730 } 731 732 impl VirtioDevice for Block { 733 fn device_type(&self) -> u32 { 734 self.common.device_type 735 } 736 737 fn queue_max_sizes(&self) -> &[u16] { 738 &self.common.queue_sizes 739 } 740 741 fn features(&self) -> u64 { 742 self.common.avail_features 743 } 744 745 fn ack_features(&mut self, value: u64) { 746 self.common.ack_features(value) 747 } 748 749 fn read_config(&self, offset: u64, data: &mut [u8]) { 750 self.read_config_from_slice(self.config.as_slice(), offset, data); 751 } 752 753 fn write_config(&mut self, offset: u64, data: &[u8]) { 754 // The "writeback" field is the only mutable field 755 let writeback_offset = 756 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 757 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 758 { 759 error!( 760 "Attempt to write to read-only field: offset {:x} length {}", 761 offset, 762 data.len() 763 ); 764 return; 765 } 766 767 self.config.writeback = data[0]; 768 self.update_writeback(); 769 } 770 771 fn activate( 772 &mut self, 773 mem: GuestMemoryAtomic<GuestMemoryMmap>, 774 interrupt_cb: Arc<dyn VirtioInterrupt>, 775 mut queues: Vec<(usize, Queue, EventFd)>, 776 ) -> ActivateResult { 777 self.common.activate(&queues, &interrupt_cb)?; 778 779 self.update_writeback(); 780 781 let mut epoll_threads = Vec::new(); 782 for i in 0..queues.len() { 783 let (_, queue, queue_evt) = queues.remove(0); 784 let queue_size = queue.size(); 785 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 786 let queue_idx = i as u16; 787 788 let mut handler = BlockEpollHandler { 789 queue_index: queue_idx, 790 queue, 791 mem: mem.clone(), 792 disk_image: self 793 .disk_image 794 .new_async_io(queue_size as u32) 795 .map_err(|e| { 796 error!("failed to create new AsyncIo: {}", e); 797 ActivateError::BadActivate 798 })?, 799 disk_nsectors: self.disk_nsectors, 800 interrupt_cb: interrupt_cb.clone(), 801 serial: self.serial.clone(), 802 kill_evt, 803 pause_evt, 804 writeback: self.writeback.clone(), 805 counters: self.counters.clone(), 806 queue_evt, 807 // Analysis during boot shows around ~40 maximum requests 808 // This gives head room for systems with slower I/O without 809 // compromising the cost of the reallocation or memory overhead 810 inflight_requests: VecDeque::with_capacity(64), 811 rate_limiter: self 812 .rate_limiter 813 .as_ref() 814 .map(|r| r.new_handle()) 815 .transpose() 816 .unwrap(), 817 access_platform: self.common.access_platform.clone(), 818 read_only: self.read_only, 819 host_cpus: self.queue_affinity.get(&queue_idx).cloned(), 820 }; 821 822 let paused = self.common.paused.clone(); 823 let paused_sync = self.common.paused_sync.clone(); 824 825 spawn_virtio_thread( 826 &format!("{}_q{}", self.id.clone(), i), 827 &self.seccomp_action, 828 Thread::VirtioBlock, 829 &mut epoll_threads, 830 &self.exit_evt, 831 move || handler.run(paused, paused_sync.unwrap()), 832 )?; 833 } 834 835 self.common.epoll_threads = Some(epoll_threads); 836 event!("virtio-device", "activated", "id", &self.id); 837 838 Ok(()) 839 } 840 841 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 842 let result = self.common.reset(); 843 event!("virtio-device", "reset", "id", &self.id); 844 result 845 } 846 847 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 848 let mut counters = HashMap::new(); 849 850 counters.insert( 851 "read_bytes", 852 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 853 ); 854 counters.insert( 855 "write_bytes", 856 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 857 ); 858 counters.insert( 859 "read_ops", 860 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 861 ); 862 counters.insert( 863 "write_ops", 864 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 865 ); 866 counters.insert( 867 "write_latency_min", 868 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 869 ); 870 counters.insert( 871 "write_latency_max", 872 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 873 ); 874 counters.insert( 875 "write_latency_avg", 876 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 877 ); 878 counters.insert( 879 "read_latency_min", 880 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 881 ); 882 counters.insert( 883 "read_latency_max", 884 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 885 ); 886 counters.insert( 887 "read_latency_avg", 888 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 889 ); 890 891 Some(counters) 892 } 893 894 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 895 self.common.set_access_platform(access_platform) 896 } 897 } 898 899 impl Pausable for Block { 900 fn pause(&mut self) -> result::Result<(), MigratableError> { 901 self.common.pause() 902 } 903 904 fn resume(&mut self) -> result::Result<(), MigratableError> { 905 self.common.resume() 906 } 907 } 908 909 impl Snapshottable for Block { 910 fn id(&self) -> String { 911 self.id.clone() 912 } 913 914 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 915 Snapshot::new_from_state(&self.state()) 916 } 917 } 918 impl Transportable for Block {} 919 impl Migratable for Block {} 920