xref: /cloud-hypervisor/vhost_user_block/src/lib.rs (revision 6f8bd27cf7629733582d930519e98d19e90afb16)
1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10 
11 #![allow(clippy::significant_drop_in_scrutinee)]
12 
13 use block_util::{build_disk_image_id, Request, VirtioBlockConfig};
14 use libc::EFD_NONBLOCK;
15 use log::*;
16 use option_parser::{OptionParser, OptionParserError, Toggle};
17 use qcow::{self, ImageType, QcowFile};
18 use std::fs::File;
19 use std::fs::OpenOptions;
20 use std::io::Read;
21 use std::io::{Seek, SeekFrom, Write};
22 use std::ops::Deref;
23 use std::ops::DerefMut;
24 use std::os::unix::fs::OpenOptionsExt;
25 use std::path::PathBuf;
26 use std::process;
27 use std::result;
28 use std::sync::atomic::{AtomicBool, Ordering};
29 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
30 use std::time::Instant;
31 use std::vec::Vec;
32 use std::{convert, error, fmt, io};
33 use vhost::vhost_user::message::*;
34 use vhost::vhost_user::Listener;
35 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
36 use virtio_bindings::bindings::virtio_blk::*;
37 use virtio_bindings::bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
38 use virtio_queue::QueueT;
39 use vm_memory::GuestAddressSpace;
40 use vm_memory::{bitmap::AtomicBitmap, ByteValued, Bytes, GuestMemoryAtomic};
41 use vmm_sys_util::{epoll::EventSet, eventfd::EventFd};
42 
43 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<AtomicBitmap>;
44 
45 const SECTOR_SHIFT: u8 = 9;
46 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
47 const BLK_SIZE: u32 = 512;
48 // Current (2020) enterprise SSDs have a latency lower than 30us.
49 // Polling for 50us should be enough to cover for the device latency
50 // and the overhead of the emulation layer.
51 const POLL_QUEUE_US: u128 = 50;
52 
53 trait DiskFile: Read + Seek + Write + Send {}
54 impl<D: Read + Seek + Write + Send> DiskFile for D {}
55 
56 type Result<T> = std::result::Result<T, Error>;
57 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
58 
59 #[derive(Debug)]
60 enum Error {
61     /// Failed to create kill eventfd
62     CreateKillEventFd(io::Error),
63     /// Failed to parse configuration string
64     FailedConfigParse(OptionParserError),
65     /// Failed to handle event other than input event.
66     HandleEventNotEpollIn,
67     /// Failed to handle unknown event.
68     HandleEventUnknownEvent,
69     /// No path provided
70     PathParameterMissing,
71     /// No socket provided
72     SocketParameterMissing,
73 }
74 
75 pub const SYNTAX: &str = "vhost-user-block backend parameters \
76  \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
77  queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
78  poll_queue=true|false\"";
79 
80 impl fmt::Display for Error {
81     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
82         write!(f, "vhost_user_block_error: {:?}", self)
83     }
84 }
85 
86 impl error::Error for Error {}
87 
88 impl convert::From<Error> for io::Error {
89     fn from(e: Error) -> Self {
90         io::Error::new(io::ErrorKind::Other, e)
91     }
92 }
93 
94 struct VhostUserBlkThread {
95     disk_image: Arc<Mutex<dyn DiskFile>>,
96     disk_image_id: Vec<u8>,
97     disk_nsectors: u64,
98     event_idx: bool,
99     kill_evt: EventFd,
100     writeback: Arc<AtomicBool>,
101     mem: GuestMemoryAtomic<GuestMemoryMmap>,
102 }
103 
104 impl VhostUserBlkThread {
105     fn new(
106         disk_image: Arc<Mutex<dyn DiskFile>>,
107         disk_image_id: Vec<u8>,
108         disk_nsectors: u64,
109         writeback: Arc<AtomicBool>,
110         mem: GuestMemoryAtomic<GuestMemoryMmap>,
111     ) -> Result<Self> {
112         Ok(VhostUserBlkThread {
113             disk_image,
114             disk_image_id,
115             disk_nsectors,
116             event_idx: false,
117             kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
118             writeback,
119             mem,
120         })
121     }
122 
123     fn process_queue(
124         &mut self,
125         vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
126     ) -> bool {
127         let mut used_descs = false;
128 
129         while let Some(mut desc_chain) = vring
130             .get_queue_mut()
131             .pop_descriptor_chain(self.mem.memory())
132         {
133             debug!("got an element in the queue");
134             let len;
135             match Request::parse(&mut desc_chain, None) {
136                 Ok(mut request) => {
137                     debug!("element is a valid request");
138                     request.set_writeback(self.writeback.load(Ordering::Acquire));
139                     let status = match request.execute(
140                         &mut self.disk_image.lock().unwrap().deref_mut(),
141                         self.disk_nsectors,
142                         desc_chain.memory(),
143                         &self.disk_image_id,
144                     ) {
145                         Ok(l) => {
146                             len = l;
147                             VIRTIO_BLK_S_OK
148                         }
149                         Err(e) => {
150                             len = 1;
151                             e.status()
152                         }
153                     };
154                     desc_chain
155                         .memory()
156                         .write_obj(status, request.status_addr)
157                         .unwrap();
158                 }
159                 Err(err) => {
160                     error!("failed to parse available descriptor chain: {:?}", err);
161                     len = 0;
162                 }
163             }
164 
165             vring
166                 .get_queue_mut()
167                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
168                 .unwrap();
169             used_descs = true;
170         }
171 
172         let mut needs_signalling = false;
173         if self.event_idx {
174             if vring
175                 .get_queue_mut()
176                 .needs_notification(self.mem.memory().deref())
177                 .unwrap()
178             {
179                 debug!("signalling queue");
180                 needs_signalling = true;
181             } else {
182                 debug!("omitting signal (event_idx)");
183             }
184         } else {
185             debug!("signalling queue");
186             needs_signalling = true;
187         }
188 
189         if needs_signalling {
190             vring.signal_used_queue().unwrap();
191         }
192 
193         used_descs
194     }
195 }
196 
197 struct VhostUserBlkBackend {
198     threads: Vec<Mutex<VhostUserBlkThread>>,
199     config: VirtioBlockConfig,
200     rdonly: bool,
201     poll_queue: bool,
202     queues_per_thread: Vec<u64>,
203     queue_size: usize,
204     acked_features: u64,
205     writeback: Arc<AtomicBool>,
206     mem: GuestMemoryAtomic<GuestMemoryMmap>,
207 }
208 
209 impl VhostUserBlkBackend {
210     fn new(
211         image_path: String,
212         num_queues: usize,
213         rdonly: bool,
214         direct: bool,
215         poll_queue: bool,
216         queue_size: usize,
217         mem: GuestMemoryAtomic<GuestMemoryMmap>,
218     ) -> Result<Self> {
219         let mut options = OpenOptions::new();
220         options.read(true);
221         options.write(!rdonly);
222         if direct {
223             options.custom_flags(libc::O_DIRECT);
224         }
225         let image: File = options.open(&image_path).unwrap();
226         let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
227 
228         let image_id = build_disk_image_id(&PathBuf::from(&image_path));
229         let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
230         let image = match image_type {
231             ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
232             ImageType::Qcow2 => {
233                 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
234             }
235         };
236 
237         let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
238         let config = VirtioBlockConfig {
239             capacity: nsectors,
240             blk_size: BLK_SIZE,
241             size_max: 65535,
242             seg_max: 128 - 2,
243             min_io_size: 1,
244             opt_io_size: 1,
245             num_queues: num_queues as u16,
246             writeback: 1,
247             ..Default::default()
248         };
249 
250         let mut queues_per_thread = Vec::new();
251         let mut threads = Vec::new();
252         let writeback = Arc::new(AtomicBool::new(true));
253         for i in 0..num_queues {
254             let thread = Mutex::new(VhostUserBlkThread::new(
255                 image.clone(),
256                 image_id.clone(),
257                 nsectors,
258                 writeback.clone(),
259                 mem.clone(),
260             )?);
261             threads.push(thread);
262             queues_per_thread.push(0b1 << i);
263         }
264 
265         Ok(VhostUserBlkBackend {
266             threads,
267             config,
268             rdonly,
269             poll_queue,
270             queues_per_thread,
271             queue_size,
272             acked_features: 0,
273             writeback,
274             mem,
275         })
276     }
277 
278     fn update_writeback(&mut self) {
279         // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
280         let writeback =
281             if self.acked_features & 1 << VIRTIO_BLK_F_CONFIG_WCE == 1 << VIRTIO_BLK_F_CONFIG_WCE {
282                 self.config.writeback == 1
283             } else {
284                 // Else check if VIRTIO_BLK_F_FLUSH negotiated
285                 self.acked_features & 1 << VIRTIO_BLK_F_FLUSH == 1 << VIRTIO_BLK_F_FLUSH
286             };
287 
288         info!(
289             "Changing cache mode to {}",
290             if writeback {
291                 "writeback"
292             } else {
293                 "writethrough"
294             }
295         );
296         self.writeback.store(writeback, Ordering::Release);
297     }
298 }
299 
300 impl VhostUserBackendMut<VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>, AtomicBitmap>
301     for VhostUserBlkBackend
302 {
303     fn num_queues(&self) -> usize {
304         self.config.num_queues as usize
305     }
306 
307     fn max_queue_size(&self) -> usize {
308         self.queue_size
309     }
310 
311     fn features(&self) -> u64 {
312         let mut avail_features = 1 << VIRTIO_BLK_F_SEG_MAX
313             | 1 << VIRTIO_BLK_F_BLK_SIZE
314             | 1 << VIRTIO_BLK_F_FLUSH
315             | 1 << VIRTIO_BLK_F_TOPOLOGY
316             | 1 << VIRTIO_BLK_F_MQ
317             | 1 << VIRTIO_BLK_F_CONFIG_WCE
318             | 1 << VIRTIO_RING_F_EVENT_IDX
319             | 1 << VIRTIO_F_VERSION_1
320             | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
321 
322         if self.rdonly {
323             avail_features |= 1 << VIRTIO_BLK_F_RO;
324         }
325         avail_features
326     }
327 
328     fn acked_features(&mut self, features: u64) {
329         self.acked_features = features;
330         self.update_writeback();
331     }
332 
333     fn protocol_features(&self) -> VhostUserProtocolFeatures {
334         VhostUserProtocolFeatures::CONFIG
335             | VhostUserProtocolFeatures::MQ
336             | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
337     }
338 
339     fn set_event_idx(&mut self, enabled: bool) {
340         for thread in self.threads.iter() {
341             thread.lock().unwrap().event_idx = enabled;
342         }
343     }
344 
345     fn handle_event(
346         &mut self,
347         device_event: u16,
348         evset: EventSet,
349         vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
350         thread_id: usize,
351     ) -> VhostUserBackendResult<bool> {
352         if evset != EventSet::IN {
353             return Err(Error::HandleEventNotEpollIn.into());
354         }
355 
356         debug!("event received: {:?}", device_event);
357 
358         let mut thread = self.threads[thread_id].lock().unwrap();
359         match device_event {
360             0 => {
361                 let mut vring = vrings[0].get_mut();
362 
363                 if self.poll_queue {
364                     // Actively poll the queue until POLL_QUEUE_US has passed
365                     // without seeing a new request.
366                     let mut now = Instant::now();
367                     loop {
368                         if thread.process_queue(&mut vring) {
369                             now = Instant::now();
370                         } else if now.elapsed().as_micros() > POLL_QUEUE_US {
371                             break;
372                         }
373                     }
374                 }
375 
376                 if thread.event_idx {
377                     // vm-virtio's Queue implementation only checks avail_index
378                     // once, so to properly support EVENT_IDX we need to keep
379                     // calling process_queue() until it stops finding new
380                     // requests on the queue.
381                     loop {
382                         vring
383                             .get_queue_mut()
384                             .enable_notification(self.mem.memory().deref())
385                             .unwrap();
386                         if !thread.process_queue(&mut vring) {
387                             break;
388                         }
389                     }
390                 } else {
391                     // Without EVENT_IDX, a single call is enough.
392                     thread.process_queue(&mut vring);
393                 }
394 
395                 Ok(false)
396             }
397             _ => Err(Error::HandleEventUnknownEvent.into()),
398         }
399     }
400 
401     fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
402         self.config.as_slice().to_vec()
403     }
404 
405     fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
406         let config_slice = self.config.as_mut_slice();
407         let data_len = data.len() as u32;
408         let config_len = config_slice.len() as u32;
409         if offset + data_len > config_len {
410             error!("Failed to write config space");
411             return Err(io::Error::from_raw_os_error(libc::EINVAL));
412         }
413         let (_, right) = config_slice.split_at_mut(offset as usize);
414         right.copy_from_slice(data);
415         self.update_writeback();
416         Ok(())
417     }
418 
419     fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
420         Some(
421             self.threads[thread_index]
422                 .lock()
423                 .unwrap()
424                 .kill_evt
425                 .try_clone()
426                 .unwrap(),
427         )
428     }
429 
430     fn queues_per_thread(&self) -> Vec<u64> {
431         self.queues_per_thread.clone()
432     }
433 
434     fn update_memory(
435         &mut self,
436         _mem: GuestMemoryAtomic<GuestMemoryMmap>,
437     ) -> VhostUserBackendResult<()> {
438         Ok(())
439     }
440 }
441 
442 struct VhostUserBlkBackendConfig {
443     path: String,
444     socket: String,
445     num_queues: usize,
446     queue_size: usize,
447     readonly: bool,
448     direct: bool,
449     poll_queue: bool,
450 }
451 
452 impl VhostUserBlkBackendConfig {
453     fn parse(backend: &str) -> Result<Self> {
454         let mut parser = OptionParser::new();
455         parser
456             .add("path")
457             .add("readonly")
458             .add("direct")
459             .add("num_queues")
460             .add("queue_size")
461             .add("socket")
462             .add("poll_queue");
463         parser.parse(backend).map_err(Error::FailedConfigParse)?;
464 
465         let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
466         let readonly = parser
467             .convert::<Toggle>("readonly")
468             .map_err(Error::FailedConfigParse)?
469             .unwrap_or(Toggle(false))
470             .0;
471         let direct = parser
472             .convert::<Toggle>("direct")
473             .map_err(Error::FailedConfigParse)?
474             .unwrap_or(Toggle(false))
475             .0;
476         let num_queues = parser
477             .convert("num_queues")
478             .map_err(Error::FailedConfigParse)?
479             .unwrap_or(1);
480         let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
481         let poll_queue = parser
482             .convert::<Toggle>("poll_queue")
483             .map_err(Error::FailedConfigParse)?
484             .unwrap_or(Toggle(true))
485             .0;
486         let queue_size = parser
487             .convert("queue_size")
488             .map_err(Error::FailedConfigParse)?
489             .unwrap_or(1024);
490 
491         Ok(VhostUserBlkBackendConfig {
492             path,
493             socket,
494             num_queues,
495             queue_size,
496             readonly,
497             direct,
498             poll_queue,
499         })
500     }
501 }
502 
503 pub fn start_block_backend(backend_command: &str) {
504     let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
505         Ok(config) => config,
506         Err(e) => {
507             println!("Failed parsing parameters {:?}", e);
508             process::exit(1);
509         }
510     };
511 
512     let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
513 
514     let blk_backend = Arc::new(RwLock::new(
515         VhostUserBlkBackend::new(
516             backend_config.path,
517             backend_config.num_queues,
518             backend_config.readonly,
519             backend_config.direct,
520             backend_config.poll_queue,
521             backend_config.queue_size,
522             mem.clone(),
523         )
524         .unwrap(),
525     ));
526 
527     debug!("blk_backend is created!\n");
528 
529     let listener = Listener::new(&backend_config.socket, true).unwrap();
530 
531     let name = "vhost-user-blk-backend";
532     let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
533 
534     debug!("blk_daemon is created!\n");
535 
536     if let Err(e) = blk_daemon.start(listener) {
537         error!(
538             "Failed to start daemon for vhost-user-block with error: {:?}\n",
539             e
540         );
541         process::exit(1);
542     }
543 
544     if let Err(e) = blk_daemon.wait() {
545         error!("Error from the main thread: {:?}", e);
546     }
547 
548     for thread in blk_backend.read().unwrap().threads.iter() {
549         if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
550             error!("Error shutting down worker thread: {:?}", e)
551         }
552     }
553 }
554