xref: /cloud-hypervisor/net_util/src/queue_pair.rs (revision 9af2968a7dc47b89bf07ea9dc5e735084efcfa3a)
1 // Copyright (c) 2020 Intel Corporation. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
4 
5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap};
6 use crate::GuestMemoryMmap;
7 use rate_limiter::{RateLimiter, TokenType};
8 use std::io;
9 use std::num::Wrapping;
10 use std::os::unix::io::{AsRawFd, RawFd};
11 use std::sync::atomic::{AtomicU64, Ordering};
12 use std::sync::Arc;
13 use vm_memory::{Bytes, GuestAddressSpace, GuestMemory, GuestMemoryAtomic};
14 use vm_virtio::Queue;
15 
16 #[derive(Clone)]
17 pub struct TxVirtio {
18     pub counter_bytes: Wrapping<u64>,
19     pub counter_frames: Wrapping<u64>,
20 }
21 
22 impl Default for TxVirtio {
23     fn default() -> Self {
24         Self::new()
25     }
26 }
27 
28 impl TxVirtio {
29     pub fn new() -> Self {
30         TxVirtio {
31             counter_bytes: Wrapping(0),
32             counter_frames: Wrapping(0),
33         }
34     }
35 
36     pub fn process_desc_chain(
37         &mut self,
38         mem: &GuestMemoryMmap,
39         tap: &mut Tap,
40         queue: &mut Queue,
41         rate_limiter: &mut Option<RateLimiter>,
42     ) -> Result<bool, NetQueuePairError> {
43         let mut retry_write = false;
44         let mut rate_limit_reached = false;
45         while let Some(avail_desc) = queue.iter(mem).next() {
46             if rate_limit_reached {
47                 queue.go_to_previous_position();
48                 break;
49             }
50 
51             let head_index = avail_desc.index;
52             let mut next_desc = Some(avail_desc);
53 
54             let mut iovecs = Vec::new();
55             while let Some(desc) = next_desc {
56                 if !desc.is_write_only() && desc.len > 0 {
57                     let buf = mem
58                         .get_slice(desc.addr, desc.len as usize)
59                         .map_err(NetQueuePairError::GuestMemory)?
60                         .as_ptr();
61                     let iovec = libc::iovec {
62                         iov_base: buf as *mut libc::c_void,
63                         iov_len: desc.len as libc::size_t,
64                     };
65                     iovecs.push(iovec);
66                 }
67                 next_desc = desc.next_descriptor();
68             }
69 
70             let len = if !iovecs.is_empty() {
71                 let result = unsafe {
72                     libc::writev(
73                         tap.as_raw_fd() as libc::c_int,
74                         iovecs.as_ptr() as *const libc::iovec,
75                         iovecs.len() as libc::c_int,
76                     )
77                 };
78 
79                 if result < 0 {
80                     let e = std::io::Error::last_os_error();
81 
82                     /* EAGAIN */
83                     if e.kind() == std::io::ErrorKind::WouldBlock {
84                         queue.go_to_previous_position();
85                         retry_write = true;
86                         break;
87                     }
88                     error!("net: tx: failed writing to tap: {}", e);
89                     return Err(NetQueuePairError::WriteTap(e));
90                 }
91 
92                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
93                 self.counter_frames += Wrapping(1);
94 
95                 result as u32
96             } else {
97                 0
98             };
99 
100             queue.add_used(mem, head_index, 0);
101             queue.update_avail_event(mem);
102 
103             // For the sake of simplicity (similar to the RX rate limiting), we always
104             // let the 'last' descriptor chain go-through even if it was over the rate
105             // limit, and simply stop processing oncoming `avail_desc` if any.
106             if let Some(rate_limiter) = rate_limiter {
107                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
108                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
109             }
110         }
111 
112         Ok(retry_write)
113     }
114 }
115 
116 #[derive(Clone)]
117 pub struct RxVirtio {
118     pub counter_bytes: Wrapping<u64>,
119     pub counter_frames: Wrapping<u64>,
120 }
121 
122 impl Default for RxVirtio {
123     fn default() -> Self {
124         Self::new()
125     }
126 }
127 
128 impl RxVirtio {
129     pub fn new() -> Self {
130         RxVirtio {
131             counter_bytes: Wrapping(0),
132             counter_frames: Wrapping(0),
133         }
134     }
135 
136     pub fn process_desc_chain(
137         &mut self,
138         mem: &GuestMemoryMmap,
139         tap: &mut Tap,
140         queue: &mut Queue,
141         rate_limiter: &mut Option<RateLimiter>,
142     ) -> Result<bool, NetQueuePairError> {
143         let mut exhausted_descs = true;
144         let mut rate_limit_reached = false;
145 
146         while let Some(avail_desc) = queue.iter(mem).next() {
147             if rate_limit_reached {
148                 exhausted_descs = false;
149                 queue.go_to_previous_position();
150                 break;
151             }
152 
153             let head_index = avail_desc.index;
154             let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap();
155             let mut next_desc = Some(avail_desc);
156 
157             let mut iovecs = Vec::new();
158             while let Some(desc) = next_desc {
159                 if desc.is_write_only() && desc.len > 0 {
160                     let buf = mem
161                         .get_slice(desc.addr, desc.len as usize)
162                         .map_err(NetQueuePairError::GuestMemory)?
163                         .as_ptr();
164                     let iovec = libc::iovec {
165                         iov_base: buf as *mut libc::c_void,
166                         iov_len: desc.len as libc::size_t,
167                     };
168                     iovecs.push(iovec);
169                 }
170                 next_desc = desc.next_descriptor();
171             }
172 
173             let len = if !iovecs.is_empty() {
174                 let result = unsafe {
175                     libc::readv(
176                         tap.as_raw_fd() as libc::c_int,
177                         iovecs.as_ptr() as *const libc::iovec,
178                         iovecs.len() as libc::c_int,
179                     )
180                 };
181                 if result < 0 {
182                     let e = std::io::Error::last_os_error();
183                     exhausted_descs = false;
184                     queue.go_to_previous_position();
185 
186                     /* EAGAIN */
187                     if e.kind() == std::io::ErrorKind::WouldBlock {
188                         break;
189                     }
190 
191                     error!("net: rx: failed reading from tap: {}", e);
192                     return Err(NetQueuePairError::ReadTap(e));
193                 }
194 
195                 // Write num_buffers to guest memory. We simply write 1 as we
196                 // never spread the frame over more than one descriptor chain.
197                 mem.write_obj(1u16, num_buffers_addr)
198                     .map_err(NetQueuePairError::GuestMemory)?;
199 
200                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
201                 self.counter_frames += Wrapping(1);
202 
203                 result as u32
204             } else {
205                 0
206             };
207 
208             queue.add_used(mem, head_index, len);
209             queue.update_avail_event(mem);
210 
211             // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
212             // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
213             // chain go-through even if it was over the rate limit, and simply stop
214             // processing oncoming `avail_desc` if any.
215             if let Some(rate_limiter) = rate_limiter {
216                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
217                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
218             }
219         }
220 
221         Ok(exhausted_descs)
222     }
223 }
224 
225 #[derive(Default, Clone)]
226 pub struct NetCounters {
227     pub tx_bytes: Arc<AtomicU64>,
228     pub tx_frames: Arc<AtomicU64>,
229     pub rx_bytes: Arc<AtomicU64>,
230     pub rx_frames: Arc<AtomicU64>,
231 }
232 
233 #[derive(Debug)]
234 pub enum NetQueuePairError {
235     /// No memory configured
236     NoMemoryConfigured,
237     /// Error registering listener
238     RegisterListener(io::Error),
239     /// Error unregistering listener
240     UnregisterListener(io::Error),
241     /// Error writing to the TAP device
242     WriteTap(io::Error),
243     /// Error reading from the TAP device
244     ReadTap(io::Error),
245     /// Error related to guest memory
246     GuestMemory(vm_memory::GuestMemoryError),
247 }
248 
249 pub struct NetQueuePair {
250     pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>,
251     pub tap: Tap,
252     // With epoll each FD must be unique. So in order to filter the
253     // events we need to get a second FD responding to the original
254     // device so that we can send EPOLLOUT and EPOLLIN to separate
255     // events.
256     pub tap_for_write_epoll: Tap,
257     pub rx: RxVirtio,
258     pub tx: TxVirtio,
259     pub epoll_fd: Option<RawFd>,
260     pub rx_tap_listening: bool,
261     pub tx_tap_listening: bool,
262     pub counters: NetCounters,
263     pub tap_rx_event_id: u16,
264     pub tap_tx_event_id: u16,
265     pub rx_desc_avail: bool,
266     pub rx_rate_limiter: Option<RateLimiter>,
267     pub tx_rate_limiter: Option<RateLimiter>,
268 }
269 
270 impl NetQueuePair {
271     pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
272         let mem = self
273             .mem
274             .as_ref()
275             .ok_or(NetQueuePairError::NoMemoryConfigured)
276             .map(|m| m.memory())?;
277 
278         let tx_tap_retry = self.tx.process_desc_chain(
279             &mem,
280             &mut self.tap,
281             &mut queue,
282             &mut self.tx_rate_limiter,
283         )?;
284 
285         // We got told to try again when writing to the tap. Wait for the TAP to be writable
286         if tx_tap_retry && !self.tx_tap_listening {
287             register_listener(
288                 self.epoll_fd.unwrap(),
289                 self.tap_for_write_epoll.as_raw_fd(),
290                 epoll::Events::EPOLLOUT,
291                 u64::from(self.tap_tx_event_id),
292             )
293             .map_err(NetQueuePairError::RegisterListener)?;
294             self.tx_tap_listening = true;
295             info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable.");
296         } else if !tx_tap_retry && self.tx_tap_listening {
297             unregister_listener(
298                 self.epoll_fd.unwrap(),
299                 self.tap_for_write_epoll.as_raw_fd(),
300                 epoll::Events::EPOLLOUT,
301                 u64::from(self.tap_tx_event_id),
302             )
303             .map_err(NetQueuePairError::UnregisterListener)?;
304             self.tx_tap_listening = false;
305             info!("Writing to TAP succeeded. No longer listening for TAP to become writable.");
306         }
307 
308         self.counters
309             .tx_bytes
310             .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
311         self.counters
312             .tx_frames
313             .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
314         self.tx.counter_bytes = Wrapping(0);
315         self.tx.counter_frames = Wrapping(0);
316 
317         Ok(queue.needs_notification(&mem, queue.next_used))
318     }
319 
320     pub fn process_rx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> {
321         let mem = self
322             .mem
323             .as_ref()
324             .ok_or(NetQueuePairError::NoMemoryConfigured)
325             .map(|m| m.memory())?;
326 
327         self.rx_desc_avail = !self.rx.process_desc_chain(
328             &mem,
329             &mut self.tap,
330             &mut queue,
331             &mut self.rx_rate_limiter,
332         )?;
333         let rate_limit_reached = self
334             .rx_rate_limiter
335             .as_ref()
336             .map_or(false, |r| r.is_blocked());
337 
338         // Stop listening on the `RX_TAP_EVENT` when:
339         // 1) there is no available describles, or
340         // 2) the RX rate limit is reached.
341         if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) {
342             unregister_listener(
343                 self.epoll_fd.unwrap(),
344                 self.tap.as_raw_fd(),
345                 epoll::Events::EPOLLIN,
346                 u64::from(self.tap_rx_event_id),
347             )
348             .map_err(NetQueuePairError::UnregisterListener)?;
349             self.rx_tap_listening = false;
350         }
351 
352         self.counters
353             .rx_bytes
354             .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
355         self.counters
356             .rx_frames
357             .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
358         self.rx.counter_bytes = Wrapping(0);
359         self.rx.counter_frames = Wrapping(0);
360 
361         Ok(queue.needs_notification(&mem, queue.next_used))
362     }
363 }
364