1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 use std::fs::{File, OpenOptions}; 12 use std::io::{Read, Seek, SeekFrom, Write}; 13 use std::ops::{Deref, DerefMut}; 14 use std::os::unix::fs::OpenOptionsExt; 15 use std::path::PathBuf; 16 use std::sync::atomic::{AtomicBool, Ordering}; 17 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 18 use std::time::Instant; 19 use std::{convert, io, process, result}; 20 21 use block::qcow::{self, ImageType, QcowFile}; 22 use block::{build_serial, Request, VirtioBlockConfig}; 23 use libc::EFD_NONBLOCK; 24 use log::*; 25 use option_parser::{OptionParser, OptionParserError, Toggle}; 26 use thiserror::Error; 27 use vhost::vhost_user::message::*; 28 use vhost::vhost_user::Listener; 29 use vhost_user_backend::bitmap::BitmapMmapRegion; 30 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT}; 31 use virtio_bindings::virtio_blk::*; 32 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1; 33 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 34 use virtio_queue::QueueT; 35 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic}; 36 use vmm_sys_util::epoll::EventSet; 37 use vmm_sys_util::eventfd::EventFd; 38 39 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>; 40 41 const SECTOR_SHIFT: u8 = 9; 42 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 43 const BLK_SIZE: u32 = 512; 44 // Current (2020) enterprise SSDs have a latency lower than 30us. 45 // Polling for 50us should be enough to cover for the device latency 46 // and the overhead of the emulation layer. 47 const POLL_QUEUE_US: u128 = 50; 48 49 trait DiskFile: Read + Seek + Write + Send {} 50 impl<D: Read + Seek + Write + Send> DiskFile for D {} 51 52 type Result<T> = std::result::Result<T, Error>; 53 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 54 55 #[allow(dead_code)] 56 #[derive(Error, Debug)] 57 enum Error { 58 /// Failed to create kill eventfd 59 #[error("Failed to create kill eventfd: {0}")] 60 CreateKillEventFd(#[source] io::Error), 61 /// Failed to parse configuration string 62 #[error("Failed to parse configuration string: {0}")] 63 FailedConfigParse(#[source] OptionParserError), 64 /// Failed to handle event other than input event. 65 #[error("Failed to handle event other than input event")] 66 HandleEventNotEpollIn, 67 /// Failed to handle unknown event. 68 #[error("Failed to handle unknown event")] 69 HandleEventUnknownEvent, 70 /// No path provided 71 #[error("No path provided")] 72 PathParameterMissing, 73 /// No socket provided 74 #[error("No socket provided")] 75 SocketParameterMissing, 76 } 77 78 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 79 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 80 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 81 poll_queue=true|false\""; 82 83 impl convert::From<Error> for io::Error { 84 fn from(e: Error) -> Self { 85 io::Error::other(e) 86 } 87 } 88 89 struct VhostUserBlkThread { 90 disk_image: Arc<Mutex<dyn DiskFile>>, 91 serial: Vec<u8>, 92 disk_nsectors: u64, 93 event_idx: bool, 94 kill_evt: EventFd, 95 writeback: Arc<AtomicBool>, 96 mem: GuestMemoryAtomic<GuestMemoryMmap>, 97 } 98 99 impl VhostUserBlkThread { 100 fn new( 101 disk_image: Arc<Mutex<dyn DiskFile>>, 102 serial: Vec<u8>, 103 disk_nsectors: u64, 104 writeback: Arc<AtomicBool>, 105 mem: GuestMemoryAtomic<GuestMemoryMmap>, 106 ) -> Result<Self> { 107 Ok(VhostUserBlkThread { 108 disk_image, 109 serial, 110 disk_nsectors, 111 event_idx: false, 112 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 113 writeback, 114 mem, 115 }) 116 } 117 118 fn process_queue( 119 &mut self, 120 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 121 ) -> bool { 122 let mut used_descs = false; 123 124 while let Some(mut desc_chain) = vring 125 .get_queue_mut() 126 .pop_descriptor_chain(self.mem.memory()) 127 { 128 debug!("got an element in the queue"); 129 let len; 130 match Request::parse(&mut desc_chain, None) { 131 Ok(mut request) => { 132 debug!("element is a valid request"); 133 request.set_writeback(self.writeback.load(Ordering::Acquire)); 134 let status = match request.execute( 135 &mut self.disk_image.lock().unwrap().deref_mut(), 136 self.disk_nsectors, 137 desc_chain.memory(), 138 &self.serial, 139 ) { 140 Ok(l) => { 141 len = l; 142 VIRTIO_BLK_S_OK as u8 143 } 144 Err(e) => { 145 len = 1; 146 e.status() 147 } 148 }; 149 desc_chain 150 .memory() 151 .write_obj(status, request.status_addr) 152 .unwrap(); 153 } 154 Err(err) => { 155 error!("failed to parse available descriptor chain: {:?}", err); 156 len = 0; 157 } 158 } 159 160 vring 161 .get_queue_mut() 162 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 163 .unwrap(); 164 used_descs = true; 165 } 166 167 let mut needs_signalling = false; 168 if self.event_idx { 169 if vring 170 .get_queue_mut() 171 .needs_notification(self.mem.memory().deref()) 172 .unwrap() 173 { 174 debug!("signalling queue"); 175 needs_signalling = true; 176 } else { 177 debug!("omitting signal (event_idx)"); 178 } 179 } else { 180 debug!("signalling queue"); 181 needs_signalling = true; 182 } 183 184 if needs_signalling { 185 vring.signal_used_queue().unwrap(); 186 } 187 188 used_descs 189 } 190 } 191 192 struct VhostUserBlkBackend { 193 threads: Vec<Mutex<VhostUserBlkThread>>, 194 config: VirtioBlockConfig, 195 rdonly: bool, 196 poll_queue: bool, 197 queues_per_thread: Vec<u64>, 198 queue_size: usize, 199 acked_features: u64, 200 writeback: Arc<AtomicBool>, 201 mem: GuestMemoryAtomic<GuestMemoryMmap>, 202 } 203 204 impl VhostUserBlkBackend { 205 fn new( 206 image_path: String, 207 num_queues: usize, 208 rdonly: bool, 209 direct: bool, 210 poll_queue: bool, 211 queue_size: usize, 212 mem: GuestMemoryAtomic<GuestMemoryMmap>, 213 ) -> Result<Self> { 214 let mut options = OpenOptions::new(); 215 options.read(true); 216 options.write(!rdonly); 217 if direct { 218 options.custom_flags(libc::O_DIRECT); 219 } 220 let image: File = options.open(&image_path).unwrap(); 221 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 222 223 let serial = build_serial(&PathBuf::from(&image_path)); 224 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 225 let image = match image_type { 226 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 227 ImageType::Qcow2 => { 228 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 229 } 230 }; 231 232 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE; 233 let config = VirtioBlockConfig { 234 capacity: nsectors, 235 blk_size: BLK_SIZE, 236 size_max: 65535, 237 seg_max: 128 - 2, 238 min_io_size: 1, 239 opt_io_size: 1, 240 num_queues: num_queues as u16, 241 writeback: 1, 242 ..Default::default() 243 }; 244 245 let mut queues_per_thread = Vec::new(); 246 let mut threads = Vec::new(); 247 let writeback = Arc::new(AtomicBool::new(true)); 248 for i in 0..num_queues { 249 let thread = Mutex::new(VhostUserBlkThread::new( 250 image.clone(), 251 serial.clone(), 252 nsectors, 253 writeback.clone(), 254 mem.clone(), 255 )?); 256 threads.push(thread); 257 queues_per_thread.push(0b1 << i); 258 } 259 260 Ok(VhostUserBlkBackend { 261 threads, 262 config, 263 rdonly, 264 poll_queue, 265 queues_per_thread, 266 queue_size, 267 acked_features: 0, 268 writeback, 269 mem, 270 }) 271 } 272 273 fn update_writeback(&mut self) { 274 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 275 let writeback = if self.acked_features & (1 << VIRTIO_BLK_F_CONFIG_WCE) 276 == 1 << VIRTIO_BLK_F_CONFIG_WCE 277 { 278 self.config.writeback == 1 279 } else { 280 // Else check if VIRTIO_BLK_F_FLUSH negotiated 281 self.acked_features & (1 << VIRTIO_BLK_F_FLUSH) == 1 << VIRTIO_BLK_F_FLUSH 282 }; 283 284 info!( 285 "Changing cache mode to {}", 286 if writeback { 287 "writeback" 288 } else { 289 "writethrough" 290 } 291 ); 292 self.writeback.store(writeback, Ordering::Release); 293 } 294 } 295 296 impl VhostUserBackendMut for VhostUserBlkBackend { 297 type Bitmap = BitmapMmapRegion; 298 type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>; 299 300 fn num_queues(&self) -> usize { 301 self.config.num_queues as usize 302 } 303 304 fn max_queue_size(&self) -> usize { 305 self.queue_size 306 } 307 308 fn features(&self) -> u64 { 309 let mut avail_features = (1 << VIRTIO_BLK_F_SEG_MAX) 310 | (1 << VIRTIO_BLK_F_BLK_SIZE) 311 | (1 << VIRTIO_BLK_F_FLUSH) 312 | (1 << VIRTIO_BLK_F_TOPOLOGY) 313 | (1 << VIRTIO_BLK_F_MQ) 314 | (1 << VIRTIO_BLK_F_CONFIG_WCE) 315 | (1 << VIRTIO_RING_F_EVENT_IDX) 316 | (1 << VIRTIO_F_VERSION_1) 317 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 318 319 if self.rdonly { 320 avail_features |= 1 << VIRTIO_BLK_F_RO; 321 } 322 avail_features 323 } 324 325 fn acked_features(&mut self, features: u64) { 326 self.acked_features = features; 327 self.update_writeback(); 328 } 329 330 fn protocol_features(&self) -> VhostUserProtocolFeatures { 331 VhostUserProtocolFeatures::CONFIG 332 | VhostUserProtocolFeatures::MQ 333 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 334 } 335 336 fn set_event_idx(&mut self, enabled: bool) { 337 for thread in self.threads.iter() { 338 thread.lock().unwrap().event_idx = enabled; 339 } 340 } 341 342 fn handle_event( 343 &mut self, 344 device_event: u16, 345 evset: EventSet, 346 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 347 thread_id: usize, 348 ) -> VhostUserBackendResult<()> { 349 if evset != EventSet::IN { 350 return Err(Error::HandleEventNotEpollIn.into()); 351 } 352 353 debug!("event received: {:?}", device_event); 354 355 let mut thread = self.threads[thread_id].lock().unwrap(); 356 match device_event { 357 0 => { 358 let mut vring = vrings[0].get_mut(); 359 360 if self.poll_queue { 361 // Actively poll the queue until POLL_QUEUE_US has passed 362 // without seeing a new request. 363 let mut now = Instant::now(); 364 loop { 365 if thread.process_queue(&mut vring) { 366 now = Instant::now(); 367 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 368 break; 369 } 370 } 371 } 372 373 if thread.event_idx { 374 // vm-virtio's Queue implementation only checks avail_index 375 // once, so to properly support EVENT_IDX we need to keep 376 // calling process_queue() until it stops finding new 377 // requests on the queue. 378 loop { 379 vring 380 .get_queue_mut() 381 .enable_notification(self.mem.memory().deref()) 382 .unwrap(); 383 if !thread.process_queue(&mut vring) { 384 break; 385 } 386 } 387 } else { 388 // Without EVENT_IDX, a single call is enough. 389 thread.process_queue(&mut vring); 390 } 391 392 Ok(()) 393 } 394 _ => Err(Error::HandleEventUnknownEvent.into()), 395 } 396 } 397 398 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 399 self.config.as_slice().to_vec() 400 } 401 402 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 403 let config_slice = self.config.as_mut_slice(); 404 let data_len = data.len() as u32; 405 let config_len = config_slice.len() as u32; 406 if offset + data_len > config_len { 407 error!("Failed to write config space"); 408 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 409 } 410 let (_, right) = config_slice.split_at_mut(offset as usize); 411 right.copy_from_slice(data); 412 self.update_writeback(); 413 Ok(()) 414 } 415 416 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 417 Some( 418 self.threads[thread_index] 419 .lock() 420 .unwrap() 421 .kill_evt 422 .try_clone() 423 .unwrap(), 424 ) 425 } 426 427 fn queues_per_thread(&self) -> Vec<u64> { 428 self.queues_per_thread.clone() 429 } 430 431 fn update_memory( 432 &mut self, 433 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 434 ) -> VhostUserBackendResult<()> { 435 Ok(()) 436 } 437 } 438 439 struct VhostUserBlkBackendConfig { 440 path: String, 441 socket: String, 442 num_queues: usize, 443 queue_size: usize, 444 readonly: bool, 445 direct: bool, 446 poll_queue: bool, 447 } 448 449 impl VhostUserBlkBackendConfig { 450 fn parse(backend: &str) -> Result<Self> { 451 let mut parser = OptionParser::new(); 452 parser 453 .add("path") 454 .add("readonly") 455 .add("direct") 456 .add("num_queues") 457 .add("queue_size") 458 .add("socket") 459 .add("poll_queue"); 460 parser.parse(backend).map_err(Error::FailedConfigParse)?; 461 462 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 463 let readonly = parser 464 .convert::<Toggle>("readonly") 465 .map_err(Error::FailedConfigParse)? 466 .unwrap_or(Toggle(false)) 467 .0; 468 let direct = parser 469 .convert::<Toggle>("direct") 470 .map_err(Error::FailedConfigParse)? 471 .unwrap_or(Toggle(false)) 472 .0; 473 let num_queues = parser 474 .convert("num_queues") 475 .map_err(Error::FailedConfigParse)? 476 .unwrap_or(1); 477 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 478 let poll_queue = parser 479 .convert::<Toggle>("poll_queue") 480 .map_err(Error::FailedConfigParse)? 481 .unwrap_or(Toggle(true)) 482 .0; 483 let queue_size = parser 484 .convert("queue_size") 485 .map_err(Error::FailedConfigParse)? 486 .unwrap_or(1024); 487 488 Ok(VhostUserBlkBackendConfig { 489 path, 490 socket, 491 num_queues, 492 queue_size, 493 readonly, 494 direct, 495 poll_queue, 496 }) 497 } 498 } 499 500 pub fn start_block_backend(backend_command: &str) { 501 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 502 Ok(config) => config, 503 Err(e) => { 504 println!("Failed parsing parameters {e:?}"); 505 process::exit(1); 506 } 507 }; 508 509 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new()); 510 511 let blk_backend = Arc::new(RwLock::new( 512 VhostUserBlkBackend::new( 513 backend_config.path, 514 backend_config.num_queues, 515 backend_config.readonly, 516 backend_config.direct, 517 backend_config.poll_queue, 518 backend_config.queue_size, 519 mem.clone(), 520 ) 521 .unwrap(), 522 )); 523 524 debug!("blk_backend is created!\n"); 525 526 let listener = Listener::new(&backend_config.socket, true).unwrap(); 527 528 let name = "vhost-user-blk-backend"; 529 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap(); 530 531 debug!("blk_daemon is created!\n"); 532 533 if let Err(e) = blk_daemon.start(listener) { 534 error!( 535 "Failed to start daemon for vhost-user-block with error: {:?}\n", 536 e 537 ); 538 process::exit(1); 539 } 540 541 if let Err(e) = blk_daemon.wait() { 542 error!("Error from the main thread: {:?}", e); 543 } 544 545 for thread in blk_backend.read().unwrap().threads.iter() { 546 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 547 error!("Error shutting down worker thread: {:?}", e) 548 } 549 } 550 } 551