xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision b440cb7d2330770cd415b63544a371d4caa2db3a)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 #![allow(clippy::significant_drop_in_scrutinee)]
12 
13 use block_util::{build_disk_image_id, Request, VirtioBlockConfig};
14 use libc::EFD_NONBLOCK;
15 use log::*;
16 use option_parser::{OptionParser, OptionParserError, Toggle};
17 use qcow::{self, ImageType, QcowFile};
18 use std::fs::File;
19 use std::fs::OpenOptions;
20 use std::io::Read;
21 use std::io::{Seek, SeekFrom, Write};
22 use std::ops::DerefMut;
23 use std::os::unix::fs::OpenOptionsExt;
24 use std::path::PathBuf;
25 use std::process;
26 use std::result;
27 use std::sync::atomic::{AtomicBool, Ordering};
28 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
29 use std::time::Instant;
30 use std::vec::Vec;
31 use std::{convert, error, fmt, io};
32 use vhost::vhost_user::message::*;
33 use vhost::vhost_user::Listener;
34 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
35 use virtio_bindings::bindings::virtio_blk::*;
36 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
37 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic};
38 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
39 
40 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>;
41 
42 const SECTOR_SHIFT: u8 = 9;
43 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
44 const BLK_SIZE: u32 = 512;
45 // Current (2020) enterprise SSDs have a latency lower than 30us.
46 // Polling for 50us should be enough to cover for the device latency
47 // and the overhead of the emulation layer.
48 const POLL_QUEUE_US: u128 = 50;
49 
50 trait DiskFile: Read + Seek + Write + Send {}
51 impl<D: Read + Seek + Write + Send> DiskFile for D {}
52 
53 type Result<T> = std::result::Result<T, Error>;
54 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
55 
56 #[derive(Debug)]
57 enum Error {
58     /// Failed to create kill eventfd
59     CreateKillEventFd(io::Error),
60     /// Failed to parse configuration string
61     FailedConfigParse(OptionParserError),
62     /// Failed to handle event other than input event.
63     HandleEventNotEpollIn,
64     /// Failed to handle unknown event.
65     HandleEventUnknownEvent,
66     /// No path provided
67     PathParameterMissing,
68     /// No socket provided
69     SocketParameterMissing,
70 }
71 
72 pub const SYNTAX: &str = "vhost-user-block backend parameters \
73  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
74  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
75  poll_queue=true|false\"";
76 
77 impl fmt::Display for Error {
78     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
79         write!(f, "vhost_user_block_error: {:?}", self)
80     }
81 }
82 
83 impl error::Error for Error {}
84 
85 impl convert::From<Error> for io::Error {
86     fn from(e: Error) -> Self {
87         io::Error::new(io::ErrorKind::Other, e)
88     }
89 }
90 
91 struct VhostUserBlkThread {
92     disk_image: Arc<Mutex<dyn DiskFile>>,
93     disk_image_id: Vec<u8>,
94     disk_nsectors: u64,
95     event_idx: bool,
96     kill_evt: EventFd,
97     writeback: Arc<AtomicBool>,
98 }
99 
100 impl VhostUserBlkThread {
101     fn new(
102         disk_image: Arc<Mutex<dyn DiskFile>>,
103         disk_image_id: Vec<u8>,
104         disk_nsectors: u64,
105         writeback: Arc<AtomicBool>,
106     ) -> Result<Self> {
107         Ok(VhostUserBlkThread {
108             disk_image,
109             disk_image_id,
110             disk_nsectors,
111             event_idx: false,
112             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
113             writeback,
114         })
115     }
116 
117     fn process_queue(
118         &mut self,
119         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
120     ) -> bool {
121         let mut used_desc_heads = Vec::new();
122 
123         for mut desc_chain in vring.get_queue_mut().iter().unwrap() {
124             debug!("got an element in the queue");
125             let len;
126             match Request::parse(&mut desc_chain, None) {
127                 Ok(mut request) => {
128                     debug!("element is a valid request");
129                     request.set_writeback(self.writeback.load(Ordering::Acquire));
130                     let status = match request.execute(
131                         &mut self.disk_image.lock().unwrap().deref_mut(),
132                         self.disk_nsectors,
133                         desc_chain.memory(),
134                         &self.disk_image_id,
135                     ) {
136                         Ok(l) => {
137                             len = l;
138                             VIRTIO_BLK_S_OK
139                         }
140                         Err(e) => {
141                             len = 1;
142                             e.status()
143                         }
144                     };
145                     desc_chain
146                         .memory()
147                         .write_obj(status, request.status_addr)
148                         .unwrap();
149                 }
150                 Err(err) => {
151                     error!("failed to parse available descriptor chain: {:?}", err);
152                     len = 0;
153                 }
154             }
155 
156             used_desc_heads.push((desc_chain.head_index(), len));
157         }
158 
159         let mut needs_signalling = false;
160         for (desc_head, len) in used_desc_heads.iter() {
161             if self.event_idx {
162                 let queue = vring.get_queue_mut();
163                 if queue.add_used(*desc_head, *len).is_ok() {
164                     if queue.needs_notification().unwrap() {
165                         debug!("signalling queue");
166                         needs_signalling = true;
167                     } else {
168                         debug!("omitting signal (event_idx)");
169                     }
170                 }
171             } else {
172                 debug!("signalling queue");
173                 vring.get_queue_mut().add_used(*desc_head, *len).unwrap();
174                 needs_signalling = true;
175             }
176         }
177 
178         if needs_signalling {
179             vring.signal_used_queue().unwrap();
180         }
181 
182         !used_desc_heads.is_empty()
183     }
184 }
185 
186 struct VhostUserBlkBackend {
187     threads: Vec<Mutex<VhostUserBlkThread>>,
188     config: VirtioBlockConfig,
189     rdonly: bool,
190     poll_queue: bool,
191     queues_per_thread: Vec<u64>,
192     queue_size: usize,
193     acked_features: u64,
194     writeback: Arc<AtomicBool>,
195 }
196 
197 impl VhostUserBlkBackend {
198     fn new(
199         image_path: String,
200         num_queues: usize,
201         rdonly: bool,
202         direct: bool,
203         poll_queue: bool,
204         queue_size: usize,
205     ) -> Result<Self> {
206         let mut options = OpenOptions::new();
207         options.read(true);
208         options.write(!rdonly);
209         if direct {
210             options.custom_flags(libc::O_DIRECT);
211         }
212         let image: File = options.open(&image_path).unwrap();
213         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
214 
215         let image_id = build_disk_image_id(&PathBuf::from(&image_path));
216         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
217         let image = match image_type {
218             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
219             ImageType::Qcow2 => {
220                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
221             }
222         };
223 
224         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap() as u64) / SECTOR_SIZE;
225         let config = VirtioBlockConfig {
226             capacity: nsectors,
227             blk_size: BLK_SIZE,
228             size_max: 65535,
229             seg_max: 128 - 2,
230             min_io_size: 1,
231             opt_io_size: 1,
232             num_queues: num_queues as u16,
233             writeback: 1,
234             ..Default::default()
235         };
236 
237         let mut queues_per_thread = Vec::new();
238         let mut threads = Vec::new();
239         let writeback = Arc::new(AtomicBool::new(true));
240         for i in 0..num_queues {
241             let thread = Mutex::new(VhostUserBlkThread::new(
242                 image.clone(),
243                 image_id.clone(),
244                 nsectors,
245                 writeback.clone(),
246             )?);
247             threads.push(thread);
248             queues_per_thread.push(0b1 << i);
249         }
250 
251         Ok(VhostUserBlkBackend {
252             threads,
253             config,
254             rdonly,
255             poll_queue,
256             queues_per_thread,
257             queue_size,
258             acked_features: 0,
259             writeback,
260         })
261     }
262 
263     fn update_writeback(&mut self) {
264         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
265         let writeback =
266             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
267                 self.config.writeback == 1
268             } else {
269                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
270                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
271             };
272 
273         info!(
274             "Changing cache mode to {}",
275             if writeback {
276                 "writeback"
277             } else {
278                 "writethrough"
279             }
280         );
281         self.writeback.store(writeback, Ordering::Release);
282     }
283 }
284 
285 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap>
286     for VhostUserBlkBackend
287 {
288     fn num_queues(&self) -> usize {
289         self.config.num_queues as usize
290     }
291 
292     fn max_queue_size(&self) -> usize {
293         self.queue_size as usize
294     }
295 
296     fn features(&self) -> u64 {
297         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
298             | 1 << VIRTIO_BLK_F_BLK_SIZE
299             | 1 << VIRTIO_BLK_F_FLUSH
300             | 1 << VIRTIO_BLK_F_TOPOLOGY
301             | 1 << VIRTIO_BLK_F_MQ
302             | 1 << VIRTIO_BLK_F_CONFIG_WCE
303             | 1 << VIRTIO_RING_F_EVENT_IDX
304             | 1 << VIRTIO_F_VERSION_1
305             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
306 
307         if self.rdonly {
308             avail_features |= 1 << VIRTIO_BLK_F_RO;
309         }
310         avail_features
311     }
312 
313     fn acked_features(&mut self, features: u64) {
314         self.acked_features = features;
315         self.update_writeback();
316     }
317 
318     fn protocol_features(&self) -> VhostUserProtocolFeatures {
319         VhostUserProtocolFeatures::CONFIG
320             | VhostUserProtocolFeatures::MQ
321             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
322     }
323 
324     fn set_event_idx(&mut self, enabled: bool) {
325         for thread in self.threads.iter() {
326             thread.lock().unwrap().event_idx = enabled;
327         }
328     }
329 
330     fn handle_event(
331         &mut self,
332         device_event: u16,
333         evset: EventSet,
334         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
335         thread_id: usize,
336     ) -> VhostUserBackendResult<bool> {
337         if evset != EventSet::IN {
338             return Err(Error::HandleEventNotEpollIn.into());
339         }
340 
341         debug!("event received: {:?}", device_event);
342 
343         let mut thread = self.threads[thread_id].lock().unwrap();
344         match device_event {
345             0 => {
346                 let mut vring = vrings[0].get_mut();
347 
348                 if self.poll_queue {
349                     // Actively poll the queue until POLL_QUEUE_US has passed
350                     // without seeing a new request.
351                     let mut now = Instant::now();
352                     loop {
353                         if thread.process_queue(&mut vring) {
354                             now = Instant::now();
355                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
356                             break;
357                         }
358                     }
359                 }
360 
361                 if thread.event_idx {
362                     // vm-virtio's Queue implementation only checks avail_index
363                     // once, so to properly support EVENT_IDX we need to keep
364                     // calling process_queue() until it stops finding new
365                     // requests on the queue.
366                     loop {
367                         vring.get_queue_mut().enable_notification().unwrap();
368                         if !thread.process_queue(&mut vring) {
369                             break;
370                         }
371                     }
372                 } else {
373                     // Without EVENT_IDX, a single call is enough.
374                     thread.process_queue(&mut vring);
375                 }
376 
377                 Ok(false)
378             }
379             _ => Err(Error::HandleEventUnknownEvent.into()),
380         }
381     }
382 
383     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
384         self.config.as_slice().to_vec()
385     }
386 
387     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
388         let config_slice = self.config.as_mut_slice();
389         let data_len = data.len() as u32;
390         let config_len = config_slice.len() as u32;
391         if offset + data_len > config_len {
392             error!("Failed to write config space");
393             return Err(io::Error::from_raw_os_error(libc::EINVAL));
394         }
395         let (_, right) = config_slice.split_at_mut(offset as usize);
396         right.copy_from_slice(data);
397         self.update_writeback();
398         Ok(())
399     }
400 
401     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
402         Some(
403             self.threads[thread_index]
404                 .lock()
405                 .unwrap()
406                 .kill_evt
407                 .try_clone()
408                 .unwrap(),
409         )
410     }
411 
412     fn queues_per_thread(&self) -> Vec<u64> {
413         self.queues_per_thread.clone()
414     }
415 
416     fn update_memory(
417         &mut self,
418         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
419     ) -> VhostUserBackendResult<()> {
420         Ok(())
421     }
422 }
423 
424 struct VhostUserBlkBackendConfig {
425     path: String,
426     socket: String,
427     num_queues: usize,
428     queue_size: usize,
429     readonly: bool,
430     direct: bool,
431     poll_queue: bool,
432 }
433 
434 impl VhostUserBlkBackendConfig {
435     fn parse(backend: &str) -> Result<Self> {
436         let mut parser = OptionParser::new();
437         parser
438             .add("path")
439             .add("readonly")
440             .add("direct")
441             .add("num_queues")
442             .add("queue_size")
443             .add("socket")
444             .add("poll_queue");
445         parser.parse(backend).map_err(Error::FailedConfigParse)?;
446 
447         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
448         let readonly = parser
449             .convert::<Toggle>("readonly")
450             .map_err(Error::FailedConfigParse)?
451             .unwrap_or(Toggle(false))
452             .0;
453         let direct = parser
454             .convert::<Toggle>("direct")
455             .map_err(Error::FailedConfigParse)?
456             .unwrap_or(Toggle(false))
457             .0;
458         let num_queues = parser
459             .convert("num_queues")
460             .map_err(Error::FailedConfigParse)?
461             .unwrap_or(1);
462         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
463         let poll_queue = parser
464             .convert::<Toggle>("poll_queue")
465             .map_err(Error::FailedConfigParse)?
466             .unwrap_or(Toggle(true))
467             .0;
468         let queue_size = parser
469             .convert("queue_size")
470             .map_err(Error::FailedConfigParse)?
471             .unwrap_or(1024);
472 
473         Ok(VhostUserBlkBackendConfig {
474             path,
475             socket,
476             num_queues,
477             queue_size,
478             readonly,
479             direct,
480             poll_queue,
481         })
482     }
483 }
484 
485 pub fn start_block_backend(backend_command: &str) {
486     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
487         Ok(config) => config,
488         Err(e) => {
489             println!("Failed parsing parameters {:?}", e);
490             process::exit(1);
491         }
492     };
493 
494     let blk_backend = Arc::new(RwLock::new(
495         VhostUserBlkBackend::new(
496             backend_config.path,
497             backend_config.num_queues,
498             backend_config.readonly,
499             backend_config.direct,
500             backend_config.poll_queue,
501             backend_config.queue_size,
502         )
503         .unwrap(),
504     ));
505 
506     debug!("blk_backend is created!\n");
507 
508     let listener = Listener::new(&backend_config.socket, true).unwrap();
509 
510     let name = "vhost-user-blk-backend";
511     let mut blk_daemon = VhostUserDaemon::new(
512         name.to_string(),
513         blk_backend.clone(),
514         GuestMemoryAtomic::new(GuestMemoryMmap::new()),
515     )
516     .unwrap();
517 
518     debug!("blk_daemon is created!\n");
519 
520     if let Err(e) = blk_daemon.start(listener) {
521         error!(
522             "Failed to start daemon for vhost-user-block with error: {:?}\n",
523             e
524         );
525         process::exit(1);
526     }
527 
528     if let Err(e) = blk_daemon.wait() {
529         error!("Error from the main thread: {:?}", e);
530     }
531 
532     for thread in blk_backend.read().unwrap().threads.iter() {
533         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
534             error!("Error shutting down worker thread: {:?}", e)
535         }
536     }
537 }
538