1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use std::collections::{BTreeMap, HashMap, VecDeque}; 12 use std::num::Wrapping; 13 use std::ops::Deref; 14 use std::os::unix::io::AsRawFd; 15 use std::path::PathBuf; 16 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 17 use std::sync::{Arc, Barrier}; 18 use std::{io, result}; 19 20 use anyhow::anyhow; 21 use block::async_io::{AsyncIo, AsyncIoError, DiskFile}; 22 use block::fcntl::{get_lock_state, LockError, LockType}; 23 use block::{build_serial, fcntl, Request, RequestType, VirtioBlockConfig}; 24 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; 25 use rate_limiter::TokenType; 26 use seccompiler::SeccompAction; 27 use serde::{Deserialize, Serialize}; 28 use thiserror::Error; 29 use virtio_bindings::virtio_blk::*; 30 use virtio_bindings::virtio_config::*; 31 use virtio_bindings::virtio_ring::{VIRTIO_RING_F_EVENT_IDX, VIRTIO_RING_F_INDIRECT_DESC}; 32 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 33 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 34 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 35 use vm_virtio::AccessPlatform; 36 use vmm_sys_util::eventfd::EventFd; 37 38 use super::{ 39 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, 40 Error as DeviceError, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, 41 EPOLL_HELPER_EVENT_LAST, 42 }; 43 use crate::seccomp_filters::Thread; 44 use crate::thread_helper::spawn_virtio_thread; 45 use crate::{GuestMemoryMmap, VirtioInterrupt}; 46 47 const SECTOR_SHIFT: u8 = 9; 48 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 49 50 // New descriptors are pending on the virtio queue. 51 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 52 // New completed tasks are pending on the completion ring. 53 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 54 // New 'wake up' event from the rate limiter 55 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 56 57 // latency scale, for reduce precision loss in calculate. 58 const LATENCY_SCALE: u64 = 10000; 59 60 pub const MINIMUM_BLOCK_QUEUE_SIZE: u16 = 2; 61 62 #[derive(Error, Debug)] 63 pub enum Error { 64 #[error("Failed to parse the request")] 65 RequestParsing(#[source] block::Error), 66 #[error("Failed to execute the request")] 67 RequestExecuting(#[source] block::ExecuteError), 68 #[error("Failed to complete the request")] 69 RequestCompleting(#[source] block::Error), 70 #[error("Missing the expected entry in the list of requests")] 71 MissingEntryRequestList, 72 #[error("The asynchronous request returned with failure")] 73 AsyncRequestFailure, 74 #[error("Failed synchronizing the file")] 75 Fsync(#[source] AsyncIoError), 76 #[error("Failed adding used index")] 77 QueueAddUsed(#[source] virtio_queue::Error), 78 #[error("Failed creating an iterator over the queue")] 79 QueueIterator(#[source] virtio_queue::Error), 80 #[error("Failed to update request status")] 81 RequestStatus(#[source] GuestMemoryError), 82 #[error("Failed to enable notification")] 83 QueueEnableNotification(#[source] virtio_queue::Error), 84 #[error("Failed to get {lock_type:?} lock for disk image: {path}")] 85 LockDiskImage { 86 /// The underlying error. 87 #[source] 88 error: LockError, 89 /// The requested lock type. 90 lock_type: LockType, 91 /// The path of the disk image. 92 path: PathBuf, 93 }, 94 } 95 96 pub type Result<T> = result::Result<T, Error>; 97 98 // latency will be records as microseconds, average latency 99 // will be save as scaled value. 100 #[derive(Clone)] 101 pub struct BlockCounters { 102 read_bytes: Arc<AtomicU64>, 103 read_ops: Arc<AtomicU64>, 104 read_latency_min: Arc<AtomicU64>, 105 read_latency_max: Arc<AtomicU64>, 106 read_latency_avg: Arc<AtomicU64>, 107 write_bytes: Arc<AtomicU64>, 108 write_ops: Arc<AtomicU64>, 109 write_latency_min: Arc<AtomicU64>, 110 write_latency_max: Arc<AtomicU64>, 111 write_latency_avg: Arc<AtomicU64>, 112 } 113 114 impl Default for BlockCounters { 115 fn default() -> Self { 116 BlockCounters { 117 read_bytes: Arc::new(AtomicU64::new(0)), 118 read_ops: Arc::new(AtomicU64::new(0)), 119 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 120 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 121 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 122 write_bytes: Arc::new(AtomicU64::new(0)), 123 write_ops: Arc::new(AtomicU64::new(0)), 124 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 125 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 126 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 127 } 128 } 129 } 130 131 struct BlockEpollHandler { 132 queue_index: u16, 133 queue: Queue, 134 mem: GuestMemoryAtomic<GuestMemoryMmap>, 135 disk_image: Box<dyn AsyncIo>, 136 disk_nsectors: u64, 137 interrupt_cb: Arc<dyn VirtioInterrupt>, 138 serial: Vec<u8>, 139 kill_evt: EventFd, 140 pause_evt: EventFd, 141 writeback: Arc<AtomicBool>, 142 counters: BlockCounters, 143 queue_evt: EventFd, 144 inflight_requests: VecDeque<(u16, Request)>, 145 rate_limiter: Option<RateLimiterGroupHandle>, 146 access_platform: Option<Arc<dyn AccessPlatform>>, 147 read_only: bool, 148 host_cpus: Option<Vec<usize>>, 149 } 150 151 impl BlockEpollHandler { 152 fn process_queue_submit(&mut self) -> Result<()> { 153 let queue = &mut self.queue; 154 155 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 156 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 157 .map_err(Error::RequestParsing)?; 158 159 // For virtio spec compliance 160 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 161 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 162 if self.read_only 163 && (request.request_type == RequestType::Out 164 || request.request_type == RequestType::Flush) 165 { 166 desc_chain 167 .memory() 168 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 169 .map_err(Error::RequestStatus)?; 170 171 // If no asynchronous operation has been submitted, we can 172 // simply return the used descriptor. 173 queue 174 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 175 .map_err(Error::QueueAddUsed)?; 176 queue 177 .enable_notification(self.mem.memory().deref()) 178 .map_err(Error::QueueEnableNotification)?; 179 continue; 180 } 181 182 if let Some(rate_limiter) = &mut self.rate_limiter { 183 // If limiter.consume() fails it means there is no more TokenType::Ops 184 // budget and rate limiting is in effect. 185 if !rate_limiter.consume(1, TokenType::Ops) { 186 // Stop processing the queue and return this descriptor chain to the 187 // avail ring, for later processing. 188 queue.go_to_previous_position(); 189 break; 190 } 191 // Exercise the rate limiter only if this request is of data transfer type. 192 if request.request_type == RequestType::In 193 || request.request_type == RequestType::Out 194 { 195 let mut bytes = Wrapping(0); 196 for (_, data_len) in &request.data_descriptors { 197 bytes += Wrapping(*data_len as u64); 198 } 199 200 // If limiter.consume() fails it means there is no more TokenType::Bytes 201 // budget and rate limiting is in effect. 202 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 203 // Revert the OPS consume(). 204 rate_limiter.manual_replenish(1, TokenType::Ops); 205 // Stop processing the queue and return this descriptor chain to the 206 // avail ring, for later processing. 207 queue.go_to_previous_position(); 208 break; 209 } 210 }; 211 } 212 213 request.set_writeback(self.writeback.load(Ordering::Acquire)); 214 215 let result = request.execute_async( 216 desc_chain.memory(), 217 self.disk_nsectors, 218 self.disk_image.as_mut(), 219 &self.serial, 220 desc_chain.head_index() as u64, 221 ); 222 223 if let Ok(true) = result { 224 self.inflight_requests 225 .push_back((desc_chain.head_index(), request)); 226 } else { 227 let status = match result { 228 Ok(_) => VIRTIO_BLK_S_OK, 229 Err(e) => { 230 warn!("Request failed: {:x?} {:?}", request, e); 231 VIRTIO_BLK_S_IOERR 232 } 233 }; 234 235 desc_chain 236 .memory() 237 .write_obj(status as u8, request.status_addr) 238 .map_err(Error::RequestStatus)?; 239 240 // If no asynchronous operation has been submitted, we can 241 // simply return the used descriptor. 242 queue 243 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 244 .map_err(Error::QueueAddUsed)?; 245 queue 246 .enable_notification(self.mem.memory().deref()) 247 .map_err(Error::QueueEnableNotification)?; 248 } 249 } 250 251 Ok(()) 252 } 253 254 fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> { 255 if self 256 .queue 257 .needs_notification(self.mem.memory().deref()) 258 .map_err(|e| { 259 EpollHelperError::HandleEvent(anyhow!( 260 "Failed to check needs_notification: {:?}", 261 e 262 )) 263 })? 264 { 265 self.signal_used_queue().map_err(|e| { 266 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 267 })?; 268 } 269 270 Ok(()) 271 } 272 273 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 274 self.process_queue_submit().map_err(|e| { 275 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 276 })?; 277 278 self.try_signal_used_queue() 279 } 280 281 #[inline] 282 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 283 // This loop neatly handles the fast path where the completions are 284 // in order (it turns into just a pop_front()) and the 1% of the time 285 // (analysis during boot) where slight out of ordering has been 286 // observed e.g. 287 // Submissions: 1 2 3 4 5 6 7 288 // Completions: 2 1 3 5 4 7 6 289 // In this case find the corresponding item and swap it with the front 290 // This is a O(1) operation and is prepared for the future as it it likely 291 // the next completion would be for the one that was skipped which will 292 // now be the new front. 293 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 294 if head == &completed_head { 295 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 296 } 297 } 298 299 Err(Error::MissingEntryRequestList) 300 } 301 302 fn process_queue_complete(&mut self) -> Result<()> { 303 let mem = self.mem.memory(); 304 let mut read_bytes = Wrapping(0); 305 let mut write_bytes = Wrapping(0); 306 let mut read_ops = Wrapping(0); 307 let mut write_ops = Wrapping(0); 308 309 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 310 let desc_index = user_data as u16; 311 312 let mut request = self.find_inflight_request(desc_index)?; 313 314 request.complete_async().map_err(Error::RequestCompleting)?; 315 316 let latency = request.start.elapsed().as_micros() as u64; 317 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 318 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 319 let read_max = self.counters.read_latency_max.load(Ordering::Relaxed); 320 let write_max = self.counters.write_latency_max.load(Ordering::Relaxed); 321 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 322 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 323 let (status, len) = if result >= 0 { 324 match request.request_type { 325 RequestType::In => { 326 for (_, data_len) in &request.data_descriptors { 327 read_bytes += Wrapping(*data_len as u64); 328 } 329 read_ops += Wrapping(1); 330 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 331 self.counters 332 .read_latency_min 333 .store(latency, Ordering::Relaxed); 334 } 335 if latency > read_max || read_max == u64::MAX { 336 self.counters 337 .read_latency_max 338 .store(latency, Ordering::Relaxed); 339 } 340 341 // Special case the first real latency report 342 read_avg = if read_avg == u64::MAX { 343 latency * LATENCY_SCALE 344 } else { 345 // Cumulative average is guaranteed to be 346 // positive if being calculated properly 347 (read_avg as i64 348 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64) 349 / (read_ops_last + read_ops.0) as i64) 350 .try_into() 351 .unwrap() 352 }; 353 } 354 RequestType::Out => { 355 if !request.writeback { 356 self.disk_image.fsync(None).map_err(Error::Fsync)?; 357 } 358 for (_, data_len) in &request.data_descriptors { 359 write_bytes += Wrapping(*data_len as u64); 360 } 361 write_ops += Wrapping(1); 362 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 363 self.counters 364 .write_latency_min 365 .store(latency, Ordering::Relaxed); 366 } 367 if latency > write_max || write_max == u64::MAX { 368 self.counters 369 .write_latency_max 370 .store(latency, Ordering::Relaxed); 371 } 372 373 // Special case the first real latency report 374 write_avg = if write_avg == u64::MAX { 375 latency * LATENCY_SCALE 376 } else { 377 // Cumulative average is guaranteed to be 378 // positive if being calculated properly 379 (write_avg as i64 380 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64) 381 / (write_ops_last + write_ops.0) as i64) 382 .try_into() 383 .unwrap() 384 } 385 } 386 _ => {} 387 } 388 389 self.counters 390 .read_latency_avg 391 .store(read_avg, Ordering::Relaxed); 392 393 self.counters 394 .write_latency_avg 395 .store(write_avg, Ordering::Relaxed); 396 397 (VIRTIO_BLK_S_OK as u8, result as u32) 398 } else { 399 warn!( 400 "Request failed: {:x?} {:?}", 401 request, 402 io::Error::from_raw_os_error(-result) 403 ); 404 (VIRTIO_BLK_S_IOERR as u8, 0) 405 }; 406 407 mem.write_obj(status, request.status_addr) 408 .map_err(Error::RequestStatus)?; 409 410 let queue = &mut self.queue; 411 412 queue 413 .add_used(mem.deref(), desc_index, len) 414 .map_err(Error::QueueAddUsed)?; 415 queue 416 .enable_notification(mem.deref()) 417 .map_err(Error::QueueEnableNotification)?; 418 } 419 420 self.counters 421 .write_bytes 422 .fetch_add(write_bytes.0, Ordering::AcqRel); 423 self.counters 424 .write_ops 425 .fetch_add(write_ops.0, Ordering::AcqRel); 426 427 self.counters 428 .read_bytes 429 .fetch_add(read_bytes.0, Ordering::AcqRel); 430 self.counters 431 .read_ops 432 .fetch_add(read_ops.0, Ordering::AcqRel); 433 434 Ok(()) 435 } 436 437 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 438 self.interrupt_cb 439 .trigger(VirtioInterruptType::Queue(self.queue_index)) 440 .map_err(|e| { 441 error!("Failed to signal used queue: {:?}", e); 442 DeviceError::FailedSignalingUsedQueue(e) 443 }) 444 } 445 446 fn set_queue_thread_affinity(&self) { 447 // Prepare the CPU set the current queue thread is expected to run onto. 448 let cpuset = self.host_cpus.as_ref().map(|host_cpus| { 449 // SAFETY: all zeros is a valid pattern 450 let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; 451 // SAFETY: FFI call, trivially safe 452 unsafe { libc::CPU_ZERO(&mut cpuset) }; 453 for host_cpu in host_cpus { 454 // SAFETY: FFI call, trivially safe 455 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; 456 } 457 cpuset 458 }); 459 460 // Schedule the thread to run on the expected CPU set 461 if let Some(cpuset) = cpuset.as_ref() { 462 // SAFETY: FFI call with correct arguments 463 let ret = unsafe { 464 libc::sched_setaffinity( 465 0, 466 std::mem::size_of::<libc::cpu_set_t>(), 467 cpuset as *const libc::cpu_set_t, 468 ) 469 }; 470 471 if ret != 0 { 472 error!( 473 "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", 474 self.queue_index, 475 io::Error::last_os_error() 476 ) 477 } 478 } 479 } 480 481 fn run( 482 &mut self, 483 paused: Arc<AtomicBool>, 484 paused_sync: Arc<Barrier>, 485 ) -> result::Result<(), EpollHelperError> { 486 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 487 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 488 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 489 if let Some(rate_limiter) = &self.rate_limiter { 490 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 491 } 492 self.set_queue_thread_affinity(); 493 helper.run(paused, paused_sync, self)?; 494 495 Ok(()) 496 } 497 } 498 499 impl EpollHelperHandler for BlockEpollHandler { 500 fn handle_event( 501 &mut self, 502 _helper: &mut EpollHelper, 503 event: &epoll::Event, 504 ) -> result::Result<(), EpollHelperError> { 505 let ev_type = event.data as u16; 506 match ev_type { 507 QUEUE_AVAIL_EVENT => { 508 self.queue_evt.read().map_err(|e| { 509 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 510 })?; 511 512 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked()); 513 514 // Process the queue only when the rate limit is not reached 515 if !rate_limit_reached { 516 self.process_queue_submit_and_signal()? 517 } 518 } 519 COMPLETION_EVENT => { 520 self.disk_image.notifier().read().map_err(|e| { 521 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 522 })?; 523 524 self.process_queue_complete().map_err(|e| { 525 EpollHelperError::HandleEvent(anyhow!( 526 "Failed to process queue (complete): {:?}", 527 e 528 )) 529 })?; 530 531 let rate_limit_reached = self.rate_limiter.as_ref().is_some_and(|r| r.is_blocked()); 532 533 // Process the queue only when the rate limit is not reached 534 if !rate_limit_reached { 535 self.process_queue_submit().map_err(|e| { 536 EpollHelperError::HandleEvent(anyhow!( 537 "Failed to process queue (submit): {:?}", 538 e 539 )) 540 })?; 541 } 542 self.try_signal_used_queue()?; 543 } 544 RATE_LIMITER_EVENT => { 545 if let Some(rate_limiter) = &mut self.rate_limiter { 546 // Upon rate limiter event, call the rate limiter handler 547 // and restart processing the queue. 548 rate_limiter.event_handler().map_err(|e| { 549 EpollHelperError::HandleEvent(anyhow!( 550 "Failed to process rate limiter event: {:?}", 551 e 552 )) 553 })?; 554 555 self.process_queue_submit_and_signal()? 556 } else { 557 return Err(EpollHelperError::HandleEvent(anyhow!( 558 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 559 ))); 560 } 561 } 562 _ => { 563 return Err(EpollHelperError::HandleEvent(anyhow!( 564 "Unexpected event: {}", 565 ev_type 566 ))); 567 } 568 } 569 Ok(()) 570 } 571 } 572 573 /// Virtio device for exposing block level read/write operations on a host file. 574 pub struct Block { 575 common: VirtioCommon, 576 id: String, 577 disk_image: Box<dyn DiskFile>, 578 disk_path: PathBuf, 579 disk_nsectors: u64, 580 config: VirtioBlockConfig, 581 writeback: Arc<AtomicBool>, 582 counters: BlockCounters, 583 seccomp_action: SeccompAction, 584 rate_limiter: Option<Arc<RateLimiterGroup>>, 585 exit_evt: EventFd, 586 read_only: bool, 587 serial: Vec<u8>, 588 queue_affinity: BTreeMap<u16, Vec<usize>>, 589 } 590 591 #[derive(Serialize, Deserialize)] 592 pub struct BlockState { 593 pub disk_path: String, 594 pub disk_nsectors: u64, 595 pub avail_features: u64, 596 pub acked_features: u64, 597 pub config: VirtioBlockConfig, 598 } 599 600 impl Block { 601 /// Create a new virtio block device that operates on the given file. 602 #[allow(clippy::too_many_arguments)] 603 pub fn new( 604 id: String, 605 mut disk_image: Box<dyn DiskFile>, 606 disk_path: PathBuf, 607 read_only: bool, 608 iommu: bool, 609 num_queues: usize, 610 queue_size: u16, 611 serial: Option<String>, 612 seccomp_action: SeccompAction, 613 rate_limiter: Option<Arc<RateLimiterGroup>>, 614 exit_evt: EventFd, 615 state: Option<BlockState>, 616 queue_affinity: BTreeMap<u16, Vec<usize>>, 617 ) -> io::Result<Self> { 618 let (disk_nsectors, avail_features, acked_features, config, paused) = 619 if let Some(state) = state { 620 info!("Restoring virtio-block {}", id); 621 ( 622 state.disk_nsectors, 623 state.avail_features, 624 state.acked_features, 625 state.config, 626 true, 627 ) 628 } else { 629 let disk_size = disk_image 630 .size() 631 .map_err(|e| io::Error::other(format!("Failed getting disk size: {e}")))?; 632 if disk_size % SECTOR_SIZE != 0 { 633 warn!( 634 "Disk size {} is not a multiple of sector size {}; \ 635 the remainder will not be visible to the guest.", 636 disk_size, SECTOR_SIZE 637 ); 638 } 639 640 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 641 | (1u64 << VIRTIO_BLK_F_FLUSH) 642 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 643 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 644 | (1u64 << VIRTIO_BLK_F_TOPOLOGY) 645 | (1u64 << VIRTIO_BLK_F_SEG_MAX) 646 | (1u64 << VIRTIO_RING_F_EVENT_IDX) 647 | (1u64 << VIRTIO_RING_F_INDIRECT_DESC); 648 if iommu { 649 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 650 } 651 652 if read_only { 653 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 654 } 655 656 let topology = disk_image.topology(); 657 info!("Disk topology: {:?}", topology); 658 659 let logical_block_size = if topology.logical_block_size > 512 { 660 topology.logical_block_size 661 } else { 662 512 663 }; 664 665 // Calculate the exponent that maps physical block to logical block 666 let mut physical_block_exp = 0; 667 let mut size = logical_block_size; 668 while size < topology.physical_block_size { 669 physical_block_exp += 1; 670 size <<= 1; 671 } 672 673 let disk_nsectors = disk_size / SECTOR_SIZE; 674 let mut config = VirtioBlockConfig { 675 capacity: disk_nsectors, 676 writeback: 1, 677 blk_size: topology.logical_block_size as u32, 678 physical_block_exp, 679 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 680 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 681 seg_max: (queue_size - MINIMUM_BLOCK_QUEUE_SIZE) as u32, 682 ..Default::default() 683 }; 684 685 if num_queues > 1 { 686 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 687 config.num_queues = num_queues as u16; 688 } 689 690 (disk_nsectors, avail_features, 0, config, false) 691 }; 692 693 let serial = serial 694 .map(Vec::from) 695 .unwrap_or_else(|| build_serial(&disk_path)); 696 697 Ok(Block { 698 common: VirtioCommon { 699 device_type: VirtioDeviceType::Block as u32, 700 avail_features, 701 acked_features, 702 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 703 queue_sizes: vec![queue_size; num_queues], 704 min_queues: 1, 705 paused: Arc::new(AtomicBool::new(paused)), 706 ..Default::default() 707 }, 708 id, 709 disk_image, 710 disk_path, 711 disk_nsectors, 712 config, 713 writeback: Arc::new(AtomicBool::new(true)), 714 counters: BlockCounters::default(), 715 seccomp_action, 716 rate_limiter, 717 exit_evt, 718 read_only, 719 serial, 720 queue_affinity, 721 }) 722 } 723 724 /// Tries to set an advisory lock for the corresponding disk image. 725 pub fn try_lock_image(&mut self) -> Result<()> { 726 let lock_type = match self.read_only { 727 true => LockType::Read, 728 false => LockType::Write, 729 }; 730 log::debug!( 731 "Attempting to acquire {lock_type:?} lock for disk image id={},path={}", 732 self.id, 733 self.disk_path.display() 734 ); 735 let fd = self.disk_image.fd(); 736 fcntl::try_acquire_lock(fd, lock_type).map_err(|error| { 737 let current_lock = get_lock_state(fd); 738 // Don't propagate the error to the outside, as it is not useful at all. Instead, 739 // we try to log additional help to the user. 740 if let Ok(current_lock) = current_lock { 741 log::error!("Can't get {lock_type:?} lock for {} as there is already a {current_lock:?} lock", self.disk_path.display()); 742 } else { 743 log::error!("Can't get {lock_type:?} lock for {}, but also can't determine the current lock state", self.disk_path.display()); 744 } 745 Error::LockDiskImage { 746 path: self.disk_path.clone(), 747 error, 748 lock_type, 749 } 750 })?; 751 log::info!( 752 "Acquired {lock_type:?} lock for disk image id={},path={}", 753 self.id, 754 self.disk_path.display() 755 ); 756 Ok(()) 757 } 758 759 /// Releases the advisory lock held for the corresponding disk image. 760 pub fn unlock_image(&mut self) -> Result<()> { 761 // It is very unlikely that this fails; 762 // Should we remove the Result to simplify the error propagation on 763 // higher levels? 764 fcntl::clear_lock(self.disk_image.fd()).map_err(|error| Error::LockDiskImage { 765 path: self.disk_path.clone(), 766 error, 767 lock_type: LockType::Unlock, 768 }) 769 } 770 771 fn state(&self) -> BlockState { 772 BlockState { 773 disk_path: self.disk_path.to_str().unwrap().to_owned(), 774 disk_nsectors: self.disk_nsectors, 775 avail_features: self.common.avail_features, 776 acked_features: self.common.acked_features, 777 config: self.config, 778 } 779 } 780 781 fn update_writeback(&mut self) { 782 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 783 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 784 self.config.writeback == 1 785 } else { 786 // Else check if VIRTIO_BLK_F_FLUSH negotiated 787 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 788 }; 789 790 info!( 791 "Changing cache mode to {}", 792 if writeback { 793 "writeback" 794 } else { 795 "writethrough" 796 } 797 ); 798 self.writeback.store(writeback, Ordering::Release); 799 } 800 801 #[cfg(fuzzing)] 802 pub fn wait_for_epoll_threads(&mut self) { 803 self.common.wait_for_epoll_threads(); 804 } 805 } 806 807 impl Drop for Block { 808 fn drop(&mut self) { 809 if let Some(kill_evt) = self.common.kill_evt.take() { 810 // Ignore the result because there is nothing we can do about it. 811 let _ = kill_evt.write(1); 812 } 813 self.common.wait_for_epoll_threads(); 814 } 815 } 816 817 impl VirtioDevice for Block { 818 fn device_type(&self) -> u32 { 819 self.common.device_type 820 } 821 822 fn queue_max_sizes(&self) -> &[u16] { 823 &self.common.queue_sizes 824 } 825 826 fn features(&self) -> u64 { 827 self.common.avail_features 828 } 829 830 fn ack_features(&mut self, value: u64) { 831 self.common.ack_features(value) 832 } 833 834 fn read_config(&self, offset: u64, data: &mut [u8]) { 835 self.read_config_from_slice(self.config.as_slice(), offset, data); 836 } 837 838 fn write_config(&mut self, offset: u64, data: &[u8]) { 839 // The "writeback" field is the only mutable field 840 let writeback_offset = 841 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 842 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 843 { 844 error!( 845 "Attempt to write to read-only field: offset {:x} length {}", 846 offset, 847 data.len() 848 ); 849 return; 850 } 851 852 self.config.writeback = data[0]; 853 self.update_writeback(); 854 } 855 856 fn activate( 857 &mut self, 858 mem: GuestMemoryAtomic<GuestMemoryMmap>, 859 interrupt_cb: Arc<dyn VirtioInterrupt>, 860 mut queues: Vec<(usize, Queue, EventFd)>, 861 ) -> ActivateResult { 862 self.common.activate(&queues, &interrupt_cb)?; 863 864 self.update_writeback(); 865 866 let mut epoll_threads = Vec::new(); 867 let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into()); 868 869 for i in 0..queues.len() { 870 let (_, mut queue, queue_evt) = queues.remove(0); 871 queue.set_event_idx(event_idx); 872 873 let queue_size = queue.size(); 874 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 875 let queue_idx = i as u16; 876 877 let mut handler = BlockEpollHandler { 878 queue_index: queue_idx, 879 queue, 880 mem: mem.clone(), 881 disk_image: self 882 .disk_image 883 .new_async_io(queue_size as u32) 884 .map_err(|e| { 885 error!("failed to create new AsyncIo: {}", e); 886 ActivateError::BadActivate 887 })?, 888 disk_nsectors: self.disk_nsectors, 889 interrupt_cb: interrupt_cb.clone(), 890 serial: self.serial.clone(), 891 kill_evt, 892 pause_evt, 893 writeback: self.writeback.clone(), 894 counters: self.counters.clone(), 895 queue_evt, 896 // Analysis during boot shows around ~40 maximum requests 897 // This gives head room for systems with slower I/O without 898 // compromising the cost of the reallocation or memory overhead 899 inflight_requests: VecDeque::with_capacity(64), 900 rate_limiter: self 901 .rate_limiter 902 .as_ref() 903 .map(|r| r.new_handle()) 904 .transpose() 905 .unwrap(), 906 access_platform: self.common.access_platform.clone(), 907 read_only: self.read_only, 908 host_cpus: self.queue_affinity.get(&queue_idx).cloned(), 909 }; 910 911 let paused = self.common.paused.clone(); 912 let paused_sync = self.common.paused_sync.clone(); 913 914 spawn_virtio_thread( 915 &format!("{}_q{}", self.id.clone(), i), 916 &self.seccomp_action, 917 Thread::VirtioBlock, 918 &mut epoll_threads, 919 &self.exit_evt, 920 move || handler.run(paused, paused_sync.unwrap()), 921 )?; 922 } 923 924 self.common.epoll_threads = Some(epoll_threads); 925 event!("virtio-device", "activated", "id", &self.id); 926 927 Ok(()) 928 } 929 930 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 931 let result = self.common.reset(); 932 event!("virtio-device", "reset", "id", &self.id); 933 result 934 } 935 936 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 937 let mut counters = HashMap::new(); 938 939 counters.insert( 940 "read_bytes", 941 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 942 ); 943 counters.insert( 944 "write_bytes", 945 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 946 ); 947 counters.insert( 948 "read_ops", 949 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 950 ); 951 counters.insert( 952 "write_ops", 953 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 954 ); 955 counters.insert( 956 "write_latency_min", 957 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 958 ); 959 counters.insert( 960 "write_latency_max", 961 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 962 ); 963 counters.insert( 964 "write_latency_avg", 965 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 966 ); 967 counters.insert( 968 "read_latency_min", 969 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 970 ); 971 counters.insert( 972 "read_latency_max", 973 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 974 ); 975 counters.insert( 976 "read_latency_avg", 977 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 978 ); 979 980 Some(counters) 981 } 982 983 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 984 self.common.set_access_platform(access_platform) 985 } 986 } 987 988 impl Pausable for Block { 989 fn pause(&mut self) -> result::Result<(), MigratableError> { 990 self.common.pause() 991 } 992 993 fn resume(&mut self) -> result::Result<(), MigratableError> { 994 self.common.resume() 995 } 996 } 997 998 impl Snapshottable for Block { 999 fn id(&self) -> String { 1000 self.id.clone() 1001 } 1002 1003 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 1004 Snapshot::new_from_state(&self.state()) 1005 } 1006 } 1007 impl Transportable for Block {} 1008 impl Migratable for Block {} 1009