xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision 8803e4a2e7f8e9596b72f81d3c916390e5b10fbd)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 use block::{
12     build_serial,
13     qcow::{self, ImageType, QcowFile},
14     Request, VirtioBlockConfig,
15 };
16 use libc::EFD_NONBLOCK;
17 use log::*;
18 use option_parser::{OptionParser, OptionParserError, Toggle};
19 use std::fs::File;
20 use std::fs::OpenOptions;
21 use std::io::Read;
22 use std::io::{Seek, SeekFrom, Write};
23 use std::ops::Deref;
24 use std::ops::DerefMut;
25 use std::os::unix::fs::OpenOptionsExt;
26 use std::path::PathBuf;
27 use std::process;
28 use std::result;
29 use std::sync::atomic::{AtomicBool, Ordering};
30 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
31 use std::time::Instant;
32 use std::{convert, error, fmt, io};
33 use vhost::vhost_user::message::*;
34 use vhost::vhost_user::Listener;
35 use vhost_user_backend::{
36     bitmap::BitmapMmapRegion, VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT,
37 };
38 use virtio_bindings::virtio_blk::*;
39 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1;
40 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
41 use virtio_queue::QueueT;
42 use vm_memory::GuestAddressSpace;
43 use vm_memory::{ByteValued, Bytes, GuestMemoryAtomic};
44 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
45 
46 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>;
47 
48 const SECTOR_SHIFT: u8 = 9;
49 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
50 const BLK_SIZE: u32 = 512;
51 // Current (2020) enterprise SSDs have a latency lower than 30us.
52 // Polling for 50us should be enough to cover for the device latency
53 // and the overhead of the emulation layer.
54 const POLL_QUEUE_US: u128 = 50;
55 
56 trait DiskFile: Read + Seek + Write + Send {}
57 impl<D: Read + Seek + Write + Send> DiskFile for D {}
58 
59 type Result<T> = std::result::Result<T, Error>;
60 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
61 
62 #[allow(dead_code)]
63 #[derive(Debug)]
64 enum Error {
65     /// Failed to create kill eventfd
66     CreateKillEventFd(io::Error),
67     /// Failed to parse configuration string
68     FailedConfigParse(OptionParserError),
69     /// Failed to handle event other than input event.
70     HandleEventNotEpollIn,
71     /// Failed to handle unknown event.
72     HandleEventUnknownEvent,
73     /// No path provided
74     PathParameterMissing,
75     /// No socket provided
76     SocketParameterMissing,
77 }
78 
79 pub const SYNTAX: &str = "vhost-user-block backend parameters \
80  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
81  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
82  poll_queue=true|false\"";
83 
84 impl fmt::Display for Error {
85     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
86         write!(f, "vhost_user_block_error: {self:?}")
87     }
88 }
89 
90 impl error::Error for Error {}
91 
92 impl convert::From<Error> for io::Error {
93     fn from(e: Error) -> Self {
94         io::Error::new(io::ErrorKind::Other, e)
95     }
96 }
97 
98 struct VhostUserBlkThread {
99     disk_image: Arc<Mutex<dyn DiskFile>>,
100     serial: Vec<u8>,
101     disk_nsectors: u64,
102     event_idx: bool,
103     kill_evt: EventFd,
104     writeback: Arc<AtomicBool>,
105     mem: GuestMemoryAtomic<GuestMemoryMmap>,
106 }
107 
108 impl VhostUserBlkThread {
109     fn new(
110         disk_image: Arc<Mutex<dyn DiskFile>>,
111         serial: Vec<u8>,
112         disk_nsectors: u64,
113         writeback: Arc<AtomicBool>,
114         mem: GuestMemoryAtomic<GuestMemoryMmap>,
115     ) -> Result<Self> {
116         Ok(VhostUserBlkThread {
117             disk_image,
118             serial,
119             disk_nsectors,
120             event_idx: false,
121             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
122             writeback,
123             mem,
124         })
125     }
126 
127     fn process_queue(
128         &mut self,
129         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
130     ) -> bool {
131         let mut used_descs = false;
132 
133         while let Some(mut desc_chain) = vring
134             .get_queue_mut()
135             .pop_descriptor_chain(self.mem.memory())
136         {
137             debug!("got an element in the queue");
138             let len;
139             match Request::parse(&mut desc_chain, None) {
140                 Ok(mut request) => {
141                     debug!("element is a valid request");
142                     request.set_writeback(self.writeback.load(Ordering::Acquire));
143                     let status = match request.execute(
144                         &mut self.disk_image.lock().unwrap().deref_mut(),
145                         self.disk_nsectors,
146                         desc_chain.memory(),
147                         &self.serial,
148                     ) {
149                         Ok(l) => {
150                             len = l;
151                             VIRTIO_BLK_S_OK as u8
152                         }
153                         Err(e) => {
154                             len = 1;
155                             e.status()
156                         }
157                     };
158                     desc_chain
159                         .memory()
160                         .write_obj(status, request.status_addr)
161                         .unwrap();
162                 }
163                 Err(err) => {
164                     error!("failed to parse available descriptor chain: {:?}", err);
165                     len = 0;
166                 }
167             }
168 
169             vring
170                 .get_queue_mut()
171                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
172                 .unwrap();
173             used_descs = true;
174         }
175 
176         let mut needs_signalling = false;
177         if self.event_idx {
178             if vring
179                 .get_queue_mut()
180                 .needs_notification(self.mem.memory().deref())
181                 .unwrap()
182             {
183                 debug!("signalling queue");
184                 needs_signalling = true;
185             } else {
186                 debug!("omitting signal (event_idx)");
187             }
188         } else {
189             debug!("signalling queue");
190             needs_signalling = true;
191         }
192 
193         if needs_signalling {
194             vring.signal_used_queue().unwrap();
195         }
196 
197         used_descs
198     }
199 }
200 
201 struct VhostUserBlkBackend {
202     threads: Vec<Mutex<VhostUserBlkThread>>,
203     config: VirtioBlockConfig,
204     rdonly: bool,
205     poll_queue: bool,
206     queues_per_thread: Vec<u64>,
207     queue_size: usize,
208     acked_features: u64,
209     writeback: Arc<AtomicBool>,
210     mem: GuestMemoryAtomic<GuestMemoryMmap>,
211 }
212 
213 impl VhostUserBlkBackend {
214     fn new(
215         image_path: String,
216         num_queues: usize,
217         rdonly: bool,
218         direct: bool,
219         poll_queue: bool,
220         queue_size: usize,
221         mem: GuestMemoryAtomic<GuestMemoryMmap>,
222     ) -> Result<Self> {
223         let mut options = OpenOptions::new();
224         options.read(true);
225         options.write(!rdonly);
226         if direct {
227             options.custom_flags(libc::O_DIRECT);
228         }
229         let image: File = options.open(&image_path).unwrap();
230         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
231 
232         let serial = build_serial(&PathBuf::from(&image_path));
233         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
234         let image = match image_type {
235             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
236             ImageType::Qcow2 => {
237                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
238             }
239         };
240 
241         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
242         let config = VirtioBlockConfig {
243             capacity: nsectors,
244             blk_size: BLK_SIZE,
245             size_max: 65535,
246             seg_max: 128 - 2,
247             min_io_size: 1,
248             opt_io_size: 1,
249             num_queues: num_queues as u16,
250             writeback: 1,
251             ..Default::default()
252         };
253 
254         let mut queues_per_thread = Vec::new();
255         let mut threads = Vec::new();
256         let writeback = Arc::new(AtomicBool::new(true));
257         for i in 0..num_queues {
258             let thread = Mutex::new(VhostUserBlkThread::new(
259                 image.clone(),
260                 serial.clone(),
261                 nsectors,
262                 writeback.clone(),
263                 mem.clone(),
264             )?);
265             threads.push(thread);
266             queues_per_thread.push(0b1 << i);
267         }
268 
269         Ok(VhostUserBlkBackend {
270             threads,
271             config,
272             rdonly,
273             poll_queue,
274             queues_per_thread,
275             queue_size,
276             acked_features: 0,
277             writeback,
278             mem,
279         })
280     }
281 
282     fn update_writeback(&mut self) {
283         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
284         let writeback =
285             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
286                 self.config.writeback == 1
287             } else {
288                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
289                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
290             };
291 
292         info!(
293             "Changing cache mode to {}",
294             if writeback {
295                 "writeback"
296             } else {
297                 "writethrough"
298             }
299         );
300         self.writeback.store(writeback, Ordering::Release);
301     }
302 }
303 
304 impl VhostUserBackendMut for VhostUserBlkBackend {
305     type Bitmap = BitmapMmapRegion;
306     type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>;
307 
308     fn num_queues(&self) -> usize {
309         self.config.num_queues as usize
310     }
311 
312     fn max_queue_size(&self) -> usize {
313         self.queue_size
314     }
315 
316     fn features(&self) -> u64 {
317         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
318             | 1 << VIRTIO_BLK_F_BLK_SIZE
319             | 1 << VIRTIO_BLK_F_FLUSH
320             | 1 << VIRTIO_BLK_F_TOPOLOGY
321             | 1 << VIRTIO_BLK_F_MQ
322             | 1 << VIRTIO_BLK_F_CONFIG_WCE
323             | 1 << VIRTIO_RING_F_EVENT_IDX
324             | 1 << VIRTIO_F_VERSION_1
325             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
326 
327         if self.rdonly {
328             avail_features |= 1 << VIRTIO_BLK_F_RO;
329         }
330         avail_features
331     }
332 
333     fn acked_features(&mut self, features: u64) {
334         self.acked_features = features;
335         self.update_writeback();
336     }
337 
338     fn protocol_features(&self) -> VhostUserProtocolFeatures {
339         VhostUserProtocolFeatures::CONFIG
340             | VhostUserProtocolFeatures::MQ
341             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
342     }
343 
344     fn set_event_idx(&mut self, enabled: bool) {
345         for thread in self.threads.iter() {
346             thread.lock().unwrap().event_idx = enabled;
347         }
348     }
349 
350     fn handle_event(
351         &mut self,
352         device_event: u16,
353         evset: EventSet,
354         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
355         thread_id: usize,
356     ) -> VhostUserBackendResult<()> {
357         if evset != EventSet::IN {
358             return Err(Error::HandleEventNotEpollIn.into());
359         }
360 
361         debug!("event received: {:?}", device_event);
362 
363         let mut thread = self.threads[thread_id].lock().unwrap();
364         match device_event {
365             0 => {
366                 let mut vring = vrings[0].get_mut();
367 
368                 if self.poll_queue {
369                     // Actively poll the queue until POLL_QUEUE_US has passed
370                     // without seeing a new request.
371                     let mut now = Instant::now();
372                     loop {
373                         if thread.process_queue(&mut vring) {
374                             now = Instant::now();
375                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
376                             break;
377                         }
378                     }
379                 }
380 
381                 if thread.event_idx {
382                     // vm-virtio's Queue implementation only checks avail_index
383                     // once, so to properly support EVENT_IDX we need to keep
384                     // calling process_queue() until it stops finding new
385                     // requests on the queue.
386                     loop {
387                         vring
388                             .get_queue_mut()
389                             .enable_notification(self.mem.memory().deref())
390                             .unwrap();
391                         if !thread.process_queue(&mut vring) {
392                             break;
393                         }
394                     }
395                 } else {
396                     // Without EVENT_IDX, a single call is enough.
397                     thread.process_queue(&mut vring);
398                 }
399 
400                 Ok(())
401             }
402             _ => Err(Error::HandleEventUnknownEvent.into()),
403         }
404     }
405 
406     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
407         self.config.as_slice().to_vec()
408     }
409 
410     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
411         let config_slice = self.config.as_mut_slice();
412         let data_len = data.len() as u32;
413         let config_len = config_slice.len() as u32;
414         if offset + data_len > config_len {
415             error!("Failed to write config space");
416             return Err(io::Error::from_raw_os_error(libc::EINVAL));
417         }
418         let (_, right) = config_slice.split_at_mut(offset as usize);
419         right.copy_from_slice(data);
420         self.update_writeback();
421         Ok(())
422     }
423 
424     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
425         Some(
426             self.threads[thread_index]
427                 .lock()
428                 .unwrap()
429                 .kill_evt
430                 .try_clone()
431                 .unwrap(),
432         )
433     }
434 
435     fn queues_per_thread(&self) -> Vec<u64> {
436         self.queues_per_thread.clone()
437     }
438 
439     fn update_memory(
440         &mut self,
441         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
442     ) -> VhostUserBackendResult<()> {
443         Ok(())
444     }
445 }
446 
447 struct VhostUserBlkBackendConfig {
448     path: String,
449     socket: String,
450     num_queues: usize,
451     queue_size: usize,
452     readonly: bool,
453     direct: bool,
454     poll_queue: bool,
455 }
456 
457 impl VhostUserBlkBackendConfig {
458     fn parse(backend: &str) -> Result<Self> {
459         let mut parser = OptionParser::new();
460         parser
461             .add("path")
462             .add("readonly")
463             .add("direct")
464             .add("num_queues")
465             .add("queue_size")
466             .add("socket")
467             .add("poll_queue");
468         parser.parse(backend).map_err(Error::FailedConfigParse)?;
469 
470         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
471         let readonly = parser
472             .convert::<Toggle>("readonly")
473             .map_err(Error::FailedConfigParse)?
474             .unwrap_or(Toggle(false))
475             .0;
476         let direct = parser
477             .convert::<Toggle>("direct")
478             .map_err(Error::FailedConfigParse)?
479             .unwrap_or(Toggle(false))
480             .0;
481         let num_queues = parser
482             .convert("num_queues")
483             .map_err(Error::FailedConfigParse)?
484             .unwrap_or(1);
485         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
486         let poll_queue = parser
487             .convert::<Toggle>("poll_queue")
488             .map_err(Error::FailedConfigParse)?
489             .unwrap_or(Toggle(true))
490             .0;
491         let queue_size = parser
492             .convert("queue_size")
493             .map_err(Error::FailedConfigParse)?
494             .unwrap_or(1024);
495 
496         Ok(VhostUserBlkBackendConfig {
497             path,
498             socket,
499             num_queues,
500             queue_size,
501             readonly,
502             direct,
503             poll_queue,
504         })
505     }
506 }
507 
508 pub fn start_block_backend(backend_command: &str) {
509     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
510         Ok(config) => config,
511         Err(e) => {
512             println!("Failed parsing parameters {e:?}");
513             process::exit(1);
514         }
515     };
516 
517     let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
518 
519     let blk_backend = Arc::new(RwLock::new(
520         VhostUserBlkBackend::new(
521             backend_config.path,
522             backend_config.num_queues,
523             backend_config.readonly,
524             backend_config.direct,
525             backend_config.poll_queue,
526             backend_config.queue_size,
527             mem.clone(),
528         )
529         .unwrap(),
530     ));
531 
532     debug!("blk_backend is created!\n");
533 
534     let listener = Listener::new(&backend_config.socket, true).unwrap();
535 
536     let name = "vhost-user-blk-backend";
537     let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
538 
539     debug!("blk_daemon is created!\n");
540 
541     if let Err(e) = blk_daemon.start(listener) {
542         error!(
543             "Failed to start daemon for vhost-user-block with error: {:?}\n",
544             e
545         );
546         process::exit(1);
547     }
548 
549     if let Err(e) = blk_daemon.wait() {
550         error!("Error from the main thread: {:?}", e);
551     }
552 
553     for thread in blk_backend.read().unwrap().threads.iter() {
554         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
555             error!("Error shutting down worker thread: {:?}", e)
556         }
557     }
558 }
559