1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 use std::fs::{File, OpenOptions}; 12 use std::io::{Read, Seek, SeekFrom, Write}; 13 use std::ops::{Deref, DerefMut}; 14 use std::os::unix::fs::OpenOptionsExt; 15 use std::path::PathBuf; 16 use std::sync::atomic::{AtomicBool, Ordering}; 17 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 18 use std::time::Instant; 19 use std::{convert, error, fmt, io, process, result}; 20 21 use block::qcow::{self, ImageType, QcowFile}; 22 use block::{build_serial, Request, VirtioBlockConfig}; 23 use libc::EFD_NONBLOCK; 24 use log::*; 25 use option_parser::{OptionParser, OptionParserError, Toggle}; 26 use vhost::vhost_user::message::*; 27 use vhost::vhost_user::Listener; 28 use vhost_user_backend::bitmap::BitmapMmapRegion; 29 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT}; 30 use virtio_bindings::virtio_blk::*; 31 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1; 32 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 33 use virtio_queue::QueueT; 34 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic}; 35 use vmm_sys_util::epoll::EventSet; 36 use vmm_sys_util::eventfd::EventFd; 37 38 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>; 39 40 const SECTOR_SHIFT: u8 = 9; 41 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 42 const BLK_SIZE: u32 = 512; 43 // Current (2020) enterprise SSDs have a latency lower than 30us. 44 // Polling for 50us should be enough to cover for the device latency 45 // and the overhead of the emulation layer. 46 const POLL_QUEUE_US: u128 = 50; 47 48 trait DiskFile: Read + Seek + Write + Send {} 49 impl<D: Read + Seek + Write + Send> DiskFile for D {} 50 51 type Result<T> = std::result::Result<T, Error>; 52 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 53 54 #[allow(dead_code)] 55 #[derive(Debug)] 56 enum Error { 57 /// Failed to create kill eventfd 58 CreateKillEventFd(io::Error), 59 /// Failed to parse configuration string 60 FailedConfigParse(OptionParserError), 61 /// Failed to handle event other than input event. 62 HandleEventNotEpollIn, 63 /// Failed to handle unknown event. 64 HandleEventUnknownEvent, 65 /// No path provided 66 PathParameterMissing, 67 /// No socket provided 68 SocketParameterMissing, 69 } 70 71 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 72 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 73 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 74 poll_queue=true|false\""; 75 76 impl fmt::Display for Error { 77 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 78 write!(f, "vhost_user_block_error: {self:?}") 79 } 80 } 81 82 impl error::Error for Error {} 83 84 impl convert::From<Error> for io::Error { 85 fn from(e: Error) -> Self { 86 io::Error::other(e) 87 } 88 } 89 90 struct VhostUserBlkThread { 91 disk_image: Arc<Mutex<dyn DiskFile>>, 92 serial: Vec<u8>, 93 disk_nsectors: u64, 94 event_idx: bool, 95 kill_evt: EventFd, 96 writeback: Arc<AtomicBool>, 97 mem: GuestMemoryAtomic<GuestMemoryMmap>, 98 } 99 100 impl VhostUserBlkThread { 101 fn new( 102 disk_image: Arc<Mutex<dyn DiskFile>>, 103 serial: Vec<u8>, 104 disk_nsectors: u64, 105 writeback: Arc<AtomicBool>, 106 mem: GuestMemoryAtomic<GuestMemoryMmap>, 107 ) -> Result<Self> { 108 Ok(VhostUserBlkThread { 109 disk_image, 110 serial, 111 disk_nsectors, 112 event_idx: false, 113 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 114 writeback, 115 mem, 116 }) 117 } 118 119 fn process_queue( 120 &mut self, 121 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 122 ) -> bool { 123 let mut used_descs = false; 124 125 while let Some(mut desc_chain) = vring 126 .get_queue_mut() 127 .pop_descriptor_chain(self.mem.memory()) 128 { 129 debug!("got an element in the queue"); 130 let len; 131 match Request::parse(&mut desc_chain, None) { 132 Ok(mut request) => { 133 debug!("element is a valid request"); 134 request.set_writeback(self.writeback.load(Ordering::Acquire)); 135 let status = match request.execute( 136 &mut self.disk_image.lock().unwrap().deref_mut(), 137 self.disk_nsectors, 138 desc_chain.memory(), 139 &self.serial, 140 ) { 141 Ok(l) => { 142 len = l; 143 VIRTIO_BLK_S_OK as u8 144 } 145 Err(e) => { 146 len = 1; 147 e.status() 148 } 149 }; 150 desc_chain 151 .memory() 152 .write_obj(status, request.status_addr) 153 .unwrap(); 154 } 155 Err(err) => { 156 error!("failed to parse available descriptor chain: {:?}", err); 157 len = 0; 158 } 159 } 160 161 vring 162 .get_queue_mut() 163 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 164 .unwrap(); 165 used_descs = true; 166 } 167 168 let mut needs_signalling = false; 169 if self.event_idx { 170 if vring 171 .get_queue_mut() 172 .needs_notification(self.mem.memory().deref()) 173 .unwrap() 174 { 175 debug!("signalling queue"); 176 needs_signalling = true; 177 } else { 178 debug!("omitting signal (event_idx)"); 179 } 180 } else { 181 debug!("signalling queue"); 182 needs_signalling = true; 183 } 184 185 if needs_signalling { 186 vring.signal_used_queue().unwrap(); 187 } 188 189 used_descs 190 } 191 } 192 193 struct VhostUserBlkBackend { 194 threads: Vec<Mutex<VhostUserBlkThread>>, 195 config: VirtioBlockConfig, 196 rdonly: bool, 197 poll_queue: bool, 198 queues_per_thread: Vec<u64>, 199 queue_size: usize, 200 acked_features: u64, 201 writeback: Arc<AtomicBool>, 202 mem: GuestMemoryAtomic<GuestMemoryMmap>, 203 } 204 205 impl VhostUserBlkBackend { 206 fn new( 207 image_path: String, 208 num_queues: usize, 209 rdonly: bool, 210 direct: bool, 211 poll_queue: bool, 212 queue_size: usize, 213 mem: GuestMemoryAtomic<GuestMemoryMmap>, 214 ) -> Result<Self> { 215 let mut options = OpenOptions::new(); 216 options.read(true); 217 options.write(!rdonly); 218 if direct { 219 options.custom_flags(libc::O_DIRECT); 220 } 221 let image: File = options.open(&image_path).unwrap(); 222 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 223 224 let serial = build_serial(&PathBuf::from(&image_path)); 225 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 226 let image = match image_type { 227 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 228 ImageType::Qcow2 => { 229 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 230 } 231 }; 232 233 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE; 234 let config = VirtioBlockConfig { 235 capacity: nsectors, 236 blk_size: BLK_SIZE, 237 size_max: 65535, 238 seg_max: 128 - 2, 239 min_io_size: 1, 240 opt_io_size: 1, 241 num_queues: num_queues as u16, 242 writeback: 1, 243 ..Default::default() 244 }; 245 246 let mut queues_per_thread = Vec::new(); 247 let mut threads = Vec::new(); 248 let writeback = Arc::new(AtomicBool::new(true)); 249 for i in 0..num_queues { 250 let thread = Mutex::new(VhostUserBlkThread::new( 251 image.clone(), 252 serial.clone(), 253 nsectors, 254 writeback.clone(), 255 mem.clone(), 256 )?); 257 threads.push(thread); 258 queues_per_thread.push(0b1 << i); 259 } 260 261 Ok(VhostUserBlkBackend { 262 threads, 263 config, 264 rdonly, 265 poll_queue, 266 queues_per_thread, 267 queue_size, 268 acked_features: 0, 269 writeback, 270 mem, 271 }) 272 } 273 274 fn update_writeback(&mut self) { 275 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 276 let writeback = if self.acked_features & (1 << VIRTIO_BLK_F_CONFIG_WCE) 277 == 1 << VIRTIO_BLK_F_CONFIG_WCE 278 { 279 self.config.writeback == 1 280 } else { 281 // Else check if VIRTIO_BLK_F_FLUSH negotiated 282 self.acked_features & (1 << VIRTIO_BLK_F_FLUSH) == 1 << VIRTIO_BLK_F_FLUSH 283 }; 284 285 info!( 286 "Changing cache mode to {}", 287 if writeback { 288 "writeback" 289 } else { 290 "writethrough" 291 } 292 ); 293 self.writeback.store(writeback, Ordering::Release); 294 } 295 } 296 297 impl VhostUserBackendMut for VhostUserBlkBackend { 298 type Bitmap = BitmapMmapRegion; 299 type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>; 300 301 fn num_queues(&self) -> usize { 302 self.config.num_queues as usize 303 } 304 305 fn max_queue_size(&self) -> usize { 306 self.queue_size 307 } 308 309 fn features(&self) -> u64 { 310 let mut avail_features = (1 << VIRTIO_BLK_F_SEG_MAX) 311 | (1 << VIRTIO_BLK_F_BLK_SIZE) 312 | (1 << VIRTIO_BLK_F_FLUSH) 313 | (1 << VIRTIO_BLK_F_TOPOLOGY) 314 | (1 << VIRTIO_BLK_F_MQ) 315 | (1 << VIRTIO_BLK_F_CONFIG_WCE) 316 | (1 << VIRTIO_RING_F_EVENT_IDX) 317 | (1 << VIRTIO_F_VERSION_1) 318 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 319 320 if self.rdonly { 321 avail_features |= 1 << VIRTIO_BLK_F_RO; 322 } 323 avail_features 324 } 325 326 fn acked_features(&mut self, features: u64) { 327 self.acked_features = features; 328 self.update_writeback(); 329 } 330 331 fn protocol_features(&self) -> VhostUserProtocolFeatures { 332 VhostUserProtocolFeatures::CONFIG 333 | VhostUserProtocolFeatures::MQ 334 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 335 } 336 337 fn set_event_idx(&mut self, enabled: bool) { 338 for thread in self.threads.iter() { 339 thread.lock().unwrap().event_idx = enabled; 340 } 341 } 342 343 fn handle_event( 344 &mut self, 345 device_event: u16, 346 evset: EventSet, 347 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 348 thread_id: usize, 349 ) -> VhostUserBackendResult<()> { 350 if evset != EventSet::IN { 351 return Err(Error::HandleEventNotEpollIn.into()); 352 } 353 354 debug!("event received: {:?}", device_event); 355 356 let mut thread = self.threads[thread_id].lock().unwrap(); 357 match device_event { 358 0 => { 359 let mut vring = vrings[0].get_mut(); 360 361 if self.poll_queue { 362 // Actively poll the queue until POLL_QUEUE_US has passed 363 // without seeing a new request. 364 let mut now = Instant::now(); 365 loop { 366 if thread.process_queue(&mut vring) { 367 now = Instant::now(); 368 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 369 break; 370 } 371 } 372 } 373 374 if thread.event_idx { 375 // vm-virtio's Queue implementation only checks avail_index 376 // once, so to properly support EVENT_IDX we need to keep 377 // calling process_queue() until it stops finding new 378 // requests on the queue. 379 loop { 380 vring 381 .get_queue_mut() 382 .enable_notification(self.mem.memory().deref()) 383 .unwrap(); 384 if !thread.process_queue(&mut vring) { 385 break; 386 } 387 } 388 } else { 389 // Without EVENT_IDX, a single call is enough. 390 thread.process_queue(&mut vring); 391 } 392 393 Ok(()) 394 } 395 _ => Err(Error::HandleEventUnknownEvent.into()), 396 } 397 } 398 399 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 400 self.config.as_slice().to_vec() 401 } 402 403 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 404 let config_slice = self.config.as_mut_slice(); 405 let data_len = data.len() as u32; 406 let config_len = config_slice.len() as u32; 407 if offset + data_len > config_len { 408 error!("Failed to write config space"); 409 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 410 } 411 let (_, right) = config_slice.split_at_mut(offset as usize); 412 right.copy_from_slice(data); 413 self.update_writeback(); 414 Ok(()) 415 } 416 417 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 418 Some( 419 self.threads[thread_index] 420 .lock() 421 .unwrap() 422 .kill_evt 423 .try_clone() 424 .unwrap(), 425 ) 426 } 427 428 fn queues_per_thread(&self) -> Vec<u64> { 429 self.queues_per_thread.clone() 430 } 431 432 fn update_memory( 433 &mut self, 434 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 435 ) -> VhostUserBackendResult<()> { 436 Ok(()) 437 } 438 } 439 440 struct VhostUserBlkBackendConfig { 441 path: String, 442 socket: String, 443 num_queues: usize, 444 queue_size: usize, 445 readonly: bool, 446 direct: bool, 447 poll_queue: bool, 448 } 449 450 impl VhostUserBlkBackendConfig { 451 fn parse(backend: &str) -> Result<Self> { 452 let mut parser = OptionParser::new(); 453 parser 454 .add("path") 455 .add("readonly") 456 .add("direct") 457 .add("num_queues") 458 .add("queue_size") 459 .add("socket") 460 .add("poll_queue"); 461 parser.parse(backend).map_err(Error::FailedConfigParse)?; 462 463 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 464 let readonly = parser 465 .convert::<Toggle>("readonly") 466 .map_err(Error::FailedConfigParse)? 467 .unwrap_or(Toggle(false)) 468 .0; 469 let direct = parser 470 .convert::<Toggle>("direct") 471 .map_err(Error::FailedConfigParse)? 472 .unwrap_or(Toggle(false)) 473 .0; 474 let num_queues = parser 475 .convert("num_queues") 476 .map_err(Error::FailedConfigParse)? 477 .unwrap_or(1); 478 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 479 let poll_queue = parser 480 .convert::<Toggle>("poll_queue") 481 .map_err(Error::FailedConfigParse)? 482 .unwrap_or(Toggle(true)) 483 .0; 484 let queue_size = parser 485 .convert("queue_size") 486 .map_err(Error::FailedConfigParse)? 487 .unwrap_or(1024); 488 489 Ok(VhostUserBlkBackendConfig { 490 path, 491 socket, 492 num_queues, 493 queue_size, 494 readonly, 495 direct, 496 poll_queue, 497 }) 498 } 499 } 500 501 pub fn start_block_backend(backend_command: &str) { 502 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 503 Ok(config) => config, 504 Err(e) => { 505 println!("Failed parsing parameters {e:?}"); 506 process::exit(1); 507 } 508 }; 509 510 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); 511 512 let blk_backend = Arc::new(RwLock::new( 513 VhostUserBlkBackend::new( 514 backend_config.path, 515 backend_config.num_queues, 516 backend_config.readonly, 517 backend_config.direct, 518 backend_config.poll_queue, 519 backend_config.queue_size, 520 mem.clone(), 521 ) 522 .unwrap(), 523 )); 524 525 debug!("blk_backend is created!\n"); 526 527 let listener = Listener::new(&backend_config.socket, true).unwrap(); 528 529 let name = "vhost-user-blk-backend"; 530 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap(); 531 532 debug!("blk_daemon is created!\n"); 533 534 if let Err(e) = blk_daemon.start(listener) { 535 error!( 536 "Failed to start daemon for vhost-user-block with error: {:?}\n", 537 e 538 ); 539 process::exit(1); 540 } 541 542 if let Err(e) = blk_daemon.wait() { 543 error!("Error from the main thread: {:?}", e); 544 } 545 546 for thread in blk_backend.read().unwrap().threads.iter() { 547 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 548 error!("Error shutting down worker thread: {:?}", e) 549 } 550 } 551 } 552