1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use std::collections::{BTreeMap, HashMap, VecDeque}; 12 use std::num::Wrapping; 13 use std::ops::Deref; 14 use std::os::unix::io::AsRawFd; 15 use std::path::PathBuf; 16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 17 use std::sync::{Arc, Barrier}; 18 use std::{io, result}; 19 20 use anyhow::anyhow; 21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile}; 22 use block::{build_serial, Request, RequestType, VirtioBlockConfig}; 23 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; 24 use rate_limiter::TokenType; 25 use seccompiler::SeccompAction; 26 use serde::{Deserialize, Serialize}; 27 use thiserror::Error; 28 use virtio_bindings::virtio_blk::*; 29 use virtio_bindings::virtio_config::*; 30 use virtio_bindings::virtio_ring::{VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC}; 31 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 32 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 33 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 34 use vm_virtio::AccessPlatform; 35 use vmm_sys_util::eventfd::EventFd; 36 37 use super::{ 38 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, 39 Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, 40 EPOLL_HELPER_EVENT_LAST, 41 }; 42 use crate::seccomp_filters::Thread; 43 use crate::thread_helper::spawn_virtio_thread; 44 use crate::{GuestMemoryMmap, VirtioInterrupt}; 45 46 const SECTOR_SHIFT: u8 = 9; 47 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 48 49 // New descriptors are pending on the virtio queue. 50 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 51 // New completed tasks are pending on the completion ring. 52 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 53 // New 'wake up' event from the rate limiter 54 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 55 56 // latency scale, for reduce precision loss in calculate. 57 const LATENCY_SCALE: u64 = 10000; 58 59 #[derive(Error, Debug)] 60 pub enum Error { 61 #[error("Failed to parse the request: {0}")] 62 RequestParsing(block::Error), 63 #[error("Failed to execute the request: {0}")] 64 RequestExecuting(block::ExecuteError), 65 #[error("Failed to complete the request: {0}")] 66 RequestCompleting(block::Error), 67 #[error("Missing the expected entry in the list of requests")] 68 MissingEntryRequestList, 69 #[error("The asynchronous request returned with failure")] 70 AsyncRequestFailure, 71 #[error("Failed synchronizing the file: {0}")] 72 Fsync(AsyncIoError), 73 #[error("Failed adding used index: {0}")] 74 QueueAddUsed(virtio_queue::Error), 75 #[error("Failed creating an iterator over the queue: {0}")] 76 QueueIterator(virtio_queue::Error), 77 #[error("Failed to update request status: {0}")] 78 RequestStatus(GuestMemoryError), 79 #[error("Failed to enable notification: {0}")] 80 QueueEnableNotification(virtio_queue::Error), 81 } 82 83 pub type Result<T> = result::Result<T, Error>; 84 85 // latency will be records as microseconds, average latency 86 // will be save as scaled value. 87 #[derive(Clone)] 88 pub struct BlockCounters { 89 read_bytes: Arc<AtomicU64>, 90 read_ops: Arc<AtomicU64>, 91 read_latency_min: Arc<AtomicU64>, 92 read_latency_max: Arc<AtomicU64>, 93 read_latency_avg: Arc<AtomicU64>, 94 write_bytes: Arc<AtomicU64>, 95 write_ops: Arc<AtomicU64>, 96 write_latency_min: Arc<AtomicU64>, 97 write_latency_max: Arc<AtomicU64>, 98 write_latency_avg: Arc<AtomicU64>, 99 } 100 101 impl Default for BlockCounters { 102 fn default() -> Self { 103 BlockCounters { 104 read_bytes: Arc::new(AtomicU64::new(0)), 105 read_ops: Arc::new(AtomicU64::new(0)), 106 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 107 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 108 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 109 write_bytes: Arc::new(AtomicU64::new(0)), 110 write_ops: Arc::new(AtomicU64::new(0)), 111 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 112 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 113 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 114 } 115 } 116 } 117 118 struct BlockEpollHandler { 119 queue_index: u16, 120 queue: Queue, 121 mem: GuestMemoryAtomic<GuestMemoryMmap>, 122 disk_image: Box<dyn AsyncIo>, 123 disk_nsectors: u64, 124 interrupt_cb: Arc<dyn VirtioInterrupt>, 125 serial: Vec<u8>, 126 kill_evt: EventFd, 127 pause_evt: EventFd, 128 writeback: Arc<AtomicBool>, 129 counters: BlockCounters, 130 queue_evt: EventFd, 131 inflight_requests: VecDeque<(u16, Request)>, 132 rate_limiter: Option<RateLimiterGroupHandle>, 133 access_platform: Option<Arc<dyn AccessPlatform>>, 134 read_only: bool, 135 host_cpus: Option<Vec<usize>>, 136 } 137 138 impl BlockEpollHandler { 139 fn process_queue_submit(&mut self) -> Result<()> { 140 let queue = &mut self.queue; 141 142 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 143 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 144 .map_err(Error::RequestParsing)?; 145 146 // For virtio spec compliance 147 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 148 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 149 if self.read_only 150 && (request.request_type == RequestType::Out 151 || request.request_type == RequestType::Flush) 152 { 153 desc_chain 154 .memory() 155 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 156 .map_err(Error::RequestStatus)?; 157 158 // If no asynchronous operation has been submitted, we can 159 // simply return the used descriptor. 160 queue 161 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 162 .map_err(Error::QueueAddUsed)?; 163 queue 164 .enable_notification(self.mem.memory().deref()) 165 .map_err(Error::QueueEnableNotification)?; 166 continue; 167 } 168 169 if let Some(rate_limiter) = &mut self.rate_limiter { 170 // If limiter.consume() fails it means there is no more TokenType::Ops 171 // budget and rate limiting is in effect. 172 if !rate_limiter.consume(1, TokenType::Ops) { 173 // Stop processing the queue and return this descriptor chain to the 174 // avail ring, for later processing. 175 queue.go_to_previous_position(); 176 break; 177 } 178 // Exercise the rate limiter only if this request is of data transfer type. 179 if request.request_type == RequestType::In 180 || request.request_type == RequestType::Out 181 { 182 let mut bytes = Wrapping(0); 183 for (_, data_len) in &request.data_descriptors { 184 bytes += Wrapping(*data_len as u64); 185 } 186 187 // If limiter.consume() fails it means there is no more TokenType::Bytes 188 // budget and rate limiting is in effect. 189 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 190 // Revert the OPS consume(). 191 rate_limiter.manual_replenish(1, TokenType::Ops); 192 // Stop processing the queue and return this descriptor chain to the 193 // avail ring, for later processing. 194 queue.go_to_previous_position(); 195 break; 196 } 197 }; 198 } 199 200 request.set_writeback(self.writeback.load(Ordering::Acquire)); 201 202 if request 203 .execute_async( 204 desc_chain.memory(), 205 self.disk_nsectors, 206 self.disk_image.as_mut(), 207 &self.serial, 208 desc_chain.head_index() as u64, 209 ) 210 .map_err(Error::RequestExecuting)? 211 { 212 self.inflight_requests 213 .push_back((desc_chain.head_index(), request)); 214 } else { 215 desc_chain 216 .memory() 217 .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr) 218 .map_err(Error::RequestStatus)?; 219 220 // If no asynchronous operation has been submitted, we can 221 // simply return the used descriptor. 222 queue 223 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 224 .map_err(Error::QueueAddUsed)?; 225 queue 226 .enable_notification(self.mem.memory().deref()) 227 .map_err(Error::QueueEnableNotification)?; 228 } 229 } 230 231 Ok(()) 232 } 233 234 fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> { 235 if self 236 .queue 237 .needs_notification(self.mem.memory().deref()) 238 .map_err(|e| { 239 EpollHelperError::HandleEvent(anyhow!( 240 "Failed to check needs_notification: {:?}", 241 e 242 )) 243 })? 244 { 245 self.signal_used_queue().map_err(|e| { 246 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 247 })?; 248 } 249 250 Ok(()) 251 } 252 253 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 254 self.process_queue_submit().map_err(|e| { 255 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 256 })?; 257 258 self.try_signal_used_queue() 259 } 260 261 #[inline] 262 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 263 // This loop neatly handles the fast path where the completions are 264 // in order (it turns into just a pop_front()) and the 1% of the time 265 // (analysis during boot) where slight out of ordering has been 266 // observed e.g. 267 // Submissions: 1 2 3 4 5 6 7 268 // Completions: 2 1 3 5 4 7 6 269 // In this case find the corresponding item and swap it with the front 270 // This is a O(1) operation and is prepared for the future as it it likely 271 // the next completion would be for the one that was skipped which will 272 // now be the new front. 273 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 274 if head == &completed_head { 275 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 276 } 277 } 278 279 Err(Error::MissingEntryRequestList) 280 } 281 282 fn process_queue_complete(&mut self) -> Result<()> { 283 let mem = self.mem.memory(); 284 let mut read_bytes = Wrapping(0); 285 let mut write_bytes = Wrapping(0); 286 let mut read_ops = Wrapping(0); 287 let mut write_ops = Wrapping(0); 288 289 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 290 let desc_index = user_data as u16; 291 292 let mut request = self.find_inflight_request(desc_index)?; 293 294 request.complete_async().map_err(Error::RequestCompleting)?; 295 296 let latency = request.start.elapsed().as_micros() as u64; 297 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 298 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 299 let read_max = self.counters.read_latency_max.load(Ordering::Relaxed); 300 let write_max = self.counters.write_latency_max.load(Ordering::Relaxed); 301 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 302 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 303 let (status, len) = if result >= 0 { 304 match request.request_type { 305 RequestType::In => { 306 for (_, data_len) in &request.data_descriptors { 307 read_bytes += Wrapping(*data_len as u64); 308 } 309 read_ops += Wrapping(1); 310 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 311 self.counters 312 .read_latency_min 313 .store(latency, Ordering::Relaxed); 314 } 315 if latency > read_max || read_max == u64::MAX { 316 self.counters 317 .read_latency_max 318 .store(latency, Ordering::Relaxed); 319 } 320 321 // Special case the first real latency report 322 read_avg = if read_avg == u64::MAX { 323 latency * LATENCY_SCALE 324 } else { 325 // Cumulative average is guaranteed to be 326 // positive if being calculated properly 327 (read_avg as i64 328 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64) 329 / (read_ops_last + read_ops.0) as i64) 330 .try_into() 331 .unwrap() 332 }; 333 } 334 RequestType::Out => { 335 if !request.writeback { 336 self.disk_image.fsync(None).map_err(Error::Fsync)?; 337 } 338 for (_, data_len) in &request.data_descriptors { 339 write_bytes += Wrapping(*data_len as u64); 340 } 341 write_ops += Wrapping(1); 342 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 343 self.counters 344 .write_latency_min 345 .store(latency, Ordering::Relaxed); 346 } 347 if latency > write_max || write_max == u64::MAX { 348 self.counters 349 .write_latency_max 350 .store(latency, Ordering::Relaxed); 351 } 352 353 // Special case the first real latency report 354 write_avg = if write_avg == u64::MAX { 355 latency * LATENCY_SCALE 356 } else { 357 // Cumulative average is guaranteed to be 358 // positive if being calculated properly 359 (write_avg as i64 360 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64) 361 / (write_ops_last + write_ops.0) as i64) 362 .try_into() 363 .unwrap() 364 } 365 } 366 _ => {} 367 } 368 369 self.counters 370 .read_latency_avg 371 .store(read_avg, Ordering::Relaxed); 372 373 self.counters 374 .write_latency_avg 375 .store(write_avg, Ordering::Relaxed); 376 377 (VIRTIO_BLK_S_OK as u8, result as u32) 378 } else { 379 error!( 380 "Request failed: {:x?} {:?}", 381 request, 382 io::Error::from_raw_os_error(-result) 383 ); 384 return Err(Error::AsyncRequestFailure); 385 }; 386 387 mem.write_obj(status, request.status_addr) 388 .map_err(Error::RequestStatus)?; 389 390 let queue = &mut self.queue; 391 392 queue 393 .add_used(mem.deref(), desc_index, len) 394 .map_err(Error::QueueAddUsed)?; 395 queue 396 .enable_notification(mem.deref()) 397 .map_err(Error::QueueEnableNotification)?; 398 } 399 400 self.counters 401 .write_bytes 402 .fetch_add(write_bytes.0, Ordering::AcqRel); 403 self.counters 404 .write_ops 405 .fetch_add(write_ops.0, Ordering::AcqRel); 406 407 self.counters 408 .read_bytes 409 .fetch_add(read_bytes.0, Ordering::AcqRel); 410 self.counters 411 .read_ops 412 .fetch_add(read_ops.0, Ordering::AcqRel); 413 414 Ok(()) 415 } 416 417 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 418 self.interrupt_cb 419 .trigger(VirtioInterruptType::Queue(self.queue_index)) 420 .map_err(|e| { 421 error!("Failed to signal used queue: {:?}", e); 422 DeviceError::FailedSignalingUsedQueue(e) 423 }) 424 } 425 426 fn set_queue_thread_affinity(&self) { 427 // Prepare the CPU set the current queue thread is expected to run onto. 428 let cpuset = self.host_cpus.as_ref().map(|host_cpus| { 429 // SAFETY: all zeros is a valid pattern 430 let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; 431 // SAFETY: FFI call, trivially safe 432 unsafe { libc::CPU_ZERO(&mut cpuset) }; 433 for host_cpu in host_cpus { 434 // SAFETY: FFI call, trivially safe 435 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; 436 } 437 cpuset 438 }); 439 440 // Schedule the thread to run on the expected CPU set 441 if let Some(cpuset) = cpuset.as_ref() { 442 // SAFETY: FFI call with correct arguments 443 let ret = unsafe { 444 libc::sched_setaffinity( 445 0, 446 std::mem::size_of::<libc::cpu_set_t>(), 447 cpuset as *const libc::cpu_set_t, 448 ) 449 }; 450 451 if ret != 0 { 452 error!( 453 "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", 454 self.queue_index, 455 io::Error::last_os_error() 456 ) 457 } 458 } 459 } 460 461 fn run( 462 &mut self, 463 paused: Arc<AtomicBool>, 464 paused_sync: Arc<Barrier>, 465 ) -> result::Result<(), EpollHelperError> { 466 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 467 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 468 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 469 if let Some(rate_limiter) = &self.rate_limiter { 470 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 471 } 472 self.set_queue_thread_affinity(); 473 helper.run(paused, paused_sync, self)?; 474 475 Ok(()) 476 } 477 } 478 479 impl EpollHelperHandler for BlockEpollHandler { 480 fn handle_event( 481 &mut self, 482 _helper: &mut EpollHelper, 483 event: &epoll::Event, 484 ) -> result::Result<(), EpollHelperError> { 485 let ev_type = event.data as u16; 486 match ev_type { 487 QUEUE_AVAIL_EVENT => { 488 self.queue_evt.read().map_err(|e| { 489 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 490 })?; 491 492 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked()); 493 494 // Process the queue only when the rate limit is not reached 495 if !rate_limit_reached { 496 self.process_queue_submit_and_signal()? 497 } 498 } 499 COMPLETION_EVENT => { 500 self.disk_image.notifier().read().map_err(|e| { 501 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 502 })?; 503 504 self.process_queue_complete().map_err(|e| { 505 EpollHelperError::HandleEvent(anyhow!( 506 "Failed to process queue (complete): {:?}", 507 e 508 )) 509 })?; 510 511 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked()); 512 513 // Process the queue only when the rate limit is not reached 514 if !rate_limit_reached { 515 self.process_queue_submit().map_err(|e| { 516 EpollHelperError::HandleEvent(anyhow!( 517 "Failed to process queue (submit): {:?}", 518 e 519 )) 520 })?; 521 } 522 self.try_signal_used_queue()?; 523 } 524 RATE_LIMITER_EVENT => { 525 if let Some(rate_limiter) = &mut self.rate_limiter { 526 // Upon rate limiter event, call the rate limiter handler 527 // and restart processing the queue. 528 rate_limiter.event_handler().map_err(|e| { 529 EpollHelperError::HandleEvent(anyhow!( 530 "Failed to process rate limiter event: {:?}", 531 e 532 )) 533 })?; 534 535 self.process_queue_submit_and_signal()? 536 } else { 537 return Err(EpollHelperError::HandleEvent(anyhow!( 538 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 539 ))); 540 } 541 } 542 _ => { 543 return Err(EpollHelperError::HandleEvent(anyhow!( 544 "Unexpected event: {}", 545 ev_type 546 ))); 547 } 548 } 549 Ok(()) 550 } 551 } 552 553 /// Virtio device for exposing block level read/write operations on a host file. 554 pub struct Block { 555 common: VirtioCommon, 556 id: String, 557 disk_image: Box<dyn DiskFile>, 558 disk_path: PathBuf, 559 disk_nsectors: u64, 560 config: VirtioBlockConfig, 561 writeback: Arc<AtomicBool>, 562 counters: BlockCounters, 563 seccomp_action: SeccompAction, 564 rate_limiter: Option<Arc<RateLimiterGroup>>, 565 exit_evt: EventFd, 566 read_only: bool, 567 serial: Vec<u8>, 568 queue_affinity: BTreeMap<u16, Vec<usize>>, 569 } 570 571 #[derive(Serialize, Deserialize)] 572 pub struct BlockState { 573 pub disk_path: String, 574 pub disk_nsectors: u64, 575 pub avail_features: u64, 576 pub acked_features: u64, 577 pub config: VirtioBlockConfig, 578 } 579 580 impl Block { 581 /// Create a new virtio block device that operates on the given file. 582 #[allow(clippy::too_many_arguments)] 583 pub fn new( 584 id: String, 585 mut disk_image: Box<dyn DiskFile>, 586 disk_path: PathBuf, 587 read_only: bool, 588 iommu: bool, 589 num_queues: usize, 590 queue_size: u16, 591 serial: Option<String>, 592 seccomp_action: SeccompAction, 593 rate_limiter: Option<Arc<RateLimiterGroup>>, 594 exit_evt: EventFd, 595 state: Option<BlockState>, 596 queue_affinity: BTreeMap<u16, Vec<usize>>, 597 ) -> io::Result<Self> { 598 let (disk_nsectors, avail_features, acked_features, config, paused) = 599 if let Some(state) = state { 600 info!("Restoring virtio-block {}", id); 601 ( 602 state.disk_nsectors, 603 state.avail_features, 604 state.acked_features, 605 state.config, 606 true, 607 ) 608 } else { 609 let disk_size = disk_image.size().map_err(|e| { 610 io::Error::new( 611 io::ErrorKind::Other, 612 format!("Failed getting disk size: {e}"), 613 ) 614 })?; 615 if disk_size % SECTOR_SIZE != 0 { 616 warn!( 617 "Disk size {} is not a multiple of sector size {}; \ 618 the remainder will not be visible to the guest.", 619 disk_size, SECTOR_SIZE 620 ); 621 } 622 623 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 624 | (1u64 << VIRTIO_BLK_F_FLUSH) 625 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 626 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 627 | (1u64 << VIRTIO_BLK_F_TOPOLOGY) 628 | (1u64 << VIRTIO_RING_F_EVENT_IDX) 629 | (1u64 << VIRTIO_RING_F_INDIRECT_DESC); 630 if iommu { 631 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 632 } 633 634 if read_only { 635 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 636 } 637 638 let topology = disk_image.topology(); 639 info!("Disk topology: {:?}", topology); 640 641 let logical_block_size = if topology.logical_block_size > 512 { 642 topology.logical_block_size 643 } else { 644 512 645 }; 646 647 // Calculate the exponent that maps physical block to logical block 648 let mut physical_block_exp = 0; 649 let mut size = logical_block_size; 650 while size < topology.physical_block_size { 651 physical_block_exp += 1; 652 size <<= 1; 653 } 654 655 let disk_nsectors = disk_size / SECTOR_SIZE; 656 let mut config = VirtioBlockConfig { 657 capacity: disk_nsectors, 658 writeback: 1, 659 blk_size: topology.logical_block_size as u32, 660 physical_block_exp, 661 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 662 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 663 ..Default::default() 664 }; 665 666 if num_queues > 1 { 667 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 668 config.num_queues = num_queues as u16; 669 } 670 671 (disk_nsectors, avail_features, 0, config, false) 672 }; 673 674 let serial = serial 675 .map(Vec::from) 676 .unwrap_or_else(|| build_serial(&disk_path)); 677 678 Ok(Block { 679 common: VirtioCommon { 680 device_type: VirtioDeviceType::Block as u32, 681 avail_features, 682 acked_features, 683 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 684 queue_sizes: vec![queue_size; num_queues], 685 min_queues: 1, 686 paused: Arc::new(AtomicBool::new(paused)), 687 ..Default::default() 688 }, 689 id, 690 disk_image, 691 disk_path, 692 disk_nsectors, 693 config, 694 writeback: Arc::new(AtomicBool::new(true)), 695 counters: BlockCounters::default(), 696 seccomp_action, 697 rate_limiter, 698 exit_evt, 699 read_only, 700 serial, 701 queue_affinity, 702 }) 703 } 704 705 fn state(&self) -> BlockState { 706 BlockState { 707 disk_path: self.disk_path.to_str().unwrap().to_owned(), 708 disk_nsectors: self.disk_nsectors, 709 avail_features: self.common.avail_features, 710 acked_features: self.common.acked_features, 711 config: self.config, 712 } 713 } 714 715 fn update_writeback(&mut self) { 716 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 717 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 718 self.config.writeback == 1 719 } else { 720 // Else check if VIRTIO_BLK_F_FLUSH negotiated 721 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 722 }; 723 724 info!( 725 "Changing cache mode to {}", 726 if writeback { 727 "writeback" 728 } else { 729 "writethrough" 730 } 731 ); 732 self.writeback.store(writeback, Ordering::Release); 733 } 734 735 #[cfg(fuzzing)] 736 pub fn wait_for_epoll_threads(&mut self) { 737 self.common.wait_for_epoll_threads(); 738 } 739 } 740 741 impl Drop for Block { 742 fn drop(&mut self) { 743 if let Some(kill_evt) = self.common.kill_evt.take() { 744 // Ignore the result because there is nothing we can do about it. 745 let _ = kill_evt.write(1); 746 } 747 self.common.wait_for_epoll_threads(); 748 } 749 } 750 751 impl VirtioDevice for Block { 752 fn device_type(&self) -> u32 { 753 self.common.device_type 754 } 755 756 fn queue_max_sizes(&self) -> &[u16] { 757 &self.common.queue_sizes 758 } 759 760 fn features(&self) -> u64 { 761 self.common.avail_features 762 } 763 764 fn ack_features(&mut self, value: u64) { 765 self.common.ack_features(value) 766 } 767 768 fn read_config(&self, offset: u64, data: &mut [u8]) { 769 self.read_config_from_slice(self.config.as_slice(), offset, data); 770 } 771 772 fn write_config(&mut self, offset: u64, data: &[u8]) { 773 // The "writeback" field is the only mutable field 774 let writeback_offset = 775 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 776 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 777 { 778 error!( 779 "Attempt to write to read-only field: offset {:x} length {}", 780 offset, 781 data.len() 782 ); 783 return; 784 } 785 786 self.config.writeback = data[0]; 787 self.update_writeback(); 788 } 789 790 fn activate( 791 &mut self, 792 mem: GuestMemoryAtomic<GuestMemoryMmap>, 793 interrupt_cb: Arc<dyn VirtioInterrupt>, 794 mut queues: Vec<(usize, Queue, EventFd)>, 795 ) -> ActivateResult { 796 self.common.activate(&queues, &interrupt_cb)?; 797 798 self.update_writeback(); 799 800 let mut epoll_threads = Vec::new(); 801 let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into()); 802 803 for i in 0..queues.len() { 804 let (_, mut queue, queue_evt) = queues.remove(0); 805 queue.set_event_idx(event_idx); 806 807 let queue_size = queue.size(); 808 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 809 let queue_idx = i as u16; 810 811 let mut handler = BlockEpollHandler { 812 queue_index: queue_idx, 813 queue, 814 mem: mem.clone(), 815 disk_image: self 816 .disk_image 817 .new_async_io(queue_size as u32) 818 .map_err(|e| { 819 error!("failed to create new AsyncIo: {}", e); 820 ActivateError::BadActivate 821 })?, 822 disk_nsectors: self.disk_nsectors, 823 interrupt_cb: interrupt_cb.clone(), 824 serial: self.serial.clone(), 825 kill_evt, 826 pause_evt, 827 writeback: self.writeback.clone(), 828 counters: self.counters.clone(), 829 queue_evt, 830 // Analysis during boot shows around ~40 maximum requests 831 // This gives head room for systems with slower I/O without 832 // compromising the cost of the reallocation or memory overhead 833 inflight_requests: VecDeque::with_capacity(64), 834 rate_limiter: self 835 .rate_limiter 836 .as_ref() 837 .map(|r| r.new_handle()) 838 .transpose() 839 .unwrap(), 840 access_platform: self.common.access_platform.clone(), 841 read_only: self.read_only, 842 host_cpus: self.queue_affinity.get(&queue_idx).cloned(), 843 }; 844 845 let paused = self.common.paused.clone(); 846 let paused_sync = self.common.paused_sync.clone(); 847 848 spawn_virtio_thread( 849 &format!("{}_q{}", self.id.clone(), i), 850 &self.seccomp_action, 851 Thread::VirtioBlock, 852 &mut epoll_threads, 853 &self.exit_evt, 854 move || handler.run(paused, paused_sync.unwrap()), 855 )?; 856 } 857 858 self.common.epoll_threads = Some(epoll_threads); 859 event!("virtio-device", "activated", "id", &self.id); 860 861 Ok(()) 862 } 863 864 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 865 let result = self.common.reset(); 866 event!("virtio-device", "reset", "id", &self.id); 867 result 868 } 869 870 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 871 let mut counters = HashMap::new(); 872 873 counters.insert( 874 "read_bytes", 875 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 876 ); 877 counters.insert( 878 "write_bytes", 879 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 880 ); 881 counters.insert( 882 "read_ops", 883 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 884 ); 885 counters.insert( 886 "write_ops", 887 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 888 ); 889 counters.insert( 890 "write_latency_min", 891 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 892 ); 893 counters.insert( 894 "write_latency_max", 895 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 896 ); 897 counters.insert( 898 "write_latency_avg", 899 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 900 ); 901 counters.insert( 902 "read_latency_min", 903 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 904 ); 905 counters.insert( 906 "read_latency_max", 907 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 908 ); 909 counters.insert( 910 "read_latency_avg", 911 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 912 ); 913 914 Some(counters) 915 } 916 917 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 918 self.common.set_access_platform(access_platform) 919 } 920 } 921 922 impl Pausable for Block { 923 fn pause(&mut self) -> result::Result<(), MigratableError> { 924 self.common.pause() 925 } 926 927 fn resume(&mut self) -> result::Result<(), MigratableError> { 928 self.common.resume() 929 } 930 } 931 932 impl Snapshottable for Block { 933 fn id(&self) -> String { 934 self.id.clone() 935 } 936 937 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 938 Snapshot::new_from_state(&self.state()) 939 } 940 } 941 impl Transportable for Block {} 942 impl Migratable for Block {} 943