1 // Copyright 2019 Red Hat, Inc. All Rights Reserved. 2 // 3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved. 4 // 5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 6 // 7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved. 8 // 9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause) 10 11 use block_util::{build_disk_image_id, Request, VirtioBlockConfig}; 12 use libc::EFD_NONBLOCK; 13 use log::*; 14 use option_parser::{OptionParser, OptionParserError, Toggle}; 15 use qcow::{self, ImageType, QcowFile}; 16 use std::fs::File; 17 use std::fs::OpenOptions; 18 use std::io::Read; 19 use std::io::{Seek, SeekFrom, Write}; 20 use std::ops::DerefMut; 21 use std::os::unix::fs::OpenOptionsExt; 22 use std::path::PathBuf; 23 use std::process; 24 use std::result; 25 use std::sync::atomic::{AtomicBool, Ordering}; 26 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard}; 27 use std::time::Instant; 28 use std::vec::Vec; 29 use std::{convert, error, fmt, io}; 30 use vhost::vhost_user::message::*; 31 use vhost::vhost_user::Listener; 32 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT}; 33 use virtio_bindings::bindings::virtio_blk::*; 34 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX; 35 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic}; 36 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd}; 37 38 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>; 39 40 const SECTOR_SHIFT: u8 = 9; 41 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT; 42 const BLK_SIZE: u32 = 512; 43 // Current (2020) enterprise SSDs have a latency lower than 30us. 44 // Polling for 50us should be enough to cover for the device latency 45 // and the overhead of the emulation layer. 46 const POLL_QUEUE_US: u128 = 50; 47 48 trait DiskFile: Read + Seek + Write + Send + Sync {} 49 impl<D: Read + Seek + Write + Send + Sync> DiskFile for D {} 50 51 type Result<T> = std::result::Result<T, Error>; 52 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>; 53 54 #[derive(Debug)] 55 enum Error { 56 /// Failed to create kill eventfd 57 CreateKillEventFd(io::Error), 58 /// Failed to parse configuration string 59 FailedConfigParse(OptionParserError), 60 /// Failed to handle event other than input event. 61 HandleEventNotEpollIn, 62 /// Failed to handle unknown event. 63 HandleEventUnknownEvent, 64 /// No path provided 65 PathParameterMissing, 66 /// No socket provided 67 SocketParameterMissing, 68 } 69 70 pub const SYNTAX: &str = "vhost-user-block backend parameters \ 71 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\ 72 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\ 73 poll_queue=true|false\""; 74 75 impl fmt::Display for Error { 76 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 77 write!(f, "vhost_user_block_error: {:?}", self) 78 } 79 } 80 81 impl error::Error for Error {} 82 83 impl convert::From<Error> for io::Error { 84 fn from(e: Error) -> Self { 85 io::Error::new(io::ErrorKind::Other, e) 86 } 87 } 88 89 struct VhostUserBlkThread { 90 disk_image: Arc<Mutex<dyn DiskFile>>, 91 disk_image_id: Vec<u8>, 92 disk_nsectors: u64, 93 event_idx: bool, 94 kill_evt: EventFd, 95 writeback: Arc<AtomicBool>, 96 } 97 98 impl VhostUserBlkThread { 99 fn new( 100 disk_image: Arc<Mutex<dyn DiskFile>>, 101 disk_image_id: Vec<u8>, 102 disk_nsectors: u64, 103 writeback: Arc<AtomicBool>, 104 ) -> Result<Self> { 105 Ok(VhostUserBlkThread { 106 disk_image, 107 disk_image_id, 108 disk_nsectors, 109 event_idx: false, 110 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?, 111 writeback, 112 }) 113 } 114 115 fn process_queue( 116 &mut self, 117 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, 118 ) -> bool { 119 let mut used_desc_heads = Vec::new(); 120 121 for mut desc_chain in vring.get_queue_mut().iter().unwrap() { 122 debug!("got an element in the queue"); 123 let len; 124 match Request::parse(&mut desc_chain, None) { 125 Ok(mut request) => { 126 debug!("element is a valid request"); 127 request.set_writeback(self.writeback.load(Ordering::Acquire)); 128 let status = match request.execute( 129 &mut self.disk_image.lock().unwrap().deref_mut(), 130 self.disk_nsectors, 131 desc_chain.memory(), 132 &self.disk_image_id, 133 ) { 134 Ok(l) => { 135 len = l; 136 VIRTIO_BLK_S_OK 137 } 138 Err(e) => { 139 len = 1; 140 e.status() 141 } 142 }; 143 desc_chain 144 .memory() 145 .write_obj(status, request.status_addr) 146 .unwrap(); 147 } 148 Err(err) => { 149 error!("failed to parse available descriptor chain: {:?}", err); 150 len = 0; 151 } 152 } 153 154 used_desc_heads.push((desc_chain.head_index(), len)); 155 } 156 157 let mut needs_signalling = false; 158 for (desc_head, len) in used_desc_heads.iter() { 159 if self.event_idx { 160 let queue = vring.get_queue_mut(); 161 if queue.add_used(*desc_head, *len).is_ok() { 162 if queue.needs_notification().unwrap() { 163 debug!("signalling queue"); 164 needs_signalling = true; 165 } else { 166 debug!("omitting signal (event_idx)"); 167 } 168 } 169 } else { 170 debug!("signalling queue"); 171 vring.get_queue_mut().add_used(*desc_head, *len).unwrap(); 172 needs_signalling = true; 173 } 174 } 175 176 if needs_signalling { 177 vring.signal_used_queue().unwrap(); 178 } 179 180 !used_desc_heads.is_empty() 181 } 182 } 183 184 struct VhostUserBlkBackend { 185 threads: Vec<Mutex<VhostUserBlkThread>>, 186 config: VirtioBlockConfig, 187 rdonly: bool, 188 poll_queue: bool, 189 queues_per_thread: Vec<u64>, 190 queue_size: usize, 191 acked_features: u64, 192 writeback: Arc<AtomicBool>, 193 } 194 195 impl VhostUserBlkBackend { 196 fn new( 197 image_path: String, 198 num_queues: usize, 199 rdonly: bool, 200 direct: bool, 201 poll_queue: bool, 202 queue_size: usize, 203 ) -> Result<Self> { 204 let mut options = OpenOptions::new(); 205 options.read(true); 206 options.write(!rdonly); 207 if direct { 208 options.custom_flags(libc::O_DIRECT); 209 } 210 let image: File = options.open(&image_path).unwrap(); 211 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct); 212 213 let image_id = build_disk_image_id(&PathBuf::from(&image_path)); 214 let image_type = qcow::detect_image_type(&mut raw_img).unwrap(); 215 let image = match image_type { 216 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>, 217 ImageType::Qcow2 => { 218 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>> 219 } 220 }; 221 222 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap() as u64) / SECTOR_SIZE; 223 let config = VirtioBlockConfig { 224 capacity: nsectors, 225 blk_size: BLK_SIZE, 226 size_max: 65535, 227 seg_max: 128 - 2, 228 min_io_size: 1, 229 opt_io_size: 1, 230 num_queues: num_queues as u16, 231 writeback: 1, 232 ..Default::default() 233 }; 234 235 let mut queues_per_thread = Vec::new(); 236 let mut threads = Vec::new(); 237 let writeback = Arc::new(AtomicBool::new(true)); 238 for i in 0..num_queues { 239 let thread = Mutex::new(VhostUserBlkThread::new( 240 image.clone(), 241 image_id.clone(), 242 nsectors, 243 writeback.clone(), 244 )?); 245 threads.push(thread); 246 queues_per_thread.push(0b1 << i); 247 } 248 249 Ok(VhostUserBlkBackend { 250 threads, 251 config, 252 rdonly, 253 poll_queue, 254 queues_per_thread, 255 queue_size, 256 acked_features: 0, 257 writeback, 258 }) 259 } 260 261 fn update_writeback(&mut self) { 262 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE 263 let writeback = 264 if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE { 265 self.config.writeback == 1 266 } else { 267 // Else check if VIRTIO_BLK_F_FLUSH negotiated 268 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH 269 }; 270 271 info!( 272 "Changing cache mode to {}", 273 if writeback { 274 "writeback" 275 } else { 276 "writethrough" 277 } 278 ); 279 self.writeback.store(writeback, Ordering::Release); 280 } 281 } 282 283 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap> 284 for VhostUserBlkBackend 285 { 286 fn num_queues(&self) -> usize { 287 self.config.num_queues as usize 288 } 289 290 fn max_queue_size(&self) -> usize { 291 self.queue_size as usize 292 } 293 294 fn features(&self) -> u64 { 295 let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX 296 | 1 << VIRTIO_BLK_F_BLK_SIZE 297 | 1 << VIRTIO_BLK_F_FLUSH 298 | 1 << VIRTIO_BLK_F_TOPOLOGY 299 | 1 << VIRTIO_BLK_F_MQ 300 | 1 << VIRTIO_BLK_F_CONFIG_WCE 301 | 1 << VIRTIO_RING_F_EVENT_IDX 302 | 1 << VIRTIO_F_VERSION_1 303 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits(); 304 305 if self.rdonly { 306 avail_features |= 1 << VIRTIO_BLK_F_RO; 307 } 308 avail_features 309 } 310 311 fn acked_features(&mut self, features: u64) { 312 self.acked_features = features; 313 self.update_writeback(); 314 } 315 316 fn protocol_features(&self) -> VhostUserProtocolFeatures { 317 VhostUserProtocolFeatures::CONFIG 318 | VhostUserProtocolFeatures::MQ 319 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS 320 } 321 322 fn set_event_idx(&mut self, enabled: bool) { 323 for thread in self.threads.iter() { 324 thread.lock().unwrap().event_idx = enabled; 325 } 326 } 327 328 fn handle_event( 329 &mut self, 330 device_event: u16, 331 evset: EventSet, 332 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], 333 thread_id: usize, 334 ) -> VhostUserBackendResult<bool> { 335 if evset != EventSet::IN { 336 return Err(Error::HandleEventNotEpollIn.into()); 337 } 338 339 debug!("event received: {:?}", device_event); 340 341 let mut thread = self.threads[thread_id].lock().unwrap(); 342 match device_event { 343 0 => { 344 let mut vring = vrings[0].get_mut(); 345 346 if self.poll_queue { 347 // Actively poll the queue until POLL_QUEUE_US has passed 348 // without seeing a new request. 349 let mut now = Instant::now(); 350 loop { 351 if thread.process_queue(&mut vring) { 352 now = Instant::now(); 353 } else if now.elapsed().as_micros() > POLL_QUEUE_US { 354 break; 355 } 356 } 357 } 358 359 if thread.event_idx { 360 // vm-virtio's Queue implementation only checks avail_index 361 // once, so to properly support EVENT_IDX we need to keep 362 // calling process_queue() until it stops finding new 363 // requests on the queue. 364 loop { 365 vring.get_queue_mut().enable_notification().unwrap(); 366 if !thread.process_queue(&mut vring) { 367 break; 368 } 369 } 370 } else { 371 // Without EVENT_IDX, a single call is enough. 372 thread.process_queue(&mut vring); 373 } 374 375 Ok(false) 376 } 377 _ => Err(Error::HandleEventUnknownEvent.into()), 378 } 379 } 380 381 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> { 382 self.config.as_slice().to_vec() 383 } 384 385 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> { 386 let config_slice = self.config.as_mut_slice(); 387 let data_len = data.len() as u32; 388 let config_len = config_slice.len() as u32; 389 if offset + data_len > config_len { 390 error!("Failed to write config space"); 391 return Err(io::Error::from_raw_os_error(libc::EINVAL)); 392 } 393 let (_, right) = config_slice.split_at_mut(offset as usize); 394 right.copy_from_slice(data); 395 self.update_writeback(); 396 Ok(()) 397 } 398 399 fn exit_event(&self, thread_index: usize) -> Option<EventFd> { 400 Some( 401 self.threads[thread_index] 402 .lock() 403 .unwrap() 404 .kill_evt 405 .try_clone() 406 .unwrap(), 407 ) 408 } 409 410 fn queues_per_thread(&self) -> Vec<u64> { 411 self.queues_per_thread.clone() 412 } 413 414 fn update_memory( 415 &mut self, 416 _mem: GuestMemoryAtomic<GuestMemoryMmap>, 417 ) -> VhostUserBackendResult<()> { 418 Ok(()) 419 } 420 } 421 422 struct VhostUserBlkBackendConfig { 423 path: String, 424 socket: String, 425 num_queues: usize, 426 queue_size: usize, 427 readonly: bool, 428 direct: bool, 429 poll_queue: bool, 430 } 431 432 impl VhostUserBlkBackendConfig { 433 fn parse(backend: &str) -> Result<Self> { 434 let mut parser = OptionParser::new(); 435 parser 436 .add("path") 437 .add("readonly") 438 .add("direct") 439 .add("num_queues") 440 .add("queue_size") 441 .add("socket") 442 .add("poll_queue"); 443 parser.parse(backend).map_err(Error::FailedConfigParse)?; 444 445 let path = parser.get("path").ok_or(Error::PathParameterMissing)?; 446 let readonly = parser 447 .convert::<Toggle>("readonly") 448 .map_err(Error::FailedConfigParse)? 449 .unwrap_or(Toggle(false)) 450 .0; 451 let direct = parser 452 .convert::<Toggle>("direct") 453 .map_err(Error::FailedConfigParse)? 454 .unwrap_or(Toggle(false)) 455 .0; 456 let num_queues = parser 457 .convert("num_queues") 458 .map_err(Error::FailedConfigParse)? 459 .unwrap_or(1); 460 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?; 461 let poll_queue = parser 462 .convert::<Toggle>("poll_queue") 463 .map_err(Error::FailedConfigParse)? 464 .unwrap_or(Toggle(true)) 465 .0; 466 let queue_size = parser 467 .convert("queue_size") 468 .map_err(Error::FailedConfigParse)? 469 .unwrap_or(1024); 470 471 Ok(VhostUserBlkBackendConfig { 472 path, 473 socket, 474 num_queues, 475 queue_size, 476 readonly, 477 direct, 478 poll_queue, 479 }) 480 } 481 } 482 483 pub fn start_block_backend(backend_command: &str) { 484 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) { 485 Ok(config) => config, 486 Err(e) => { 487 println!("Failed parsing parameters {:?}", e); 488 process::exit(1); 489 } 490 }; 491 492 let blk_backend = Arc::new(RwLock::new( 493 VhostUserBlkBackend::new( 494 backend_config.path, 495 backend_config.num_queues, 496 backend_config.readonly, 497 backend_config.direct, 498 backend_config.poll_queue, 499 backend_config.queue_size, 500 ) 501 .unwrap(), 502 )); 503 504 debug!("blk_backend is created!\n"); 505 506 let listener = Listener::new(&backend_config.socket, true).unwrap(); 507 508 let name = "vhost-user-blk-backend"; 509 let mut blk_daemon = VhostUserDaemon::new( 510 name.to_string(), 511 blk_backend.clone(), 512 GuestMemoryAtomic::new(GuestMemoryMmap::new()), 513 ) 514 .unwrap(); 515 516 debug!("blk_daemon is created!\n"); 517 518 if let Err(e) = blk_daemon.start(listener) { 519 error!( 520 "Failed to start daemon for vhost-user-block with error: {:?}\n", 521 e 522 ); 523 process::exit(1); 524 } 525 526 if let Err(e) = blk_daemon.wait() { 527 error!("Error from the main thread: {:?}", e); 528 } 529 530 for thread in blk_backend.read().unwrap().threads.iter() { 531 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) { 532 error!("Error shutting down worker thread: {:?}", e) 533 } 534 } 535 } 536