1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use super::Error as DeviceError; 12 use super::{ 13 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon, 14 VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST, 15 }; 16 use crate::seccomp_filters::Thread; 17 use crate::thread_helper::spawn_virtio_thread; 18 use crate::GuestMemoryMmap; 19 use crate::VirtioInterrupt; 20 use anyhow::anyhow; 21 use block::{ 22 async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request, 23 RequestType, VirtioBlockConfig, 24 }; 25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; 26 use rate_limiter::TokenType; 27 use seccompiler::SeccompAction; 28 use std::collections::BTreeMap; 29 use std::collections::HashMap; 30 use std::collections::VecDeque; 31 use std::io; 32 use std::num::Wrapping; 33 use std::ops::Deref; 34 use std::os::unix::io::AsRawFd; 35 use std::path::PathBuf; 36 use std::result; 37 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 38 use std::sync::{Arc, Barrier}; 39 use thiserror::Error; 40 use versionize::{VersionMap, Versionize, VersionizeResult}; 41 use versionize_derive::Versionize; 42 use virtio_bindings::virtio_blk::*; 43 use virtio_bindings::virtio_config::*; 44 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 45 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 46 use vm_migration::VersionMapped; 47 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 48 use vm_virtio::AccessPlatform; 49 use vmm_sys_util::eventfd::EventFd; 50 51 const SECTOR_SHIFT: u8 = 9; 52 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 53 54 // New descriptors are pending on the virtio queue. 55 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 56 // New completed tasks are pending on the completion ring. 57 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 58 // New 'wake up' event from the rate limiter 59 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 60 61 // latency scale, for reduce precision loss in calculate. 62 const LATENCY_SCALE: u64 = 10000; 63 64 #[derive(Error, Debug)] 65 pub enum Error { 66 #[error("Failed to parse the request: {0}")] 67 RequestParsing(block::Error), 68 #[error("Failed to execute the request: {0}")] 69 RequestExecuting(block::ExecuteError), 70 #[error("Failed to complete the request: {0}")] 71 RequestCompleting(block::Error), 72 #[error("Missing the expected entry in the list of requests")] 73 MissingEntryRequestList, 74 #[error("The asynchronous request returned with failure")] 75 AsyncRequestFailure, 76 #[error("Failed synchronizing the file: {0}")] 77 Fsync(AsyncIoError), 78 #[error("Failed adding used index: {0}")] 79 QueueAddUsed(virtio_queue::Error), 80 #[error("Failed creating an iterator over the queue: {0}")] 81 QueueIterator(virtio_queue::Error), 82 #[error("Failed to update request status: {0}")] 83 RequestStatus(GuestMemoryError), 84 } 85 86 pub type Result<T> = result::Result<T, Error>; 87 88 // latency will be records as microseconds, average latency 89 // will be save as scaled value. 90 #[derive(Clone)] 91 pub struct BlockCounters { 92 read_bytes: Arc<AtomicU64>, 93 read_ops: Arc<AtomicU64>, 94 read_latency_min: Arc<AtomicU64>, 95 read_latency_max: Arc<AtomicU64>, 96 read_latency_avg: Arc<AtomicU64>, 97 write_bytes: Arc<AtomicU64>, 98 write_ops: Arc<AtomicU64>, 99 write_latency_min: Arc<AtomicU64>, 100 write_latency_max: Arc<AtomicU64>, 101 write_latency_avg: Arc<AtomicU64>, 102 } 103 104 impl Default for BlockCounters { 105 fn default() -> Self { 106 BlockCounters { 107 read_bytes: Arc::new(AtomicU64::new(0)), 108 read_ops: Arc::new(AtomicU64::new(0)), 109 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 110 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 111 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 112 write_bytes: Arc::new(AtomicU64::new(0)), 113 write_ops: Arc::new(AtomicU64::new(0)), 114 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 115 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 116 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 117 } 118 } 119 } 120 121 struct BlockEpollHandler { 122 queue_index: u16, 123 queue: Queue, 124 mem: GuestMemoryAtomic<GuestMemoryMmap>, 125 disk_image: Box<dyn AsyncIo>, 126 disk_nsectors: u64, 127 interrupt_cb: Arc<dyn VirtioInterrupt>, 128 serial: Vec<u8>, 129 kill_evt: EventFd, 130 pause_evt: EventFd, 131 writeback: Arc<AtomicBool>, 132 counters: BlockCounters, 133 queue_evt: EventFd, 134 inflight_requests: VecDeque<(u16, Request)>, 135 rate_limiter: Option<RateLimiterGroupHandle>, 136 access_platform: Option<Arc<dyn AccessPlatform>>, 137 read_only: bool, 138 host_cpus: Option<Vec<usize>>, 139 } 140 141 impl BlockEpollHandler { 142 fn process_queue_submit(&mut self) -> Result<bool> { 143 let queue = &mut self.queue; 144 145 let mut used_descs = false; 146 147 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 148 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 149 .map_err(Error::RequestParsing)?; 150 151 // For virtio spec compliance 152 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 153 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 154 if self.read_only 155 && (request.request_type == RequestType::Out 156 || request.request_type == RequestType::Flush) 157 { 158 desc_chain 159 .memory() 160 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 161 .map_err(Error::RequestStatus)?; 162 163 // If no asynchronous operation has been submitted, we can 164 // simply return the used descriptor. 165 queue 166 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 167 .map_err(Error::QueueAddUsed)?; 168 used_descs = true; 169 continue; 170 } 171 172 if let Some(rate_limiter) = &mut self.rate_limiter { 173 // If limiter.consume() fails it means there is no more TokenType::Ops 174 // budget and rate limiting is in effect. 175 if !rate_limiter.consume(1, TokenType::Ops) { 176 // Stop processing the queue and return this descriptor chain to the 177 // avail ring, for later processing. 178 queue.go_to_previous_position(); 179 break; 180 } 181 // Exercise the rate limiter only if this request is of data transfer type. 182 if request.request_type == RequestType::In 183 || request.request_type == RequestType::Out 184 { 185 let mut bytes = Wrapping(0); 186 for (_, data_len) in &request.data_descriptors { 187 bytes += Wrapping(*data_len as u64); 188 } 189 190 // If limiter.consume() fails it means there is no more TokenType::Bytes 191 // budget and rate limiting is in effect. 192 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 193 // Revert the OPS consume(). 194 rate_limiter.manual_replenish(1, TokenType::Ops); 195 // Stop processing the queue and return this descriptor chain to the 196 // avail ring, for later processing. 197 queue.go_to_previous_position(); 198 break; 199 } 200 }; 201 } 202 203 request.set_writeback(self.writeback.load(Ordering::Acquire)); 204 205 if request 206 .execute_async( 207 desc_chain.memory(), 208 self.disk_nsectors, 209 self.disk_image.as_mut(), 210 &self.serial, 211 desc_chain.head_index() as u64, 212 ) 213 .map_err(Error::RequestExecuting)? 214 { 215 self.inflight_requests 216 .push_back((desc_chain.head_index(), request)); 217 } else { 218 desc_chain 219 .memory() 220 .write_obj(VIRTIO_BLK_S_OK, request.status_addr) 221 .map_err(Error::RequestStatus)?; 222 223 // If no asynchronous operation has been submitted, we can 224 // simply return the used descriptor. 225 queue 226 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 227 .map_err(Error::QueueAddUsed)?; 228 used_descs = true; 229 } 230 } 231 232 Ok(used_descs) 233 } 234 235 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 236 let needs_notification = self.process_queue_submit().map_err(|e| { 237 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 238 })?; 239 240 if needs_notification { 241 self.signal_used_queue().map_err(|e| { 242 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 243 })? 244 }; 245 246 Ok(()) 247 } 248 249 #[inline] 250 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 251 // This loop neatly handles the fast path where the completions are 252 // in order (it turng into just a pop_front()) and the 1% of the time 253 // (analysis during boot) where slight out of ordering has been 254 // observed e.g. 255 // Submissions: 1 2 3 4 5 6 7 256 // Completions: 2 1 3 5 4 7 6 257 // In this case find the corresponding item and swap it with the front 258 // This is a O(1) operation and is prepared for the future as it it likely 259 // the next completion would be for the one that was skipped which will 260 // now be the new front. 261 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 262 if head == &completed_head { 263 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 264 } 265 } 266 267 Err(Error::MissingEntryRequestList) 268 } 269 270 fn process_queue_complete(&mut self) -> Result<bool> { 271 let mut used_descs = false; 272 let mem = self.mem.memory(); 273 let mut read_bytes = Wrapping(0); 274 let mut write_bytes = Wrapping(0); 275 let mut read_ops = Wrapping(0); 276 let mut write_ops = Wrapping(0); 277 278 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 279 let desc_index = user_data as u16; 280 281 let mut request = self.find_inflight_request(desc_index)?; 282 283 request.complete_async().map_err(Error::RequestCompleting)?; 284 285 let latency = request.start.elapsed().as_micros() as u64; 286 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 287 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 288 let read_max = self.counters.read_latency_max.load(Ordering::Relaxed); 289 let write_max = self.counters.write_latency_max.load(Ordering::Relaxed); 290 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 291 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 292 let (status, len) = if result >= 0 { 293 match request.request_type { 294 RequestType::In => { 295 for (_, data_len) in &request.data_descriptors { 296 read_bytes += Wrapping(*data_len as u64); 297 } 298 read_ops += Wrapping(1); 299 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 300 self.counters 301 .read_latency_min 302 .store(latency, Ordering::Relaxed); 303 } 304 if latency > read_max || read_max == u64::MAX { 305 self.counters 306 .read_latency_max 307 .store(latency, Ordering::Relaxed); 308 } 309 310 // Special case the first real latency report 311 read_avg = if read_avg == u64::MAX { 312 latency * LATENCY_SCALE 313 } else { 314 // Cumulative average is guaranteed to be 315 // positive if being calculated properly 316 (read_avg as i64 317 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64) 318 / (read_ops_last + read_ops.0) as i64) 319 .try_into() 320 .unwrap() 321 }; 322 } 323 RequestType::Out => { 324 if !request.writeback { 325 self.disk_image.fsync(None).map_err(Error::Fsync)?; 326 } 327 for (_, data_len) in &request.data_descriptors { 328 write_bytes += Wrapping(*data_len as u64); 329 } 330 write_ops += Wrapping(1); 331 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 332 self.counters 333 .write_latency_min 334 .store(latency, Ordering::Relaxed); 335 } 336 if latency > write_max || write_max == u64::MAX { 337 self.counters 338 .write_latency_max 339 .store(latency, Ordering::Relaxed); 340 } 341 342 // Special case the first real latency report 343 write_avg = if write_avg == u64::MAX { 344 latency * LATENCY_SCALE 345 } else { 346 // Cumulative average is guaranteed to be 347 // positive if being calculated properly 348 (write_avg as i64 349 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64) 350 / (write_ops_last + write_ops.0) as i64) 351 .try_into() 352 .unwrap() 353 } 354 } 355 _ => {} 356 } 357 358 self.counters 359 .read_latency_avg 360 .store(read_avg, Ordering::Relaxed); 361 362 self.counters 363 .write_latency_avg 364 .store(write_avg, Ordering::Relaxed); 365 366 (VIRTIO_BLK_S_OK, result as u32) 367 } else { 368 error!( 369 "Request failed: {:x?} {:?}", 370 request, 371 io::Error::from_raw_os_error(-result) 372 ); 373 return Err(Error::AsyncRequestFailure); 374 }; 375 376 mem.write_obj(status, request.status_addr) 377 .map_err(Error::RequestStatus)?; 378 379 let queue = &mut self.queue; 380 381 queue 382 .add_used(mem.deref(), desc_index, len) 383 .map_err(Error::QueueAddUsed)?; 384 used_descs = true; 385 } 386 387 self.counters 388 .write_bytes 389 .fetch_add(write_bytes.0, Ordering::AcqRel); 390 self.counters 391 .write_ops 392 .fetch_add(write_ops.0, Ordering::AcqRel); 393 394 self.counters 395 .read_bytes 396 .fetch_add(read_bytes.0, Ordering::AcqRel); 397 self.counters 398 .read_ops 399 .fetch_add(read_ops.0, Ordering::AcqRel); 400 401 Ok(used_descs) 402 } 403 404 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 405 self.interrupt_cb 406 .trigger(VirtioInterruptType::Queue(self.queue_index)) 407 .map_err(|e| { 408 error!("Failed to signal used queue: {:?}", e); 409 DeviceError::FailedSignalingUsedQueue(e) 410 }) 411 } 412 413 fn set_queue_thread_affinity(&self) { 414 // Prepare the CPU set the current queue thread is expected to run onto. 415 let cpuset = self.host_cpus.as_ref().map(|host_cpus| { 416 // SAFETY: all zeros is a valid pattern 417 let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; 418 // SAFETY: FFI call, trivially safe 419 unsafe { libc::CPU_ZERO(&mut cpuset) }; 420 for host_cpu in host_cpus { 421 // SAFETY: FFI call, trivially safe 422 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; 423 } 424 cpuset 425 }); 426 427 // Schedule the thread to run on the expected CPU set 428 if let Some(cpuset) = cpuset.as_ref() { 429 // SAFETY: FFI call with correct arguments 430 let ret = unsafe { 431 libc::sched_setaffinity( 432 0, 433 std::mem::size_of::<libc::cpu_set_t>(), 434 cpuset as *const libc::cpu_set_t, 435 ) 436 }; 437 438 if ret != 0 { 439 error!( 440 "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", 441 self.queue_index, 442 io::Error::last_os_error() 443 ) 444 } 445 } 446 } 447 448 fn run( 449 &mut self, 450 paused: Arc<AtomicBool>, 451 paused_sync: Arc<Barrier>, 452 ) -> result::Result<(), EpollHelperError> { 453 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 454 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 455 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 456 if let Some(rate_limiter) = &self.rate_limiter { 457 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 458 } 459 self.set_queue_thread_affinity(); 460 helper.run(paused, paused_sync, self)?; 461 462 Ok(()) 463 } 464 } 465 466 impl EpollHelperHandler for BlockEpollHandler { 467 fn handle_event( 468 &mut self, 469 _helper: &mut EpollHelper, 470 event: &epoll::Event, 471 ) -> result::Result<(), EpollHelperError> { 472 let ev_type = event.data as u16; 473 match ev_type { 474 QUEUE_AVAIL_EVENT => { 475 self.queue_evt.read().map_err(|e| { 476 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 477 })?; 478 479 let rate_limit_reached = 480 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 481 482 // Process the queue only when the rate limit is not reached 483 if !rate_limit_reached { 484 self.process_queue_submit_and_signal()? 485 } 486 } 487 COMPLETION_EVENT => { 488 self.disk_image.notifier().read().map_err(|e| { 489 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 490 })?; 491 492 let needs_notification = self.process_queue_complete().map_err(|e| { 493 EpollHelperError::HandleEvent(anyhow!( 494 "Failed to process queue (complete): {:?}", 495 e 496 )) 497 })?; 498 499 if needs_notification { 500 self.signal_used_queue().map_err(|e| { 501 EpollHelperError::HandleEvent(anyhow!( 502 "Failed to signal used queue: {:?}", 503 e 504 )) 505 })?; 506 } 507 } 508 RATE_LIMITER_EVENT => { 509 if let Some(rate_limiter) = &mut self.rate_limiter { 510 // Upon rate limiter event, call the rate limiter handler 511 // and restart processing the queue. 512 rate_limiter.event_handler().map_err(|e| { 513 EpollHelperError::HandleEvent(anyhow!( 514 "Failed to process rate limiter event: {:?}", 515 e 516 )) 517 })?; 518 519 self.process_queue_submit_and_signal()? 520 } else { 521 return Err(EpollHelperError::HandleEvent(anyhow!( 522 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 523 ))); 524 } 525 } 526 _ => { 527 return Err(EpollHelperError::HandleEvent(anyhow!( 528 "Unexpected event: {}", 529 ev_type 530 ))); 531 } 532 } 533 Ok(()) 534 } 535 } 536 537 /// Virtio device for exposing block level read/write operations on a host file. 538 pub struct Block { 539 common: VirtioCommon, 540 id: String, 541 disk_image: Box<dyn DiskFile>, 542 disk_path: PathBuf, 543 disk_nsectors: u64, 544 config: VirtioBlockConfig, 545 writeback: Arc<AtomicBool>, 546 counters: BlockCounters, 547 seccomp_action: SeccompAction, 548 rate_limiter: Option<Arc<RateLimiterGroup>>, 549 exit_evt: EventFd, 550 read_only: bool, 551 serial: Vec<u8>, 552 queue_affinity: BTreeMap<u16, Vec<usize>>, 553 } 554 555 #[derive(Versionize)] 556 pub struct BlockState { 557 pub disk_path: String, 558 pub disk_nsectors: u64, 559 pub avail_features: u64, 560 pub acked_features: u64, 561 pub config: VirtioBlockConfig, 562 } 563 564 impl VersionMapped for BlockState {} 565 566 impl Block { 567 /// Create a new virtio block device that operates on the given file. 568 #[allow(clippy::too_many_arguments)] 569 pub fn new( 570 id: String, 571 mut disk_image: Box<dyn DiskFile>, 572 disk_path: PathBuf, 573 read_only: bool, 574 iommu: bool, 575 num_queues: usize, 576 queue_size: u16, 577 serial: Option<String>, 578 seccomp_action: SeccompAction, 579 rate_limiter: Option<Arc<RateLimiterGroup>>, 580 exit_evt: EventFd, 581 state: Option<BlockState>, 582 queue_affinity: BTreeMap<u16, Vec<usize>>, 583 ) -> io::Result<Self> { 584 let (disk_nsectors, avail_features, acked_features, config, paused) = 585 if let Some(state) = state { 586 info!("Restoring virtio-block {}", id); 587 ( 588 state.disk_nsectors, 589 state.avail_features, 590 state.acked_features, 591 state.config, 592 true, 593 ) 594 } else { 595 let disk_size = disk_image.size().map_err(|e| { 596 io::Error::new( 597 io::ErrorKind::Other, 598 format!("Failed getting disk size: {e}"), 599 ) 600 })?; 601 if disk_size % SECTOR_SIZE != 0 { 602 warn!( 603 "Disk size {} is not a multiple of sector size {}; \ 604 the remainder will not be visible to the guest.", 605 disk_size, SECTOR_SIZE 606 ); 607 } 608 609 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 610 | (1u64 << VIRTIO_BLK_F_FLUSH) 611 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 612 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 613 | (1u64 << VIRTIO_BLK_F_TOPOLOGY); 614 615 if iommu { 616 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 617 } 618 619 if read_only { 620 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 621 } 622 623 let topology = disk_image.topology(); 624 info!("Disk topology: {:?}", topology); 625 626 let logical_block_size = if topology.logical_block_size > 512 { 627 topology.logical_block_size 628 } else { 629 512 630 }; 631 632 // Calculate the exponent that maps physical block to logical block 633 let mut physical_block_exp = 0; 634 let mut size = logical_block_size; 635 while size < topology.physical_block_size { 636 physical_block_exp += 1; 637 size <<= 1; 638 } 639 640 let disk_nsectors = disk_size / SECTOR_SIZE; 641 let mut config = VirtioBlockConfig { 642 capacity: disk_nsectors, 643 writeback: 1, 644 blk_size: topology.logical_block_size as u32, 645 physical_block_exp, 646 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 647 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 648 ..Default::default() 649 }; 650 651 if num_queues > 1 { 652 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 653 config.num_queues = num_queues as u16; 654 } 655 656 (disk_nsectors, avail_features, 0, config, false) 657 }; 658 659 let serial = serial 660 .map(Vec::from) 661 .unwrap_or_else(|| build_serial(&disk_path)); 662 663 Ok(Block { 664 common: VirtioCommon { 665 device_type: VirtioDeviceType::Block as u32, 666 avail_features, 667 acked_features, 668 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 669 queue_sizes: vec![queue_size; num_queues], 670 min_queues: 1, 671 paused: Arc::new(AtomicBool::new(paused)), 672 ..Default::default() 673 }, 674 id, 675 disk_image, 676 disk_path, 677 disk_nsectors, 678 config, 679 writeback: Arc::new(AtomicBool::new(true)), 680 counters: BlockCounters::default(), 681 seccomp_action, 682 rate_limiter, 683 exit_evt, 684 read_only, 685 serial, 686 queue_affinity, 687 }) 688 } 689 690 fn state(&self) -> BlockState { 691 BlockState { 692 disk_path: self.disk_path.to_str().unwrap().to_owned(), 693 disk_nsectors: self.disk_nsectors, 694 avail_features: self.common.avail_features, 695 acked_features: self.common.acked_features, 696 config: self.config, 697 } 698 } 699 700 fn update_writeback(&mut self) { 701 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 702 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 703 self.config.writeback == 1 704 } else { 705 // Else check if VIRTIO_BLK_F_FLUSH negotiated 706 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 707 }; 708 709 info!( 710 "Changing cache mode to {}", 711 if writeback { 712 "writeback" 713 } else { 714 "writethrough" 715 } 716 ); 717 self.writeback.store(writeback, Ordering::Release); 718 } 719 720 #[cfg(fuzzing)] 721 pub fn wait_for_epoll_threads(&mut self) { 722 self.common.wait_for_epoll_threads(); 723 } 724 } 725 726 impl Drop for Block { 727 fn drop(&mut self) { 728 if let Some(kill_evt) = self.common.kill_evt.take() { 729 // Ignore the result because there is nothing we can do about it. 730 let _ = kill_evt.write(1); 731 } 732 self.common.wait_for_epoll_threads(); 733 } 734 } 735 736 impl VirtioDevice for Block { 737 fn device_type(&self) -> u32 { 738 self.common.device_type 739 } 740 741 fn queue_max_sizes(&self) -> &[u16] { 742 &self.common.queue_sizes 743 } 744 745 fn features(&self) -> u64 { 746 self.common.avail_features 747 } 748 749 fn ack_features(&mut self, value: u64) { 750 self.common.ack_features(value) 751 } 752 753 fn read_config(&self, offset: u64, data: &mut [u8]) { 754 self.read_config_from_slice(self.config.as_slice(), offset, data); 755 } 756 757 fn write_config(&mut self, offset: u64, data: &[u8]) { 758 // The "writeback" field is the only mutable field 759 let writeback_offset = 760 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 761 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 762 { 763 error!( 764 "Attempt to write to read-only field: offset {:x} length {}", 765 offset, 766 data.len() 767 ); 768 return; 769 } 770 771 self.config.writeback = data[0]; 772 self.update_writeback(); 773 } 774 775 fn activate( 776 &mut self, 777 mem: GuestMemoryAtomic<GuestMemoryMmap>, 778 interrupt_cb: Arc<dyn VirtioInterrupt>, 779 mut queues: Vec<(usize, Queue, EventFd)>, 780 ) -> ActivateResult { 781 self.common.activate(&queues, &interrupt_cb)?; 782 783 self.update_writeback(); 784 785 let mut epoll_threads = Vec::new(); 786 for i in 0..queues.len() { 787 let (_, queue, queue_evt) = queues.remove(0); 788 let queue_size = queue.size(); 789 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 790 let queue_idx = i as u16; 791 792 let mut handler = BlockEpollHandler { 793 queue_index: queue_idx, 794 queue, 795 mem: mem.clone(), 796 disk_image: self 797 .disk_image 798 .new_async_io(queue_size as u32) 799 .map_err(|e| { 800 error!("failed to create new AsyncIo: {}", e); 801 ActivateError::BadActivate 802 })?, 803 disk_nsectors: self.disk_nsectors, 804 interrupt_cb: interrupt_cb.clone(), 805 serial: self.serial.clone(), 806 kill_evt, 807 pause_evt, 808 writeback: self.writeback.clone(), 809 counters: self.counters.clone(), 810 queue_evt, 811 // Analysis during boot shows around ~40 maximum requests 812 // This gives head room for systems with slower I/O without 813 // compromising the cost of the reallocation or memory overhead 814 inflight_requests: VecDeque::with_capacity(64), 815 rate_limiter: self 816 .rate_limiter 817 .as_ref() 818 .map(|r| r.new_handle()) 819 .transpose() 820 .unwrap(), 821 access_platform: self.common.access_platform.clone(), 822 read_only: self.read_only, 823 host_cpus: self.queue_affinity.get(&queue_idx).cloned(), 824 }; 825 826 let paused = self.common.paused.clone(); 827 let paused_sync = self.common.paused_sync.clone(); 828 829 spawn_virtio_thread( 830 &format!("{}_q{}", self.id.clone(), i), 831 &self.seccomp_action, 832 Thread::VirtioBlock, 833 &mut epoll_threads, 834 &self.exit_evt, 835 move || handler.run(paused, paused_sync.unwrap()), 836 )?; 837 } 838 839 self.common.epoll_threads = Some(epoll_threads); 840 event!("virtio-device", "activated", "id", &self.id); 841 842 Ok(()) 843 } 844 845 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 846 let result = self.common.reset(); 847 event!("virtio-device", "reset", "id", &self.id); 848 result 849 } 850 851 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 852 let mut counters = HashMap::new(); 853 854 counters.insert( 855 "read_bytes", 856 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 857 ); 858 counters.insert( 859 "write_bytes", 860 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 861 ); 862 counters.insert( 863 "read_ops", 864 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 865 ); 866 counters.insert( 867 "write_ops", 868 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 869 ); 870 counters.insert( 871 "write_latency_min", 872 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 873 ); 874 counters.insert( 875 "write_latency_max", 876 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 877 ); 878 counters.insert( 879 "write_latency_avg", 880 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 881 ); 882 counters.insert( 883 "read_latency_min", 884 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 885 ); 886 counters.insert( 887 "read_latency_max", 888 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 889 ); 890 counters.insert( 891 "read_latency_avg", 892 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 893 ); 894 895 Some(counters) 896 } 897 898 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 899 self.common.set_access_platform(access_platform) 900 } 901 } 902 903 impl Pausable for Block { 904 fn pause(&mut self) -> result::Result<(), MigratableError> { 905 self.common.pause() 906 } 907 908 fn resume(&mut self) -> result::Result<(), MigratableError> { 909 self.common.resume() 910 } 911 } 912 913 impl Snapshottable for Block { 914 fn id(&self) -> String { 915 self.id.clone() 916 } 917 918 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 919 Snapshot::new_from_versioned_state(&self.state()) 920 } 921 } 922 impl Transportable for Block {} 923 impl Migratable for Block {} 924