1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use super::Error as DeviceError; 12 use super::{ 13 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, VirtioCommon, 14 VirtioDevice, VirtioDeviceType, VirtioInterruptType, EPOLL_HELPER_EVENT_LAST, 15 }; 16 use crate::seccomp_filters::Thread; 17 use crate::thread_helper::spawn_virtio_thread; 18 use crate::GuestMemoryMmap; 19 use crate::VirtioInterrupt; 20 use anyhow::anyhow; 21 use block::{ 22 async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_serial, Request, 23 RequestType, VirtioBlockConfig, 24 }; 25 use rate_limiter::group::{RateLimiterGroup, RateLimiterGroupHandle}; 26 use rate_limiter::TokenType; 27 use seccompiler::SeccompAction; 28 use serde::{Deserialize, Serialize}; 29 use std::collections::BTreeMap; 30 use std::collections::HashMap; 31 use std::collections::VecDeque; 32 use std::io; 33 use std::num::Wrapping; 34 use std::ops::Deref; 35 use std::os::unix::io::AsRawFd; 36 use std::path::PathBuf; 37 use std::result; 38 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 39 use std::sync::{Arc, Barrier}; 40 use thiserror::Error; 41 use virtio_bindings::virtio_blk::*; 42 use virtio_bindings::virtio_config::*; 43 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 44 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 45 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 46 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 47 use vm_virtio::AccessPlatform; 48 use vmm_sys_util::eventfd::EventFd; 49 50 const SECTOR_SHIFT: u8 = 9; 51 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 52 53 // New descriptors are pending on the virtio queue. 54 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 55 // New completed tasks are pending on the completion ring. 56 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 57 // New 'wake up' event from the rate limiter 58 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 59 60 // latency scale, for reduce precision loss in calculate. 61 const LATENCY_SCALE: u64 = 10000; 62 63 #[derive(Error, Debug)] 64 pub enum Error { 65 #[error("Failed to parse the request: {0}")] 66 RequestParsing(block::Error), 67 #[error("Failed to execute the request: {0}")] 68 RequestExecuting(block::ExecuteError), 69 #[error("Failed to complete the request: {0}")] 70 RequestCompleting(block::Error), 71 #[error("Missing the expected entry in the list of requests")] 72 MissingEntryRequestList, 73 #[error("The asynchronous request returned with failure")] 74 AsyncRequestFailure, 75 #[error("Failed synchronizing the file: {0}")] 76 Fsync(AsyncIoError), 77 #[error("Failed adding used index: {0}")] 78 QueueAddUsed(virtio_queue::Error), 79 #[error("Failed creating an iterator over the queue: {0}")] 80 QueueIterator(virtio_queue::Error), 81 #[error("Failed to update request status: {0}")] 82 RequestStatus(GuestMemoryError), 83 #[error("Failed to enable notification: {0}")] 84 QueueEnableNotification(virtio_queue::Error), 85 } 86 87 pub type Result<T> = result::Result<T, Error>; 88 89 // latency will be records as microseconds, average latency 90 // will be save as scaled value. 91 #[derive(Clone)] 92 pub struct BlockCounters { 93 read_bytes: Arc<AtomicU64>, 94 read_ops: Arc<AtomicU64>, 95 read_latency_min: Arc<AtomicU64>, 96 read_latency_max: Arc<AtomicU64>, 97 read_latency_avg: Arc<AtomicU64>, 98 write_bytes: Arc<AtomicU64>, 99 write_ops: Arc<AtomicU64>, 100 write_latency_min: Arc<AtomicU64>, 101 write_latency_max: Arc<AtomicU64>, 102 write_latency_avg: Arc<AtomicU64>, 103 } 104 105 impl Default for BlockCounters { 106 fn default() -> Self { 107 BlockCounters { 108 read_bytes: Arc::new(AtomicU64::new(0)), 109 read_ops: Arc::new(AtomicU64::new(0)), 110 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 111 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 112 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 113 write_bytes: Arc::new(AtomicU64::new(0)), 114 write_ops: Arc::new(AtomicU64::new(0)), 115 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 116 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 117 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 118 } 119 } 120 } 121 122 struct BlockEpollHandler { 123 queue_index: u16, 124 queue: Queue, 125 mem: GuestMemoryAtomic<GuestMemoryMmap>, 126 disk_image: Box<dyn AsyncIo>, 127 disk_nsectors: u64, 128 interrupt_cb: Arc<dyn VirtioInterrupt>, 129 serial: Vec<u8>, 130 kill_evt: EventFd, 131 pause_evt: EventFd, 132 writeback: Arc<AtomicBool>, 133 counters: BlockCounters, 134 queue_evt: EventFd, 135 inflight_requests: VecDeque<(u16, Request)>, 136 rate_limiter: Option<RateLimiterGroupHandle>, 137 access_platform: Option<Arc<dyn AccessPlatform>>, 138 read_only: bool, 139 host_cpus: Option<Vec<usize>>, 140 } 141 142 impl BlockEpollHandler { 143 fn process_queue_submit(&mut self) -> Result<()> { 144 let queue = &mut self.queue; 145 146 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 147 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 148 .map_err(Error::RequestParsing)?; 149 150 // For virtio spec compliance 151 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 152 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 153 if self.read_only 154 && (request.request_type == RequestType::Out 155 || request.request_type == RequestType::Flush) 156 { 157 desc_chain 158 .memory() 159 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 160 .map_err(Error::RequestStatus)?; 161 162 // If no asynchronous operation has been submitted, we can 163 // simply return the used descriptor. 164 queue 165 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 166 .map_err(Error::QueueAddUsed)?; 167 queue 168 .enable_notification(self.mem.memory().deref()) 169 .map_err(Error::QueueEnableNotification)?; 170 continue; 171 } 172 173 if let Some(rate_limiter) = &mut self.rate_limiter { 174 // If limiter.consume() fails it means there is no more TokenType::Ops 175 // budget and rate limiting is in effect. 176 if !rate_limiter.consume(1, TokenType::Ops) { 177 // Stop processing the queue and return this descriptor chain to the 178 // avail ring, for later processing. 179 queue.go_to_previous_position(); 180 break; 181 } 182 // Exercise the rate limiter only if this request is of data transfer type. 183 if request.request_type == RequestType::In 184 || request.request_type == RequestType::Out 185 { 186 let mut bytes = Wrapping(0); 187 for (_, data_len) in &request.data_descriptors { 188 bytes += Wrapping(*data_len as u64); 189 } 190 191 // If limiter.consume() fails it means there is no more TokenType::Bytes 192 // budget and rate limiting is in effect. 193 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 194 // Revert the OPS consume(). 195 rate_limiter.manual_replenish(1, TokenType::Ops); 196 // Stop processing the queue and return this descriptor chain to the 197 // avail ring, for later processing. 198 queue.go_to_previous_position(); 199 break; 200 } 201 }; 202 } 203 204 request.set_writeback(self.writeback.load(Ordering::Acquire)); 205 206 if request 207 .execute_async( 208 desc_chain.memory(), 209 self.disk_nsectors, 210 self.disk_image.as_mut(), 211 &self.serial, 212 desc_chain.head_index() as u64, 213 ) 214 .map_err(Error::RequestExecuting)? 215 { 216 self.inflight_requests 217 .push_back((desc_chain.head_index(), request)); 218 } else { 219 desc_chain 220 .memory() 221 .write_obj(VIRTIO_BLK_S_OK as u8, request.status_addr) 222 .map_err(Error::RequestStatus)?; 223 224 // If no asynchronous operation has been submitted, we can 225 // simply return the used descriptor. 226 queue 227 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 228 .map_err(Error::QueueAddUsed)?; 229 queue 230 .enable_notification(self.mem.memory().deref()) 231 .map_err(Error::QueueEnableNotification)?; 232 } 233 } 234 235 Ok(()) 236 } 237 238 fn try_signal_used_queue(&mut self) -> result::Result<(), EpollHelperError> { 239 if self 240 .queue 241 .needs_notification(self.mem.memory().deref()) 242 .map_err(|e| { 243 EpollHelperError::HandleEvent(anyhow!( 244 "Failed to check needs_notification: {:?}", 245 e 246 )) 247 })? 248 { 249 self.signal_used_queue().map_err(|e| { 250 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 251 })?; 252 } 253 254 Ok(()) 255 } 256 257 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 258 self.process_queue_submit().map_err(|e| { 259 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 260 })?; 261 262 self.try_signal_used_queue() 263 } 264 265 #[inline] 266 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 267 // This loop neatly handles the fast path where the completions are 268 // in order (it turns into just a pop_front()) and the 1% of the time 269 // (analysis during boot) where slight out of ordering has been 270 // observed e.g. 271 // Submissions: 1 2 3 4 5 6 7 272 // Completions: 2 1 3 5 4 7 6 273 // In this case find the corresponding item and swap it with the front 274 // This is a O(1) operation and is prepared for the future as it it likely 275 // the next completion would be for the one that was skipped which will 276 // now be the new front. 277 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 278 if head == &completed_head { 279 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 280 } 281 } 282 283 Err(Error::MissingEntryRequestList) 284 } 285 286 fn process_queue_complete(&mut self) -> Result<()> { 287 let mem = self.mem.memory(); 288 let mut read_bytes = Wrapping(0); 289 let mut write_bytes = Wrapping(0); 290 let mut read_ops = Wrapping(0); 291 let mut write_ops = Wrapping(0); 292 293 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 294 let desc_index = user_data as u16; 295 296 let mut request = self.find_inflight_request(desc_index)?; 297 298 request.complete_async().map_err(Error::RequestCompleting)?; 299 300 let latency = request.start.elapsed().as_micros() as u64; 301 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 302 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 303 let read_max = self.counters.read_latency_max.load(Ordering::Relaxed); 304 let write_max = self.counters.write_latency_max.load(Ordering::Relaxed); 305 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 306 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 307 let (status, len) = if result >= 0 { 308 match request.request_type { 309 RequestType::In => { 310 for (_, data_len) in &request.data_descriptors { 311 read_bytes += Wrapping(*data_len as u64); 312 } 313 read_ops += Wrapping(1); 314 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 315 self.counters 316 .read_latency_min 317 .store(latency, Ordering::Relaxed); 318 } 319 if latency > read_max || read_max == u64::MAX { 320 self.counters 321 .read_latency_max 322 .store(latency, Ordering::Relaxed); 323 } 324 325 // Special case the first real latency report 326 read_avg = if read_avg == u64::MAX { 327 latency * LATENCY_SCALE 328 } else { 329 // Cumulative average is guaranteed to be 330 // positive if being calculated properly 331 (read_avg as i64 332 + ((latency * LATENCY_SCALE) as i64 - read_avg as i64) 333 / (read_ops_last + read_ops.0) as i64) 334 .try_into() 335 .unwrap() 336 }; 337 } 338 RequestType::Out => { 339 if !request.writeback { 340 self.disk_image.fsync(None).map_err(Error::Fsync)?; 341 } 342 for (_, data_len) in &request.data_descriptors { 343 write_bytes += Wrapping(*data_len as u64); 344 } 345 write_ops += Wrapping(1); 346 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 347 self.counters 348 .write_latency_min 349 .store(latency, Ordering::Relaxed); 350 } 351 if latency > write_max || write_max == u64::MAX { 352 self.counters 353 .write_latency_max 354 .store(latency, Ordering::Relaxed); 355 } 356 357 // Special case the first real latency report 358 write_avg = if write_avg == u64::MAX { 359 latency * LATENCY_SCALE 360 } else { 361 // Cumulative average is guaranteed to be 362 // positive if being calculated properly 363 (write_avg as i64 364 + ((latency * LATENCY_SCALE) as i64 - write_avg as i64) 365 / (write_ops_last + write_ops.0) as i64) 366 .try_into() 367 .unwrap() 368 } 369 } 370 _ => {} 371 } 372 373 self.counters 374 .read_latency_avg 375 .store(read_avg, Ordering::Relaxed); 376 377 self.counters 378 .write_latency_avg 379 .store(write_avg, Ordering::Relaxed); 380 381 (VIRTIO_BLK_S_OK as u8, result as u32) 382 } else { 383 error!( 384 "Request failed: {:x?} {:?}", 385 request, 386 io::Error::from_raw_os_error(-result) 387 ); 388 return Err(Error::AsyncRequestFailure); 389 }; 390 391 mem.write_obj(status, request.status_addr) 392 .map_err(Error::RequestStatus)?; 393 394 let queue = &mut self.queue; 395 396 queue 397 .add_used(mem.deref(), desc_index, len) 398 .map_err(Error::QueueAddUsed)?; 399 queue 400 .enable_notification(mem.deref()) 401 .map_err(Error::QueueEnableNotification)?; 402 } 403 404 self.counters 405 .write_bytes 406 .fetch_add(write_bytes.0, Ordering::AcqRel); 407 self.counters 408 .write_ops 409 .fetch_add(write_ops.0, Ordering::AcqRel); 410 411 self.counters 412 .read_bytes 413 .fetch_add(read_bytes.0, Ordering::AcqRel); 414 self.counters 415 .read_ops 416 .fetch_add(read_ops.0, Ordering::AcqRel); 417 418 Ok(()) 419 } 420 421 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 422 self.interrupt_cb 423 .trigger(VirtioInterruptType::Queue(self.queue_index)) 424 .map_err(|e| { 425 error!("Failed to signal used queue: {:?}", e); 426 DeviceError::FailedSignalingUsedQueue(e) 427 }) 428 } 429 430 fn set_queue_thread_affinity(&self) { 431 // Prepare the CPU set the current queue thread is expected to run onto. 432 let cpuset = self.host_cpus.as_ref().map(|host_cpus| { 433 // SAFETY: all zeros is a valid pattern 434 let mut cpuset: libc::cpu_set_t = unsafe { std::mem::zeroed() }; 435 // SAFETY: FFI call, trivially safe 436 unsafe { libc::CPU_ZERO(&mut cpuset) }; 437 for host_cpu in host_cpus { 438 // SAFETY: FFI call, trivially safe 439 unsafe { libc::CPU_SET(*host_cpu, &mut cpuset) }; 440 } 441 cpuset 442 }); 443 444 // Schedule the thread to run on the expected CPU set 445 if let Some(cpuset) = cpuset.as_ref() { 446 // SAFETY: FFI call with correct arguments 447 let ret = unsafe { 448 libc::sched_setaffinity( 449 0, 450 std::mem::size_of::<libc::cpu_set_t>(), 451 cpuset as *const libc::cpu_set_t, 452 ) 453 }; 454 455 if ret != 0 { 456 error!( 457 "Failed scheduling the virtqueue thread {} on the expected CPU set: {}", 458 self.queue_index, 459 io::Error::last_os_error() 460 ) 461 } 462 } 463 } 464 465 fn run( 466 &mut self, 467 paused: Arc<AtomicBool>, 468 paused_sync: Arc<Barrier>, 469 ) -> result::Result<(), EpollHelperError> { 470 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 471 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 472 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 473 if let Some(rate_limiter) = &self.rate_limiter { 474 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 475 } 476 self.set_queue_thread_affinity(); 477 helper.run(paused, paused_sync, self)?; 478 479 Ok(()) 480 } 481 } 482 483 impl EpollHelperHandler for BlockEpollHandler { 484 fn handle_event( 485 &mut self, 486 _helper: &mut EpollHelper, 487 event: &epoll::Event, 488 ) -> result::Result<(), EpollHelperError> { 489 let ev_type = event.data as u16; 490 match ev_type { 491 QUEUE_AVAIL_EVENT => { 492 self.queue_evt.read().map_err(|e| { 493 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 494 })?; 495 496 let rate_limit_reached = 497 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 498 499 // Process the queue only when the rate limit is not reached 500 if !rate_limit_reached { 501 self.process_queue_submit_and_signal()? 502 } 503 } 504 COMPLETION_EVENT => { 505 self.disk_image.notifier().read().map_err(|e| { 506 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 507 })?; 508 509 self.process_queue_complete().map_err(|e| { 510 EpollHelperError::HandleEvent(anyhow!( 511 "Failed to process queue (complete): {:?}", 512 e 513 )) 514 })?; 515 516 let rate_limit_reached = 517 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 518 519 // Process the queue only when the rate limit is not reached 520 if !rate_limit_reached { 521 self.process_queue_submit().map_err(|e| { 522 EpollHelperError::HandleEvent(anyhow!( 523 "Failed to process queue (submit): {:?}", 524 e 525 )) 526 })?; 527 } 528 self.try_signal_used_queue()?; 529 } 530 RATE_LIMITER_EVENT => { 531 if let Some(rate_limiter) = &mut self.rate_limiter { 532 // Upon rate limiter event, call the rate limiter handler 533 // and restart processing the queue. 534 rate_limiter.event_handler().map_err(|e| { 535 EpollHelperError::HandleEvent(anyhow!( 536 "Failed to process rate limiter event: {:?}", 537 e 538 )) 539 })?; 540 541 self.process_queue_submit_and_signal()? 542 } else { 543 return Err(EpollHelperError::HandleEvent(anyhow!( 544 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 545 ))); 546 } 547 } 548 _ => { 549 return Err(EpollHelperError::HandleEvent(anyhow!( 550 "Unexpected event: {}", 551 ev_type 552 ))); 553 } 554 } 555 Ok(()) 556 } 557 } 558 559 /// Virtio device for exposing block level read/write operations on a host file. 560 pub struct Block { 561 common: VirtioCommon, 562 id: String, 563 disk_image: Box<dyn DiskFile>, 564 disk_path: PathBuf, 565 disk_nsectors: u64, 566 config: VirtioBlockConfig, 567 writeback: Arc<AtomicBool>, 568 counters: BlockCounters, 569 seccomp_action: SeccompAction, 570 rate_limiter: Option<Arc<RateLimiterGroup>>, 571 exit_evt: EventFd, 572 read_only: bool, 573 serial: Vec<u8>, 574 queue_affinity: BTreeMap<u16, Vec<usize>>, 575 } 576 577 #[derive(Serialize, Deserialize)] 578 pub struct BlockState { 579 pub disk_path: String, 580 pub disk_nsectors: u64, 581 pub avail_features: u64, 582 pub acked_features: u64, 583 pub config: VirtioBlockConfig, 584 } 585 586 impl Block { 587 /// Create a new virtio block device that operates on the given file. 588 #[allow(clippy::too_many_arguments)] 589 pub fn new( 590 id: String, 591 mut disk_image: Box<dyn DiskFile>, 592 disk_path: PathBuf, 593 read_only: bool, 594 iommu: bool, 595 num_queues: usize, 596 queue_size: u16, 597 serial: Option<String>, 598 seccomp_action: SeccompAction, 599 rate_limiter: Option<Arc<RateLimiterGroup>>, 600 exit_evt: EventFd, 601 state: Option<BlockState>, 602 queue_affinity: BTreeMap<u16, Vec<usize>>, 603 ) -> io::Result<Self> { 604 let (disk_nsectors, avail_features, acked_features, config, paused) = 605 if let Some(state) = state { 606 info!("Restoring virtio-block {}", id); 607 ( 608 state.disk_nsectors, 609 state.avail_features, 610 state.acked_features, 611 state.config, 612 true, 613 ) 614 } else { 615 let disk_size = disk_image.size().map_err(|e| { 616 io::Error::new( 617 io::ErrorKind::Other, 618 format!("Failed getting disk size: {e}"), 619 ) 620 })?; 621 if disk_size % SECTOR_SIZE != 0 { 622 warn!( 623 "Disk size {} is not a multiple of sector size {}; \ 624 the remainder will not be visible to the guest.", 625 disk_size, SECTOR_SIZE 626 ); 627 } 628 629 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 630 | (1u64 << VIRTIO_BLK_F_FLUSH) 631 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 632 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 633 | (1u64 << VIRTIO_BLK_F_TOPOLOGY) 634 | (1u64 << VIRTIO_RING_F_EVENT_IDX); 635 636 if iommu { 637 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 638 } 639 640 if read_only { 641 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 642 } 643 644 let topology = disk_image.topology(); 645 info!("Disk topology: {:?}", topology); 646 647 let logical_block_size = if topology.logical_block_size > 512 { 648 topology.logical_block_size 649 } else { 650 512 651 }; 652 653 // Calculate the exponent that maps physical block to logical block 654 let mut physical_block_exp = 0; 655 let mut size = logical_block_size; 656 while size < topology.physical_block_size { 657 physical_block_exp += 1; 658 size <<= 1; 659 } 660 661 let disk_nsectors = disk_size / SECTOR_SIZE; 662 let mut config = VirtioBlockConfig { 663 capacity: disk_nsectors, 664 writeback: 1, 665 blk_size: topology.logical_block_size as u32, 666 physical_block_exp, 667 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 668 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 669 ..Default::default() 670 }; 671 672 if num_queues > 1 { 673 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 674 config.num_queues = num_queues as u16; 675 } 676 677 (disk_nsectors, avail_features, 0, config, false) 678 }; 679 680 let serial = serial 681 .map(Vec::from) 682 .unwrap_or_else(|| build_serial(&disk_path)); 683 684 Ok(Block { 685 common: VirtioCommon { 686 device_type: VirtioDeviceType::Block as u32, 687 avail_features, 688 acked_features, 689 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 690 queue_sizes: vec![queue_size; num_queues], 691 min_queues: 1, 692 paused: Arc::new(AtomicBool::new(paused)), 693 ..Default::default() 694 }, 695 id, 696 disk_image, 697 disk_path, 698 disk_nsectors, 699 config, 700 writeback: Arc::new(AtomicBool::new(true)), 701 counters: BlockCounters::default(), 702 seccomp_action, 703 rate_limiter, 704 exit_evt, 705 read_only, 706 serial, 707 queue_affinity, 708 }) 709 } 710 711 fn state(&self) -> BlockState { 712 BlockState { 713 disk_path: self.disk_path.to_str().unwrap().to_owned(), 714 disk_nsectors: self.disk_nsectors, 715 avail_features: self.common.avail_features, 716 acked_features: self.common.acked_features, 717 config: self.config, 718 } 719 } 720 721 fn update_writeback(&mut self) { 722 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 723 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 724 self.config.writeback == 1 725 } else { 726 // Else check if VIRTIO_BLK_F_FLUSH negotiated 727 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 728 }; 729 730 info!( 731 "Changing cache mode to {}", 732 if writeback { 733 "writeback" 734 } else { 735 "writethrough" 736 } 737 ); 738 self.writeback.store(writeback, Ordering::Release); 739 } 740 741 #[cfg(fuzzing)] 742 pub fn wait_for_epoll_threads(&mut self) { 743 self.common.wait_for_epoll_threads(); 744 } 745 } 746 747 impl Drop for Block { 748 fn drop(&mut self) { 749 if let Some(kill_evt) = self.common.kill_evt.take() { 750 // Ignore the result because there is nothing we can do about it. 751 let _ = kill_evt.write(1); 752 } 753 self.common.wait_for_epoll_threads(); 754 } 755 } 756 757 impl VirtioDevice for Block { 758 fn device_type(&self) -> u32 { 759 self.common.device_type 760 } 761 762 fn queue_max_sizes(&self) -> &[u16] { 763 &self.common.queue_sizes 764 } 765 766 fn features(&self) -> u64 { 767 self.common.avail_features 768 } 769 770 fn ack_features(&mut self, value: u64) { 771 self.common.ack_features(value) 772 } 773 774 fn read_config(&self, offset: u64, data: &mut [u8]) { 775 self.read_config_from_slice(self.config.as_slice(), offset, data); 776 } 777 778 fn write_config(&mut self, offset: u64, data: &[u8]) { 779 // The "writeback" field is the only mutable field 780 let writeback_offset = 781 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 782 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 783 { 784 error!( 785 "Attempt to write to read-only field: offset {:x} length {}", 786 offset, 787 data.len() 788 ); 789 return; 790 } 791 792 self.config.writeback = data[0]; 793 self.update_writeback(); 794 } 795 796 fn activate( 797 &mut self, 798 mem: GuestMemoryAtomic<GuestMemoryMmap>, 799 interrupt_cb: Arc<dyn VirtioInterrupt>, 800 mut queues: Vec<(usize, Queue, EventFd)>, 801 ) -> ActivateResult { 802 self.common.activate(&queues, &interrupt_cb)?; 803 804 self.update_writeback(); 805 806 let mut epoll_threads = Vec::new(); 807 let event_idx = self.common.feature_acked(VIRTIO_RING_F_EVENT_IDX.into()); 808 809 for i in 0..queues.len() { 810 let (_, mut queue, queue_evt) = queues.remove(0); 811 queue.set_event_idx(event_idx); 812 813 let queue_size = queue.size(); 814 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 815 let queue_idx = i as u16; 816 817 let mut handler = BlockEpollHandler { 818 queue_index: queue_idx, 819 queue, 820 mem: mem.clone(), 821 disk_image: self 822 .disk_image 823 .new_async_io(queue_size as u32) 824 .map_err(|e| { 825 error!("failed to create new AsyncIo: {}", e); 826 ActivateError::BadActivate 827 })?, 828 disk_nsectors: self.disk_nsectors, 829 interrupt_cb: interrupt_cb.clone(), 830 serial: self.serial.clone(), 831 kill_evt, 832 pause_evt, 833 writeback: self.writeback.clone(), 834 counters: self.counters.clone(), 835 queue_evt, 836 // Analysis during boot shows around ~40 maximum requests 837 // This gives head room for systems with slower I/O without 838 // compromising the cost of the reallocation or memory overhead 839 inflight_requests: VecDeque::with_capacity(64), 840 rate_limiter: self 841 .rate_limiter 842 .as_ref() 843 .map(|r| r.new_handle()) 844 .transpose() 845 .unwrap(), 846 access_platform: self.common.access_platform.clone(), 847 read_only: self.read_only, 848 host_cpus: self.queue_affinity.get(&queue_idx).cloned(), 849 }; 850 851 let paused = self.common.paused.clone(); 852 let paused_sync = self.common.paused_sync.clone(); 853 854 spawn_virtio_thread( 855 &format!("{}_q{}", self.id.clone(), i), 856 &self.seccomp_action, 857 Thread::VirtioBlock, 858 &mut epoll_threads, 859 &self.exit_evt, 860 move || handler.run(paused, paused_sync.unwrap()), 861 )?; 862 } 863 864 self.common.epoll_threads = Some(epoll_threads); 865 event!("virtio-device", "activated", "id", &self.id); 866 867 Ok(()) 868 } 869 870 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 871 let result = self.common.reset(); 872 event!("virtio-device", "reset", "id", &self.id); 873 result 874 } 875 876 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 877 let mut counters = HashMap::new(); 878 879 counters.insert( 880 "read_bytes", 881 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 882 ); 883 counters.insert( 884 "write_bytes", 885 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 886 ); 887 counters.insert( 888 "read_ops", 889 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 890 ); 891 counters.insert( 892 "write_ops", 893 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 894 ); 895 counters.insert( 896 "write_latency_min", 897 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 898 ); 899 counters.insert( 900 "write_latency_max", 901 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 902 ); 903 counters.insert( 904 "write_latency_avg", 905 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 906 ); 907 counters.insert( 908 "read_latency_min", 909 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 910 ); 911 counters.insert( 912 "read_latency_max", 913 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 914 ); 915 counters.insert( 916 "read_latency_avg", 917 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 918 ); 919 920 Some(counters) 921 } 922 923 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 924 self.common.set_access_platform(access_platform) 925 } 926 } 927 928 impl Pausable for Block { 929 fn pause(&mut self) -> result::Result<(), MigratableError> { 930 self.common.pause() 931 } 932 933 fn resume(&mut self) -> result::Result<(), MigratableError> { 934 self.common.resume() 935 } 936 } 937 938 impl Snapshottable for Block { 939 fn id(&self) -> String { 940 self.id.clone() 941 } 942 943 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 944 Snapshot::new_from_state(&self.state()) 945 } 946 } 947 impl Transportable for Block {} 948 impl Migratable for Block {} 949