1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // 3 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 4 // Use of this source code is governed by a BSD-style license that can be 5 // found in the LICENSE-BSD-3-Clause file. 6 // 7 // Copyright © 2020 Intel Corporation 8 // 9 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 10 11 use super::Error as DeviceError; 12 use super::{ 13 ActivateError, ActivateResult, EpollHelper, EpollHelperError, EpollHelperHandler, Queue, 14 RateLimiterConfig, VirtioCommon, VirtioDevice, VirtioDeviceType, VirtioInterruptType, 15 EPOLL_HELPER_EVENT_LAST, 16 }; 17 use crate::seccomp_filters::Thread; 18 use crate::thread_helper::spawn_virtio_thread; 19 use crate::GuestMemoryMmap; 20 use crate::VirtioInterrupt; 21 use block_util::{ 22 async_io::AsyncIo, async_io::AsyncIoError, async_io::DiskFile, build_disk_image_id, Request, 23 RequestType, VirtioBlockConfig, 24 }; 25 use rate_limiter::{RateLimiter, TokenType}; 26 use seccompiler::SeccompAction; 27 use std::io; 28 use std::num::Wrapping; 29 use std::os::unix::io::AsRawFd; 30 use std::path::PathBuf; 31 use std::result; 32 use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; 33 use std::sync::{Arc, Barrier}; 34 use std::{collections::HashMap, convert::TryInto}; 35 use versionize::{VersionMap, Versionize, VersionizeResult}; 36 use versionize_derive::Versionize; 37 use virtio_bindings::bindings::virtio_blk::*; 38 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic}; 39 use vm_migration::VersionMapped; 40 use vm_migration::{Migratable, MigratableError, Pausable, Snapshot, Snapshottable, Transportable}; 41 use vmm_sys_util::eventfd::EventFd; 42 43 const SECTOR_SHIFT: u8 = 9; 44 pub const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 45 46 // New descriptors are pending on the virtio queue. 47 const QUEUE_AVAIL_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 1; 48 // New completed tasks are pending on the completion ring. 49 const COMPLETION_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 2; 50 // New 'wake up' event from the rate limiter 51 const RATE_LIMITER_EVENT: u16 = EPOLL_HELPER_EVENT_LAST + 3; 52 53 #[derive(Debug)] 54 pub enum Error { 55 /// Failed to parse the request. 56 RequestParsing(block_util::Error), 57 /// Failed to execute the request. 58 RequestExecuting(block_util::ExecuteError), 59 /// Missing the expected entry in the list of requests. 60 MissingEntryRequestList, 61 /// The asynchronous request returned with failure. 62 AsyncRequestFailure, 63 /// Failed synchronizing the file 64 Fsync(AsyncIoError), 65 } 66 67 pub type Result<T> = result::Result<T, Error>; 68 69 #[derive(Default, Clone)] 70 pub struct BlockCounters { 71 read_bytes: Arc<AtomicU64>, 72 read_ops: Arc<AtomicU64>, 73 write_bytes: Arc<AtomicU64>, 74 write_ops: Arc<AtomicU64>, 75 } 76 77 struct BlockEpollHandler { 78 queue: Queue, 79 mem: GuestMemoryAtomic<GuestMemoryMmap>, 80 disk_image: Box<dyn AsyncIo>, 81 disk_nsectors: u64, 82 interrupt_cb: Arc<dyn VirtioInterrupt>, 83 disk_image_id: Vec<u8>, 84 kill_evt: EventFd, 85 pause_evt: EventFd, 86 writeback: Arc<AtomicBool>, 87 counters: BlockCounters, 88 queue_evt: EventFd, 89 request_list: HashMap<u16, Request>, 90 rate_limiter: Option<RateLimiter>, 91 } 92 93 impl BlockEpollHandler { 94 fn process_queue_submit(&mut self) -> Result<bool> { 95 let queue = &mut self.queue; 96 let mem = self.mem.memory(); 97 98 let mut used_desc_heads = Vec::new(); 99 let mut used_count = 0; 100 101 for avail_desc in queue.iter(&mem) { 102 let mut request = Request::parse(&avail_desc, &mem).map_err(Error::RequestParsing)?; 103 104 if let Some(rate_limiter) = &mut self.rate_limiter { 105 // If limiter.consume() fails it means there is no more TokenType::Ops 106 // budget and rate limiting is in effect. 107 if !rate_limiter.consume(1, TokenType::Ops) { 108 // Stop processing the queue and return this descriptor chain to the 109 // avail ring, for later processing. 110 queue.go_to_previous_position(); 111 break; 112 } 113 // Exercise the rate limiter only if this request is of data transfer type. 114 if request.request_type == RequestType::In 115 || request.request_type == RequestType::Out 116 { 117 let mut bytes = Wrapping(0); 118 for (_, data_len) in &request.data_descriptors { 119 bytes += Wrapping(*data_len as u64); 120 } 121 122 // If limiter.consume() fails it means there is no more TokenType::Bytes 123 // budget and rate limiting is in effect. 124 if !rate_limiter.consume(bytes.0, TokenType::Bytes) { 125 // Revert the OPS consume(). 126 rate_limiter.manual_replenish(1, TokenType::Ops); 127 // Stop processing the queue and return this descriptor chain to the 128 // avail ring, for later processing. 129 queue.go_to_previous_position(); 130 break; 131 } 132 }; 133 } 134 135 request.set_writeback(self.writeback.load(Ordering::Acquire)); 136 137 if request 138 .execute_async( 139 &mem, 140 self.disk_nsectors, 141 self.disk_image.as_mut(), 142 &self.disk_image_id, 143 avail_desc.index as u64, 144 ) 145 .map_err(Error::RequestExecuting)? 146 { 147 self.request_list.insert(avail_desc.index, request); 148 } else { 149 // We use unwrap because the request parsing process already 150 // checked that the status_addr was valid. 151 mem.write_obj(VIRTIO_BLK_S_OK, request.status_addr).unwrap(); 152 153 // If no asynchronous operation has been submitted, we can 154 // simply return the used descriptor. 155 used_desc_heads.push((avail_desc.index, 0)); 156 used_count += 1; 157 } 158 } 159 160 for &(desc_index, len) in used_desc_heads.iter() { 161 queue.add_used(&mem, desc_index, len); 162 } 163 164 Ok(used_count > 0) 165 } 166 167 fn process_queue_complete(&mut self) -> Result<bool> { 168 let queue = &mut self.queue; 169 170 let mut used_desc_heads = Vec::new(); 171 let mut used_count = 0; 172 let mem = self.mem.memory(); 173 let mut read_bytes = Wrapping(0); 174 let mut write_bytes = Wrapping(0); 175 let mut read_ops = Wrapping(0); 176 let mut write_ops = Wrapping(0); 177 178 let completion_list = self.disk_image.complete(); 179 for (user_data, result) in completion_list { 180 let desc_index = user_data as u16; 181 let request = self 182 .request_list 183 .remove(&desc_index) 184 .ok_or(Error::MissingEntryRequestList)?; 185 186 let (status, len) = if result >= 0 { 187 match request.request_type { 188 RequestType::In => { 189 for (_, data_len) in &request.data_descriptors { 190 read_bytes += Wrapping(*data_len as u64); 191 } 192 read_ops += Wrapping(1); 193 } 194 RequestType::Out => { 195 if !request.writeback { 196 self.disk_image.fsync(None).map_err(Error::Fsync)?; 197 } 198 for (_, data_len) in &request.data_descriptors { 199 write_bytes += Wrapping(*data_len as u64); 200 } 201 write_ops += Wrapping(1); 202 } 203 _ => {} 204 } 205 206 (VIRTIO_BLK_S_OK, result as u32) 207 } else { 208 error!( 209 "Request failed: {:?}", 210 io::Error::from_raw_os_error(-result) 211 ); 212 return Err(Error::AsyncRequestFailure); 213 }; 214 215 // We use unwrap because the request parsing process already 216 // checked that the status_addr was valid. 217 mem.write_obj(status, request.status_addr).unwrap(); 218 219 used_desc_heads.push((desc_index as u16, len)); 220 used_count += 1; 221 } 222 223 for &(desc_index, len) in used_desc_heads.iter() { 224 queue.add_used(&mem, desc_index, len); 225 } 226 227 self.counters 228 .write_bytes 229 .fetch_add(write_bytes.0, Ordering::AcqRel); 230 self.counters 231 .write_ops 232 .fetch_add(write_ops.0, Ordering::AcqRel); 233 234 self.counters 235 .read_bytes 236 .fetch_add(read_bytes.0, Ordering::AcqRel); 237 self.counters 238 .read_ops 239 .fetch_add(read_ops.0, Ordering::AcqRel); 240 241 Ok(used_count > 0) 242 } 243 244 fn signal_used_queue(&self) -> result::Result<(), DeviceError> { 245 self.interrupt_cb 246 .trigger(&VirtioInterruptType::Queue, Some(&self.queue)) 247 .map_err(|e| { 248 error!("Failed to signal used queue: {:?}", e); 249 DeviceError::FailedSignalingUsedQueue(e) 250 }) 251 } 252 253 fn run( 254 &mut self, 255 paused: Arc<AtomicBool>, 256 paused_sync: Arc<Barrier>, 257 ) -> result::Result<(), EpollHelperError> { 258 let mut helper = EpollHelper::new(&self.kill_evt, &self.pause_evt)?; 259 helper.add_event(self.queue_evt.as_raw_fd(), QUEUE_AVAIL_EVENT)?; 260 helper.add_event(self.disk_image.notifier().as_raw_fd(), COMPLETION_EVENT)?; 261 if let Some(rate_limiter) = &self.rate_limiter { 262 helper.add_event(rate_limiter.as_raw_fd(), RATE_LIMITER_EVENT)?; 263 } 264 helper.run(paused, paused_sync, self)?; 265 266 Ok(()) 267 } 268 } 269 270 impl EpollHelperHandler for BlockEpollHandler { 271 fn handle_event(&mut self, _helper: &mut EpollHelper, event: &epoll::Event) -> bool { 272 let ev_type = event.data as u16; 273 match ev_type { 274 QUEUE_AVAIL_EVENT => { 275 if let Err(e) = self.queue_evt.read() { 276 error!("Failed to get queue event: {:?}", e); 277 return true; 278 } 279 280 let rate_limit_reached = 281 self.rate_limiter.as_ref().map_or(false, |r| r.is_blocked()); 282 283 // Process the queue only when the rate limit is not reached 284 if !rate_limit_reached { 285 match self.process_queue_submit() { 286 Ok(needs_notification) => { 287 if needs_notification { 288 if let Err(e) = self.signal_used_queue() { 289 error!("Failed to signal used queue: {:?}", e); 290 return true; 291 } 292 } 293 } 294 Err(e) => { 295 error!("Failed to process queue (submit): {:?}", e); 296 return true; 297 } 298 } 299 } 300 } 301 COMPLETION_EVENT => { 302 if let Err(e) = self.disk_image.notifier().read() { 303 error!("Failed to get queue event: {:?}", e); 304 return true; 305 } 306 307 match self.process_queue_complete() { 308 Ok(needs_notification) => { 309 if needs_notification { 310 if let Err(e) = self.signal_used_queue() { 311 error!("Failed to signal used queue: {:?}", e); 312 return true; 313 } 314 } 315 } 316 Err(e) => { 317 error!("Failed to process queue (complete): {:?}", e); 318 return true; 319 } 320 } 321 } 322 RATE_LIMITER_EVENT => { 323 if let Some(rate_limiter) = &mut self.rate_limiter { 324 // Upon rate limiter event, call the rate limiter handler 325 // and restart processing the queue. 326 if rate_limiter.event_handler().is_ok() { 327 match self.process_queue_submit() { 328 Ok(needs_notification) => { 329 if needs_notification { 330 if let Err(e) = self.signal_used_queue() { 331 error!("Failed to signal used queue: {:?}", e); 332 return true; 333 } 334 } 335 } 336 Err(e) => { 337 error!("Failed to process queue (submit): {:?}", e); 338 return true; 339 } 340 } 341 } 342 } else { 343 error!("Unexpected 'RATE_LIMITER_EVENT' when rate_limiter is not enabled."); 344 return true; 345 } 346 } 347 _ => { 348 error!("Unexpected event: {}", ev_type); 349 return true; 350 } 351 } 352 false 353 } 354 } 355 356 /// Virtio device for exposing block level read/write operations on a host file. 357 pub struct Block { 358 common: VirtioCommon, 359 id: String, 360 disk_image: Box<dyn DiskFile>, 361 disk_path: PathBuf, 362 disk_nsectors: u64, 363 config: VirtioBlockConfig, 364 writeback: Arc<AtomicBool>, 365 counters: BlockCounters, 366 seccomp_action: SeccompAction, 367 rate_limiter_config: Option<RateLimiterConfig>, 368 exit_evt: EventFd, 369 } 370 371 #[derive(Versionize)] 372 pub struct BlockState { 373 pub disk_path: String, 374 pub disk_nsectors: u64, 375 pub avail_features: u64, 376 pub acked_features: u64, 377 pub config: VirtioBlockConfig, 378 } 379 380 impl VersionMapped for BlockState {} 381 382 impl Block { 383 /// Create a new virtio block device that operates on the given file. 384 #[allow(clippy::too_many_arguments)] 385 pub fn new( 386 id: String, 387 mut disk_image: Box<dyn DiskFile>, 388 disk_path: PathBuf, 389 is_disk_read_only: bool, 390 iommu: bool, 391 num_queues: usize, 392 queue_size: u16, 393 seccomp_action: SeccompAction, 394 rate_limiter_config: Option<RateLimiterConfig>, 395 exit_evt: EventFd, 396 ) -> io::Result<Self> { 397 let disk_size = disk_image.size().map_err(|e| { 398 io::Error::new( 399 io::ErrorKind::Other, 400 format!("Failed getting disk size: {}", e), 401 ) 402 })?; 403 if disk_size % SECTOR_SIZE != 0 { 404 warn!( 405 "Disk size {} is not a multiple of sector size {}; \ 406 the remainder will not be visible to the guest.", 407 disk_size, SECTOR_SIZE 408 ); 409 } 410 411 let mut avail_features = (1u64 << VIRTIO_F_VERSION_1) 412 | (1u64 << VIRTIO_BLK_F_FLUSH) 413 | (1u64 << VIRTIO_BLK_F_CONFIG_WCE); 414 415 if iommu { 416 avail_features |= 1u64 << VIRTIO_F_IOMMU_PLATFORM; 417 } 418 419 if is_disk_read_only { 420 avail_features |= 1u64 << VIRTIO_BLK_F_RO; 421 } 422 423 let disk_nsectors = disk_size / SECTOR_SIZE; 424 let mut config = VirtioBlockConfig { 425 capacity: disk_nsectors, 426 writeback: 1, 427 ..Default::default() 428 }; 429 430 if num_queues > 1 { 431 avail_features |= 1u64 << VIRTIO_BLK_F_MQ; 432 config.num_queues = num_queues as u16; 433 } 434 435 Ok(Block { 436 common: VirtioCommon { 437 device_type: VirtioDeviceType::Block as u32, 438 avail_features, 439 paused_sync: Some(Arc::new(Barrier::new(num_queues + 1))), 440 queue_sizes: vec![queue_size; num_queues], 441 min_queues: 1, 442 ..Default::default() 443 }, 444 id, 445 disk_image, 446 disk_path, 447 disk_nsectors, 448 config, 449 writeback: Arc::new(AtomicBool::new(true)), 450 counters: BlockCounters::default(), 451 seccomp_action, 452 rate_limiter_config, 453 exit_evt, 454 }) 455 } 456 457 fn state(&self) -> BlockState { 458 BlockState { 459 disk_path: self.disk_path.to_str().unwrap().to_owned(), 460 disk_nsectors: self.disk_nsectors, 461 avail_features: self.common.avail_features, 462 acked_features: self.common.acked_features, 463 config: self.config, 464 } 465 } 466 467 fn set_state(&mut self, state: &BlockState) { 468 self.disk_path = state.disk_path.clone().into(); 469 self.disk_nsectors = state.disk_nsectors; 470 self.common.avail_features = state.avail_features; 471 self.common.acked_features = state.acked_features; 472 self.config = state.config; 473 } 474 475 fn update_writeback(&mut self) { 476 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 477 let writeback = if self.common.feature_acked(VIRTIO_BLK_F_CONFIG_WCE.into()) { 478 self.config.writeback == 1 479 } else { 480 // Else check if VIRTIO_BLK_F_FLUSH negotiated 481 self.common.feature_acked(VIRTIO_BLK_F_FLUSH.into()) 482 }; 483 484 info!( 485 "Changing cache mode to {}", 486 if writeback { 487 "writeback" 488 } else { 489 "writethrough" 490 } 491 ); 492 self.writeback.store(writeback, Ordering::Release); 493 } 494 } 495 496 impl Drop for Block { 497 fn drop(&mut self) { 498 if let Some(kill_evt) = self.common.kill_evt.take() { 499 // Ignore the result because there is nothing we can do about it. 500 let _ = kill_evt.write(1); 501 } 502 } 503 } 504 505 impl VirtioDevice for Block { 506 fn device_type(&self) -> u32 { 507 self.common.device_type 508 } 509 510 fn queue_max_sizes(&self) -> &[u16] { 511 &self.common.queue_sizes 512 } 513 514 fn features(&self) -> u64 { 515 self.common.avail_features 516 } 517 518 fn ack_features(&mut self, value: u64) { 519 self.common.ack_features(value) 520 } 521 522 fn read_config(&self, offset: u64, data: &mut [u8]) { 523 self.read_config_from_slice(self.config.as_slice(), offset, data); 524 } 525 526 fn write_config(&mut self, offset: u64, data: &[u8]) { 527 // The "writeback" field is the only mutable field 528 let writeback_offset = 529 (&self.config.writeback as *const _ as u64) - (&self.config as *const _ as u64); 530 if offset != writeback_offset || data.len() != std::mem::size_of_val(&self.config.writeback) 531 { 532 error!( 533 "Attempt to write to read-only field: offset {:x} length {}", 534 offset, 535 data.len() 536 ); 537 return; 538 } 539 540 self.config.writeback = data[0]; 541 self.update_writeback(); 542 } 543 544 fn activate( 545 &mut self, 546 mem: GuestMemoryAtomic<GuestMemoryMmap>, 547 interrupt_cb: Arc<dyn VirtioInterrupt>, 548 mut queues: Vec<Queue>, 549 mut queue_evts: Vec<EventFd>, 550 ) -> ActivateResult { 551 self.common.activate(&queues, &queue_evts, &interrupt_cb)?; 552 553 let disk_image_id = build_disk_image_id(&self.disk_path); 554 self.update_writeback(); 555 556 let mut epoll_threads = Vec::new(); 557 for i in 0..queues.len() { 558 let queue_evt = queue_evts.remove(0); 559 let queue = queues.remove(0); 560 let queue_size = queue.size; 561 let (kill_evt, pause_evt) = self.common.dup_eventfds(); 562 563 let rate_limiter: Option<RateLimiter> = self 564 .rate_limiter_config 565 .map(RateLimiterConfig::try_into) 566 .transpose() 567 .map_err(ActivateError::CreateRateLimiter)?; 568 569 let mut handler = BlockEpollHandler { 570 queue, 571 mem: mem.clone(), 572 disk_image: self 573 .disk_image 574 .new_async_io(queue_size as u32) 575 .map_err(|e| { 576 error!("failed to create new AsyncIo: {}", e); 577 ActivateError::BadActivate 578 })?, 579 disk_nsectors: self.disk_nsectors, 580 interrupt_cb: interrupt_cb.clone(), 581 disk_image_id: disk_image_id.clone(), 582 kill_evt, 583 pause_evt, 584 writeback: self.writeback.clone(), 585 counters: self.counters.clone(), 586 queue_evt, 587 request_list: HashMap::with_capacity(queue_size.into()), 588 rate_limiter, 589 }; 590 591 let paused = self.common.paused.clone(); 592 let paused_sync = self.common.paused_sync.clone(); 593 594 spawn_virtio_thread( 595 &format!("{}_q{}", self.id.clone(), i), 596 &self.seccomp_action, 597 Thread::VirtioBlock, 598 &mut epoll_threads, 599 &self.exit_evt, 600 move || { 601 if let Err(e) = handler.run(paused, paused_sync.unwrap()) { 602 error!("Error running worker: {:?}", e); 603 } 604 }, 605 )?; 606 } 607 608 self.common.epoll_threads = Some(epoll_threads); 609 event!("virtio-device", "activated", "id", &self.id); 610 611 Ok(()) 612 } 613 614 fn reset(&mut self) -> Option<Arc<dyn VirtioInterrupt>> { 615 let result = self.common.reset(); 616 event!("virtio-device", "reset", "id", &self.id); 617 result 618 } 619 620 fn counters(&self) -> Option<HashMap<&'static str, Wrapping<u64>>> { 621 let mut counters = HashMap::new(); 622 623 counters.insert( 624 "read_bytes", 625 Wrapping(self.counters.read_bytes.load(Ordering::Acquire)), 626 ); 627 counters.insert( 628 "write_bytes", 629 Wrapping(self.counters.write_bytes.load(Ordering::Acquire)), 630 ); 631 counters.insert( 632 "read_ops", 633 Wrapping(self.counters.read_ops.load(Ordering::Acquire)), 634 ); 635 counters.insert( 636 "write_ops", 637 Wrapping(self.counters.write_ops.load(Ordering::Acquire)), 638 ); 639 640 Some(counters) 641 } 642 } 643 644 impl Pausable for Block { 645 fn pause(&mut self) -> result::Result<(), MigratableError> { 646 self.common.pause() 647 } 648 649 fn resume(&mut self) -> result::Result<(), MigratableError> { 650 self.common.resume() 651 } 652 } 653 654 impl Snapshottable for Block { 655 fn id(&self) -> String { 656 self.id.clone() 657 } 658 659 fn snapshot(&mut self) -> std::result::Result<Snapshot, MigratableError> { 660 Snapshot::new_from_versioned_state(&self.id(), &self.state()) 661 } 662 663 fn restore(&mut self, snapshot: Snapshot) -> std::result::Result<(), MigratableError> { 664 self.set_state(&snapshot.to_versioned_state(&self.id)?); 665 Ok(()) 666 } 667 } 668 impl Transportable for Block {} 669 impl Migratable for Block {} 670