xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision 88a9f799449c04180c6b9a21d3b9c0c4b57e2bd6)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 use std::fs::File;
12 use std::fs::OpenOptions;
13 use std::io::Read;
14 use std::io::{Seek, SeekFrom, Write};
15 use std::ops::Deref;
16 use std::ops::DerefMut;
17 use std::os::unix::fs::OpenOptionsExt;
18 use std::path::PathBuf;
19 use std::process;
20 use std::result;
21 use std::sync::atomic::{AtomicBool, Ordering};
22 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
23 use std::time::Instant;
24 use std::{convert, error, fmt, io};
25 
26 use block::{
27     build_serial,
28     qcow::{self, ImageType, QcowFile},
29     Request, VirtioBlockConfig,
30 };
31 use libc::EFD_NONBLOCK;
32 use log::*;
33 use option_parser::{OptionParser, OptionParserError, Toggle};
34 use vhost::vhost_user::message::*;
35 use vhost::vhost_user::Listener;
36 use vhost_user_backend::{
37     bitmap::BitmapMmapRegion, VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT,
38 };
39 use virtio_bindings::virtio_blk::*;
40 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1;
41 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
42 use virtio_queue::QueueT;
43 use vm_memory::GuestAddressSpace;
44 use vm_memory::{ByteValued, Bytes, GuestMemoryAtomic};
45 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
46 
47 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>;
48 
49 const SECTOR_SHIFT: u8 = 9;
50 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
51 const BLK_SIZE: u32 = 512;
52 // Current (2020) enterprise SSDs have a latency lower than 30us.
53 // Polling for 50us should be enough to cover for the device latency
54 // and the overhead of the emulation layer.
55 const POLL_QUEUE_US: u128 = 50;
56 
57 trait DiskFile: Read + Seek + Write + Send {}
58 impl<D: Read + Seek + Write + Send> DiskFile for D {}
59 
60 type Result<T> = std::result::Result<T, Error>;
61 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
62 
63 #[allow(dead_code)]
64 #[derive(Debug)]
65 enum Error {
66     /// Failed to create kill eventfd
67     CreateKillEventFd(io::Error),
68     /// Failed to parse configuration string
69     FailedConfigParse(OptionParserError),
70     /// Failed to handle event other than input event.
71     HandleEventNotEpollIn,
72     /// Failed to handle unknown event.
73     HandleEventUnknownEvent,
74     /// No path provided
75     PathParameterMissing,
76     /// No socket provided
77     SocketParameterMissing,
78 }
79 
80 pub const SYNTAX: &str = "vhost-user-block backend parameters \
81  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
82  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
83  poll_queue=true|false\"";
84 
85 impl fmt::Display for Error {
86     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
87         write!(f, "vhost_user_block_error: {self:?}")
88     }
89 }
90 
91 impl error::Error for Error {}
92 
93 impl convert::From<Error> for io::Error {
94     fn from(e: Error) -> Self {
95         io::Error::new(io::ErrorKind::Other, e)
96     }
97 }
98 
99 struct VhostUserBlkThread {
100     disk_image: Arc<Mutex<dyn DiskFile>>,
101     serial: Vec<u8>,
102     disk_nsectors: u64,
103     event_idx: bool,
104     kill_evt: EventFd,
105     writeback: Arc<AtomicBool>,
106     mem: GuestMemoryAtomic<GuestMemoryMmap>,
107 }
108 
109 impl VhostUserBlkThread {
110     fn new(
111         disk_image: Arc<Mutex<dyn DiskFile>>,
112         serial: Vec<u8>,
113         disk_nsectors: u64,
114         writeback: Arc<AtomicBool>,
115         mem: GuestMemoryAtomic<GuestMemoryMmap>,
116     ) -> Result<Self> {
117         Ok(VhostUserBlkThread {
118             disk_image,
119             serial,
120             disk_nsectors,
121             event_idx: false,
122             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
123             writeback,
124             mem,
125         })
126     }
127 
128     fn process_queue(
129         &mut self,
130         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
131     ) -> bool {
132         let mut used_descs = false;
133 
134         while let Some(mut desc_chain) = vring
135             .get_queue_mut()
136             .pop_descriptor_chain(self.mem.memory())
137         {
138             debug!("got an element in the queue");
139             let len;
140             match Request::parse(&mut desc_chain, None) {
141                 Ok(mut request) => {
142                     debug!("element is a valid request");
143                     request.set_writeback(self.writeback.load(Ordering::Acquire));
144                     let status = match request.execute(
145                         &mut self.disk_image.lock().unwrap().deref_mut(),
146                         self.disk_nsectors,
147                         desc_chain.memory(),
148                         &self.serial,
149                     ) {
150                         Ok(l) => {
151                             len = l;
152                             VIRTIO_BLK_S_OK as u8
153                         }
154                         Err(e) => {
155                             len = 1;
156                             e.status()
157                         }
158                     };
159                     desc_chain
160                         .memory()
161                         .write_obj(status, request.status_addr)
162                         .unwrap();
163                 }
164                 Err(err) => {
165                     error!("failed to parse available descriptor chain: {:?}", err);
166                     len = 0;
167                 }
168             }
169 
170             vring
171                 .get_queue_mut()
172                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
173                 .unwrap();
174             used_descs = true;
175         }
176 
177         let mut needs_signalling = false;
178         if self.event_idx {
179             if vring
180                 .get_queue_mut()
181                 .needs_notification(self.mem.memory().deref())
182                 .unwrap()
183             {
184                 debug!("signalling queue");
185                 needs_signalling = true;
186             } else {
187                 debug!("omitting signal (event_idx)");
188             }
189         } else {
190             debug!("signalling queue");
191             needs_signalling = true;
192         }
193 
194         if needs_signalling {
195             vring.signal_used_queue().unwrap();
196         }
197 
198         used_descs
199     }
200 }
201 
202 struct VhostUserBlkBackend {
203     threads: Vec<Mutex<VhostUserBlkThread>>,
204     config: VirtioBlockConfig,
205     rdonly: bool,
206     poll_queue: bool,
207     queues_per_thread: Vec<u64>,
208     queue_size: usize,
209     acked_features: u64,
210     writeback: Arc<AtomicBool>,
211     mem: GuestMemoryAtomic<GuestMemoryMmap>,
212 }
213 
214 impl VhostUserBlkBackend {
215     fn new(
216         image_path: String,
217         num_queues: usize,
218         rdonly: bool,
219         direct: bool,
220         poll_queue: bool,
221         queue_size: usize,
222         mem: GuestMemoryAtomic<GuestMemoryMmap>,
223     ) -> Result<Self> {
224         let mut options = OpenOptions::new();
225         options.read(true);
226         options.write(!rdonly);
227         if direct {
228             options.custom_flags(libc::O_DIRECT);
229         }
230         let image: File = options.open(&image_path).unwrap();
231         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
232 
233         let serial = build_serial(&PathBuf::from(&image_path));
234         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
235         let image = match image_type {
236             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
237             ImageType::Qcow2 => {
238                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
239             }
240         };
241 
242         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
243         let config = VirtioBlockConfig {
244             capacity: nsectors,
245             blk_size: BLK_SIZE,
246             size_max: 65535,
247             seg_max: 128 - 2,
248             min_io_size: 1,
249             opt_io_size: 1,
250             num_queues: num_queues as u16,
251             writeback: 1,
252             ..Default::default()
253         };
254 
255         let mut queues_per_thread = Vec::new();
256         let mut threads = Vec::new();
257         let writeback = Arc::new(AtomicBool::new(true));
258         for i in 0..num_queues {
259             let thread = Mutex::new(VhostUserBlkThread::new(
260                 image.clone(),
261                 serial.clone(),
262                 nsectors,
263                 writeback.clone(),
264                 mem.clone(),
265             )?);
266             threads.push(thread);
267             queues_per_thread.push(0b1 << i);
268         }
269 
270         Ok(VhostUserBlkBackend {
271             threads,
272             config,
273             rdonly,
274             poll_queue,
275             queues_per_thread,
276             queue_size,
277             acked_features: 0,
278             writeback,
279             mem,
280         })
281     }
282 
283     fn update_writeback(&mut self) {
284         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
285         let writeback =
286             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
287                 self.config.writeback == 1
288             } else {
289                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
290                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
291             };
292 
293         info!(
294             "Changing cache mode to {}",
295             if writeback {
296                 "writeback"
297             } else {
298                 "writethrough"
299             }
300         );
301         self.writeback.store(writeback, Ordering::Release);
302     }
303 }
304 
305 impl VhostUserBackendMut for VhostUserBlkBackend {
306     type Bitmap = BitmapMmapRegion;
307     type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>;
308 
309     fn num_queues(&self) -> usize {
310         self.config.num_queues as usize
311     }
312 
313     fn max_queue_size(&self) -> usize {
314         self.queue_size
315     }
316 
317     fn features(&self) -> u64 {
318         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
319             | 1 << VIRTIO_BLK_F_BLK_SIZE
320             | 1 << VIRTIO_BLK_F_FLUSH
321             | 1 << VIRTIO_BLK_F_TOPOLOGY
322             | 1 << VIRTIO_BLK_F_MQ
323             | 1 << VIRTIO_BLK_F_CONFIG_WCE
324             | 1 << VIRTIO_RING_F_EVENT_IDX
325             | 1 << VIRTIO_F_VERSION_1
326             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
327 
328         if self.rdonly {
329             avail_features |= 1 << VIRTIO_BLK_F_RO;
330         }
331         avail_features
332     }
333 
334     fn acked_features(&mut self, features: u64) {
335         self.acked_features = features;
336         self.update_writeback();
337     }
338 
339     fn protocol_features(&self) -> VhostUserProtocolFeatures {
340         VhostUserProtocolFeatures::CONFIG
341             | VhostUserProtocolFeatures::MQ
342             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
343     }
344 
345     fn set_event_idx(&mut self, enabled: bool) {
346         for thread in self.threads.iter() {
347             thread.lock().unwrap().event_idx = enabled;
348         }
349     }
350 
351     fn handle_event(
352         &mut self,
353         device_event: u16,
354         evset: EventSet,
355         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
356         thread_id: usize,
357     ) -> VhostUserBackendResult<()> {
358         if evset != EventSet::IN {
359             return Err(Error::HandleEventNotEpollIn.into());
360         }
361 
362         debug!("event received: {:?}", device_event);
363 
364         let mut thread = self.threads[thread_id].lock().unwrap();
365         match device_event {
366             0 => {
367                 let mut vring = vrings[0].get_mut();
368 
369                 if self.poll_queue {
370                     // Actively poll the queue until POLL_QUEUE_US has passed
371                     // without seeing a new request.
372                     let mut now = Instant::now();
373                     loop {
374                         if thread.process_queue(&mut vring) {
375                             now = Instant::now();
376                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
377                             break;
378                         }
379                     }
380                 }
381 
382                 if thread.event_idx {
383                     // vm-virtio's Queue implementation only checks avail_index
384                     // once, so to properly support EVENT_IDX we need to keep
385                     // calling process_queue() until it stops finding new
386                     // requests on the queue.
387                     loop {
388                         vring
389                             .get_queue_mut()
390                             .enable_notification(self.mem.memory().deref())
391                             .unwrap();
392                         if !thread.process_queue(&mut vring) {
393                             break;
394                         }
395                     }
396                 } else {
397                     // Without EVENT_IDX, a single call is enough.
398                     thread.process_queue(&mut vring);
399                 }
400 
401                 Ok(())
402             }
403             _ => Err(Error::HandleEventUnknownEvent.into()),
404         }
405     }
406 
407     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
408         self.config.as_slice().to_vec()
409     }
410 
411     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
412         let config_slice = self.config.as_mut_slice();
413         let data_len = data.len() as u32;
414         let config_len = config_slice.len() as u32;
415         if offset + data_len > config_len {
416             error!("Failed to write config space");
417             return Err(io::Error::from_raw_os_error(libc::EINVAL));
418         }
419         let (_, right) = config_slice.split_at_mut(offset as usize);
420         right.copy_from_slice(data);
421         self.update_writeback();
422         Ok(())
423     }
424 
425     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
426         Some(
427             self.threads[thread_index]
428                 .lock()
429                 .unwrap()
430                 .kill_evt
431                 .try_clone()
432                 .unwrap(),
433         )
434     }
435 
436     fn queues_per_thread(&self) -> Vec<u64> {
437         self.queues_per_thread.clone()
438     }
439 
440     fn update_memory(
441         &mut self,
442         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
443     ) -> VhostUserBackendResult<()> {
444         Ok(())
445     }
446 }
447 
448 struct VhostUserBlkBackendConfig {
449     path: String,
450     socket: String,
451     num_queues: usize,
452     queue_size: usize,
453     readonly: bool,
454     direct: bool,
455     poll_queue: bool,
456 }
457 
458 impl VhostUserBlkBackendConfig {
459     fn parse(backend: &str) -> Result<Self> {
460         let mut parser = OptionParser::new();
461         parser
462             .add("path")
463             .add("readonly")
464             .add("direct")
465             .add("num_queues")
466             .add("queue_size")
467             .add("socket")
468             .add("poll_queue");
469         parser.parse(backend).map_err(Error::FailedConfigParse)?;
470 
471         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
472         let readonly = parser
473             .convert::<Toggle>("readonly")
474             .map_err(Error::FailedConfigParse)?
475             .unwrap_or(Toggle(false))
476             .0;
477         let direct = parser
478             .convert::<Toggle>("direct")
479             .map_err(Error::FailedConfigParse)?
480             .unwrap_or(Toggle(false))
481             .0;
482         let num_queues = parser
483             .convert("num_queues")
484             .map_err(Error::FailedConfigParse)?
485             .unwrap_or(1);
486         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
487         let poll_queue = parser
488             .convert::<Toggle>("poll_queue")
489             .map_err(Error::FailedConfigParse)?
490             .unwrap_or(Toggle(true))
491             .0;
492         let queue_size = parser
493             .convert("queue_size")
494             .map_err(Error::FailedConfigParse)?
495             .unwrap_or(1024);
496 
497         Ok(VhostUserBlkBackendConfig {
498             path,
499             socket,
500             num_queues,
501             queue_size,
502             readonly,
503             direct,
504             poll_queue,
505         })
506     }
507 }
508 
509 pub fn start_block_backend(backend_command: &str) {
510     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
511         Ok(config) => config,
512         Err(e) => {
513             println!("Failed parsing parameters {e:?}");
514             process::exit(1);
515         }
516     };
517 
518     let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
519 
520     let blk_backend = Arc::new(RwLock::new(
521         VhostUserBlkBackend::new(
522             backend_config.path,
523             backend_config.num_queues,
524             backend_config.readonly,
525             backend_config.direct,
526             backend_config.poll_queue,
527             backend_config.queue_size,
528             mem.clone(),
529         )
530         .unwrap(),
531     ));
532 
533     debug!("blk_backend is created!\n");
534 
535     let listener = Listener::new(&backend_config.socket, true).unwrap();
536 
537     let name = "vhost-user-blk-backend";
538     let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
539 
540     debug!("blk_daemon is created!\n");
541 
542     if let Err(e) = blk_daemon.start(listener) {
543         error!(
544             "Failed to start daemon for vhost-user-block with error: {:?}\n",
545             e
546         );
547         process::exit(1);
548     }
549 
550     if let Err(e) = blk_daemon.wait() {
551         error!("Error from the main thread: {:?}", e);
552     }
553 
554     for thread in blk_backend.read().unwrap().threads.iter() {
555         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
556             error!("Error shutting down worker thread: {:?}", e)
557         }
558     }
559 }
560