xref: /cloud-hypervisor/net_util/src/queue_pair.rs (revision 5f814308d6b19037f2afb3d36fe49b0aa14c0b22)
1 // Copyright (c) 2020 Intel Corporation. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
4 
5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap};
6 use rate_limiter::{RateLimiter, TokenType};
7 use std::io;
8 use std::num::Wrapping;
9 use std::os::unix::io::{AsRawFd, RawFd};
10 use std::sync::atomic::{AtomicU64, Ordering};
11 use std::sync::Arc;
12 use thiserror::Error;
13 use virtio_queue::{Queue, QueueOwnedT, QueueT};
14 use vm_memory::bitmap::Bitmap;
15 use vm_memory::{Bytes, GuestMemory};
16 use vm_virtio::{AccessPlatform, Translatable};
17 
18 #[derive(Clone)]
19 pub struct TxVirtio {
20     pub counter_bytes: Wrapping<u64>,
21     pub counter_frames: Wrapping<u64>,
22     iovecs: IovecBuffer,
23 }
24 
25 impl Default for TxVirtio {
26     fn default() -> Self {
27         Self::new()
28     }
29 }
30 
31 impl TxVirtio {
32     pub fn new() -> Self {
33         TxVirtio {
34             counter_bytes: Wrapping(0),
35             counter_frames: Wrapping(0),
36             iovecs: IovecBuffer::new(),
37         }
38     }
39 
40     pub fn process_desc_chain<B: Bitmap + 'static>(
41         &mut self,
42         mem: &vm_memory::GuestMemoryMmap<B>,
43         tap: &Tap,
44         queue: &mut Queue,
45         rate_limiter: &mut Option<RateLimiter>,
46         access_platform: Option<&Arc<dyn AccessPlatform>>,
47     ) -> Result<bool, NetQueuePairError> {
48         let mut retry_write = false;
49         let mut rate_limit_reached = false;
50 
51         while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
52             if rate_limit_reached {
53                 queue.go_to_previous_position();
54                 break;
55             }
56 
57             let mut next_desc = desc_chain.next();
58 
59             let mut iovecs = self.iovecs.borrow();
60             while let Some(desc) = next_desc {
61                 let desc_addr = desc
62                     .addr()
63                     .translate_gva(access_platform, desc.len() as usize);
64                 if !desc.is_write_only() && desc.len() > 0 {
65                     let buf = desc_chain
66                         .memory()
67                         .get_slice(desc_addr, desc.len() as usize)
68                         .map_err(NetQueuePairError::GuestMemory)?
69                         .ptr_guard_mut();
70                     let iovec = libc::iovec {
71                         iov_base: buf.as_ptr() as *mut libc::c_void,
72                         iov_len: desc.len() as libc::size_t,
73                     };
74                     iovecs.push(iovec);
75                 } else {
76                     error!(
77                         "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
78                         desc_addr.0,
79                         desc.len(),
80                         desc.is_write_only()
81                     );
82                     return Err(NetQueuePairError::DescriptorChainInvalid);
83                 }
84                 next_desc = desc_chain.next();
85             }
86 
87             let len = if !iovecs.is_empty() {
88                 // SAFETY: FFI call with correct arguments
89                 let result = unsafe {
90                     libc::writev(
91                         tap.as_raw_fd() as libc::c_int,
92                         iovecs.as_ptr(),
93                         iovecs.len() as libc::c_int,
94                     )
95                 };
96 
97                 if result < 0 {
98                     let e = std::io::Error::last_os_error();
99 
100                     /* EAGAIN */
101                     if e.kind() == std::io::ErrorKind::WouldBlock {
102                         queue.go_to_previous_position();
103                         retry_write = true;
104                         break;
105                     }
106                     error!("net: tx: failed writing to tap: {}", e);
107                     return Err(NetQueuePairError::WriteTap(e));
108                 }
109 
110                 if (result as usize) < vnet_hdr_len() {
111                     return Err(NetQueuePairError::InvalidVirtioNetHeader);
112                 }
113 
114                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
115                 self.counter_frames += Wrapping(1);
116 
117                 result as u32
118             } else {
119                 0
120             };
121 
122             // For the sake of simplicity (similar to the RX rate limiting), we always
123             // let the 'last' descriptor chain go-through even if it was over the rate
124             // limit, and simply stop processing oncoming `avail_desc` if any.
125             if let Some(rate_limiter) = rate_limiter {
126                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
127                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
128             }
129 
130             queue
131                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
132                 .map_err(NetQueuePairError::QueueAddUsed)?;
133 
134             if !queue
135                 .enable_notification(mem)
136                 .map_err(NetQueuePairError::QueueEnableNotification)?
137             {
138                 break;
139             }
140         }
141 
142         Ok(retry_write)
143     }
144 }
145 
146 #[derive(Clone)]
147 pub struct RxVirtio {
148     pub counter_bytes: Wrapping<u64>,
149     pub counter_frames: Wrapping<u64>,
150     iovecs: IovecBuffer,
151 }
152 
153 impl Default for RxVirtio {
154     fn default() -> Self {
155         Self::new()
156     }
157 }
158 
159 impl RxVirtio {
160     pub fn new() -> Self {
161         RxVirtio {
162             counter_bytes: Wrapping(0),
163             counter_frames: Wrapping(0),
164             iovecs: IovecBuffer::new(),
165         }
166     }
167 
168     pub fn process_desc_chain<B: Bitmap + 'static>(
169         &mut self,
170         mem: &vm_memory::GuestMemoryMmap<B>,
171         tap: &Tap,
172         queue: &mut Queue,
173         rate_limiter: &mut Option<RateLimiter>,
174         access_platform: Option<&Arc<dyn AccessPlatform>>,
175     ) -> Result<bool, NetQueuePairError> {
176         let mut exhausted_descs = true;
177         let mut rate_limit_reached = false;
178 
179         while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
180             if rate_limit_reached {
181                 exhausted_descs = false;
182                 queue.go_to_previous_position();
183                 break;
184             }
185 
186             let desc = desc_chain
187                 .next()
188                 .ok_or(NetQueuePairError::DescriptorChainTooShort)?;
189 
190             let num_buffers_addr = desc_chain
191                 .memory()
192                 .checked_offset(
193                     desc.addr()
194                         .translate_gva(access_platform, desc.len() as usize),
195                     10,
196                 )
197                 .ok_or(NetQueuePairError::DescriptorInvalidHeader)?;
198             let mut next_desc = Some(desc);
199 
200             let mut iovecs = self.iovecs.borrow();
201             while let Some(desc) = next_desc {
202                 let desc_addr = desc
203                     .addr()
204                     .translate_gva(access_platform, desc.len() as usize);
205                 if desc.is_write_only() && desc.len() > 0 {
206                     let buf = desc_chain
207                         .memory()
208                         .get_slice(desc_addr, desc.len() as usize)
209                         .map_err(NetQueuePairError::GuestMemory)?
210                         .ptr_guard_mut();
211                     let iovec = libc::iovec {
212                         iov_base: buf.as_ptr() as *mut libc::c_void,
213                         iov_len: desc.len() as libc::size_t,
214                     };
215                     iovecs.push(iovec);
216                 } else {
217                     error!(
218                         "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
219                         desc_addr.0,
220                         desc.len(),
221                         desc.is_write_only()
222                     );
223                     return Err(NetQueuePairError::DescriptorChainInvalid);
224                 }
225                 next_desc = desc_chain.next();
226             }
227 
228             let len = if !iovecs.is_empty() {
229                 // SAFETY: FFI call with correct arguments
230                 let result = unsafe {
231                     libc::readv(
232                         tap.as_raw_fd() as libc::c_int,
233                         iovecs.as_ptr(),
234                         iovecs.len() as libc::c_int,
235                     )
236                 };
237                 if result < 0 {
238                     let e = std::io::Error::last_os_error();
239                     exhausted_descs = false;
240                     queue.go_to_previous_position();
241 
242                     /* EAGAIN */
243                     if e.kind() == std::io::ErrorKind::WouldBlock {
244                         break;
245                     }
246 
247                     error!("net: rx: failed reading from tap: {}", e);
248                     return Err(NetQueuePairError::ReadTap(e));
249                 }
250 
251                 if (result as usize) < vnet_hdr_len() {
252                     return Err(NetQueuePairError::InvalidVirtioNetHeader);
253                 }
254 
255                 // Write num_buffers to guest memory. We simply write 1 as we
256                 // never spread the frame over more than one descriptor chain.
257                 desc_chain
258                     .memory()
259                     .write_obj(1u16, num_buffers_addr)
260                     .map_err(NetQueuePairError::GuestMemory)?;
261 
262                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
263                 self.counter_frames += Wrapping(1);
264 
265                 result as u32
266             } else {
267                 0
268             };
269 
270             // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
271             // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
272             // chain go-through even if it was over the rate limit, and simply stop
273             // processing oncoming `avail_desc` if any.
274             if let Some(rate_limiter) = rate_limiter {
275                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
276                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
277             }
278 
279             queue
280                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
281                 .map_err(NetQueuePairError::QueueAddUsed)?;
282 
283             if !queue
284                 .enable_notification(mem)
285                 .map_err(NetQueuePairError::QueueEnableNotification)?
286             {
287                 break;
288             }
289         }
290 
291         Ok(exhausted_descs)
292     }
293 }
294 
295 #[derive(Default, Clone)]
296 struct IovecBuffer(Vec<libc::iovec>);
297 
298 // SAFETY: Implementing Send for IovecBuffer is safe as the pointer inside is iovec.
299 // The iovecs are usually constructed from virtio descriptors, which are safe to send across
300 // threads.
301 unsafe impl Send for IovecBuffer {}
302 // SAFETY: Implementing Sync for IovecBuffer is safe as the pointer inside is iovec.
303 // The iovecs are usually constructed from virtio descriptors, which are safe to access from
304 // multiple threads.
305 unsafe impl Sync for IovecBuffer {}
306 
307 impl IovecBuffer {
308     fn new() -> Self {
309         // Here we use 4 as the default capacity because it is enough for most cases.
310         const DEFAULT_CAPACITY: usize = 4;
311         IovecBuffer(Vec::with_capacity(DEFAULT_CAPACITY))
312     }
313 
314     fn borrow(&mut self) -> IovecBufferBorrowed<'_> {
315         IovecBufferBorrowed(&mut self.0)
316     }
317 }
318 
319 struct IovecBufferBorrowed<'a>(&'a mut Vec<libc::iovec>);
320 
321 impl<'a> std::ops::Deref for IovecBufferBorrowed<'a> {
322     type Target = Vec<libc::iovec>;
323 
324     fn deref(&self) -> &Self::Target {
325         self.0
326     }
327 }
328 
329 impl<'a> std::ops::DerefMut for IovecBufferBorrowed<'a> {
330     fn deref_mut(&mut self) -> &mut Self::Target {
331         self.0
332     }
333 }
334 
335 impl Drop for IovecBufferBorrowed<'_> {
336     fn drop(&mut self) {
337         // Clear the buffer to make sure old values are not used after
338         self.0.clear();
339     }
340 }
341 
342 #[derive(Default, Clone)]
343 pub struct NetCounters {
344     pub tx_bytes: Arc<AtomicU64>,
345     pub tx_frames: Arc<AtomicU64>,
346     pub rx_bytes: Arc<AtomicU64>,
347     pub rx_frames: Arc<AtomicU64>,
348 }
349 
350 #[derive(Error, Debug)]
351 pub enum NetQueuePairError {
352     #[error("No memory configured")]
353     NoMemoryConfigured,
354     #[error("Error registering listener: {0}")]
355     RegisterListener(io::Error),
356     #[error("Error unregistering listener: {0}")]
357     UnregisterListener(io::Error),
358     #[error("Error writing to the TAP device: {0}")]
359     WriteTap(io::Error),
360     #[error("Error reading from the TAP device: {0}")]
361     ReadTap(io::Error),
362     #[error("Error related to guest memory: {0}")]
363     GuestMemory(vm_memory::GuestMemoryError),
364     #[error("Returned an error while iterating through the queue: {0}")]
365     QueueIteratorFailed(virtio_queue::Error),
366     #[error("Descriptor chain is too short")]
367     DescriptorChainTooShort,
368     #[error("Descriptor chain does not contain valid descriptors")]
369     DescriptorChainInvalid,
370     #[error("Failed to determine if queue needed notification: {0}")]
371     QueueNeedsNotification(virtio_queue::Error),
372     #[error("Failed to enable notification on the queue: {0}")]
373     QueueEnableNotification(virtio_queue::Error),
374     #[error("Failed to add used index to the queue: {0}")]
375     QueueAddUsed(virtio_queue::Error),
376     #[error("Descriptor with invalid virtio-net header")]
377     DescriptorInvalidHeader,
378     #[error("Invalid virtio-net header")]
379     InvalidVirtioNetHeader,
380 }
381 
382 pub struct NetQueuePair {
383     pub tap: Tap,
384     // With epoll each FD must be unique. So in order to filter the
385     // events we need to get a second FD responding to the original
386     // device so that we can send EPOLLOUT and EPOLLIN to separate
387     // events.
388     pub tap_for_write_epoll: Tap,
389     pub rx: RxVirtio,
390     pub tx: TxVirtio,
391     pub epoll_fd: Option<RawFd>,
392     pub rx_tap_listening: bool,
393     pub tx_tap_listening: bool,
394     pub counters: NetCounters,
395     pub tap_rx_event_id: u16,
396     pub tap_tx_event_id: u16,
397     pub rx_desc_avail: bool,
398     pub rx_rate_limiter: Option<RateLimiter>,
399     pub tx_rate_limiter: Option<RateLimiter>,
400     pub access_platform: Option<Arc<dyn AccessPlatform>>,
401 }
402 
403 impl NetQueuePair {
404     pub fn process_tx<B: Bitmap + 'static>(
405         &mut self,
406         mem: &vm_memory::GuestMemoryMmap<B>,
407         queue: &mut Queue,
408     ) -> Result<bool, NetQueuePairError> {
409         let tx_tap_retry = self.tx.process_desc_chain(
410             mem,
411             &self.tap,
412             queue,
413             &mut self.tx_rate_limiter,
414             self.access_platform.as_ref(),
415         )?;
416 
417         // We got told to try again when writing to the tap. Wait for the TAP to be writable
418         if tx_tap_retry && !self.tx_tap_listening {
419             register_listener(
420                 self.epoll_fd.unwrap(),
421                 self.tap_for_write_epoll.as_raw_fd(),
422                 epoll::Events::EPOLLOUT,
423                 u64::from(self.tap_tx_event_id),
424             )
425             .map_err(NetQueuePairError::RegisterListener)?;
426             self.tx_tap_listening = true;
427             info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable.");
428         } else if !tx_tap_retry && self.tx_tap_listening {
429             unregister_listener(
430                 self.epoll_fd.unwrap(),
431                 self.tap_for_write_epoll.as_raw_fd(),
432                 epoll::Events::EPOLLOUT,
433                 u64::from(self.tap_tx_event_id),
434             )
435             .map_err(NetQueuePairError::UnregisterListener)?;
436             self.tx_tap_listening = false;
437             info!("Writing to TAP succeeded. No longer listening for TAP to become writable.");
438         }
439 
440         self.counters
441             .tx_bytes
442             .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
443         self.counters
444             .tx_frames
445             .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
446         self.tx.counter_bytes = Wrapping(0);
447         self.tx.counter_frames = Wrapping(0);
448 
449         queue
450             .needs_notification(mem)
451             .map_err(NetQueuePairError::QueueNeedsNotification)
452     }
453 
454     pub fn process_rx<B: Bitmap + 'static>(
455         &mut self,
456         mem: &vm_memory::GuestMemoryMmap<B>,
457         queue: &mut Queue,
458     ) -> Result<bool, NetQueuePairError> {
459         self.rx_desc_avail = !self.rx.process_desc_chain(
460             mem,
461             &self.tap,
462             queue,
463             &mut self.rx_rate_limiter,
464             self.access_platform.as_ref(),
465         )?;
466         let rate_limit_reached = self
467             .rx_rate_limiter
468             .as_ref()
469             .map_or(false, |r| r.is_blocked());
470 
471         // Stop listening on the `RX_TAP_EVENT` when:
472         // 1) there is no available describes, or
473         // 2) the RX rate limit is reached.
474         if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) {
475             unregister_listener(
476                 self.epoll_fd.unwrap(),
477                 self.tap.as_raw_fd(),
478                 epoll::Events::EPOLLIN,
479                 u64::from(self.tap_rx_event_id),
480             )
481             .map_err(NetQueuePairError::UnregisterListener)?;
482             self.rx_tap_listening = false;
483         }
484 
485         self.counters
486             .rx_bytes
487             .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
488         self.counters
489             .rx_frames
490             .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
491         self.rx.counter_bytes = Wrapping(0);
492         self.rx.counter_frames = Wrapping(0);
493 
494         queue
495             .needs_notification(mem)
496             .map_err(NetQueuePairError::QueueNeedsNotification)
497     }
498 }
499