xref: /cloud-hypervisor/block/src/raw_async.rs (revision eb0b14f70ed5ed44b76579145fd2a741c0100ae4)
1 // Copyright © 2021 Intel Corporation
2 //
3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
4 
5 use std::fs::File;
6 use std::io::{Error, Seek, SeekFrom};
7 use std::os::unix::io::{AsRawFd, RawFd};
8 
9 use io_uring::{opcode, types, IoUring};
10 use vmm_sys_util::eventfd::EventFd;
11 
12 use crate::async_io::{
13     AsyncIo, AsyncIoError, AsyncIoResult, BorrowedDiskFd, DiskFile, DiskFileError, DiskFileResult,
14 };
15 use crate::DiskTopology;
16 
17 pub struct RawFileDisk {
18     file: File,
19 }
20 
21 impl RawFileDisk {
22     pub fn new(file: File) -> Self {
23         RawFileDisk { file }
24     }
25 }
26 
27 impl DiskFile for RawFileDisk {
28     fn size(&mut self) -> DiskFileResult<u64> {
29         self.file
30             .seek(SeekFrom::End(0))
31             .map_err(DiskFileError::Size)
32     }
33 
34     fn new_async_io(&self, ring_depth: u32) -> DiskFileResult<Box<dyn AsyncIo>> {
35         Ok(Box::new(
36             RawFileAsync::new(self.file.as_raw_fd(), ring_depth)
37                 .map_err(DiskFileError::NewAsyncIo)?,
38         ) as Box<dyn AsyncIo>)
39     }
40 
41     fn topology(&mut self) -> DiskTopology {
42         if let Ok(topology) = DiskTopology::probe(&self.file) {
43             topology
44         } else {
45             warn!("Unable to get device topology. Using default topology");
46             DiskTopology::default()
47         }
48     }
49 
50     fn fd(&mut self) -> BorrowedDiskFd {
51         BorrowedDiskFd::new(self.file.as_raw_fd())
52     }
53 }
54 
55 pub struct RawFileAsync {
56     fd: RawFd,
57     io_uring: IoUring,
58     eventfd: EventFd,
59 }
60 
61 impl RawFileAsync {
62     pub fn new(fd: RawFd, ring_depth: u32) -> std::io::Result<Self> {
63         let io_uring = IoUring::new(ring_depth)?;
64         let eventfd = EventFd::new(libc::EFD_NONBLOCK)?;
65 
66         // Register the io_uring eventfd that will notify when something in
67         // the completion queue is ready.
68         io_uring.submitter().register_eventfd(eventfd.as_raw_fd())?;
69 
70         Ok(RawFileAsync {
71             fd,
72             io_uring,
73             eventfd,
74         })
75     }
76 }
77 
78 impl AsyncIo for RawFileAsync {
79     fn notifier(&self) -> &EventFd {
80         &self.eventfd
81     }
82 
83     fn read_vectored(
84         &mut self,
85         offset: libc::off_t,
86         iovecs: &[libc::iovec],
87         user_data: u64,
88     ) -> AsyncIoResult<()> {
89         let (submitter, mut sq, _) = self.io_uring.split();
90 
91         // SAFETY: we know the file descriptor is valid and we
92         // relied on vm-memory to provide the buffer address.
93         unsafe {
94             sq.push(
95                 &opcode::Readv::new(types::Fd(self.fd), iovecs.as_ptr(), iovecs.len() as u32)
96                     .offset(offset.try_into().unwrap())
97                     .build()
98                     .user_data(user_data),
99             )
100             .map_err(|_| AsyncIoError::ReadVectored(Error::other("Submission queue is full")))?
101         };
102 
103         // Update the submission queue and submit new operations to the
104         // io_uring instance.
105         sq.sync();
106         submitter.submit().map_err(AsyncIoError::ReadVectored)?;
107 
108         Ok(())
109     }
110 
111     fn write_vectored(
112         &mut self,
113         offset: libc::off_t,
114         iovecs: &[libc::iovec],
115         user_data: u64,
116     ) -> AsyncIoResult<()> {
117         let (submitter, mut sq, _) = self.io_uring.split();
118 
119         // SAFETY: we know the file descriptor is valid and we
120         // relied on vm-memory to provide the buffer address.
121         unsafe {
122             sq.push(
123                 &opcode::Writev::new(types::Fd(self.fd), iovecs.as_ptr(), iovecs.len() as u32)
124                     .offset(offset.try_into().unwrap())
125                     .build()
126                     .user_data(user_data),
127             )
128             .map_err(|_| AsyncIoError::WriteVectored(Error::other("Submission queue is full")))?
129         };
130 
131         // Update the submission queue and submit new operations to the
132         // io_uring instance.
133         sq.sync();
134         submitter.submit().map_err(AsyncIoError::WriteVectored)?;
135 
136         Ok(())
137     }
138 
139     fn fsync(&mut self, user_data: Option<u64>) -> AsyncIoResult<()> {
140         if let Some(user_data) = user_data {
141             let (submitter, mut sq, _) = self.io_uring.split();
142 
143             // SAFETY: we know the file descriptor is valid.
144             unsafe {
145                 sq.push(
146                     &opcode::Fsync::new(types::Fd(self.fd))
147                         .build()
148                         .user_data(user_data),
149                 )
150                 .map_err(|_| AsyncIoError::Fsync(Error::other("Submission queue is full")))?
151             };
152 
153             // Update the submission queue and submit new operations to the
154             // io_uring instance.
155             sq.sync();
156             submitter.submit().map_err(AsyncIoError::Fsync)?;
157         } else {
158             // SAFETY: FFI call with a valid fd
159             unsafe { libc::fsync(self.fd) };
160         }
161 
162         Ok(())
163     }
164 
165     fn next_completed_request(&mut self) -> Option<(u64, i32)> {
166         self.io_uring
167             .completion()
168             .next()
169             .map(|entry| (entry.user_data(), entry.result()))
170     }
171 }
172