xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision 7d7bfb2034001d4cb15df2ddc56d2d350c8da30f)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 use block_util::{build_disk_image_id, Request, VirtioBlockConfig};
12 use libc::EFD_NONBLOCK;
13 use log::*;
14 use option_parser::{OptionParser, OptionParserError, Toggle};
15 use qcow::{self, ImageType, QcowFile};
16 use std::fs::File;
17 use std::fs::OpenOptions;
18 use std::io::Read;
19 use std::io::{Seek, SeekFrom, Write};
20 use std::ops::DerefMut;
21 use std::os::unix::fs::OpenOptionsExt;
22 use std::path::PathBuf;
23 use std::process;
24 use std::result;
25 use std::sync::atomic::{AtomicBool, Ordering};
26 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
27 use std::time::Instant;
28 use std::vec::Vec;
29 use std::{convert, error, fmt, io};
30 use vhost::vhost_user::message::*;
31 use vhost::vhost_user::Listener;
32 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
33 use virtio_bindings::bindings::virtio_blk::*;
34 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
35 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic};
36 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
37 
38 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>;
39 
40 const SECTOR_SHIFT: u8 = 9;
41 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
42 const BLK_SIZE: u32 = 512;
43 // Current (2020) enterprise SSDs have a latency lower than 30us.
44 // Polling for 50us should be enough to cover for the device latency
45 // and the overhead of the emulation layer.
46 const POLL_QUEUE_US: u128 = 50;
47 
48 trait DiskFile: Read + Seek + Write + Send + Sync {}
49 impl<D: Read + Seek + Write + Send + Sync> DiskFile for D {}
50 
51 type Result<T> = std::result::Result<T, Error>;
52 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
53 
54 #[derive(Debug)]
55 enum Error {
56     /// Failed to create kill eventfd
57     CreateKillEventFd(io::Error),
58     /// Failed to parse configuration string
59     FailedConfigParse(OptionParserError),
60     /// Failed to handle event other than input event.
61     HandleEventNotEpollIn,
62     /// Failed to handle unknown event.
63     HandleEventUnknownEvent,
64     /// No path provided
65     PathParameterMissing,
66     /// No socket provided
67     SocketParameterMissing,
68 }
69 
70 pub const SYNTAX: &str = "vhost-user-block backend parameters \
71  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
72  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
73  poll_queue=true|false\"";
74 
75 impl fmt::Display for Error {
76     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
77         write!(f, "vhost_user_block_error: {:?}", self)
78     }
79 }
80 
81 impl error::Error for Error {}
82 
83 impl convert::From<Error> for io::Error {
84     fn from(e: Error) -> Self {
85         io::Error::new(io::ErrorKind::Other, e)
86     }
87 }
88 
89 struct VhostUserBlkThread {
90     disk_image: Arc<Mutex<dyn DiskFile>>,
91     disk_image_id: Vec<u8>,
92     disk_nsectors: u64,
93     event_idx: bool,
94     kill_evt: EventFd,
95     writeback: Arc<AtomicBool>,
96 }
97 
98 impl VhostUserBlkThread {
99     fn new(
100         disk_image: Arc<Mutex<dyn DiskFile>>,
101         disk_image_id: Vec<u8>,
102         disk_nsectors: u64,
103         writeback: Arc<AtomicBool>,
104     ) -> Result<Self> {
105         Ok(VhostUserBlkThread {
106             disk_image,
107             disk_image_id,
108             disk_nsectors,
109             event_idx: false,
110             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
111             writeback,
112         })
113     }
114 
115     fn process_queue(
116         &mut self,
117         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
118     ) -> bool {
119         let mut used_desc_heads = Vec::new();
120 
121         for mut desc_chain in vring.get_queue_mut().iter().unwrap() {
122             debug!("got an element in the queue");
123             let len;
124             match Request::parse(&mut desc_chain, None) {
125                 Ok(mut request) => {
126                     debug!("element is a valid request");
127                     request.set_writeback(self.writeback.load(Ordering::Acquire));
128                     let status = match request.execute(
129                         &mut self.disk_image.lock().unwrap().deref_mut(),
130                         self.disk_nsectors,
131                         desc_chain.memory(),
132                         &self.disk_image_id,
133                     ) {
134                         Ok(l) => {
135                             len = l;
136                             VIRTIO_BLK_S_OK
137                         }
138                         Err(e) => {
139                             len = 1;
140                             e.status()
141                         }
142                     };
143                     desc_chain
144                         .memory()
145                         .write_obj(status, request.status_addr)
146                         .unwrap();
147                 }
148                 Err(err) => {
149                     error!("failed to parse available descriptor chain: {:?}", err);
150                     len = 0;
151                 }
152             }
153 
154             used_desc_heads.push((desc_chain.head_index(), len));
155         }
156 
157         let mut needs_signalling = false;
158         for (desc_head, len) in used_desc_heads.iter() {
159             if self.event_idx {
160                 let queue = vring.get_queue_mut();
161                 if queue.add_used(*desc_head, *len).is_ok() {
162                     if queue.needs_notification().unwrap() {
163                         debug!("signalling queue");
164                         needs_signalling = true;
165                     } else {
166                         debug!("omitting signal (event_idx)");
167                     }
168                 }
169             } else {
170                 debug!("signalling queue");
171                 vring.get_queue_mut().add_used(*desc_head, *len).unwrap();
172                 needs_signalling = true;
173             }
174         }
175 
176         if needs_signalling {
177             vring.signal_used_queue().unwrap();
178         }
179 
180         !used_desc_heads.is_empty()
181     }
182 }
183 
184 struct VhostUserBlkBackend {
185     threads: Vec<Mutex<VhostUserBlkThread>>,
186     config: VirtioBlockConfig,
187     rdonly: bool,
188     poll_queue: bool,
189     queues_per_thread: Vec<u64>,
190     queue_size: usize,
191     acked_features: u64,
192     writeback: Arc<AtomicBool>,
193 }
194 
195 impl VhostUserBlkBackend {
196     fn new(
197         image_path: String,
198         num_queues: usize,
199         rdonly: bool,
200         direct: bool,
201         poll_queue: bool,
202         queue_size: usize,
203     ) -> Result<Self> {
204         let mut options = OpenOptions::new();
205         options.read(true);
206         options.write(!rdonly);
207         if direct {
208             options.custom_flags(libc::O_DIRECT);
209         }
210         let image: File = options.open(&image_path).unwrap();
211         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
212 
213         let image_id = build_disk_image_id(&PathBuf::from(&image_path));
214         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
215         let image = match image_type {
216             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
217             ImageType::Qcow2 => {
218                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
219             }
220         };
221 
222         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap() as u64) / SECTOR_SIZE;
223         let config = VirtioBlockConfig {
224             capacity: nsectors,
225             blk_size: BLK_SIZE,
226             size_max: 65535,
227             seg_max: 128 - 2,
228             min_io_size: 1,
229             opt_io_size: 1,
230             num_queues: num_queues as u16,
231             writeback: 1,
232             ..Default::default()
233         };
234 
235         let mut queues_per_thread = Vec::new();
236         let mut threads = Vec::new();
237         let writeback = Arc::new(AtomicBool::new(true));
238         for i in 0..num_queues {
239             let thread = Mutex::new(VhostUserBlkThread::new(
240                 image.clone(),
241                 image_id.clone(),
242                 nsectors,
243                 writeback.clone(),
244             )?);
245             threads.push(thread);
246             queues_per_thread.push(0b1 << i);
247         }
248 
249         Ok(VhostUserBlkBackend {
250             threads,
251             config,
252             rdonly,
253             poll_queue,
254             queues_per_thread,
255             queue_size,
256             acked_features: 0,
257             writeback,
258         })
259     }
260 
261     fn update_writeback(&mut self) {
262         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
263         let writeback =
264             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
265                 self.config.writeback == 1
266             } else {
267                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
268                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
269             };
270 
271         info!(
272             "Changing cache mode to {}",
273             if writeback {
274                 "writeback"
275             } else {
276                 "writethrough"
277             }
278         );
279         self.writeback.store(writeback, Ordering::Release);
280     }
281 }
282 
283 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap>
284     for VhostUserBlkBackend
285 {
286     fn num_queues(&self) -> usize {
287         self.config.num_queues as usize
288     }
289 
290     fn max_queue_size(&self) -> usize {
291         self.queue_size as usize
292     }
293 
294     fn features(&self) -> u64 {
295         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
296             | 1 << VIRTIO_BLK_F_BLK_SIZE
297             | 1 << VIRTIO_BLK_F_FLUSH
298             | 1 << VIRTIO_BLK_F_TOPOLOGY
299             | 1 << VIRTIO_BLK_F_MQ
300             | 1 << VIRTIO_BLK_F_CONFIG_WCE
301             | 1 << VIRTIO_RING_F_EVENT_IDX
302             | 1 << VIRTIO_F_VERSION_1
303             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
304 
305         if self.rdonly {
306             avail_features |= 1 << VIRTIO_BLK_F_RO;
307         }
308         avail_features
309     }
310 
311     fn acked_features(&mut self, features: u64) {
312         self.acked_features = features;
313         self.update_writeback();
314     }
315 
316     fn protocol_features(&self) -> VhostUserProtocolFeatures {
317         VhostUserProtocolFeatures::CONFIG
318             | VhostUserProtocolFeatures::MQ
319             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
320     }
321 
322     fn set_event_idx(&mut self, enabled: bool) {
323         for thread in self.threads.iter() {
324             thread.lock().unwrap().event_idx = enabled;
325         }
326     }
327 
328     fn handle_event(
329         &mut self,
330         device_event: u16,
331         evset: EventSet,
332         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
333         thread_id: usize,
334     ) -> VhostUserBackendResult<bool> {
335         if evset != EventSet::IN {
336             return Err(Error::HandleEventNotEpollIn.into());
337         }
338 
339         debug!("event received: {:?}", device_event);
340 
341         let mut thread = self.threads[thread_id].lock().unwrap();
342         match device_event {
343             0 => {
344                 let mut vring = vrings[0].get_mut();
345 
346                 if self.poll_queue {
347                     // Actively poll the queue until POLL_QUEUE_US has passed
348                     // without seeing a new request.
349                     let mut now = Instant::now();
350                     loop {
351                         if thread.process_queue(&mut vring) {
352                             now = Instant::now();
353                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
354                             break;
355                         }
356                     }
357                 }
358 
359                 if thread.event_idx {
360                     // vm-virtio's Queue implementation only checks avail_index
361                     // once, so to properly support EVENT_IDX we need to keep
362                     // calling process_queue() until it stops finding new
363                     // requests on the queue.
364                     loop {
365                         vring.get_queue_mut().enable_notification().unwrap();
366                         if !thread.process_queue(&mut vring) {
367                             break;
368                         }
369                     }
370                 } else {
371                     // Without EVENT_IDX, a single call is enough.
372                     thread.process_queue(&mut vring);
373                 }
374 
375                 Ok(false)
376             }
377             _ => Err(Error::HandleEventUnknownEvent.into()),
378         }
379     }
380 
381     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
382         self.config.as_slice().to_vec()
383     }
384 
385     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
386         let config_slice = self.config.as_mut_slice();
387         let data_len = data.len() as u32;
388         let config_len = config_slice.len() as u32;
389         if offset + data_len > config_len {
390             error!("Failed to write config space");
391             return Err(io::Error::from_raw_os_error(libc::EINVAL));
392         }
393         let (_, right) = config_slice.split_at_mut(offset as usize);
394         right.copy_from_slice(data);
395         self.update_writeback();
396         Ok(())
397     }
398 
399     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
400         Some(
401             self.threads[thread_index]
402                 .lock()
403                 .unwrap()
404                 .kill_evt
405                 .try_clone()
406                 .unwrap(),
407         )
408     }
409 
410     fn queues_per_thread(&self) -> Vec<u64> {
411         self.queues_per_thread.clone()
412     }
413 
414     fn update_memory(
415         &mut self,
416         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
417     ) -> VhostUserBackendResult<()> {
418         Ok(())
419     }
420 }
421 
422 struct VhostUserBlkBackendConfig {
423     path: String,
424     socket: String,
425     num_queues: usize,
426     queue_size: usize,
427     readonly: bool,
428     direct: bool,
429     poll_queue: bool,
430 }
431 
432 impl VhostUserBlkBackendConfig {
433     fn parse(backend: &str) -> Result<Self> {
434         let mut parser = OptionParser::new();
435         parser
436             .add("path")
437             .add("readonly")
438             .add("direct")
439             .add("num_queues")
440             .add("queue_size")
441             .add("socket")
442             .add("poll_queue");
443         parser.parse(backend).map_err(Error::FailedConfigParse)?;
444 
445         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
446         let readonly = parser
447             .convert::<Toggle>("readonly")
448             .map_err(Error::FailedConfigParse)?
449             .unwrap_or(Toggle(false))
450             .0;
451         let direct = parser
452             .convert::<Toggle>("direct")
453             .map_err(Error::FailedConfigParse)?
454             .unwrap_or(Toggle(false))
455             .0;
456         let num_queues = parser
457             .convert("num_queues")
458             .map_err(Error::FailedConfigParse)?
459             .unwrap_or(1);
460         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
461         let poll_queue = parser
462             .convert::<Toggle>("poll_queue")
463             .map_err(Error::FailedConfigParse)?
464             .unwrap_or(Toggle(true))
465             .0;
466         let queue_size = parser
467             .convert("queue_size")
468             .map_err(Error::FailedConfigParse)?
469             .unwrap_or(1024);
470 
471         Ok(VhostUserBlkBackendConfig {
472             path,
473             socket,
474             num_queues,
475             queue_size,
476             readonly,
477             direct,
478             poll_queue,
479         })
480     }
481 }
482 
483 pub fn start_block_backend(backend_command: &str) {
484     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
485         Ok(config) => config,
486         Err(e) => {
487             println!("Failed parsing parameters {:?}", e);
488             process::exit(1);
489         }
490     };
491 
492     let blk_backend = Arc::new(RwLock::new(
493         VhostUserBlkBackend::new(
494             backend_config.path,
495             backend_config.num_queues,
496             backend_config.readonly,
497             backend_config.direct,
498             backend_config.poll_queue,
499             backend_config.queue_size,
500         )
501         .unwrap(),
502     ));
503 
504     debug!("blk_backend is created!\n");
505 
506     let listener = Listener::new(&backend_config.socket, true).unwrap();
507 
508     let name = "vhost-user-blk-backend";
509     let mut blk_daemon = VhostUserDaemon::new(
510         name.to_string(),
511         blk_backend.clone(),
512         GuestMemoryAtomic::new(GuestMemoryMmap::new()),
513     )
514     .unwrap();
515 
516     debug!("blk_daemon is created!\n");
517 
518     if let Err(e) = blk_daemon.start(listener) {
519         error!(
520             "Failed to start daemon for vhost-user-block with error: {:?}\n",
521             e
522         );
523         process::exit(1);
524     }
525 
526     if let Err(e) = blk_daemon.wait() {
527         error!("Error from the main thread: {:?}", e);
528     }
529 
530     for thread in blk_backend.read().unwrap().threads.iter() {
531         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
532             error!("Error shutting down worker thread: {:?}", e)
533         }
534     }
535 }
536