1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 use block::{ 12 build_serial, 13 qcow::{self, ImageType, QcowFile}, 14 Request, VirtioBlockConfig, 15 }; 16 use libc::EFD_NONBLOCK; 17 use log::*; 18 use option_parser::{OptionParser, OptionParserError, Toggle}; 19 use std::fs::File; 20 use std::fs::OpenOptions; 21 use std::io::Read; 22 use std::io::{Seek, SeekFrom, Write}; 23 use std::ops::Deref; 24 use std::ops::DerefMut; 25 use std::os::unix::fs::OpenOptionsExt; 26 use std::path::PathBuf; 27 use std::process; 28 use std::result; 29 use std::sync::atomic::{AtomicBool, Ordering}; 30 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 31 use std::time::Instant; 32 use std::{convert, error, fmt, io}; 33 use vhost::vhost_user::message::*; 34 use vhost::vhost_user::Listener; 35 use vhost_user_backend::{ 36 bitmap::BitmapMmapRegion, VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT, 37 }; 38 use virtio_bindings::virtio_blk::*; 39 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1; 40 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 41 use virtio_queue::QueueT; 42 use vm_memory::GuestAddressSpace; 43 use vm_memory::{ByteValued, Bytes, GuestMemoryAtomic}; 44 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; 45 46 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>; 47 48 const SECTOR_SHIFT: u8 = 9; 49 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 50 const BLK_SIZE: u32 = 512; 51 // Current (2020) enterprise SSDs have a latency lower than 30us. 52 // Polling for 50us should be enough to cover for the device latency 53 // and the overhead of the emulation layer. 54 const POLL_QUEUE_US: u128 = 50; 55 56 trait DiskFile: Read + Seek + Write + Send {} 57 impl<D: Read + Seek + Write + Send> DiskFile for D {} 58 59 type Result<T> = std::result::Result<T, Error>; 60 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 61 62 #[allow(dead_code)] 63 #[derive(Debug)] 64 enum Error { 65 /// Failed to create kill eventfd 66 CreateKillEventFd(io::Error), 67 /// Failed to parse configuration string 68 FailedConfigParse(OptionParserError), 69 /// Failed to handle event other than input event. 70 HandleEventNotEpollIn, 71 /// Failed to handle unknown event. 72 HandleEventUnknownEvent, 73 /// No path provided 74 PathParameterMissing, 75 /// No socket provided 76 SocketParameterMissing, 77 } 78 79 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 80 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 81 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 82 poll_queue=true|false\""; 83 84 impl fmt::Display for Error { 85 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 86 write!(f, "vhost_user_block_error: {self:?}") 87 } 88 } 89 90 impl error::Error for Error {} 91 92 impl convert::From<Error> for io::Error { 93 fn from(e: Error) -> Self { 94 io::Error::new(io::ErrorKind::Other, e) 95 } 96 } 97 98 struct VhostUserBlkThread { 99 disk_image: Arc<Mutex<dyn DiskFile>>, 100 serial: Vec<u8>, 101 disk_nsectors: u64, 102 event_idx: bool, 103 kill_evt: EventFd, 104 writeback: Arc<AtomicBool>, 105 mem: GuestMemoryAtomic<GuestMemoryMmap>, 106 } 107 108 impl VhostUserBlkThread { 109 fn new( 110 disk_image: Arc<Mutex<dyn DiskFile>>, 111 serial: Vec<u8>, 112 disk_nsectors: u64, 113 writeback: Arc<AtomicBool>, 114 mem: GuestMemoryAtomic<GuestMemoryMmap>, 115 ) -> Result<Self> { 116 Ok(VhostUserBlkThread { 117 disk_image, 118 serial, 119 disk_nsectors, 120 event_idx: false, 121 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 122 writeback, 123 mem, 124 }) 125 } 126 127 fn process_queue( 128 &mut self, 129 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 130 ) -> bool { 131 let mut used_descs = false; 132 133 while let Some(mut desc_chain) = vring 134 .get_queue_mut() 135 .pop_descriptor_chain(self.mem.memory()) 136 { 137 debug!("got an element in the queue"); 138 let len; 139 match Request::parse(&mut desc_chain, None) { 140 Ok(mut request) => { 141 debug!("element is a valid request"); 142 request.set_writeback(self.writeback.load(Ordering::Acquire)); 143 let status = match request.execute( 144 &mut self.disk_image.lock().unwrap().deref_mut(), 145 self.disk_nsectors, 146 desc_chain.memory(), 147 &self.serial, 148 ) { 149 Ok(l) => { 150 len = l; 151 VIRTIO_BLK_S_OK as u8 152 } 153 Err(e) => { 154 len = 1; 155 e.status() 156 } 157 }; 158 desc_chain 159 .memory() 160 .write_obj(status, request.status_addr) 161 .unwrap(); 162 } 163 Err(err) => { 164 error!("failed to parse available descriptor chain: {:?}", err); 165 len = 0; 166 } 167 } 168 169 vring 170 .get_queue_mut() 171 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 172 .unwrap(); 173 used_descs = true; 174 } 175 176 let mut needs_signalling = false; 177 if self.event_idx { 178 if vring 179 .get_queue_mut() 180 .needs_notification(self.mem.memory().deref()) 181 .unwrap() 182 { 183 debug!("signalling queue"); 184 needs_signalling = true; 185 } else { 186 debug!("omitting signal (event_idx)"); 187 } 188 } else { 189 debug!("signalling queue"); 190 needs_signalling = true; 191 } 192 193 if needs_signalling { 194 vring.signal_used_queue().unwrap(); 195 } 196 197 used_descs 198 } 199 } 200 201 struct VhostUserBlkBackend { 202 threads: Vec<Mutex<VhostUserBlkThread>>, 203 config: VirtioBlockConfig, 204 rdonly: bool, 205 poll_queue: bool, 206 queues_per_thread: Vec<u64>, 207 queue_size: usize, 208 acked_features: u64, 209 writeback: Arc<AtomicBool>, 210 mem: GuestMemoryAtomic<GuestMemoryMmap>, 211 } 212 213 impl VhostUserBlkBackend { 214 fn new( 215 image_path: String, 216 num_queues: usize, 217 rdonly: bool, 218 direct: bool, 219 poll_queue: bool, 220 queue_size: usize, 221 mem: GuestMemoryAtomic<GuestMemoryMmap>, 222 ) -> Result<Self> { 223 let mut options = OpenOptions::new(); 224 options.read(true); 225 options.write(!rdonly); 226 if direct { 227 options.custom_flags(libc::O_DIRECT); 228 } 229 let image: File = options.open(&image_path).unwrap(); 230 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 231 232 let serial = build_serial(&PathBuf::from(&image_path)); 233 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 234 let image = match image_type { 235 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 236 ImageType::Qcow2 => { 237 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 238 } 239 }; 240 241 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE; 242 let config = VirtioBlockConfig { 243 capacity: nsectors, 244 blk_size: BLK_SIZE, 245 size_max: 65535, 246 seg_max: 128 - 2, 247 min_io_size: 1, 248 opt_io_size: 1, 249 num_queues: num_queues as u16, 250 writeback: 1, 251 ..Default::default() 252 }; 253 254 let mut queues_per_thread = Vec::new(); 255 let mut threads = Vec::new(); 256 let writeback = Arc::new(AtomicBool::new(true)); 257 for i in 0..num_queues { 258 let thread = Mutex::new(VhostUserBlkThread::new( 259 image.clone(), 260 serial.clone(), 261 nsectors, 262 writeback.clone(), 263 mem.clone(), 264 )?); 265 threads.push(thread); 266 queues_per_thread.push(0b1 << i); 267 } 268 269 Ok(VhostUserBlkBackend { 270 threads, 271 config, 272 rdonly, 273 poll_queue, 274 queues_per_thread, 275 queue_size, 276 acked_features: 0, 277 writeback, 278 mem, 279 }) 280 } 281 282 fn update_writeback(&mut self) { 283 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 284 let writeback = 285 if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE { 286 self.config.writeback == 1 287 } else { 288 // Else check if VIRTIO_BLK_F_FLUSH negotiated 289 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH 290 }; 291 292 info!( 293 "Changing cache mode to {}", 294 if writeback { 295 "writeback" 296 } else { 297 "writethrough" 298 } 299 ); 300 self.writeback.store(writeback, Ordering::Release); 301 } 302 } 303 304 impl VhostUserBackendMut for VhostUserBlkBackend { 305 type Bitmap = BitmapMmapRegion; 306 type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>; 307 308 fn num_queues(&self) -> usize { 309 self.config.num_queues as usize 310 } 311 312 fn max_queue_size(&self) -> usize { 313 self.queue_size 314 } 315 316 fn features(&self) -> u64 { 317 let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX 318 | 1 << VIRTIO_BLK_F_BLK_SIZE 319 | 1 << VIRTIO_BLK_F_FLUSH 320 | 1 << VIRTIO_BLK_F_TOPOLOGY 321 | 1 << VIRTIO_BLK_F_MQ 322 | 1 << VIRTIO_BLK_F_CONFIG_WCE 323 | 1 << VIRTIO_RING_F_EVENT_IDX 324 | 1 << VIRTIO_F_VERSION_1 325 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 326 327 if self.rdonly { 328 avail_features |= 1 << VIRTIO_BLK_F_RO; 329 } 330 avail_features 331 } 332 333 fn acked_features(&mut self, features: u64) { 334 self.acked_features = features; 335 self.update_writeback(); 336 } 337 338 fn protocol_features(&self) -> VhostUserProtocolFeatures { 339 VhostUserProtocolFeatures::CONFIG 340 | VhostUserProtocolFeatures::MQ 341 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 342 } 343 344 fn set_event_idx(&mut self, enabled: bool) { 345 for thread in self.threads.iter() { 346 thread.lock().unwrap().event_idx = enabled; 347 } 348 } 349 350 fn handle_event( 351 &mut self, 352 device_event: u16, 353 evset: EventSet, 354 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 355 thread_id: usize, 356 ) -> VhostUserBackendResult<()> { 357 if evset != EventSet::IN { 358 return Err(Error::HandleEventNotEpollIn.into()); 359 } 360 361 debug!("event received: {:?}", device_event); 362 363 let mut thread = self.threads[thread_id].lock().unwrap(); 364 match device_event { 365 0 => { 366 let mut vring = vrings[0].get_mut(); 367 368 if self.poll_queue { 369 // Actively poll the queue until POLL_QUEUE_US has passed 370 // without seeing a new request. 371 let mut now = Instant::now(); 372 loop { 373 if thread.process_queue(&mut vring) { 374 now = Instant::now(); 375 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 376 break; 377 } 378 } 379 } 380 381 if thread.event_idx { 382 // vm-virtio's Queue implementation only checks avail_index 383 // once, so to properly support EVENT_IDX we need to keep 384 // calling process_queue() until it stops finding new 385 // requests on the queue. 386 loop { 387 vring 388 .get_queue_mut() 389 .enable_notification(self.mem.memory().deref()) 390 .unwrap(); 391 if !thread.process_queue(&mut vring) { 392 break; 393 } 394 } 395 } else { 396 // Without EVENT_IDX, a single call is enough. 397 thread.process_queue(&mut vring); 398 } 399 400 Ok(()) 401 } 402 _ => Err(Error::HandleEventUnknownEvent.into()), 403 } 404 } 405 406 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 407 self.config.as_slice().to_vec() 408 } 409 410 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 411 let config_slice = self.config.as_mut_slice(); 412 let data_len = data.len() as u32; 413 let config_len = config_slice.len() as u32; 414 if offset + data_len > config_len { 415 error!("Failed to write config space"); 416 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 417 } 418 let (_, right) = config_slice.split_at_mut(offset as usize); 419 right.copy_from_slice(data); 420 self.update_writeback(); 421 Ok(()) 422 } 423 424 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 425 Some( 426 self.threads[thread_index] 427 .lock() 428 .unwrap() 429 .kill_evt 430 .try_clone() 431 .unwrap(), 432 ) 433 } 434 435 fn queues_per_thread(&self) -> Vec<u64> { 436 self.queues_per_thread.clone() 437 } 438 439 fn update_memory( 440 &mut self, 441 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 442 ) -> VhostUserBackendResult<()> { 443 Ok(()) 444 } 445 } 446 447 struct VhostUserBlkBackendConfig { 448 path: String, 449 socket: String, 450 num_queues: usize, 451 queue_size: usize, 452 readonly: bool, 453 direct: bool, 454 poll_queue: bool, 455 } 456 457 impl VhostUserBlkBackendConfig { 458 fn parse(backend: &str) -> Result<Self> { 459 let mut parser = OptionParser::new(); 460 parser 461 .add("path") 462 .add("readonly") 463 .add("direct") 464 .add("num_queues") 465 .add("queue_size") 466 .add("socket") 467 .add("poll_queue"); 468 parser.parse(backend).map_err(Error::FailedConfigParse)?; 469 470 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 471 let readonly = parser 472 .convert::<Toggle>("readonly") 473 .map_err(Error::FailedConfigParse)? 474 .unwrap_or(Toggle(false)) 475 .0; 476 let direct = parser 477 .convert::<Toggle>("direct") 478 .map_err(Error::FailedConfigParse)? 479 .unwrap_or(Toggle(false)) 480 .0; 481 let num_queues = parser 482 .convert("num_queues") 483 .map_err(Error::FailedConfigParse)? 484 .unwrap_or(1); 485 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 486 let poll_queue = parser 487 .convert::<Toggle>("poll_queue") 488 .map_err(Error::FailedConfigParse)? 489 .unwrap_or(Toggle(true)) 490 .0; 491 let queue_size = parser 492 .convert("queue_size") 493 .map_err(Error::FailedConfigParse)? 494 .unwrap_or(1024); 495 496 Ok(VhostUserBlkBackendConfig { 497 path, 498 socket, 499 num_queues, 500 queue_size, 501 readonly, 502 direct, 503 poll_queue, 504 }) 505 } 506 } 507 508 pub fn start_block_backend(backend_command: &str) { 509 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 510 Ok(config) => config, 511 Err(e) => { 512 println!("Failed parsing parameters {e:?}"); 513 process::exit(1); 514 } 515 }; 516 517 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); 518 519 let blk_backend = Arc::new(RwLock::new( 520 VhostUserBlkBackend::new( 521 backend_config.path, 522 backend_config.num_queues, 523 backend_config.readonly, 524 backend_config.direct, 525 backend_config.poll_queue, 526 backend_config.queue_size, 527 mem.clone(), 528 ) 529 .unwrap(), 530 )); 531 532 debug!("blk_backend is created!\n"); 533 534 let listener = Listener::new(&backend_config.socket, true).unwrap(); 535 536 let name = "vhost-user-blk-backend"; 537 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap(); 538 539 debug!("blk_daemon is created!\n"); 540 541 if let Err(e) = blk_daemon.start(listener) { 542 error!( 543 "Failed to start daemon for vhost-user-block with error: {:?}\n", 544 e 545 ); 546 process::exit(1); 547 } 548 549 if let Err(e) = blk_daemon.wait() { 550 error!("Error from the main thread: {:?}", e); 551 } 552 553 for thread in blk_backend.read().unwrap().threads.iter() { 554 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 555 error!("Error shutting down worker thread: {:?}", e) 556 } 557 } 558 } 559