1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 #![allow(clippy::significant_drop_in_scrutinee)] 12 13 use block_util::{build_disk_image_id, Request, VirtioBlockConfig}; 14 use libc::EFD_NONBLOCK; 15 use log::*; 16 use option_parser::{OptionParser, OptionParserError, Toggle}; 17 use qcow::{self, ImageType, QcowFile}; 18 use std::fs::File; 19 use std::fs::OpenOptions; 20 use std::io::Read; 21 use std::io::{Seek, SeekFrom, Write}; 22 use std::ops::DerefMut; 23 use std::os::unix::fs::OpenOptionsExt; 24 use std::path::PathBuf; 25 use std::process; 26 use std::result; 27 use std::sync::atomic::{AtomicBool, Ordering}; 28 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 29 use std::time::Instant; 30 use std::vec::Vec; 31 use std::{convert, error, fmt, io}; 32 use vhost::vhost_user::message::*; 33 use vhost::vhost_user::Listener; 34 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT}; 35 use virtio_bindings::bindings::virtio_blk::*; 36 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 37 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic}; 38 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; 39 40 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>; 41 42 const SECTOR_SHIFT: u8 = 9; 43 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 44 const BLK_SIZE: u32 = 512; 45 // Current (2020) enterprise SSDs have a latency lower than 30us. 46 // Polling for 50us should be enough to cover for the device latency 47 // and the overhead of the emulation layer. 48 const POLL_QUEUE_US: u128 = 50; 49 50 trait DiskFile: Read + Seek + Write + Send {} 51 impl<D: Read + Seek + Write + Send> DiskFile for D {} 52 53 type Result<T> = std::result::Result<T, Error>; 54 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 55 56 #[derive(Debug)] 57 enum Error { 58 /// Failed to create kill eventfd 59 CreateKillEventFd(io::Error), 60 /// Failed to parse configuration string 61 FailedConfigParse(OptionParserError), 62 /// Failed to handle event other than input event. 63 HandleEventNotEpollIn, 64 /// Failed to handle unknown event. 65 HandleEventUnknownEvent, 66 /// No path provided 67 PathParameterMissing, 68 /// No socket provided 69 SocketParameterMissing, 70 } 71 72 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 73 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 74 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 75 poll_queue=true|false\""; 76 77 impl fmt::Display for Error { 78 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 79 write!(f, "vhost_user_block_error: {:?}", self) 80 } 81 } 82 83 impl error::Error for Error {} 84 85 impl convert::From<Error> for io::Error { 86 fn from(e: Error) -> Self { 87 io::Error::new(io::ErrorKind::Other, e) 88 } 89 } 90 91 struct VhostUserBlkThread { 92 disk_image: Arc<Mutex<dyn DiskFile>>, 93 disk_image_id: Vec<u8>, 94 disk_nsectors: u64, 95 event_idx: bool, 96 kill_evt: EventFd, 97 writeback: Arc<AtomicBool>, 98 } 99 100 impl VhostUserBlkThread { 101 fn new( 102 disk_image: Arc<Mutex<dyn DiskFile>>, 103 disk_image_id: Vec<u8>, 104 disk_nsectors: u64, 105 writeback: Arc<AtomicBool>, 106 ) -> Result<Self> { 107 Ok(VhostUserBlkThread { 108 disk_image, 109 disk_image_id, 110 disk_nsectors, 111 event_idx: false, 112 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 113 writeback, 114 }) 115 } 116 117 fn process_queue( 118 &mut self, 119 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 120 ) -> bool { 121 let mut used_desc_heads = Vec::new(); 122 123 for mut desc_chain in vring.get_queue_mut().iter().unwrap() { 124 debug!("got an element in the queue"); 125 let len; 126 match Request::parse(&mut desc_chain, None) { 127 Ok(mut request) => { 128 debug!("element is a valid request"); 129 request.set_writeback(self.writeback.load(Ordering::Acquire)); 130 let status = match request.execute( 131 &mut self.disk_image.lock().unwrap().deref_mut(), 132 self.disk_nsectors, 133 desc_chain.memory(), 134 &self.disk_image_id, 135 ) { 136 Ok(l) => { 137 len = l; 138 VIRTIO_BLK_S_OK 139 } 140 Err(e) => { 141 len = 1; 142 e.status() 143 } 144 }; 145 desc_chain 146 .memory() 147 .write_obj(status, request.status_addr) 148 .unwrap(); 149 } 150 Err(err) => { 151 error!("failed to parse available descriptor chain: {:?}", err); 152 len = 0; 153 } 154 } 155 156 used_desc_heads.push((desc_chain.head_index(), len)); 157 } 158 159 let mut needs_signalling = false; 160 for (desc_head, len) in used_desc_heads.iter() { 161 if self.event_idx { 162 let queue = vring.get_queue_mut(); 163 if queue.add_used(*desc_head, *len).is_ok() { 164 if queue.needs_notification().unwrap() { 165 debug!("signalling queue"); 166 needs_signalling = true; 167 } else { 168 debug!("omitting signal (event_idx)"); 169 } 170 } 171 } else { 172 debug!("signalling queue"); 173 vring.get_queue_mut().add_used(*desc_head, *len).unwrap(); 174 needs_signalling = true; 175 } 176 } 177 178 if needs_signalling { 179 vring.signal_used_queue().unwrap(); 180 } 181 182 !used_desc_heads.is_empty() 183 } 184 } 185 186 struct VhostUserBlkBackend { 187 threads: Vec<Mutex<VhostUserBlkThread>>, 188 config: VirtioBlockConfig, 189 rdonly: bool, 190 poll_queue: bool, 191 queues_per_thread: Vec<u64>, 192 queue_size: usize, 193 acked_features: u64, 194 writeback: Arc<AtomicBool>, 195 } 196 197 impl VhostUserBlkBackend { 198 fn new( 199 image_path: String, 200 num_queues: usize, 201 rdonly: bool, 202 direct: bool, 203 poll_queue: bool, 204 queue_size: usize, 205 ) -> Result<Self> { 206 let mut options = OpenOptions::new(); 207 options.read(true); 208 options.write(!rdonly); 209 if direct { 210 options.custom_flags(libc::O_DIRECT); 211 } 212 let image: File = options.open(&image_path).unwrap(); 213 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 214 215 let image_id = build_disk_image_id(&PathBuf::from(&image_path)); 216 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 217 let image = match image_type { 218 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 219 ImageType::Qcow2 => { 220 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 221 } 222 }; 223 224 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap() as u64) / SECTOR_SIZE; 225 let config = VirtioBlockConfig { 226 capacity: nsectors, 227 blk_size: BLK_SIZE, 228 size_max: 65535, 229 seg_max: 128 - 2, 230 min_io_size: 1, 231 opt_io_size: 1, 232 num_queues: num_queues as u16, 233 writeback: 1, 234 ..Default::default() 235 }; 236 237 let mut queues_per_thread = Vec::new(); 238 let mut threads = Vec::new(); 239 let writeback = Arc::new(AtomicBool::new(true)); 240 for i in 0..num_queues { 241 let thread = Mutex::new(VhostUserBlkThread::new( 242 image.clone(), 243 image_id.clone(), 244 nsectors, 245 writeback.clone(), 246 )?); 247 threads.push(thread); 248 queues_per_thread.push(0b1 << i); 249 } 250 251 Ok(VhostUserBlkBackend { 252 threads, 253 config, 254 rdonly, 255 poll_queue, 256 queues_per_thread, 257 queue_size, 258 acked_features: 0, 259 writeback, 260 }) 261 } 262 263 fn update_writeback(&mut self) { 264 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 265 let writeback = 266 if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE { 267 self.config.writeback == 1 268 } else { 269 // Else check if VIRTIO_BLK_F_FLUSH negotiated 270 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH 271 }; 272 273 info!( 274 "Changing cache mode to {}", 275 if writeback { 276 "writeback" 277 } else { 278 "writethrough" 279 } 280 ); 281 self.writeback.store(writeback, Ordering::Release); 282 } 283 } 284 285 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap> 286 for VhostUserBlkBackend 287 { 288 fn num_queues(&self) -> usize { 289 self.config.num_queues as usize 290 } 291 292 fn max_queue_size(&self) -> usize { 293 self.queue_size as usize 294 } 295 296 fn features(&self) -> u64 { 297 let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX 298 | 1 << VIRTIO_BLK_F_BLK_SIZE 299 | 1 << VIRTIO_BLK_F_FLUSH 300 | 1 << VIRTIO_BLK_F_TOPOLOGY 301 | 1 << VIRTIO_BLK_F_MQ 302 | 1 << VIRTIO_BLK_F_CONFIG_WCE 303 | 1 << VIRTIO_RING_F_EVENT_IDX 304 | 1 << VIRTIO_F_VERSION_1 305 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 306 307 if self.rdonly { 308 avail_features |= 1 << VIRTIO_BLK_F_RO; 309 } 310 avail_features 311 } 312 313 fn acked_features(&mut self, features: u64) { 314 self.acked_features = features; 315 self.update_writeback(); 316 } 317 318 fn protocol_features(&self) -> VhostUserProtocolFeatures { 319 VhostUserProtocolFeatures::CONFIG 320 | VhostUserProtocolFeatures::MQ 321 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 322 } 323 324 fn set_event_idx(&mut self, enabled: bool) { 325 for thread in self.threads.iter() { 326 thread.lock().unwrap().event_idx = enabled; 327 } 328 } 329 330 fn handle_event( 331 &mut self, 332 device_event: u16, 333 evset: EventSet, 334 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 335 thread_id: usize, 336 ) -> VhostUserBackendResult<bool> { 337 if evset != EventSet::IN { 338 return Err(Error::HandleEventNotEpollIn.into()); 339 } 340 341 debug!("event received: {:?}", device_event); 342 343 let mut thread = self.threads[thread_id].lock().unwrap(); 344 match device_event { 345 0 => { 346 let mut vring = vrings[0].get_mut(); 347 348 if self.poll_queue { 349 // Actively poll the queue until POLL_QUEUE_US has passed 350 // without seeing a new request. 351 let mut now = Instant::now(); 352 loop { 353 if thread.process_queue(&mut vring) { 354 now = Instant::now(); 355 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 356 break; 357 } 358 } 359 } 360 361 if thread.event_idx { 362 // vm-virtio's Queue implementation only checks avail_index 363 // once, so to properly support EVENT_IDX we need to keep 364 // calling process_queue() until it stops finding new 365 // requests on the queue. 366 loop { 367 vring.get_queue_mut().enable_notification().unwrap(); 368 if !thread.process_queue(&mut vring) { 369 break; 370 } 371 } 372 } else { 373 // Without EVENT_IDX, a single call is enough. 374 thread.process_queue(&mut vring); 375 } 376 377 Ok(false) 378 } 379 _ => Err(Error::HandleEventUnknownEvent.into()), 380 } 381 } 382 383 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 384 self.config.as_slice().to_vec() 385 } 386 387 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 388 let config_slice = self.config.as_mut_slice(); 389 let data_len = data.len() as u32; 390 let config_len = config_slice.len() as u32; 391 if offset + data_len > config_len { 392 error!("Failed to write config space"); 393 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 394 } 395 let (_, right) = config_slice.split_at_mut(offset as usize); 396 right.copy_from_slice(data); 397 self.update_writeback(); 398 Ok(()) 399 } 400 401 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 402 Some( 403 self.threads[thread_index] 404 .lock() 405 .unwrap() 406 .kill_evt 407 .try_clone() 408 .unwrap(), 409 ) 410 } 411 412 fn queues_per_thread(&self) -> Vec<u64> { 413 self.queues_per_thread.clone() 414 } 415 416 fn update_memory( 417 &mut self, 418 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 419 ) -> VhostUserBackendResult<()> { 420 Ok(()) 421 } 422 } 423 424 struct VhostUserBlkBackendConfig { 425 path: String, 426 socket: String, 427 num_queues: usize, 428 queue_size: usize, 429 readonly: bool, 430 direct: bool, 431 poll_queue: bool, 432 } 433 434 impl VhostUserBlkBackendConfig { 435 fn parse(backend: &str) -> Result<Self> { 436 let mut parser = OptionParser::new(); 437 parser 438 .add("path") 439 .add("readonly") 440 .add("direct") 441 .add("num_queues") 442 .add("queue_size") 443 .add("socket") 444 .add("poll_queue"); 445 parser.parse(backend).map_err(Error::FailedConfigParse)?; 446 447 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 448 let readonly = parser 449 .convert::<Toggle>("readonly") 450 .map_err(Error::FailedConfigParse)? 451 .unwrap_or(Toggle(false)) 452 .0; 453 let direct = parser 454 .convert::<Toggle>("direct") 455 .map_err(Error::FailedConfigParse)? 456 .unwrap_or(Toggle(false)) 457 .0; 458 let num_queues = parser 459 .convert("num_queues") 460 .map_err(Error::FailedConfigParse)? 461 .unwrap_or(1); 462 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 463 let poll_queue = parser 464 .convert::<Toggle>("poll_queue") 465 .map_err(Error::FailedConfigParse)? 466 .unwrap_or(Toggle(true)) 467 .0; 468 let queue_size = parser 469 .convert("queue_size") 470 .map_err(Error::FailedConfigParse)? 471 .unwrap_or(1024); 472 473 Ok(VhostUserBlkBackendConfig { 474 path, 475 socket, 476 num_queues, 477 queue_size, 478 readonly, 479 direct, 480 poll_queue, 481 }) 482 } 483 } 484 485 pub fn start_block_backend(backend_command: &str) { 486 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 487 Ok(config) => config, 488 Err(e) => { 489 println!("Failed parsing parameters {:?}", e); 490 process::exit(1); 491 } 492 }; 493 494 let blk_backend = Arc::new(RwLock::new( 495 VhostUserBlkBackend::new( 496 backend_config.path, 497 backend_config.num_queues, 498 backend_config.readonly, 499 backend_config.direct, 500 backend_config.poll_queue, 501 backend_config.queue_size, 502 ) 503 .unwrap(), 504 )); 505 506 debug!("blk_backend is created!\n"); 507 508 let listener = Listener::new(&backend_config.socket, true).unwrap(); 509 510 let name = "vhost-user-blk-backend"; 511 let mut blk_daemon = VhostUserDaemon::new( 512 name.to_string(), 513 blk_backend.clone(), 514 GuestMemoryAtomic::new(GuestMemoryMmap::new()), 515 ) 516 .unwrap(); 517 518 debug!("blk_daemon is created!\n"); 519 520 if let Err(e) = blk_daemon.start(listener) { 521 error!( 522 "Failed to start daemon for vhost-user-block with error: {:?}\n", 523 e 524 ); 525 process::exit(1); 526 } 527 528 if let Err(e) = blk_daemon.wait() { 529 error!("Error from the main thread: {:?}", e); 530 } 531 532 for thread in blk_backend.read().unwrap().threads.iter() { 533 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 534 error!("Error shutting down worker thread: {:?}", e) 535 } 536 } 537 } 538