1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 use std::fs::File; 12 use std::fs::OpenOptions; 13 use std::io::Read; 14 use std::io::{Seek, SeekFrom, Write}; 15 use std::ops::Deref; 16 use std::ops::DerefMut; 17 use std::os::unix::fs::OpenOptionsExt; 18 use std::path::PathBuf; 19 use std::process; 20 use std::result; 21 use std::sync::atomic::{AtomicBool, Ordering}; 22 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 23 use std::time::Instant; 24 use std::{convert, error, fmt, io}; 25 26 use block::{ 27 build_serial, 28 qcow::{self, ImageType, QcowFile}, 29 Request, VirtioBlockConfig, 30 }; 31 use libc::EFD_NONBLOCK; 32 use log::*; 33 use option_parser::{OptionParser, OptionParserError, Toggle}; 34 use vhost::vhost_user::message::*; 35 use vhost::vhost_user::Listener; 36 use vhost_user_backend::{ 37 bitmap::BitmapMmapRegion, VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT, 38 }; 39 use virtio_bindings::virtio_blk::*; 40 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1; 41 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 42 use virtio_queue::QueueT; 43 use vm_memory::GuestAddressSpace; 44 use vm_memory::{ByteValued, Bytes, GuestMemoryAtomic}; 45 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; 46 47 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>; 48 49 const SECTOR_SHIFT: u8 = 9; 50 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 51 const BLK_SIZE: u32 = 512; 52 // Current (2020) enterprise SSDs have a latency lower than 30us. 53 // Polling for 50us should be enough to cover for the device latency 54 // and the overhead of the emulation layer. 55 const POLL_QUEUE_US: u128 = 50; 56 57 trait DiskFile: Read + Seek + Write + Send {} 58 impl<D: Read + Seek + Write + Send> DiskFile for D {} 59 60 type Result<T> = std::result::Result<T, Error>; 61 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 62 63 #[allow(dead_code)] 64 #[derive(Debug)] 65 enum Error { 66 /// Failed to create kill eventfd 67 CreateKillEventFd(io::Error), 68 /// Failed to parse configuration string 69 FailedConfigParse(OptionParserError), 70 /// Failed to handle event other than input event. 71 HandleEventNotEpollIn, 72 /// Failed to handle unknown event. 73 HandleEventUnknownEvent, 74 /// No path provided 75 PathParameterMissing, 76 /// No socket provided 77 SocketParameterMissing, 78 } 79 80 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 81 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 82 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 83 poll_queue=true|false\""; 84 85 impl fmt::Display for Error { 86 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 87 write!(f, "vhost_user_block_error: {self:?}") 88 } 89 } 90 91 impl error::Error for Error {} 92 93 impl convert::From<Error> for io::Error { 94 fn from(e: Error) -> Self { 95 io::Error::new(io::ErrorKind::Other, e) 96 } 97 } 98 99 struct VhostUserBlkThread { 100 disk_image: Arc<Mutex<dyn DiskFile>>, 101 serial: Vec<u8>, 102 disk_nsectors: u64, 103 event_idx: bool, 104 kill_evt: EventFd, 105 writeback: Arc<AtomicBool>, 106 mem: GuestMemoryAtomic<GuestMemoryMmap>, 107 } 108 109 impl VhostUserBlkThread { 110 fn new( 111 disk_image: Arc<Mutex<dyn DiskFile>>, 112 serial: Vec<u8>, 113 disk_nsectors: u64, 114 writeback: Arc<AtomicBool>, 115 mem: GuestMemoryAtomic<GuestMemoryMmap>, 116 ) -> Result<Self> { 117 Ok(VhostUserBlkThread { 118 disk_image, 119 serial, 120 disk_nsectors, 121 event_idx: false, 122 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 123 writeback, 124 mem, 125 }) 126 } 127 128 fn process_queue( 129 &mut self, 130 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 131 ) -> bool { 132 let mut used_descs = false; 133 134 while let Some(mut desc_chain) = vring 135 .get_queue_mut() 136 .pop_descriptor_chain(self.mem.memory()) 137 { 138 debug!("got an element in the queue"); 139 let len; 140 match Request::parse(&mut desc_chain, None) { 141 Ok(mut request) => { 142 debug!("element is a valid request"); 143 request.set_writeback(self.writeback.load(Ordering::Acquire)); 144 let status = match request.execute( 145 &mut self.disk_image.lock().unwrap().deref_mut(), 146 self.disk_nsectors, 147 desc_chain.memory(), 148 &self.serial, 149 ) { 150 Ok(l) => { 151 len = l; 152 VIRTIO_BLK_S_OK as u8 153 } 154 Err(e) => { 155 len = 1; 156 e.status() 157 } 158 }; 159 desc_chain 160 .memory() 161 .write_obj(status, request.status_addr) 162 .unwrap(); 163 } 164 Err(err) => { 165 error!("failed to parse available descriptor chain: {:?}", err); 166 len = 0; 167 } 168 } 169 170 vring 171 .get_queue_mut() 172 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 173 .unwrap(); 174 used_descs = true; 175 } 176 177 let mut needs_signalling = false; 178 if self.event_idx { 179 if vring 180 .get_queue_mut() 181 .needs_notification(self.mem.memory().deref()) 182 .unwrap() 183 { 184 debug!("signalling queue"); 185 needs_signalling = true; 186 } else { 187 debug!("omitting signal (event_idx)"); 188 } 189 } else { 190 debug!("signalling queue"); 191 needs_signalling = true; 192 } 193 194 if needs_signalling { 195 vring.signal_used_queue().unwrap(); 196 } 197 198 used_descs 199 } 200 } 201 202 struct VhostUserBlkBackend { 203 threads: Vec<Mutex<VhostUserBlkThread>>, 204 config: VirtioBlockConfig, 205 rdonly: bool, 206 poll_queue: bool, 207 queues_per_thread: Vec<u64>, 208 queue_size: usize, 209 acked_features: u64, 210 writeback: Arc<AtomicBool>, 211 mem: GuestMemoryAtomic<GuestMemoryMmap>, 212 } 213 214 impl VhostUserBlkBackend { 215 fn new( 216 image_path: String, 217 num_queues: usize, 218 rdonly: bool, 219 direct: bool, 220 poll_queue: bool, 221 queue_size: usize, 222 mem: GuestMemoryAtomic<GuestMemoryMmap>, 223 ) -> Result<Self> { 224 let mut options = OpenOptions::new(); 225 options.read(true); 226 options.write(!rdonly); 227 if direct { 228 options.custom_flags(libc::O_DIRECT); 229 } 230 let image: File = options.open(&image_path).unwrap(); 231 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 232 233 let serial = build_serial(&PathBuf::from(&image_path)); 234 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 235 let image = match image_type { 236 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 237 ImageType::Qcow2 => { 238 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 239 } 240 }; 241 242 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE; 243 let config = VirtioBlockConfig { 244 capacity: nsectors, 245 blk_size: BLK_SIZE, 246 size_max: 65535, 247 seg_max: 128 - 2, 248 min_io_size: 1, 249 opt_io_size: 1, 250 num_queues: num_queues as u16, 251 writeback: 1, 252 ..Default::default() 253 }; 254 255 let mut queues_per_thread = Vec::new(); 256 let mut threads = Vec::new(); 257 let writeback = Arc::new(AtomicBool::new(true)); 258 for i in 0..num_queues { 259 let thread = Mutex::new(VhostUserBlkThread::new( 260 image.clone(), 261 serial.clone(), 262 nsectors, 263 writeback.clone(), 264 mem.clone(), 265 )?); 266 threads.push(thread); 267 queues_per_thread.push(0b1 << i); 268 } 269 270 Ok(VhostUserBlkBackend { 271 threads, 272 config, 273 rdonly, 274 poll_queue, 275 queues_per_thread, 276 queue_size, 277 acked_features: 0, 278 writeback, 279 mem, 280 }) 281 } 282 283 fn update_writeback(&mut self) { 284 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 285 let writeback = 286 if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE { 287 self.config.writeback == 1 288 } else { 289 // Else check if VIRTIO_BLK_F_FLUSH negotiated 290 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH 291 }; 292 293 info!( 294 "Changing cache mode to {}", 295 if writeback { 296 "writeback" 297 } else { 298 "writethrough" 299 } 300 ); 301 self.writeback.store(writeback, Ordering::Release); 302 } 303 } 304 305 impl VhostUserBackendMut for VhostUserBlkBackend { 306 type Bitmap = BitmapMmapRegion; 307 type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>; 308 309 fn num_queues(&self) -> usize { 310 self.config.num_queues as usize 311 } 312 313 fn max_queue_size(&self) -> usize { 314 self.queue_size 315 } 316 317 fn features(&self) -> u64 { 318 let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX 319 | 1 << VIRTIO_BLK_F_BLK_SIZE 320 | 1 << VIRTIO_BLK_F_FLUSH 321 | 1 << VIRTIO_BLK_F_TOPOLOGY 322 | 1 << VIRTIO_BLK_F_MQ 323 | 1 << VIRTIO_BLK_F_CONFIG_WCE 324 | 1 << VIRTIO_RING_F_EVENT_IDX 325 | 1 << VIRTIO_F_VERSION_1 326 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 327 328 if self.rdonly { 329 avail_features |= 1 << VIRTIO_BLK_F_RO; 330 } 331 avail_features 332 } 333 334 fn acked_features(&mut self, features: u64) { 335 self.acked_features = features; 336 self.update_writeback(); 337 } 338 339 fn protocol_features(&self) -> VhostUserProtocolFeatures { 340 VhostUserProtocolFeatures::CONFIG 341 | VhostUserProtocolFeatures::MQ 342 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 343 } 344 345 fn set_event_idx(&mut self, enabled: bool) { 346 for thread in self.threads.iter() { 347 thread.lock().unwrap().event_idx = enabled; 348 } 349 } 350 351 fn handle_event( 352 &mut self, 353 device_event: u16, 354 evset: EventSet, 355 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 356 thread_id: usize, 357 ) -> VhostUserBackendResult<()> { 358 if evset != EventSet::IN { 359 return Err(Error::HandleEventNotEpollIn.into()); 360 } 361 362 debug!("event received: {:?}", device_event); 363 364 let mut thread = self.threads[thread_id].lock().unwrap(); 365 match device_event { 366 0 => { 367 let mut vring = vrings[0].get_mut(); 368 369 if self.poll_queue { 370 // Actively poll the queue until POLL_QUEUE_US has passed 371 // without seeing a new request. 372 let mut now = Instant::now(); 373 loop { 374 if thread.process_queue(&mut vring) { 375 now = Instant::now(); 376 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 377 break; 378 } 379 } 380 } 381 382 if thread.event_idx { 383 // vm-virtio's Queue implementation only checks avail_index 384 // once, so to properly support EVENT_IDX we need to keep 385 // calling process_queue() until it stops finding new 386 // requests on the queue. 387 loop { 388 vring 389 .get_queue_mut() 390 .enable_notification(self.mem.memory().deref()) 391 .unwrap(); 392 if !thread.process_queue(&mut vring) { 393 break; 394 } 395 } 396 } else { 397 // Without EVENT_IDX, a single call is enough. 398 thread.process_queue(&mut vring); 399 } 400 401 Ok(()) 402 } 403 _ => Err(Error::HandleEventUnknownEvent.into()), 404 } 405 } 406 407 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 408 self.config.as_slice().to_vec() 409 } 410 411 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 412 let config_slice = self.config.as_mut_slice(); 413 let data_len = data.len() as u32; 414 let config_len = config_slice.len() as u32; 415 if offset + data_len > config_len { 416 error!("Failed to write config space"); 417 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 418 } 419 let (_, right) = config_slice.split_at_mut(offset as usize); 420 right.copy_from_slice(data); 421 self.update_writeback(); 422 Ok(()) 423 } 424 425 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 426 Some( 427 self.threads[thread_index] 428 .lock() 429 .unwrap() 430 .kill_evt 431 .try_clone() 432 .unwrap(), 433 ) 434 } 435 436 fn queues_per_thread(&self) -> Vec<u64> { 437 self.queues_per_thread.clone() 438 } 439 440 fn update_memory( 441 &mut self, 442 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 443 ) -> VhostUserBackendResult<()> { 444 Ok(()) 445 } 446 } 447 448 struct VhostUserBlkBackendConfig { 449 path: String, 450 socket: String, 451 num_queues: usize, 452 queue_size: usize, 453 readonly: bool, 454 direct: bool, 455 poll_queue: bool, 456 } 457 458 impl VhostUserBlkBackendConfig { 459 fn parse(backend: &str) -> Result<Self> { 460 let mut parser = OptionParser::new(); 461 parser 462 .add("path") 463 .add("readonly") 464 .add("direct") 465 .add("num_queues") 466 .add("queue_size") 467 .add("socket") 468 .add("poll_queue"); 469 parser.parse(backend).map_err(Error::FailedConfigParse)?; 470 471 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 472 let readonly = parser 473 .convert::<Toggle>("readonly") 474 .map_err(Error::FailedConfigParse)? 475 .unwrap_or(Toggle(false)) 476 .0; 477 let direct = parser 478 .convert::<Toggle>("direct") 479 .map_err(Error::FailedConfigParse)? 480 .unwrap_or(Toggle(false)) 481 .0; 482 let num_queues = parser 483 .convert("num_queues") 484 .map_err(Error::FailedConfigParse)? 485 .unwrap_or(1); 486 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 487 let poll_queue = parser 488 .convert::<Toggle>("poll_queue") 489 .map_err(Error::FailedConfigParse)? 490 .unwrap_or(Toggle(true)) 491 .0; 492 let queue_size = parser 493 .convert("queue_size") 494 .map_err(Error::FailedConfigParse)? 495 .unwrap_or(1024); 496 497 Ok(VhostUserBlkBackendConfig { 498 path, 499 socket, 500 num_queues, 501 queue_size, 502 readonly, 503 direct, 504 poll_queue, 505 }) 506 } 507 } 508 509 pub fn start_block_backend(backend_command: &str) { 510 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 511 Ok(config) => config, 512 Err(e) => { 513 println!("Failed parsing parameters {e:?}"); 514 process::exit(1); 515 } 516 }; 517 518 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); 519 520 let blk_backend = Arc::new(RwLock::new( 521 VhostUserBlkBackend::new( 522 backend_config.path, 523 backend_config.num_queues, 524 backend_config.readonly, 525 backend_config.direct, 526 backend_config.poll_queue, 527 backend_config.queue_size, 528 mem.clone(), 529 ) 530 .unwrap(), 531 )); 532 533 debug!("blk_backend is created!\n"); 534 535 let listener = Listener::new(&backend_config.socket, true).unwrap(); 536 537 let name = "vhost-user-blk-backend"; 538 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap(); 539 540 debug!("blk_daemon is created!\n"); 541 542 if let Err(e) = blk_daemon.start(listener) { 543 error!( 544 "Failed to start daemon for vhost-user-block with error: {:?}\n", 545 e 546 ); 547 process::exit(1); 548 } 549 550 if let Err(e) = blk_daemon.wait() { 551 error!("Error from the main thread: {:?}", e); 552 } 553 554 for thread in blk_backend.read().unwrap().threads.iter() { 555 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 556 error!("Error shutting down worker thread: {:?}", e) 557 } 558 } 559 } 560