xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision eb0b14f70ed5ed44b76579145fd2a741c0100ae4)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 use std::fs::{File, OpenOptions};
12 use std::io::{Read, Seek, SeekFrom, Write};
13 use std::ops::{Deref, DerefMut};
14 use std::os::unix::fs::OpenOptionsExt;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
18 use std::time::Instant;
19 use std::{convert, io, process, result};
20 
21 use block::qcow::{self, ImageType, QcowFile};
22 use block::{build_serial, Request, VirtioBlockConfig};
23 use libc::EFD_NONBLOCK;
24 use log::*;
25 use option_parser::{OptionParser, OptionParserError, Toggle};
26 use thiserror::Error;
27 use vhost::vhost_user::message::*;
28 use vhost::vhost_user::Listener;
29 use vhost_user_backend::bitmap::BitmapMmapRegion;
30 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
31 use virtio_bindings::virtio_blk::*;
32 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1;
33 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
34 use virtio_queue::QueueT;
35 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic};
36 use vmm_sys_util::epoll::EventSet;
37 use vmm_sys_util::eventfd::EventFd;
38 
39 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>;
40 
41 const SECTOR_SHIFT: u8 = 9;
42 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
43 const BLK_SIZE: u32 = 512;
44 // Current (2020) enterprise SSDs have a latency lower than 30us.
45 // Polling for 50us should be enough to cover for the device latency
46 // and the overhead of the emulation layer.
47 const POLL_QUEUE_US: u128 = 50;
48 
49 trait DiskFile: Read + Seek + Write + Send {}
50 impl<D: Read + Seek + Write + Send> DiskFile for D {}
51 
52 type Result<T> = std::result::Result<T, Error>;
53 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
54 
55 #[allow(dead_code)]
56 #[derive(Error, Debug)]
57 enum Error {
58     /// Failed to create kill eventfd
59     #[error("Failed to create kill eventfd: {0}")]
60     CreateKillEventFd(#[source] io::Error),
61     /// Failed to parse configuration string
62     #[error("Failed to parse configuration string: {0}")]
63     FailedConfigParse(#[source] OptionParserError),
64     /// Failed to handle event other than input event.
65     #[error("Failed to handle event other than input event")]
66     HandleEventNotEpollIn,
67     /// Failed to handle unknown event.
68     #[error("Failed to handle unknown event")]
69     HandleEventUnknownEvent,
70     /// No path provided
71     #[error("No path provided")]
72     PathParameterMissing,
73     /// No socket provided
74     #[error("No socket provided")]
75     SocketParameterMissing,
76 }
77 
78 pub const SYNTAX: &str = "vhost-user-block backend parameters \
79  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
80  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
81  poll_queue=true|false\"";
82 
83 impl convert::From<Error> for io::Error {
84     fn from(e: Error) -> Self {
85         io::Error::other(e)
86     }
87 }
88 
89 struct VhostUserBlkThread {
90     disk_image: Arc<Mutex<dyn DiskFile>>,
91     serial: Vec<u8>,
92     disk_nsectors: u64,
93     event_idx: bool,
94     kill_evt: EventFd,
95     writeback: Arc<AtomicBool>,
96     mem: GuestMemoryAtomic<GuestMemoryMmap>,
97 }
98 
99 impl VhostUserBlkThread {
100     fn new(
101         disk_image: Arc<Mutex<dyn DiskFile>>,
102         serial: Vec<u8>,
103         disk_nsectors: u64,
104         writeback: Arc<AtomicBool>,
105         mem: GuestMemoryAtomic<GuestMemoryMmap>,
106     ) -> Result<Self> {
107         Ok(VhostUserBlkThread {
108             disk_image,
109             serial,
110             disk_nsectors,
111             event_idx: false,
112             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
113             writeback,
114             mem,
115         })
116     }
117 
118     fn process_queue(
119         &mut self,
120         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
121     ) -> bool {
122         let mut used_descs = false;
123 
124         while let Some(mut desc_chain) = vring
125             .get_queue_mut()
126             .pop_descriptor_chain(self.mem.memory())
127         {
128             debug!("got an element in the queue");
129             let len;
130             match Request::parse(&mut desc_chain, None) {
131                 Ok(mut request) => {
132                     debug!("element is a valid request");
133                     request.set_writeback(self.writeback.load(Ordering::Acquire));
134                     let status = match request.execute(
135                         &mut self.disk_image.lock().unwrap().deref_mut(),
136                         self.disk_nsectors,
137                         desc_chain.memory(),
138                         &self.serial,
139                     ) {
140                         Ok(l) => {
141                             len = l;
142                             VIRTIO_BLK_S_OK as u8
143                         }
144                         Err(e) => {
145                             len = 1;
146                             e.status()
147                         }
148                     };
149                     desc_chain
150                         .memory()
151                         .write_obj(status, request.status_addr)
152                         .unwrap();
153                 }
154                 Err(err) => {
155                     error!("failed to parse available descriptor chain: {:?}", err);
156                     len = 0;
157                 }
158             }
159 
160             vring
161                 .get_queue_mut()
162                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
163                 .unwrap();
164             used_descs = true;
165         }
166 
167         let mut needs_signalling = false;
168         if self.event_idx {
169             if vring
170                 .get_queue_mut()
171                 .needs_notification(self.mem.memory().deref())
172                 .unwrap()
173             {
174                 debug!("signalling queue");
175                 needs_signalling = true;
176             } else {
177                 debug!("omitting signal (event_idx)");
178             }
179         } else {
180             debug!("signalling queue");
181             needs_signalling = true;
182         }
183 
184         if needs_signalling {
185             vring.signal_used_queue().unwrap();
186         }
187 
188         used_descs
189     }
190 }
191 
192 struct VhostUserBlkBackend {
193     threads: Vec<Mutex<VhostUserBlkThread>>,
194     config: VirtioBlockConfig,
195     rdonly: bool,
196     poll_queue: bool,
197     queues_per_thread: Vec<u64>,
198     queue_size: usize,
199     acked_features: u64,
200     writeback: Arc<AtomicBool>,
201     mem: GuestMemoryAtomic<GuestMemoryMmap>,
202 }
203 
204 impl VhostUserBlkBackend {
205     fn new(
206         image_path: String,
207         num_queues: usize,
208         rdonly: bool,
209         direct: bool,
210         poll_queue: bool,
211         queue_size: usize,
212         mem: GuestMemoryAtomic<GuestMemoryMmap>,
213     ) -> Result<Self> {
214         let mut options = OpenOptions::new();
215         options.read(true);
216         options.write(!rdonly);
217         if direct {
218             options.custom_flags(libc::O_DIRECT);
219         }
220         let image: File = options.open(&image_path).unwrap();
221         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
222 
223         let serial = build_serial(&PathBuf::from(&image_path));
224         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
225         let image = match image_type {
226             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
227             ImageType::Qcow2 => {
228                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
229             }
230         };
231 
232         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
233         let config = VirtioBlockConfig {
234             capacity: nsectors,
235             blk_size: BLK_SIZE,
236             size_max: 65535,
237             seg_max: 128 - 2,
238             min_io_size: 1,
239             opt_io_size: 1,
240             num_queues: num_queues as u16,
241             writeback: 1,
242             ..Default::default()
243         };
244 
245         let mut queues_per_thread = Vec::new();
246         let mut threads = Vec::new();
247         let writeback = Arc::new(AtomicBool::new(true));
248         for i in 0..num_queues {
249             let thread = Mutex::new(VhostUserBlkThread::new(
250                 image.clone(),
251                 serial.clone(),
252                 nsectors,
253                 writeback.clone(),
254                 mem.clone(),
255             )?);
256             threads.push(thread);
257             queues_per_thread.push(0b1 << i);
258         }
259 
260         Ok(VhostUserBlkBackend {
261             threads,
262             config,
263             rdonly,
264             poll_queue,
265             queues_per_thread,
266             queue_size,
267             acked_features: 0,
268             writeback,
269             mem,
270         })
271     }
272 
273     fn update_writeback(&mut self) {
274         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
275         let writeback = if self.acked_features & (1 << VIRTIO_BLK_F_CONFIG_WCE)
276             == 1 << VIRTIO_BLK_F_CONFIG_WCE
277         {
278             self.config.writeback == 1
279         } else {
280             // Else check if VIRTIO_BLK_F_FLUSH negotiated
281             self.acked_features & (1 << VIRTIO_BLK_F_FLUSH) == 1 << VIRTIO_BLK_F_FLUSH
282         };
283 
284         info!(
285             "Changing cache mode to {}",
286             if writeback {
287                 "writeback"
288             } else {
289                 "writethrough"
290             }
291         );
292         self.writeback.store(writeback, Ordering::Release);
293     }
294 }
295 
296 impl VhostUserBackendMut for VhostUserBlkBackend {
297     type Bitmap = BitmapMmapRegion;
298     type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>;
299 
300     fn num_queues(&self) -> usize {
301         self.config.num_queues as usize
302     }
303 
304     fn max_queue_size(&self) -> usize {
305         self.queue_size
306     }
307 
308     fn features(&self) -> u64 {
309         let mut avail_features = (1 << VIRTIO_BLK_F_SEG_MAX)
310             | (1 << VIRTIO_BLK_F_BLK_SIZE)
311             | (1 << VIRTIO_BLK_F_FLUSH)
312             | (1 << VIRTIO_BLK_F_TOPOLOGY)
313             | (1 << VIRTIO_BLK_F_MQ)
314             | (1 << VIRTIO_BLK_F_CONFIG_WCE)
315             | (1 << VIRTIO_RING_F_EVENT_IDX)
316             | (1 << VIRTIO_F_VERSION_1)
317             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
318 
319         if self.rdonly {
320             avail_features |= 1 << VIRTIO_BLK_F_RO;
321         }
322         avail_features
323     }
324 
325     fn acked_features(&mut self, features: u64) {
326         self.acked_features = features;
327         self.update_writeback();
328     }
329 
330     fn protocol_features(&self) -> VhostUserProtocolFeatures {
331         VhostUserProtocolFeatures::CONFIG
332             | VhostUserProtocolFeatures::MQ
333             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
334     }
335 
336     fn set_event_idx(&mut self, enabled: bool) {
337         for thread in self.threads.iter() {
338             thread.lock().unwrap().event_idx = enabled;
339         }
340     }
341 
342     fn handle_event(
343         &mut self,
344         device_event: u16,
345         evset: EventSet,
346         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
347         thread_id: usize,
348     ) -> VhostUserBackendResult<()> {
349         if evset != EventSet::IN {
350             return Err(Error::HandleEventNotEpollIn.into());
351         }
352 
353         debug!("event received: {:?}", device_event);
354 
355         let mut thread = self.threads[thread_id].lock().unwrap();
356         match device_event {
357             0 => {
358                 let mut vring = vrings[0].get_mut();
359 
360                 if self.poll_queue {
361                     // Actively poll the queue until POLL_QUEUE_US has passed
362                     // without seeing a new request.
363                     let mut now = Instant::now();
364                     loop {
365                         if thread.process_queue(&mut vring) {
366                             now = Instant::now();
367                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
368                             break;
369                         }
370                     }
371                 }
372 
373                 if thread.event_idx {
374                     // vm-virtio's Queue implementation only checks avail_index
375                     // once, so to properly support EVENT_IDX we need to keep
376                     // calling process_queue() until it stops finding new
377                     // requests on the queue.
378                     loop {
379                         vring
380                             .get_queue_mut()
381                             .enable_notification(self.mem.memory().deref())
382                             .unwrap();
383                         if !thread.process_queue(&mut vring) {
384                             break;
385                         }
386                     }
387                 } else {
388                     // Without EVENT_IDX, a single call is enough.
389                     thread.process_queue(&mut vring);
390                 }
391 
392                 Ok(())
393             }
394             _ => Err(Error::HandleEventUnknownEvent.into()),
395         }
396     }
397 
398     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
399         self.config.as_slice().to_vec()
400     }
401 
402     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
403         let config_slice = self.config.as_mut_slice();
404         let data_len = data.len() as u32;
405         let config_len = config_slice.len() as u32;
406         if offset + data_len > config_len {
407             error!("Failed to write config space");
408             return Err(io::Error::from_raw_os_error(libc::EINVAL));
409         }
410         let (_, right) = config_slice.split_at_mut(offset as usize);
411         right.copy_from_slice(data);
412         self.update_writeback();
413         Ok(())
414     }
415 
416     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
417         Some(
418             self.threads[thread_index]
419                 .lock()
420                 .unwrap()
421                 .kill_evt
422                 .try_clone()
423                 .unwrap(),
424         )
425     }
426 
427     fn queues_per_thread(&self) -> Vec<u64> {
428         self.queues_per_thread.clone()
429     }
430 
431     fn update_memory(
432         &mut self,
433         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
434     ) -> VhostUserBackendResult<()> {
435         Ok(())
436     }
437 }
438 
439 struct VhostUserBlkBackendConfig {
440     path: String,
441     socket: String,
442     num_queues: usize,
443     queue_size: usize,
444     readonly: bool,
445     direct: bool,
446     poll_queue: bool,
447 }
448 
449 impl VhostUserBlkBackendConfig {
450     fn parse(backend: &str) -> Result<Self> {
451         let mut parser = OptionParser::new();
452         parser
453             .add("path")
454             .add("readonly")
455             .add("direct")
456             .add("num_queues")
457             .add("queue_size")
458             .add("socket")
459             .add("poll_queue");
460         parser.parse(backend).map_err(Error::FailedConfigParse)?;
461 
462         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
463         let readonly = parser
464             .convert::<Toggle>("readonly")
465             .map_err(Error::FailedConfigParse)?
466             .unwrap_or(Toggle(false))
467             .0;
468         let direct = parser
469             .convert::<Toggle>("direct")
470             .map_err(Error::FailedConfigParse)?
471             .unwrap_or(Toggle(false))
472             .0;
473         let num_queues = parser
474             .convert("num_queues")
475             .map_err(Error::FailedConfigParse)?
476             .unwrap_or(1);
477         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
478         let poll_queue = parser
479             .convert::<Toggle>("poll_queue")
480             .map_err(Error::FailedConfigParse)?
481             .unwrap_or(Toggle(true))
482             .0;
483         let queue_size = parser
484             .convert("queue_size")
485             .map_err(Error::FailedConfigParse)?
486             .unwrap_or(1024);
487 
488         Ok(VhostUserBlkBackendConfig {
489             path,
490             socket,
491             num_queues,
492             queue_size,
493             readonly,
494             direct,
495             poll_queue,
496         })
497     }
498 }
499 
500 pub fn start_block_backend(backend_command: &str) {
501     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
502         Ok(config) => config,
503         Err(e) => {
504             println!("Failed parsing parameters {e:?}");
505             process::exit(1);
506         }
507     };
508 
509     let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
510 
511     let blk_backend = Arc::new(RwLock::new(
512         VhostUserBlkBackend::new(
513             backend_config.path,
514             backend_config.num_queues,
515             backend_config.readonly,
516             backend_config.direct,
517             backend_config.poll_queue,
518             backend_config.queue_size,
519             mem.clone(),
520         )
521         .unwrap(),
522     ));
523 
524     debug!("blk_backend is created!\n");
525 
526     let listener = Listener::new(&backend_config.socket, true).unwrap();
527 
528     let name = "vhost-user-blk-backend";
529     let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
530 
531     debug!("blk_daemon is created!\n");
532 
533     if let Err(e) = blk_daemon.start(listener) {
534         error!(
535             "Failed to start daemon for vhost-user-block with error: {:?}\n",
536             e
537         );
538         process::exit(1);
539     }
540 
541     if let Err(e) = blk_daemon.wait() {
542         error!("Error from the main thread: {:?}", e);
543     }
544 
545     for thread in blk_backend.read().unwrap().threads.iter() {
546         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
547             error!("Error shutting down worker thread: {:?}", e)
548         }
549     }
550 }
551