xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision eeae63b4595fbf0cc69f62b6e9d9a79c543c4ac7)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 use std::fs::{File, OpenOptions};
12 use std::io::{Read, Seek, SeekFrom, Write};
13 use std::ops::{Deref, DerefMut};
14 use std::os::unix::fs::OpenOptionsExt;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
18 use std::time::Instant;
19 use std::{convert, error, fmt, io, process, result};
20 
21 use block::qcow::{self, ImageType, QcowFile};
22 use block::{build_serial, Request, VirtioBlockConfig};
23 use libc::EFD_NONBLOCK;
24 use log::*;
25 use option_parser::{OptionParser, OptionParserError, Toggle};
26 use vhost::vhost_user::message::*;
27 use vhost::vhost_user::Listener;
28 use vhost_user_backend::bitmap::BitmapMmapRegion;
29 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
30 use virtio_bindings::virtio_blk::*;
31 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1;
32 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
33 use virtio_queue::QueueT;
34 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic};
35 use vmm_sys_util::epoll::EventSet;
36 use vmm_sys_util::eventfd::EventFd;
37 
38 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>;
39 
40 const SECTOR_SHIFT: u8 = 9;
41 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
42 const BLK_SIZE: u32 = 512;
43 // Current (2020) enterprise SSDs have a latency lower than 30us.
44 // Polling for 50us should be enough to cover for the device latency
45 // and the overhead of the emulation layer.
46 const POLL_QUEUE_US: u128 = 50;
47 
48 trait DiskFile: Read + Seek + Write + Send {}
49 impl<D: Read + Seek + Write + Send> DiskFile for D {}
50 
51 type Result<T> = std::result::Result<T, Error>;
52 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
53 
54 #[allow(dead_code)]
55 #[derive(Debug)]
56 enum Error {
57     /// Failed to create kill eventfd
58     CreateKillEventFd(io::Error),
59     /// Failed to parse configuration string
60     FailedConfigParse(OptionParserError),
61     /// Failed to handle event other than input event.
62     HandleEventNotEpollIn,
63     /// Failed to handle unknown event.
64     HandleEventUnknownEvent,
65     /// No path provided
66     PathParameterMissing,
67     /// No socket provided
68     SocketParameterMissing,
69 }
70 
71 pub const SYNTAX: &str = "vhost-user-block backend parameters \
72  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
73  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
74  poll_queue=true|false\"";
75 
76 impl fmt::Display for Error {
77     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78         write!(f, "vhost_user_block_error: {self:?}")
79     }
80 }
81 
82 impl error::Error for Error {}
83 
84 impl convert::From<Error> for io::Error {
85     fn from(e: Error) -> Self {
86         io::Error::new(io::ErrorKind::Other, e)
87     }
88 }
89 
90 struct VhostUserBlkThread {
91     disk_image: Arc<Mutex<dyn DiskFile>>,
92     serial: Vec<u8>,
93     disk_nsectors: u64,
94     event_idx: bool,
95     kill_evt: EventFd,
96     writeback: Arc<AtomicBool>,
97     mem: GuestMemoryAtomic<GuestMemoryMmap>,
98 }
99 
100 impl VhostUserBlkThread {
101     fn new(
102         disk_image: Arc<Mutex<dyn DiskFile>>,
103         serial: Vec<u8>,
104         disk_nsectors: u64,
105         writeback: Arc<AtomicBool>,
106         mem: GuestMemoryAtomic<GuestMemoryMmap>,
107     ) -> Result<Self> {
108         Ok(VhostUserBlkThread {
109             disk_image,
110             serial,
111             disk_nsectors,
112             event_idx: false,
113             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
114             writeback,
115             mem,
116         })
117     }
118 
119     fn process_queue(
120         &mut self,
121         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
122     ) -> bool {
123         let mut used_descs = false;
124 
125         while let Some(mut desc_chain) = vring
126             .get_queue_mut()
127             .pop_descriptor_chain(self.mem.memory())
128         {
129             debug!("got an element in the queue");
130             let len;
131             match Request::parse(&mut desc_chain, None) {
132                 Ok(mut request) => {
133                     debug!("element is a valid request");
134                     request.set_writeback(self.writeback.load(Ordering::Acquire));
135                     let status = match request.execute(
136                         &mut self.disk_image.lock().unwrap().deref_mut(),
137                         self.disk_nsectors,
138                         desc_chain.memory(),
139                         &self.serial,
140                     ) {
141                         Ok(l) => {
142                             len = l;
143                             VIRTIO_BLK_S_OK as u8
144                         }
145                         Err(e) => {
146                             len = 1;
147                             e.status()
148                         }
149                     };
150                     desc_chain
151                         .memory()
152                         .write_obj(status, request.status_addr)
153                         .unwrap();
154                 }
155                 Err(err) => {
156                     error!("failed to parse available descriptor chain: {:?}", err);
157                     len = 0;
158                 }
159             }
160 
161             vring
162                 .get_queue_mut()
163                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
164                 .unwrap();
165             used_descs = true;
166         }
167 
168         let mut needs_signalling = false;
169         if self.event_idx {
170             if vring
171                 .get_queue_mut()
172                 .needs_notification(self.mem.memory().deref())
173                 .unwrap()
174             {
175                 debug!("signalling queue");
176                 needs_signalling = true;
177             } else {
178                 debug!("omitting signal (event_idx)");
179             }
180         } else {
181             debug!("signalling queue");
182             needs_signalling = true;
183         }
184 
185         if needs_signalling {
186             vring.signal_used_queue().unwrap();
187         }
188 
189         used_descs
190     }
191 }
192 
193 struct VhostUserBlkBackend {
194     threads: Vec<Mutex<VhostUserBlkThread>>,
195     config: VirtioBlockConfig,
196     rdonly: bool,
197     poll_queue: bool,
198     queues_per_thread: Vec<u64>,
199     queue_size: usize,
200     acked_features: u64,
201     writeback: Arc<AtomicBool>,
202     mem: GuestMemoryAtomic<GuestMemoryMmap>,
203 }
204 
205 impl VhostUserBlkBackend {
206     fn new(
207         image_path: String,
208         num_queues: usize,
209         rdonly: bool,
210         direct: bool,
211         poll_queue: bool,
212         queue_size: usize,
213         mem: GuestMemoryAtomic<GuestMemoryMmap>,
214     ) -> Result<Self> {
215         let mut options = OpenOptions::new();
216         options.read(true);
217         options.write(!rdonly);
218         if direct {
219             options.custom_flags(libc::O_DIRECT);
220         }
221         let image: File = options.open(&image_path).unwrap();
222         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
223 
224         let serial = build_serial(&PathBuf::from(&image_path));
225         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
226         let image = match image_type {
227             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
228             ImageType::Qcow2 => {
229                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
230             }
231         };
232 
233         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
234         let config = VirtioBlockConfig {
235             capacity: nsectors,
236             blk_size: BLK_SIZE,
237             size_max: 65535,
238             seg_max: 128 - 2,
239             min_io_size: 1,
240             opt_io_size: 1,
241             num_queues: num_queues as u16,
242             writeback: 1,
243             ..Default::default()
244         };
245 
246         let mut queues_per_thread = Vec::new();
247         let mut threads = Vec::new();
248         let writeback = Arc::new(AtomicBool::new(true));
249         for i in 0..num_queues {
250             let thread = Mutex::new(VhostUserBlkThread::new(
251                 image.clone(),
252                 serial.clone(),
253                 nsectors,
254                 writeback.clone(),
255                 mem.clone(),
256             )?);
257             threads.push(thread);
258             queues_per_thread.push(0b1 << i);
259         }
260 
261         Ok(VhostUserBlkBackend {
262             threads,
263             config,
264             rdonly,
265             poll_queue,
266             queues_per_thread,
267             queue_size,
268             acked_features: 0,
269             writeback,
270             mem,
271         })
272     }
273 
274     fn update_writeback(&mut self) {
275         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
276         let writeback =
277             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
278                 self.config.writeback == 1
279             } else {
280                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
281                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
282             };
283 
284         info!(
285             "Changing cache mode to {}",
286             if writeback {
287                 "writeback"
288             } else {
289                 "writethrough"
290             }
291         );
292         self.writeback.store(writeback, Ordering::Release);
293     }
294 }
295 
296 impl VhostUserBackendMut for VhostUserBlkBackend {
297     type Bitmap = BitmapMmapRegion;
298     type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>;
299 
300     fn num_queues(&self) -> usize {
301         self.config.num_queues as usize
302     }
303 
304     fn max_queue_size(&self) -> usize {
305         self.queue_size
306     }
307 
308     fn features(&self) -> u64 {
309         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
310             | 1 << VIRTIO_BLK_F_BLK_SIZE
311             | 1 << VIRTIO_BLK_F_FLUSH
312             | 1 << VIRTIO_BLK_F_TOPOLOGY
313             | 1 << VIRTIO_BLK_F_MQ
314             | 1 << VIRTIO_BLK_F_CONFIG_WCE
315             | 1 << VIRTIO_RING_F_EVENT_IDX
316             | 1 << VIRTIO_F_VERSION_1
317             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
318 
319         if self.rdonly {
320             avail_features |= 1 << VIRTIO_BLK_F_RO;
321         }
322         avail_features
323     }
324 
325     fn acked_features(&mut self, features: u64) {
326         self.acked_features = features;
327         self.update_writeback();
328     }
329 
330     fn protocol_features(&self) -> VhostUserProtocolFeatures {
331         VhostUserProtocolFeatures::CONFIG
332             | VhostUserProtocolFeatures::MQ
333             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
334     }
335 
336     fn set_event_idx(&mut self, enabled: bool) {
337         for thread in self.threads.iter() {
338             thread.lock().unwrap().event_idx = enabled;
339         }
340     }
341 
342     fn handle_event(
343         &mut self,
344         device_event: u16,
345         evset: EventSet,
346         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
347         thread_id: usize,
348     ) -> VhostUserBackendResult<()> {
349         if evset != EventSet::IN {
350             return Err(Error::HandleEventNotEpollIn.into());
351         }
352 
353         debug!("event received: {:?}", device_event);
354 
355         let mut thread = self.threads[thread_id].lock().unwrap();
356         match device_event {
357             0 => {
358                 let mut vring = vrings[0].get_mut();
359 
360                 if self.poll_queue {
361                     // Actively poll the queue until POLL_QUEUE_US has passed
362                     // without seeing a new request.
363                     let mut now = Instant::now();
364                     loop {
365                         if thread.process_queue(&mut vring) {
366                             now = Instant::now();
367                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
368                             break;
369                         }
370                     }
371                 }
372 
373                 if thread.event_idx {
374                     // vm-virtio's Queue implementation only checks avail_index
375                     // once, so to properly support EVENT_IDX we need to keep
376                     // calling process_queue() until it stops finding new
377                     // requests on the queue.
378                     loop {
379                         vring
380                             .get_queue_mut()
381                             .enable_notification(self.mem.memory().deref())
382                             .unwrap();
383                         if !thread.process_queue(&mut vring) {
384                             break;
385                         }
386                     }
387                 } else {
388                     // Without EVENT_IDX, a single call is enough.
389                     thread.process_queue(&mut vring);
390                 }
391 
392                 Ok(())
393             }
394             _ => Err(Error::HandleEventUnknownEvent.into()),
395         }
396     }
397 
398     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
399         self.config.as_slice().to_vec()
400     }
401 
402     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
403         let config_slice = self.config.as_mut_slice();
404         let data_len = data.len() as u32;
405         let config_len = config_slice.len() as u32;
406         if offset + data_len > config_len {
407             error!("Failed to write config space");
408             return Err(io::Error::from_raw_os_error(libc::EINVAL));
409         }
410         let (_, right) = config_slice.split_at_mut(offset as usize);
411         right.copy_from_slice(data);
412         self.update_writeback();
413         Ok(())
414     }
415 
416     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
417         Some(
418             self.threads[thread_index]
419                 .lock()
420                 .unwrap()
421                 .kill_evt
422                 .try_clone()
423                 .unwrap(),
424         )
425     }
426 
427     fn queues_per_thread(&self) -> Vec<u64> {
428         self.queues_per_thread.clone()
429     }
430 
431     fn update_memory(
432         &mut self,
433         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
434     ) -> VhostUserBackendResult<()> {
435         Ok(())
436     }
437 }
438 
439 struct VhostUserBlkBackendConfig {
440     path: String,
441     socket: String,
442     num_queues: usize,
443     queue_size: usize,
444     readonly: bool,
445     direct: bool,
446     poll_queue: bool,
447 }
448 
449 impl VhostUserBlkBackendConfig {
450     fn parse(backend: &str) -> Result<Self> {
451         let mut parser = OptionParser::new();
452         parser
453             .add("path")
454             .add("readonly")
455             .add("direct")
456             .add("num_queues")
457             .add("queue_size")
458             .add("socket")
459             .add("poll_queue");
460         parser.parse(backend).map_err(Error::FailedConfigParse)?;
461 
462         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
463         let readonly = parser
464             .convert::<Toggle>("readonly")
465             .map_err(Error::FailedConfigParse)?
466             .unwrap_or(Toggle(false))
467             .0;
468         let direct = parser
469             .convert::<Toggle>("direct")
470             .map_err(Error::FailedConfigParse)?
471             .unwrap_or(Toggle(false))
472             .0;
473         let num_queues = parser
474             .convert("num_queues")
475             .map_err(Error::FailedConfigParse)?
476             .unwrap_or(1);
477         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
478         let poll_queue = parser
479             .convert::<Toggle>("poll_queue")
480             .map_err(Error::FailedConfigParse)?
481             .unwrap_or(Toggle(true))
482             .0;
483         let queue_size = parser
484             .convert("queue_size")
485             .map_err(Error::FailedConfigParse)?
486             .unwrap_or(1024);
487 
488         Ok(VhostUserBlkBackendConfig {
489             path,
490             socket,
491             num_queues,
492             queue_size,
493             readonly,
494             direct,
495             poll_queue,
496         })
497     }
498 }
499 
500 pub fn start_block_backend(backend_command: &str) {
501     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
502         Ok(config) => config,
503         Err(e) => {
504             println!("Failed parsing parameters {e:?}");
505             process::exit(1);
506         }
507     };
508 
509     let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
510 
511     let blk_backend = Arc::new(RwLock::new(
512         VhostUserBlkBackend::new(
513             backend_config.path,
514             backend_config.num_queues,
515             backend_config.readonly,
516             backend_config.direct,
517             backend_config.poll_queue,
518             backend_config.queue_size,
519             mem.clone(),
520         )
521         .unwrap(),
522     ));
523 
524     debug!("blk_backend is created!\n");
525 
526     let listener = Listener::new(&backend_config.socket, true).unwrap();
527 
528     let name = "vhost-user-blk-backend";
529     let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
530 
531     debug!("blk_daemon is created!\n");
532 
533     if let Err(e) = blk_daemon.start(listener) {
534         error!(
535             "Failed to start daemon for vhost-user-block with error: {:?}\n",
536             e
537         );
538         process::exit(1);
539     }
540 
541     if let Err(e) = blk_daemon.wait() {
542         error!("Error from the main thread: {:?}", e);
543     }
544 
545     for thread in blk_backend.read().unwrap().threads.iter() {
546         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
547             error!("Error shutting down worker thread: {:?}", e)
548         }
549     }
550 }
551