1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use super::Error as DeviceError; 12 use super::{ 13 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, 14 RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, 15 EPOLL_HELPER_EVENT_LAST, 16 }; 17 use crate::seccomp_filters::Thread; 18 use crate::thread_helper::spawn_virtio_thread; 19 use crate::GuestMemoryMmap; 20 use crate::VirtioInterrupt; 21 use anyhow::anyhow; 22 use block::{ 23 async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request, 24 RequestType, VirtioBlockConfig, 25 }; 26 use rate_limiter::{RateLimiter, TokenType}; 27 use seccompiler::SeccompAction; 28 use std::collections::VecDeque; 29 use std::io; 30 use std::num::Wrapping; 31 use std::ops::Deref; 32 use std::os::unix::io::AsRawFd; 33 use std::path::PathBuf; 34 use std::result; 35 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 36 use std::sync::{Arc, Barrier}; 37 use std::{collections::HashMap, convert::TryInto}; 38 use thiserror::Error; 39 use versionize::{VersionMap, Versionize, VersionizeResult}; 40 use versionize_derive::Versionize; 41 use virtio_bindings::virtio_blk::*; 42 use virtio_bindings::virtio_config::*; 43 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 44 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic, GuestMemoryError}; 45 use vm_migration::VersionMapped; 46 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 47 use vm_virtio::AccessPlatform; 48 use vmm_sys_util::eventfd::EventFd; 49 50 const SECTOR_SHIFT: u8 = 9; 51 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 52 53 // New descriptors are pending on the virtio queue. 54 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 55 // New completed tasks are pending on the completion ring. 56 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 57 // New 'wake up' event from the rate limiter 58 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 59 60 // latency scale, for reduce precision loss in calculate. 61 const LATENCY_SCALE: u64 = 10000; 62 63 #[derive(Error, Debug)] 64 pub enum Error { 65 #[error("Failed to parse the request: {0}")] 66 RequestParsing(block::Error), 67 #[error("Failed to execute the request: {0}")] 68 RequestExecuting(block::ExecuteError), 69 #[error("Failed to complete the request: {0}")] 70 RequestCompleting(block::Error), 71 #[error("Missing the expected entry in the list of requests")] 72 MissingEntryRequestList, 73 #[error("The asynchronous request returned with failure")] 74 AsyncRequestFailure, 75 #[error("Failed synchronizing the file: {0}")] 76 Fsync(AsyncIoError), 77 #[error("Failed adding used index: {0}")] 78 QueueAddUsed(virtio_queue::Error), 79 #[error("Failed creating an iterator over the queue: {0}")] 80 QueueIterator(virtio_queue::Error), 81 #[error("Failed to update request status: {0}")] 82 RequestStatus(GuestMemoryError), 83 } 84 85 pub type Result<T> = result::Result<T, Error>; 86 87 // latency will be records as microseconds, average latency 88 // will be save as scaled value. 89 #[derive(Clone)] 90 pub struct BlockCounters { 91 read_bytes: Arc<AtomicU64>, 92 read_ops: Arc<AtomicU64>, 93 read_latency_min: Arc<AtomicU64>, 94 read_latency_max: Arc<AtomicU64>, 95 read_latency_avg: Arc<AtomicU64>, 96 write_bytes: Arc<AtomicU64>, 97 write_ops: Arc<AtomicU64>, 98 write_latency_min: Arc<AtomicU64>, 99 write_latency_max: Arc<AtomicU64>, 100 write_latency_avg: Arc<AtomicU64>, 101 } 102 103 impl Default for BlockCounters { 104 fn default() -> Self { 105 BlockCounters { 106 read_bytes: Arc::new(AtomicU64::new(0)), 107 read_ops: Arc::new(AtomicU64::new(0)), 108 read_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 109 read_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 110 read_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 111 write_bytes: Arc::new(AtomicU64::new(0)), 112 write_ops: Arc::new(AtomicU64::new(0)), 113 write_latency_min: Arc::new(AtomicU64::new(u64::MAX)), 114 write_latency_max: Arc::new(AtomicU64::new(u64::MAX)), 115 write_latency_avg: Arc::new(AtomicU64::new(u64::MAX)), 116 } 117 } 118 } 119 120 struct BlockEpollHandler { 121 queue_index: u16, 122 queue: Queue, 123 mem: GuestMemoryAtomic<GuestMemoryMmap>, 124 disk_image: Box<dyn AsyncIo>, 125 disk_nsectors: u64, 126 interrupt_cb: Arc<dyn VirtioInterrupt>, 127 disk_image_id: Vec<u8>, 128 kill_evt: EventFd, 129 pause_evt: EventFd, 130 writeback: Arc<AtomicBool>, 131 counters: BlockCounters, 132 queue_evt: EventFd, 133 inflight_requests: VecDeque<(u16, Request)>, 134 rate_limiter: Option<RateLimiter>, 135 access_platform: Option<Arc<dyn AccessPlatform>>, 136 read_only: bool, 137 } 138 139 impl BlockEpollHandler { 140 fn process_queue_submit(&mut self) -> Result<bool> { 141 let queue = &mut self.queue; 142 143 let mut used_descs = false; 144 145 while let Some(mut desc_chain) = queue.pop_descriptor_chain(self.mem.memory()) { 146 let mut request = Request::parse(&mut desc_chain, self.access_platform.as_ref()) 147 .map_err(Error::RequestParsing)?; 148 149 // For virtio spec compliance 150 // "A device MUST set the status byte to VIRTIO_BLK_S_IOERR for a write request 151 // if the VIRTIO_BLK_F_RO feature if offered, and MUST NOT write any data." 152 if self.read_only 153 && (request.request_type == RequestType::Out 154 || request.request_type == RequestType::Flush) 155 { 156 desc_chain 157 .memory() 158 .write_obj(VIRTIO_BLK_S_IOERR, request.status_addr) 159 .map_err(Error::RequestStatus)?; 160 161 // If no asynchronous operation has been submitted, we can 162 // simply return the used descriptor. 163 queue 164 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 165 .map_err(Error::QueueAddUsed)?; 166 used_descs = true; 167 continue; 168 } 169 170 if let Some(rate_limiter) = &mut self.rate_limiter { 171 // If limiter.consume() fails it means there is no more TokenType::Ops 172 // budget and rate limiting is in effect. 173 if !rate_limiter.consume(1, TokenType::Ops) { 174 // Stop processing the queue and return this descriptor chain to the 175 // avail ring, for later processing. 176 queue.go_to_previous_position(); 177 break; 178 } 179 // Exercise the rate limiter only if this request is of data transfer type. 180 if request.request_type == RequestType::In 181 || request.request_type == RequestType::Out 182 { 183 let mut bytes = Wrapping(0); 184 for (_, data_len) in &request.data_descriptors { 185 bytes += Wrapping(*data_len as u64); 186 } 187 188 // If limiter.consume() fails it means there is no more TokenType::Bytes 189 // budget and rate limiting is in effect. 190 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 191 // Revert the OPS consume(). 192 rate_limiter.manual_replenish(1, TokenType::Ops); 193 // Stop processing the queue and return this descriptor chain to the 194 // avail ring, for later processing. 195 queue.go_to_previous_position(); 196 break; 197 } 198 }; 199 } 200 201 request.set_writeback(self.writeback.load(Ordering::Acquire)); 202 203 if request 204 .execute_async( 205 desc_chain.memory(), 206 self.disk_nsectors, 207 self.disk_image.as_mut(), 208 &self.disk_image_id, 209 desc_chain.head_index() as u64, 210 ) 211 .map_err(Error::RequestExecuting)? 212 { 213 self.inflight_requests 214 .push_back((desc_chain.head_index(), request)); 215 } else { 216 desc_chain 217 .memory() 218 .write_obj(VIRTIO_BLK_S_OK, request.status_addr) 219 .map_err(Error::RequestStatus)?; 220 221 // If no asynchronous operation has been submitted, we can 222 // simply return the used descriptor. 223 queue 224 .add_used(desc_chain.memory(), desc_chain.head_index(), 0) 225 .map_err(Error::QueueAddUsed)?; 226 used_descs = true; 227 } 228 } 229 230 Ok(used_descs) 231 } 232 233 fn process_queue_submit_and_signal(&mut self) -> result::Result<(), EpollHelperError> { 234 let needs_notification = self.process_queue_submit().map_err(|e| { 235 EpollHelperError::HandleEvent(anyhow!("Failed to process queue (submit): {:?}", e)) 236 })?; 237 238 if needs_notification { 239 self.signal_used_queue().map_err(|e| { 240 EpollHelperError::HandleEvent(anyhow!("Failed to signal used queue: {:?}", e)) 241 })? 242 }; 243 244 Ok(()) 245 } 246 247 #[inline] 248 fn find_inflight_request(&mut self, completed_head: u16) -> Result<Request> { 249 // This loop neatly handles the fast path where the completions are 250 // in order (it turng into just a pop_front()) and the 1% of the time 251 // (analysis during boot) where slight out of ordering has been 252 // observed e.g. 253 // Submissions: 1 2 3 4 5 6 7 254 // Completions: 2 1 3 5 4 7 6 255 // In this case find the corresponding item and swap it with the front 256 // This is a O(1) operation and is prepared for the future as it it likely 257 // the next completion would be for the one that was skipped which will 258 // now be the new front. 259 for (i, (head, _)) in self.inflight_requests.iter().enumerate() { 260 if head == &completed_head { 261 return Ok(self.inflight_requests.swap_remove_front(i).unwrap().1); 262 } 263 } 264 265 Err(Error::MissingEntryRequestList) 266 } 267 268 fn process_queue_complete(&mut self) -> Result<bool> { 269 let mut used_descs = false; 270 let mem = self.mem.memory(); 271 let mut read_bytes = Wrapping(0); 272 let mut write_bytes = Wrapping(0); 273 let mut read_ops = Wrapping(0); 274 let mut write_ops = Wrapping(0); 275 276 while let Some((user_data, result)) = self.disk_image.next_completed_request() { 277 let desc_index = user_data as u16; 278 279 let mut request = self.find_inflight_request(desc_index)?; 280 281 request.complete_async().map_err(Error::RequestCompleting)?; 282 283 let latency = request.start.elapsed().as_micros() as u64; 284 let read_ops_last = self.counters.read_ops.load(Ordering::Relaxed); 285 let write_ops_last = self.counters.write_ops.load(Ordering::Relaxed); 286 let mut read_avg = self.counters.read_latency_avg.load(Ordering::Relaxed); 287 let mut write_avg = self.counters.write_latency_avg.load(Ordering::Relaxed); 288 let (status, len) = if result >= 0 { 289 match request.request_type { 290 RequestType::In => { 291 for (_, data_len) in &request.data_descriptors { 292 read_bytes += Wrapping(*data_len as u64); 293 } 294 read_ops += Wrapping(1); 295 if latency < self.counters.read_latency_min.load(Ordering::Relaxed) { 296 self.counters 297 .read_latency_min 298 .store(latency, Ordering::Relaxed); 299 } 300 if latency > self.counters.read_latency_max.load(Ordering::Relaxed) 301 || latency == u64::MAX 302 { 303 self.counters 304 .read_latency_max 305 .store(latency, Ordering::Relaxed); 306 } 307 308 // Special case the first real latency report 309 read_avg = if read_avg == u64::MAX { 310 latency 311 } else { 312 read_avg 313 + ((latency * LATENCY_SCALE) - read_avg) 314 / (read_ops_last + read_ops.0) 315 }; 316 } 317 RequestType::Out => { 318 if !request.writeback { 319 self.disk_image.fsync(None).map_err(Error::Fsync)?; 320 } 321 for (_, data_len) in &request.data_descriptors { 322 write_bytes += Wrapping(*data_len as u64); 323 } 324 write_ops += Wrapping(1); 325 if latency < self.counters.write_latency_min.load(Ordering::Relaxed) { 326 self.counters 327 .write_latency_min 328 .store(latency, Ordering::Relaxed); 329 } 330 if latency > self.counters.write_latency_max.load(Ordering::Relaxed) 331 || latency == u64::MAX 332 { 333 self.counters 334 .write_latency_max 335 .store(latency, Ordering::Relaxed); 336 } 337 338 // Special case the first real latency report 339 write_avg = if write_avg == u64::MAX { 340 latency 341 } else { 342 write_avg 343 + ((latency * LATENCY_SCALE) - write_avg) 344 / (write_ops_last + write_ops.0) 345 }; 346 } 347 _ => {} 348 } 349 350 self.counters 351 .read_latency_avg 352 .store(read_avg, Ordering::Relaxed); 353 354 self.counters 355 .write_latency_avg 356 .store(write_avg, Ordering::Relaxed); 357 358 (VIRTIO_BLK_S_OK, result as u32) 359 } else { 360 error!( 361 "Request failed: {:x?} {:?}", 362 request, 363 io::Error::from_raw_os_error(-result) 364 ); 365 return Err(Error::AsyncRequestFailure); 366 }; 367 368 mem.write_obj(status, request.status_addr) 369 .map_err(Error::RequestStatus)?; 370 371 let queue = &mut self.queue; 372 373 queue 374 .add_used(mem.deref(), desc_index, len) 375 .map_err(Error::QueueAddUsed)?; 376 used_descs = true; 377 } 378 379 self.counters 380 .write_bytes 381 .fetch_add(write_bytes.0, Ordering::AcqRel); 382 self.counters 383 .write_ops 384 .fetch_add(write_ops.0, Ordering::AcqRel); 385 386 self.counters 387 .read_bytes 388 .fetch_add(read_bytes.0, Ordering::AcqRel); 389 self.counters 390 .read_ops 391 .fetch_add(read_ops.0, Ordering::AcqRel); 392 393 Ok(used_descs) 394 } 395 396 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 397 self.interrupt_cb 398 .trigger(VirtioInterruptType::Queue(self.queue_index)) 399 .map_err(|e| { 400 error!("Failed to signal used queue: {:?}", e); 401 DeviceError::FailedSignalingUsedQueue(e) 402 }) 403 } 404 405 fn run( 406 &mut self, 407 paused: Arc<AtomicBool>, 408 paused_sync: Arc<Barrier>, 409 ) -> result::Result<(), EpollHelperError> { 410 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 411 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 412 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 413 if let Some(rate_limiter) = &self.rate_limiter { 414 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 415 } 416 helper.run(paused, paused_sync, self)?; 417 418 Ok(()) 419 } 420 } 421 422 impl EpollHelperHandler for BlockEpollHandler { 423 fn handle_event( 424 &mut self, 425 _helper: &mut EpollHelper, 426 event: &epoll::Event, 427 ) -> result::Result<(), EpollHelperError> { 428 let ev_type = event.data as u16; 429 match ev_type { 430 QUEUE_AVAIL_EVENT => { 431 self.queue_evt.read().map_err(|e| { 432 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 433 })?; 434 435 let rate_limit_reached = 436 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 437 438 // Process the queue only when the rate limit is not reached 439 if !rate_limit_reached { 440 self.process_queue_submit_and_signal()? 441 } 442 } 443 COMPLETION_EVENT => { 444 self.disk_image.notifier().read().map_err(|e| { 445 EpollHelperError::HandleEvent(anyhow!("Failed to get queue event: {:?}", e)) 446 })?; 447 448 let needs_notification = self.process_queue_complete().map_err(|e| { 449 EpollHelperError::HandleEvent(anyhow!( 450 "Failed to process queue (complete): {:?}", 451 e 452 )) 453 })?; 454 455 if needs_notification { 456 self.signal_used_queue().map_err(|e| { 457 EpollHelperError::HandleEvent(anyhow!( 458 "Failed to signal used queue: {:?}", 459 e 460 )) 461 })?; 462 } 463 } 464 RATE_LIMITER_EVENT => { 465 if let Some(rate_limiter) = &mut self.rate_limiter { 466 // Upon rate limiter event, call the rate limiter handler 467 // and restart processing the queue. 468 rate_limiter.event_handler().map_err(|e| { 469 EpollHelperError::HandleEvent(anyhow!( 470 "Failed to process rate limiter event: {:?}", 471 e 472 )) 473 })?; 474 475 self.process_queue_submit_and_signal()? 476 } else { 477 return Err(EpollHelperError::HandleEvent(anyhow!( 478 "Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled." 479 ))); 480 } 481 } 482 _ => { 483 return Err(EpollHelperError::HandleEvent(anyhow!( 484 "Unexpected event: {}", 485 ev_type 486 ))); 487 } 488 } 489 Ok(()) 490 } 491 } 492 493 /// Virtio device for exposing block level read/write operations on a host file. 494 pub struct Block { 495 common: VirtioCommon, 496 id: String, 497 disk_image: Box<dyn DiskFile>, 498 disk_path: PathBuf, 499 disk_nsectors: u64, 500 config: VirtioBlockConfig, 501 writeback: Arc<AtomicBool>, 502 counters: BlockCounters, 503 seccomp_action: SeccompAction, 504 rate_limiter_config: Option<RateLimiterConfig>, 505 exit_evt: EventFd, 506 read_only: bool, 507 } 508 509 #[derive(Versionize)] 510 pub struct BlockState { 511 pub disk_path: String, 512 pub disk_nsectors: u64, 513 pub avail_features: u64, 514 pub acked_features: u64, 515 pub config: VirtioBlockConfig, 516 } 517 518 impl VersionMapped for BlockState {} 519 520 impl Block { 521 /// Create a new virtio block device that operates on the given file. 522 #[allow(clippy::too_many_arguments)] 523 pub fn new( 524 id: String, 525 mut disk_image: Box<dyn DiskFile>, 526 disk_path: PathBuf, 527 read_only: bool, 528 iommu: bool, 529 num_queues: usize, 530 queue_size: u16, 531 seccomp_action: SeccompAction, 532 rate_limiter_config: Option<RateLimiterConfig>, 533 exit_evt: EventFd, 534 state: Option<BlockState>, 535 ) -> io::Result<Self> { 536 let (disk_nsectors, avail_features, acked_features, config, paused) = 537 if let Some(state) = state { 538 info!("Restoring virtio-block {}", id); 539 ( 540 state.disk_nsectors, 541 state.avail_features, 542 state.acked_features, 543 state.config, 544 true, 545 ) 546 } else { 547 let disk_size = disk_image.size().map_err(|e| { 548 io::Error::new( 549 io::ErrorKind::Other, 550 format!("Failed getting disk size: {e}"), 551 ) 552 })?; 553 if disk_size % SECTOR_SIZE != 0 { 554 warn!( 555 "Disk size {} is not a multiple of sector size {}; \ 556 the remainder will not be visible to the guest.", 557 disk_size, SECTOR_SIZE 558 ); 559 } 560 561 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 562 | (1u64 << VIRTIO_BLK_F_FLUSH) 563 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE) 564 | (1u64 << VIRTIO_BLK_F_BLK_SIZE) 565 | (1u64 << VIRTIO_BLK_F_TOPOLOGY); 566 567 if iommu { 568 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 569 } 570 571 if read_only { 572 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 573 } 574 575 let topology = disk_image.topology(); 576 info!("Disk topology: {:?}", topology); 577 578 let logical_block_size = if topology.logical_block_size > 512 { 579 topology.logical_block_size 580 } else { 581 512 582 }; 583 584 // Calculate the exponent that maps physical block to logical block 585 let mut physical_block_exp = 0; 586 let mut size = logical_block_size; 587 while size < topology.physical_block_size { 588 physical_block_exp += 1; 589 size <<= 1; 590 } 591 592 let disk_nsectors = disk_size / SECTOR_SIZE; 593 let mut config = VirtioBlockConfig { 594 capacity: disk_nsectors, 595 writeback: 1, 596 blk_size: topology.logical_block_size as u32, 597 physical_block_exp, 598 min_io_size: (topology.minimum_io_size / logical_block_size) as u16, 599 opt_io_size: (topology.optimal_io_size / logical_block_size) as u32, 600 ..Default::default() 601 }; 602 603 if num_queues > 1 { 604 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 605 config.num_queues = num_queues as u16; 606 } 607 608 (disk_nsectors, avail_features, 0, config, false) 609 }; 610 611 Ok(Block { 612 common: VirtioCommon { 613 device_type: VirtioDeviceType::Block as u32, 614 avail_features, 615 acked_features, 616 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 617 queue_sizes: vec![queue_size; num_queues], 618 min_queues: 1, 619 paused: Arc::new(AtomicBool::new(paused)), 620 ..Default::default() 621 }, 622 id, 623 disk_image, 624 disk_path, 625 disk_nsectors, 626 config, 627 writeback: Arc::new(AtomicBool::new(true)), 628 counters: BlockCounters::default(), 629 seccomp_action, 630 rate_limiter_config, 631 exit_evt, 632 read_only, 633 }) 634 } 635 636 fn state(&self) -> BlockState { 637 BlockState { 638 disk_path: self.disk_path.to_str().unwrap().to_owned(), 639 disk_nsectors: self.disk_nsectors, 640 avail_features: self.common.avail_features, 641 acked_features: self.common.acked_features, 642 config: self.config, 643 } 644 } 645 646 fn update_writeback(&mut self) { 647 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 648 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 649 self.config.writeback == 1 650 } else { 651 // Else check if VIRTIO_BLK_F_FLUSH negotiated 652 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 653 }; 654 655 info!( 656 "Changing cache mode to {}", 657 if writeback { 658 "writeback" 659 } else { 660 "writethrough" 661 } 662 ); 663 self.writeback.store(writeback, Ordering::Release); 664 } 665 666 #[cfg(fuzzing)] 667 pub fn wait_for_epoll_threads(&mut self) { 668 self.common.wait_for_epoll_threads(); 669 } 670 } 671 672 impl Drop for Block { 673 fn drop(&mut self) { 674 if let Some(kill_evt) = self.common.kill_evt.take() { 675 // Ignore the result because there is nothing we can do about it. 676 let _ = kill_evt.write(1); 677 } 678 self.common.wait_for_epoll_threads(); 679 } 680 } 681 682 impl VirtioDevice for Block { 683 fn device_type(&self) -> u32 { 684 self.common.device_type 685 } 686 687 fn queue_max_sizes(&self) -> &[u16] { 688 &self.common.queue_sizes 689 } 690 691 fn features(&self) -> u64 { 692 self.common.avail_features 693 } 694 695 fn ack_features(&mut self, value: u64) { 696 self.common.ack_features(value) 697 } 698 699 fn read_config(&self, offset: u64, data: &mut [u8]) { 700 self.read_config_from_slice(self.config.as_slice(), offset, data); 701 } 702 703 fn write_config(&mut self, offset: u64, data: &[u8]) { 704 // The "writeback" field is the only mutable field 705 let writeback_offset = 706 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 707 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 708 { 709 error!( 710 "Attempt to write to read-only field: offset {:x} length {}", 711 offset, 712 data.len() 713 ); 714 return; 715 } 716 717 self.config.writeback = data[0]; 718 self.update_writeback(); 719 } 720 721 fn activate( 722 &mut self, 723 mem: GuestMemoryAtomic<GuestMemoryMmap>, 724 interrupt_cb: Arc<dyn VirtioInterrupt>, 725 mut queues: Vec<(usize, Queue, EventFd)>, 726 ) -> ActivateResult { 727 self.common.activate(&queues, &interrupt_cb)?; 728 729 let disk_image_id = build_disk_image_id(&self.disk_path); 730 self.update_writeback(); 731 732 let mut epoll_threads = Vec::new(); 733 for i in 0..queues.len() { 734 let (_, queue, queue_evt) = queues.remove(0); 735 let queue_size = queue.size(); 736 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 737 738 let rate_limiter: Option<RateLimiter> = self 739 .rate_limiter_config 740 .map(RateLimiterConfig::try_into) 741 .transpose() 742 .map_err(ActivateError::CreateRateLimiter)?; 743 744 let mut handler = BlockEpollHandler { 745 queue_index: i as u16, 746 queue, 747 mem: mem.clone(), 748 disk_image: self 749 .disk_image 750 .new_async_io(queue_size as u32) 751 .map_err(|e| { 752 error!("failed to create new AsyncIo: {}", e); 753 ActivateError::BadActivate 754 })?, 755 disk_nsectors: self.disk_nsectors, 756 interrupt_cb: interrupt_cb.clone(), 757 disk_image_id: disk_image_id.clone(), 758 kill_evt, 759 pause_evt, 760 writeback: self.writeback.clone(), 761 counters: self.counters.clone(), 762 queue_evt, 763 // Analysis during boot shows around ~40 maximum requests 764 // This gives head room for systems with slower I/O without 765 // compromising the cost of the reallocation or memory overhead 766 inflight_requests: VecDeque::with_capacity(64), 767 rate_limiter, 768 access_platform: self.common.access_platform.clone(), 769 read_only: self.read_only, 770 }; 771 772 let paused = self.common.paused.clone(); 773 let paused_sync = self.common.paused_sync.clone(); 774 775 spawn_virtio_thread( 776 &format!("{}_q{}", self.id.clone(), i), 777 &self.seccomp_action, 778 Thread::VirtioBlock, 779 &mut epoll_threads, 780 &self.exit_evt, 781 move || handler.run(paused, paused_sync.unwrap()), 782 )?; 783 } 784 785 self.common.epoll_threads = Some(epoll_threads); 786 event!("virtio-device", "activated", "id", &self.id); 787 788 Ok(()) 789 } 790 791 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 792 let result = self.common.reset(); 793 event!("virtio-device", "reset", "id", &self.id); 794 result 795 } 796 797 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 798 let mut counters = HashMap::new(); 799 800 counters.insert( 801 "read_bytes", 802 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 803 ); 804 counters.insert( 805 "write_bytes", 806 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 807 ); 808 counters.insert( 809 "read_ops", 810 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 811 ); 812 counters.insert( 813 "write_ops", 814 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 815 ); 816 counters.insert( 817 "write_latency_min", 818 Wrapping(self.counters.write_latency_min.load(Ordering::Acquire)), 819 ); 820 counters.insert( 821 "write_latency_max", 822 Wrapping(self.counters.write_latency_max.load(Ordering::Acquire)), 823 ); 824 counters.insert( 825 "write_latency_avg", 826 Wrapping(self.counters.write_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 827 ); 828 counters.insert( 829 "read_latency_min", 830 Wrapping(self.counters.read_latency_min.load(Ordering::Acquire)), 831 ); 832 counters.insert( 833 "read_latency_max", 834 Wrapping(self.counters.read_latency_max.load(Ordering::Acquire)), 835 ); 836 counters.insert( 837 "read_latency_avg", 838 Wrapping(self.counters.read_latency_avg.load(Ordering::Acquire) / LATENCY_SCALE), 839 ); 840 841 Some(counters) 842 } 843 844 fn set_access_platform(&mut self, access_platform: Arc<dyn AccessPlatform>) { 845 self.common.set_access_platform(access_platform) 846 } 847 } 848 849 impl Pausable for Block { 850 fn pause(&mut self) -> result::Result<(), MigratableError> { 851 self.common.pause() 852 } 853 854 fn resume(&mut self) -> result::Result<(), MigratableError> { 855 self.common.resume() 856 } 857 } 858 859 impl Snapshottable for Block { 860 fn id(&self) -> String { 861 self.id.clone() 862 } 863 864 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 865 Snapshot::new_from_versioned_state(&self.state()) 866 } 867 } 868 impl Transportable for Block {} 869 impl Migratable for Block {} 870