1 // Copyright 2019 Red Hat, Inc. All Rights Reserved.
2 //
3 // Portions Copyright 2019 Intel Corporation. All Rights Reserved.
4 //
5 // Portions Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
6 //
7 // Portions Copyright 2017 The Chromium OS Authors. All rights reserved.
8 //
9 // SPDX-License-Identifier: (Apache-2.0 AND BSD-3-Clause)
10
11 use std::fs::{File, OpenOptions};
12 use std::io::{Read, Seek, SeekFrom, Write};
13 use std::ops::{Deref, DerefMut};
14 use std::os::unix::fs::OpenOptionsExt;
15 use std::path::PathBuf;
16 use std::sync::atomic::{AtomicBool, Ordering};
17 use std::sync::{Arc, Mutex, RwLock, RwLockWriteGuard};
18 use std::time::Instant;
19 use std::{convert, io, process, result};
20
21 use block::qcow::{self, ImageType, QcowFile};
22 use block::{build_serial, Request, VirtioBlockConfig};
23 use libc::EFD_NONBLOCK;
24 use log::*;
25 use option_parser::{OptionParser, OptionParserError, Toggle};
26 use thiserror::Error;
27 use vhost::vhost_user::message::*;
28 use vhost::vhost_user::Listener;
29 use vhost_user_backend::bitmap::BitmapMmapRegion;
30 use vhost_user_backend::{VhostUserBackendMut, VhostUserDaemon, VringRwLock, VringState, VringT};
31 use virtio_bindings::virtio_blk::*;
32 use virtio_bindings::virtio_config::VIRTIO_F_VERSION_1;
33 use virtio_bindings::virtio_ring::VIRTIO_RING_F_EVENT_IDX;
34 use virtio_queue::QueueT;
35 use vm_memory::{ByteValued, Bytes, GuestAddressSpace, GuestMemoryAtomic};
36 use vmm_sys_util::epoll::EventSet;
37 use vmm_sys_util::eventfd::EventFd;
38
39 type GuestMemoryMmap = vm_memory::GuestMemoryMmap<BitmapMmapRegion>;
40
41 const SECTOR_SHIFT: u8 = 9;
42 const SECTOR_SIZE: u64 = 0x01 << SECTOR_SHIFT;
43 const BLK_SIZE: u32 = 512;
44 // Current (2020) enterprise SSDs have a latency lower than 30us.
45 // Polling for 50us should be enough to cover for the device latency
46 // and the overhead of the emulation layer.
47 const POLL_QUEUE_US: u128 = 50;
48
49 trait DiskFile: Read + Seek + Write + Send {}
50 impl<D: Read + Seek + Write + Send> DiskFile for D {}
51
52 type Result<T> = std::result::Result<T, Error>;
53 type VhostUserBackendResult<T> = std::result::Result<T, std::io::Error>;
54
55 #[allow(dead_code)]
56 #[derive(Error, Debug)]
57 enum Error {
58 /// Failed to create kill eventfd
59 #[error("Failed to create kill eventfd")]
60 CreateKillEventFd(#[source] io::Error),
61 /// Failed to parse configuration string
62 #[error("Failed to parse configuration string")]
63 FailedConfigParse(#[source] OptionParserError),
64 /// Failed to handle event other than input event.
65 #[error("Failed to handle event other than input event")]
66 HandleEventNotEpollIn,
67 /// Failed to handle unknown event.
68 #[error("Failed to handle unknown event")]
69 HandleEventUnknownEvent,
70 /// No path provided
71 #[error("No path provided")]
72 PathParameterMissing,
73 /// No socket provided
74 #[error("No socket provided")]
75 SocketParameterMissing,
76 }
77
78 pub const SYNTAX: &str = "vhost-user-block backend parameters \
79 \"path=<image_path>,socket=<socket_path>,num_queues=<number_of_queues>,\
80 queue_size=<size_of_each_queue>,readonly=true|false,direct=true|false,\
81 poll_queue=true|false\"";
82
83 impl convert::From<Error> for io::Error {
from(e: Error) -> Self84 fn from(e: Error) -> Self {
85 io::Error::other(e)
86 }
87 }
88
89 struct VhostUserBlkThread {
90 disk_image: Arc<Mutex<dyn DiskFile>>,
91 serial: Vec<u8>,
92 disk_nsectors: u64,
93 event_idx: bool,
94 kill_evt: EventFd,
95 writeback: Arc<AtomicBool>,
96 mem: GuestMemoryAtomic<GuestMemoryMmap>,
97 }
98
99 impl VhostUserBlkThread {
new( disk_image: Arc<Mutex<dyn DiskFile>>, serial: Vec<u8>, disk_nsectors: u64, writeback: Arc<AtomicBool>, mem: GuestMemoryAtomic<GuestMemoryMmap>, ) -> Result<Self>100 fn new(
101 disk_image: Arc<Mutex<dyn DiskFile>>,
102 serial: Vec<u8>,
103 disk_nsectors: u64,
104 writeback: Arc<AtomicBool>,
105 mem: GuestMemoryAtomic<GuestMemoryMmap>,
106 ) -> Result<Self> {
107 Ok(VhostUserBlkThread {
108 disk_image,
109 serial,
110 disk_nsectors,
111 event_idx: false,
112 kill_evt: EventFd::new(EFD_NONBLOCK).map_err(Error::CreateKillEventFd)?,
113 writeback,
114 mem,
115 })
116 }
117
process_queue( &mut self, vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>, ) -> bool118 fn process_queue(
119 &mut self,
120 vring: &mut RwLockWriteGuard<VringState<GuestMemoryAtomic<GuestMemoryMmap>>>,
121 ) -> bool {
122 let mut used_descs = false;
123
124 while let Some(mut desc_chain) = vring
125 .get_queue_mut()
126 .pop_descriptor_chain(self.mem.memory())
127 {
128 debug!("got an element in the queue");
129 let len;
130 match Request::parse(&mut desc_chain, None) {
131 Ok(mut request) => {
132 debug!("element is a valid request");
133 request.set_writeback(self.writeback.load(Ordering::Acquire));
134 let status = match request.execute(
135 &mut self.disk_image.lock().unwrap().deref_mut(),
136 self.disk_nsectors,
137 desc_chain.memory(),
138 &self.serial,
139 ) {
140 Ok(l) => {
141 len = l;
142 VIRTIO_BLK_S_OK as u8
143 }
144 Err(e) => {
145 len = 1;
146 e.status()
147 }
148 };
149 desc_chain
150 .memory()
151 .write_obj(status, request.status_addr)
152 .unwrap();
153 }
154 Err(err) => {
155 error!("failed to parse available descriptor chain: {:?}", err);
156 len = 0;
157 }
158 }
159
160 vring
161 .get_queue_mut()
162 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
163 .unwrap();
164 used_descs = true;
165 }
166
167 let mut needs_signalling = false;
168 if self.event_idx {
169 if vring
170 .get_queue_mut()
171 .needs_notification(self.mem.memory().deref())
172 .unwrap()
173 {
174 debug!("signalling queue");
175 needs_signalling = true;
176 } else {
177 debug!("omitting signal (event_idx)");
178 }
179 } else {
180 debug!("signalling queue");
181 needs_signalling = true;
182 }
183
184 if needs_signalling {
185 vring.signal_used_queue().unwrap();
186 }
187
188 used_descs
189 }
190 }
191
192 struct VhostUserBlkBackend {
193 threads: Vec<Mutex<VhostUserBlkThread>>,
194 config: VirtioBlockConfig,
195 rdonly: bool,
196 poll_queue: bool,
197 queues_per_thread: Vec<u64>,
198 queue_size: usize,
199 acked_features: u64,
200 writeback: Arc<AtomicBool>,
201 mem: GuestMemoryAtomic<GuestMemoryMmap>,
202 }
203
204 impl VhostUserBlkBackend {
new( image_path: String, num_queues: usize, rdonly: bool, direct: bool, poll_queue: bool, queue_size: usize, mem: GuestMemoryAtomic<GuestMemoryMmap>, ) -> Result<Self>205 fn new(
206 image_path: String,
207 num_queues: usize,
208 rdonly: bool,
209 direct: bool,
210 poll_queue: bool,
211 queue_size: usize,
212 mem: GuestMemoryAtomic<GuestMemoryMmap>,
213 ) -> Result<Self> {
214 let mut options = OpenOptions::new();
215 options.read(true);
216 options.write(!rdonly);
217 if direct {
218 options.custom_flags(libc::O_DIRECT);
219 }
220 let image: File = options.open(&image_path).unwrap();
221 let mut raw_img: qcow::RawFile = qcow::RawFile::new(image, direct);
222
223 let serial = build_serial(&PathBuf::from(&image_path));
224 let image_type = qcow::detect_image_type(&mut raw_img).unwrap();
225 let image = match image_type {
226 ImageType::Raw => Arc::new(Mutex::new(raw_img)) as Arc<Mutex<dyn DiskFile>>,
227 ImageType::Qcow2 => {
228 Arc::new(Mutex::new(QcowFile::from(raw_img).unwrap())) as Arc<Mutex<dyn DiskFile>>
229 }
230 };
231
232 let nsectors = (image.lock().unwrap().seek(SeekFrom::End(0)).unwrap()) / SECTOR_SIZE;
233 let config = VirtioBlockConfig {
234 capacity: nsectors,
235 blk_size: BLK_SIZE,
236 size_max: 65535,
237 seg_max: 128 - 2,
238 min_io_size: 1,
239 opt_io_size: 1,
240 num_queues: num_queues as u16,
241 writeback: 1,
242 ..Default::default()
243 };
244
245 let mut queues_per_thread = Vec::new();
246 let mut threads = Vec::new();
247 let writeback = Arc::new(AtomicBool::new(true));
248 for i in 0..num_queues {
249 let thread = Mutex::new(VhostUserBlkThread::new(
250 image.clone(),
251 serial.clone(),
252 nsectors,
253 writeback.clone(),
254 mem.clone(),
255 )?);
256 threads.push(thread);
257 queues_per_thread.push(0b1 << i);
258 }
259
260 Ok(VhostUserBlkBackend {
261 threads,
262 config,
263 rdonly,
264 poll_queue,
265 queues_per_thread,
266 queue_size,
267 acked_features: 0,
268 writeback,
269 mem,
270 })
271 }
272
update_writeback(&mut self)273 fn update_writeback(&mut self) {
274 // Use writeback from config if VIRTIO_BLK_F_CONFIG_WCE
275 let writeback = if self.acked_features & (1 << VIRTIO_BLK_F_CONFIG_WCE)
276 == 1 << VIRTIO_BLK_F_CONFIG_WCE
277 {
278 self.config.writeback == 1
279 } else {
280 // Else check if VIRTIO_BLK_F_FLUSH negotiated
281 self.acked_features & (1 << VIRTIO_BLK_F_FLUSH) == 1 << VIRTIO_BLK_F_FLUSH
282 };
283
284 info!(
285 "Changing cache mode to {}",
286 if writeback {
287 "writeback"
288 } else {
289 "writethrough"
290 }
291 );
292 self.writeback.store(writeback, Ordering::Release);
293 }
294 }
295
296 impl VhostUserBackendMut for VhostUserBlkBackend {
297 type Bitmap = BitmapMmapRegion;
298 type Vring = VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>;
299
num_queues(&self) -> usize300 fn num_queues(&self) -> usize {
301 self.config.num_queues as usize
302 }
303
max_queue_size(&self) -> usize304 fn max_queue_size(&self) -> usize {
305 self.queue_size
306 }
307
features(&self) -> u64308 fn features(&self) -> u64 {
309 let mut avail_features = (1 << VIRTIO_BLK_F_SEG_MAX)
310 | (1 << VIRTIO_BLK_F_BLK_SIZE)
311 | (1 << VIRTIO_BLK_F_FLUSH)
312 | (1 << VIRTIO_BLK_F_TOPOLOGY)
313 | (1 << VIRTIO_BLK_F_MQ)
314 | (1 << VIRTIO_BLK_F_CONFIG_WCE)
315 | (1 << VIRTIO_RING_F_EVENT_IDX)
316 | (1 << VIRTIO_F_VERSION_1)
317 | VhostUserVirtioFeatures::PROTOCOL_FEATURES.bits();
318
319 if self.rdonly {
320 avail_features |= 1 << VIRTIO_BLK_F_RO;
321 }
322 avail_features
323 }
324
acked_features(&mut self, features: u64)325 fn acked_features(&mut self, features: u64) {
326 self.acked_features = features;
327 self.update_writeback();
328 }
329
protocol_features(&self) -> VhostUserProtocolFeatures330 fn protocol_features(&self) -> VhostUserProtocolFeatures {
331 VhostUserProtocolFeatures::CONFIG
332 | VhostUserProtocolFeatures::MQ
333 | VhostUserProtocolFeatures::CONFIGURE_MEM_SLOTS
334 }
335
set_event_idx(&mut self, enabled: bool)336 fn set_event_idx(&mut self, enabled: bool) {
337 for thread in self.threads.iter() {
338 thread.lock().unwrap().event_idx = enabled;
339 }
340 }
341
handle_event( &mut self, device_event: u16, evset: EventSet, vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>], thread_id: usize, ) -> VhostUserBackendResult<()>342 fn handle_event(
343 &mut self,
344 device_event: u16,
345 evset: EventSet,
346 vrings: &[VringRwLock<GuestMemoryAtomic<GuestMemoryMmap>>],
347 thread_id: usize,
348 ) -> VhostUserBackendResult<()> {
349 if evset != EventSet::IN {
350 return Err(Error::HandleEventNotEpollIn.into());
351 }
352
353 debug!("event received: {:?}", device_event);
354
355 let mut thread = self.threads[thread_id].lock().unwrap();
356 match device_event {
357 0 => {
358 let mut vring = vrings[0].get_mut();
359
360 if self.poll_queue {
361 // Actively poll the queue until POLL_QUEUE_US has passed
362 // without seeing a new request.
363 let mut now = Instant::now();
364 loop {
365 if thread.process_queue(&mut vring) {
366 now = Instant::now();
367 } else if now.elapsed().as_micros() > POLL_QUEUE_US {
368 break;
369 }
370 }
371 }
372
373 if thread.event_idx {
374 // vm-virtio's Queue implementation only checks avail_index
375 // once, so to properly support EVENT_IDX we need to keep
376 // calling process_queue() until it stops finding new
377 // requests on the queue.
378 loop {
379 vring
380 .get_queue_mut()
381 .enable_notification(self.mem.memory().deref())
382 .unwrap();
383 if !thread.process_queue(&mut vring) {
384 break;
385 }
386 }
387 } else {
388 // Without EVENT_IDX, a single call is enough.
389 thread.process_queue(&mut vring);
390 }
391
392 Ok(())
393 }
394 _ => Err(Error::HandleEventUnknownEvent.into()),
395 }
396 }
397
get_config(&self, _offset: u32, _size: u32) -> Vec<u8>398 fn get_config(&self, _offset: u32, _size: u32) -> Vec<u8> {
399 self.config.as_slice().to_vec()
400 }
401
set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error>402 fn set_config(&mut self, offset: u32, data: &[u8]) -> result::Result<(), io::Error> {
403 let config_slice = self.config.as_mut_slice();
404 let data_len = data.len() as u32;
405 let config_len = config_slice.len() as u32;
406 if offset + data_len > config_len {
407 error!("Failed to write config space");
408 return Err(io::Error::from_raw_os_error(libc::EINVAL));
409 }
410 let (_, right) = config_slice.split_at_mut(offset as usize);
411 right.copy_from_slice(data);
412 self.update_writeback();
413 Ok(())
414 }
415
exit_event(&self, thread_index: usize) -> Option<EventFd>416 fn exit_event(&self, thread_index: usize) -> Option<EventFd> {
417 Some(
418 self.threads[thread_index]
419 .lock()
420 .unwrap()
421 .kill_evt
422 .try_clone()
423 .unwrap(),
424 )
425 }
426
queues_per_thread(&self) -> Vec<u64>427 fn queues_per_thread(&self) -> Vec<u64> {
428 self.queues_per_thread.clone()
429 }
430
update_memory( &mut self, _mem: GuestMemoryAtomic<GuestMemoryMmap>, ) -> VhostUserBackendResult<()>431 fn update_memory(
432 &mut self,
433 _mem: GuestMemoryAtomic<GuestMemoryMmap>,
434 ) -> VhostUserBackendResult<()> {
435 Ok(())
436 }
437 }
438
439 struct VhostUserBlkBackendConfig {
440 path: String,
441 socket: String,
442 num_queues: usize,
443 queue_size: usize,
444 readonly: bool,
445 direct: bool,
446 poll_queue: bool,
447 }
448
449 impl VhostUserBlkBackendConfig {
parse(backend: &str) -> Result<Self>450 fn parse(backend: &str) -> Result<Self> {
451 let mut parser = OptionParser::new();
452 parser
453 .add("path")
454 .add("readonly")
455 .add("direct")
456 .add("num_queues")
457 .add("queue_size")
458 .add("socket")
459 .add("poll_queue");
460 parser.parse(backend).map_err(Error::FailedConfigParse)?;
461
462 let path = parser.get("path").ok_or(Error::PathParameterMissing)?;
463 let readonly = parser
464 .convert::<Toggle>("readonly")
465 .map_err(Error::FailedConfigParse)?
466 .unwrap_or(Toggle(false))
467 .0;
468 let direct = parser
469 .convert::<Toggle>("direct")
470 .map_err(Error::FailedConfigParse)?
471 .unwrap_or(Toggle(false))
472 .0;
473 let num_queues = parser
474 .convert("num_queues")
475 .map_err(Error::FailedConfigParse)?
476 .unwrap_or(1);
477 let socket = parser.get("socket").ok_or(Error::SocketParameterMissing)?;
478 let poll_queue = parser
479 .convert::<Toggle>("poll_queue")
480 .map_err(Error::FailedConfigParse)?
481 .unwrap_or(Toggle(true))
482 .0;
483 let queue_size = parser
484 .convert("queue_size")
485 .map_err(Error::FailedConfigParse)?
486 .unwrap_or(1024);
487
488 Ok(VhostUserBlkBackendConfig {
489 path,
490 socket,
491 num_queues,
492 queue_size,
493 readonly,
494 direct,
495 poll_queue,
496 })
497 }
498 }
499
start_block_backend(backend_command: &str)500 pub fn start_block_backend(backend_command: &str) {
501 let backend_config = match VhostUserBlkBackendConfig::parse(backend_command) {
502 Ok(config) => config,
503 Err(e) => {
504 println!("Failed parsing parameters {e:?}");
505 process::exit(1);
506 }
507 };
508
509 let mem = GuestMemoryAtomic::new(GuestMemoryMmap::new());
510
511 let blk_backend = Arc::new(RwLock::new(
512 VhostUserBlkBackend::new(
513 backend_config.path,
514 backend_config.num_queues,
515 backend_config.readonly,
516 backend_config.direct,
517 backend_config.poll_queue,
518 backend_config.queue_size,
519 mem.clone(),
520 )
521 .unwrap(),
522 ));
523
524 debug!("blk_backend is created!\n");
525
526 let listener = Listener::new(&backend_config.socket, true).unwrap();
527
528 let name = "vhost-user-blk-backend";
529 let mut blk_daemon = VhostUserDaemon::new(name.to_string(), blk_backend.clone(), mem).unwrap();
530
531 debug!("blk_daemon is created!\n");
532
533 if let Err(e) = blk_daemon.start(listener) {
534 error!(
535 "Failed to start daemon for vhost-user-block with error: {:?}\n",
536 e
537 );
538 process::exit(1);
539 }
540
541 if let Err(e) = blk_daemon.wait() {
542 error!("Error from the main thread: {:?}", e);
543 }
544
545 for thread in blk_backend.read().unwrap().threads.iter() {
546 if let Err(e) = thread.lock().unwrap().kill_evt.write(1) {
547 error!("Error shutting down worker thread: {:?}", e)
548 }
549 }
550 }
551