1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 #![allow(clippy::significant_drop_in_scrutinee)] 12 13 use block_util::{build_disk_image_id, Request, VirtioBlockConfig}; 14 use libc::EFD_NONBLOCK; 15 use log::*; 16 use option_parser::{OptionParser, OptionParserError, Toggle}; 17 use qcow::{self, ImageType, QcowFile}; 18 use std::fs::File; 19 use std::fs::OpenOptions; 20 use std::io::Read; 21 use std::io::{Seek, SeekFrom, Write}; 22 use std::ops::Deref; 23 use std::ops::DerefMut; 24 use std::os::unix::fs::OpenOptionsExt; 25 use std::path::PathBuf; 26 use std::process; 27 use std::result; 28 use std::sync::atomic::{AtomicBool, Ordering}; 29 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 30 use std::time::Instant; 31 use std::vec::Vec; 32 use std::{convert, error, fmt, io}; 33 use vhost::vhost_user::message::*; 34 use vhost::vhost_user::Listener; 35 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT}; 36 use virtio_bindings::bindings::virtio_blk::*; 37 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 38 use virtio_queue::QueueT; 39 use vm_memory::GuestAddressSpace; 40 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic}; 41 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; 42 43 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>; 44 45 const SECTOR_SHIFT: u8 = 9; 46 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 47 const BLK_SIZE: u32 = 512; 48 // Current (2020) enterprise SSDs have a latency lower than 30us. 49 // Polling for 50us should be enough to cover for the device latency 50 // and the overhead of the emulation layer. 51 const POLL_QUEUE_US: u128 = 50; 52 53 trait DiskFile: Read + Seek + Write + Send {} 54 impl<D: Read + Seek + Write + Send> DiskFile for D {} 55 56 type Result<T> = std::result::Result<T, Error>; 57 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 58 59 #[derive(Debug)] 60 enum Error { 61 /// Failed to create kill eventfd 62 CreateKillEventFd(io::Error), 63 /// Failed to parse configuration string 64 FailedConfigParse(OptionParserError), 65 /// Failed to handle event other than input event. 66 HandleEventNotEpollIn, 67 /// Failed to handle unknown event. 68 HandleEventUnknownEvent, 69 /// No path provided 70 PathParameterMissing, 71 /// No socket provided 72 SocketParameterMissing, 73 } 74 75 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 76 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 77 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 78 poll_queue=true|false\""; 79 80 impl fmt::Display for Error { 81 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 82 write!(f, "vhost_user_block_error: {:?}", self) 83 } 84 } 85 86 impl error::Error for Error {} 87 88 impl convert::From<Error> for io::Error { 89 fn from(e: Error) -> Self { 90 io::Error::new(io::ErrorKind::Other, e) 91 } 92 } 93 94 struct VhostUserBlkThread { 95 disk_image: Arc<Mutex<dyn DiskFile>>, 96 disk_image_id: Vec<u8>, 97 disk_nsectors: u64, 98 event_idx: bool, 99 kill_evt: EventFd, 100 writeback: Arc<AtomicBool>, 101 mem: GuestMemoryAtomic<GuestMemoryMmap>, 102 } 103 104 impl VhostUserBlkThread { 105 fn new( 106 disk_image: Arc<Mutex<dyn DiskFile>>, 107 disk_image_id: Vec<u8>, 108 disk_nsectors: u64, 109 writeback: Arc<AtomicBool>, 110 mem: GuestMemoryAtomic<GuestMemoryMmap>, 111 ) -> Result<Self> { 112 Ok(VhostUserBlkThread { 113 disk_image, 114 disk_image_id, 115 disk_nsectors, 116 event_idx: false, 117 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 118 writeback, 119 mem, 120 }) 121 } 122 123 fn process_queue( 124 &mut self, 125 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 126 ) -> bool { 127 let mut used_descs = false; 128 129 while let Some(mut desc_chain) = vring 130 .get_queue_mut() 131 .pop_descriptor_chain(self.mem.memory()) 132 { 133 debug!("got an element in the queue"); 134 let len; 135 match Request::parse(&mut desc_chain, None) { 136 Ok(mut request) => { 137 debug!("element is a valid request"); 138 request.set_writeback(self.writeback.load(Ordering::Acquire)); 139 let status = match request.execute( 140 &mut self.disk_image.lock().unwrap().deref_mut(), 141 self.disk_nsectors, 142 desc_chain.memory(), 143 &self.disk_image_id, 144 ) { 145 Ok(l) => { 146 len = l; 147 VIRTIO_BLK_S_OK 148 } 149 Err(e) => { 150 len = 1; 151 e.status() 152 } 153 }; 154 desc_chain 155 .memory() 156 .write_obj(status, request.status_addr) 157 .unwrap(); 158 } 159 Err(err) => { 160 error!("failed to parse available descriptor chain: {:?}", err); 161 len = 0; 162 } 163 } 164 165 vring 166 .get_queue_mut() 167 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 168 .unwrap(); 169 used_descs = true; 170 } 171 172 let mut needs_signalling = false; 173 if self.event_idx { 174 if vring 175 .get_queue_mut() 176 .needs_notification(self.mem.memory().deref()) 177 .unwrap() 178 { 179 debug!("signalling queue"); 180 needs_signalling = true; 181 } else { 182 debug!("omitting signal (event_idx)"); 183 } 184 } else { 185 debug!("signalling queue"); 186 needs_signalling = true; 187 } 188 189 if needs_signalling { 190 vring.signal_used_queue().unwrap(); 191 } 192 193 used_descs 194 } 195 } 196 197 struct VhostUserBlkBackend { 198 threads: Vec<Mutex<VhostUserBlkThread>>, 199 config: VirtioBlockConfig, 200 rdonly: bool, 201 poll_queue: bool, 202 queues_per_thread: Vec<u64>, 203 queue_size: usize, 204 acked_features: u64, 205 writeback: Arc<AtomicBool>, 206 mem: GuestMemoryAtomic<GuestMemoryMmap>, 207 } 208 209 impl VhostUserBlkBackend { 210 fn new( 211 image_path: String, 212 num_queues: usize, 213 rdonly: bool, 214 direct: bool, 215 poll_queue: bool, 216 queue_size: usize, 217 mem: GuestMemoryAtomic<GuestMemoryMmap>, 218 ) -> Result<Self> { 219 let mut options = OpenOptions::new(); 220 options.read(true); 221 options.write(!rdonly); 222 if direct { 223 options.custom_flags(libc::O_DIRECT); 224 } 225 let image: File = options.open(&image_path).unwrap(); 226 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 227 228 let image_id = build_disk_image_id(&PathBuf::from(&image_path)); 229 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 230 let image = match image_type { 231 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 232 ImageType::Qcow2 => { 233 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 234 } 235 }; 236 237 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap() as u64) / SECTOR_SIZE; 238 let config = VirtioBlockConfig { 239 capacity: nsectors, 240 blk_size: BLK_SIZE, 241 size_max: 65535, 242 seg_max: 128 - 2, 243 min_io_size: 1, 244 opt_io_size: 1, 245 num_queues: num_queues as u16, 246 writeback: 1, 247 ..Default::default() 248 }; 249 250 let mut queues_per_thread = Vec::new(); 251 let mut threads = Vec::new(); 252 let writeback = Arc::new(AtomicBool::new(true)); 253 for i in 0..num_queues { 254 let thread = Mutex::new(VhostUserBlkThread::new( 255 image.clone(), 256 image_id.clone(), 257 nsectors, 258 writeback.clone(), 259 mem.clone(), 260 )?); 261 threads.push(thread); 262 queues_per_thread.push(0b1 << i); 263 } 264 265 Ok(VhostUserBlkBackend { 266 threads, 267 config, 268 rdonly, 269 poll_queue, 270 queues_per_thread, 271 queue_size, 272 acked_features: 0, 273 writeback, 274 mem, 275 }) 276 } 277 278 fn update_writeback(&mut self) { 279 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 280 let writeback = 281 if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE { 282 self.config.writeback == 1 283 } else { 284 // Else check if VIRTIO_BLK_F_FLUSH negotiated 285 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH 286 }; 287 288 info!( 289 "Changing cache mode to {}", 290 if writeback { 291 "writeback" 292 } else { 293 "writethrough" 294 } 295 ); 296 self.writeback.store(writeback, Ordering::Release); 297 } 298 } 299 300 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap> 301 for VhostUserBlkBackend 302 { 303 fn num_queues(&self) -> usize { 304 self.config.num_queues as usize 305 } 306 307 fn max_queue_size(&self) -> usize { 308 self.queue_size as usize 309 } 310 311 fn features(&self) -> u64 { 312 let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX 313 | 1 << VIRTIO_BLK_F_BLK_SIZE 314 | 1 << VIRTIO_BLK_F_FLUSH 315 | 1 << VIRTIO_BLK_F_TOPOLOGY 316 | 1 << VIRTIO_BLK_F_MQ 317 | 1 << VIRTIO_BLK_F_CONFIG_WCE 318 | 1 << VIRTIO_RING_F_EVENT_IDX 319 | 1 << VIRTIO_F_VERSION_1 320 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 321 322 if self.rdonly { 323 avail_features |= 1 << VIRTIO_BLK_F_RO; 324 } 325 avail_features 326 } 327 328 fn acked_features(&mut self, features: u64) { 329 self.acked_features = features; 330 self.update_writeback(); 331 } 332 333 fn protocol_features(&self) -> VhostUserProtocolFeatures { 334 VhostUserProtocolFeatures::CONFIG 335 | VhostUserProtocolFeatures::MQ 336 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 337 } 338 339 fn set_event_idx(&mut self, enabled: bool) { 340 for thread in self.threads.iter() { 341 thread.lock().unwrap().event_idx = enabled; 342 } 343 } 344 345 fn handle_event( 346 &mut self, 347 device_event: u16, 348 evset: EventSet, 349 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 350 thread_id: usize, 351 ) -> VhostUserBackendResult<bool> { 352 if evset != EventSet::IN { 353 return Err(Error::HandleEventNotEpollIn.into()); 354 } 355 356 debug!("event received: {:?}", device_event); 357 358 let mut thread = self.threads[thread_id].lock().unwrap(); 359 match device_event { 360 0 => { 361 let mut vring = vrings[0].get_mut(); 362 363 if self.poll_queue { 364 // Actively poll the queue until POLL_QUEUE_US has passed 365 // without seeing a new request. 366 let mut now = Instant::now(); 367 loop { 368 if thread.process_queue(&mut vring) { 369 now = Instant::now(); 370 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 371 break; 372 } 373 } 374 } 375 376 if thread.event_idx { 377 // vm-virtio's Queue implementation only checks avail_index 378 // once, so to properly support EVENT_IDX we need to keep 379 // calling process_queue() until it stops finding new 380 // requests on the queue. 381 loop { 382 vring 383 .get_queue_mut() 384 .enable_notification(self.mem.memory().deref()) 385 .unwrap(); 386 if !thread.process_queue(&mut vring) { 387 break; 388 } 389 } 390 } else { 391 // Without EVENT_IDX, a single call is enough. 392 thread.process_queue(&mut vring); 393 } 394 395 Ok(false) 396 } 397 _ => Err(Error::HandleEventUnknownEvent.into()), 398 } 399 } 400 401 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 402 self.config.as_slice().to_vec() 403 } 404 405 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 406 let config_slice = self.config.as_mut_slice(); 407 let data_len = data.len() as u32; 408 let config_len = config_slice.len() as u32; 409 if offset + data_len > config_len { 410 error!("Failed to write config space"); 411 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 412 } 413 let (_, right) = config_slice.split_at_mut(offset as usize); 414 right.copy_from_slice(data); 415 self.update_writeback(); 416 Ok(()) 417 } 418 419 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 420 Some( 421 self.threads[thread_index] 422 .lock() 423 .unwrap() 424 .kill_evt 425 .try_clone() 426 .unwrap(), 427 ) 428 } 429 430 fn queues_per_thread(&self) -> Vec<u64> { 431 self.queues_per_thread.clone() 432 } 433 434 fn update_memory( 435 &mut self, 436 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 437 ) -> VhostUserBackendResult<()> { 438 Ok(()) 439 } 440 } 441 442 struct VhostUserBlkBackendConfig { 443 path: String, 444 socket: String, 445 num_queues: usize, 446 queue_size: usize, 447 readonly: bool, 448 direct: bool, 449 poll_queue: bool, 450 } 451 452 impl VhostUserBlkBackendConfig { 453 fn parse(backend: &str) -> Result<Self> { 454 let mut parser = OptionParser::new(); 455 parser 456 .add("path") 457 .add("readonly") 458 .add("direct") 459 .add("num_queues") 460 .add("queue_size") 461 .add("socket") 462 .add("poll_queue"); 463 parser.parse(backend).map_err(Error::FailedConfigParse)?; 464 465 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 466 let readonly = parser 467 .convert::<Toggle>("readonly") 468 .map_err(Error::FailedConfigParse)? 469 .unwrap_or(Toggle(false)) 470 .0; 471 let direct = parser 472 .convert::<Toggle>("direct") 473 .map_err(Error::FailedConfigParse)? 474 .unwrap_or(Toggle(false)) 475 .0; 476 let num_queues = parser 477 .convert("num_queues") 478 .map_err(Error::FailedConfigParse)? 479 .unwrap_or(1); 480 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 481 let poll_queue = parser 482 .convert::<Toggle>("poll_queue") 483 .map_err(Error::FailedConfigParse)? 484 .unwrap_or(Toggle(true)) 485 .0; 486 let queue_size = parser 487 .convert("queue_size") 488 .map_err(Error::FailedConfigParse)? 489 .unwrap_or(1024); 490 491 Ok(VhostUserBlkBackendConfig { 492 path, 493 socket, 494 num_queues, 495 queue_size, 496 readonly, 497 direct, 498 poll_queue, 499 }) 500 } 501 } 502 503 pub fn start_block_backend(backend_command: &str) { 504 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 505 Ok(config) => config, 506 Err(e) => { 507 println!("Failed parsing parameters {:?}", e); 508 process::exit(1); 509 } 510 }; 511 512 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); 513 514 let blk_backend = Arc::new(RwLock::new( 515 VhostUserBlkBackend::new( 516 backend_config.path, 517 backend_config.num_queues, 518 backend_config.readonly, 519 backend_config.direct, 520 backend_config.poll_queue, 521 backend_config.queue_size, 522 mem.clone(), 523 ) 524 .unwrap(), 525 )); 526 527 debug!("blk_backend is created!\n"); 528 529 let listener = Listener::new(&backend_config.socket, true).unwrap(); 530 531 let name = "vhost-user-blk-backend"; 532 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap(); 533 534 debug!("blk_daemon is created!\n"); 535 536 if let Err(e) = blk_daemon.start(listener) { 537 error!( 538 "Failed to start daemon for vhost-user-block with error: {:?}\n", 539 e 540 ); 541 process::exit(1); 542 } 543 544 if let Err(e) = blk_daemon.wait() { 545 error!("Error from the main thread: {:?}", e); 546 } 547 548 for thread in blk_backend.read().unwrap().threads.iter() { 549 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 550 error!("Error shutting down worker thread: {:?}", e) 551 } 552 } 553 } 554