xref: /cloud-hypervisor/net_util/src/queue_pair.rs (revision 7d7bfb2034001d4cb15df2ddc56d2d350c8da30f)
1 // Copyright (c) 2020 Intel Corporation. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
4 
5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap};
6 use crate::GuestMemoryMmap;
7 use rate_limiter::{RateLimiter, TokenType};
8 use std::io;
9 use std::num::Wrapping;
10 use std::os::unix::io::{AsRawFd, RawFd};
11 use std::sync::atomic::{AtomicU64, Ordering};
12 use std::sync::Arc;
13 use virtio_queue::Queue;
14 use vm_memory::{Bytes, GuestMemory, GuestMemoryAtomic};
15 use vm_virtio::{AccessPlatform, Translatable};
16 
17 #[derive(Clone)]
18 pub struct TxVirtio {
19     pub counter_bytes: Wrapping<u64>,
20     pub counter_frames: Wrapping<u64>,
21 }
22 
23 impl Default for TxVirtio {
24     fn default() -> Self {
25         Self::new()
26     }
27 }
28 
29 impl TxVirtio {
30     pub fn new() -> Self {
31         TxVirtio {
32             counter_bytes: Wrapping(0),
33             counter_frames: Wrapping(0),
34         }
35     }
36 
37     pub fn process_desc_chain(
38         &mut self,
39         tap: &mut Tap,
40         queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
41         rate_limiter: &mut Option<RateLimiter>,
42         access_platform: Option<&Arc<dyn AccessPlatform>>,
43     ) -> Result<bool, NetQueuePairError> {
44         let mut retry_write = false;
45         let mut rate_limit_reached = false;
46 
47         loop {
48             let used_desc_head: (u16, u32);
49             let mut avail_iter = queue
50                 .iter()
51                 .map_err(NetQueuePairError::QueueIteratorFailed)?;
52 
53             if let Some(mut desc_chain) = avail_iter.next() {
54                 if rate_limit_reached {
55                     avail_iter.go_to_previous_position();
56                     break;
57                 }
58 
59                 let mut next_desc = desc_chain.next();
60 
61                 let mut iovecs = Vec::new();
62                 while let Some(desc) = next_desc {
63                     let desc_addr = desc
64                         .addr()
65                         .translate_gva(access_platform, desc.len() as usize);
66                     if !desc.is_write_only() && desc.len() > 0 {
67                         let buf = desc_chain
68                             .memory()
69                             .get_slice(desc_addr, desc.len() as usize)
70                             .map_err(NetQueuePairError::GuestMemory)?
71                             .as_ptr();
72                         let iovec = libc::iovec {
73                             iov_base: buf as *mut libc::c_void,
74                             iov_len: desc.len() as libc::size_t,
75                         };
76                         iovecs.push(iovec);
77                     } else {
78                         error!(
79                             "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
80                             desc_addr.0,
81                             desc.len(),
82                             desc.is_write_only()
83                         );
84                         return Err(NetQueuePairError::DescriptorChainInvalid);
85                     }
86                     next_desc = desc_chain.next();
87                 }
88 
89                 let len = if !iovecs.is_empty() {
90                     let result = unsafe {
91                         libc::writev(
92                             tap.as_raw_fd() as libc::c_int,
93                             iovecs.as_ptr() as *const libc::iovec,
94                             iovecs.len() as libc::c_int,
95                         )
96                     };
97 
98                     if result < 0 {
99                         let e = std::io::Error::last_os_error();
100 
101                         /* EAGAIN */
102                         if e.kind() == std::io::ErrorKind::WouldBlock {
103                             avail_iter.go_to_previous_position();
104                             retry_write = true;
105                             break;
106                         }
107                         error!("net: tx: failed writing to tap: {}", e);
108                         return Err(NetQueuePairError::WriteTap(e));
109                     }
110 
111                     self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
112                     self.counter_frames += Wrapping(1);
113 
114                     result as u32
115                 } else {
116                     0
117                 };
118 
119                 used_desc_head = (desc_chain.head_index(), len);
120 
121                 // For the sake of simplicity (similar to the RX rate limiting), we always
122                 // let the 'last' descriptor chain go-through even if it was over the rate
123                 // limit, and simply stop processing oncoming `avail_desc` if any.
124                 if let Some(rate_limiter) = rate_limiter {
125                     rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
126                         || !rate_limiter.consume(len as u64, TokenType::Bytes);
127                 }
128             } else {
129                 break;
130             }
131 
132             queue
133                 .add_used(used_desc_head.0, used_desc_head.1)
134                 .map_err(NetQueuePairError::QueueAddUsed)?;
135             if !queue
136                 .enable_notification()
137                 .map_err(NetQueuePairError::QueueEnableNotification)?
138             {
139                 break;
140             }
141         }
142 
143         Ok(retry_write)
144     }
145 }
146 
147 #[derive(Clone)]
148 pub struct RxVirtio {
149     pub counter_bytes: Wrapping<u64>,
150     pub counter_frames: Wrapping<u64>,
151 }
152 
153 impl Default for RxVirtio {
154     fn default() -> Self {
155         Self::new()
156     }
157 }
158 
159 impl RxVirtio {
160     pub fn new() -> Self {
161         RxVirtio {
162             counter_bytes: Wrapping(0),
163             counter_frames: Wrapping(0),
164         }
165     }
166 
167     pub fn process_desc_chain(
168         &mut self,
169         tap: &mut Tap,
170         queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
171         rate_limiter: &mut Option<RateLimiter>,
172         access_platform: Option<&Arc<dyn AccessPlatform>>,
173     ) -> Result<bool, NetQueuePairError> {
174         let mut exhausted_descs = true;
175         let mut rate_limit_reached = false;
176 
177         loop {
178             let used_desc_head: (u16, u32);
179             let mut avail_iter = queue
180                 .iter()
181                 .map_err(NetQueuePairError::QueueIteratorFailed)?;
182 
183             if let Some(mut desc_chain) = avail_iter.next() {
184                 if rate_limit_reached {
185                     exhausted_descs = false;
186                     avail_iter.go_to_previous_position();
187                     break;
188                 }
189 
190                 let desc = desc_chain
191                     .next()
192                     .ok_or(NetQueuePairError::DescriptorChainTooShort)?;
193 
194                 let num_buffers_addr = desc_chain
195                     .memory()
196                     .checked_offset(
197                         desc.addr()
198                             .translate_gva(access_platform, desc.len() as usize),
199                         10,
200                     )
201                     .unwrap();
202                 let mut next_desc = Some(desc);
203 
204                 let mut iovecs = Vec::new();
205                 while let Some(desc) = next_desc {
206                     let desc_addr = desc
207                         .addr()
208                         .translate_gva(access_platform, desc.len() as usize);
209                     if desc.is_write_only() && desc.len() > 0 {
210                         let buf = desc_chain
211                             .memory()
212                             .get_slice(desc_addr, desc.len() as usize)
213                             .map_err(NetQueuePairError::GuestMemory)?
214                             .as_ptr();
215                         let iovec = libc::iovec {
216                             iov_base: buf as *mut libc::c_void,
217                             iov_len: desc.len() as libc::size_t,
218                         };
219                         iovecs.push(iovec);
220                     } else {
221                         error!(
222                             "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
223                             desc_addr.0,
224                             desc.len(),
225                             desc.is_write_only()
226                         );
227                         return Err(NetQueuePairError::DescriptorChainInvalid);
228                     }
229                     next_desc = desc_chain.next();
230                 }
231 
232                 let len = if !iovecs.is_empty() {
233                     let result = unsafe {
234                         libc::readv(
235                             tap.as_raw_fd() as libc::c_int,
236                             iovecs.as_ptr() as *const libc::iovec,
237                             iovecs.len() as libc::c_int,
238                         )
239                     };
240                     if result < 0 {
241                         let e = std::io::Error::last_os_error();
242                         exhausted_descs = false;
243                         avail_iter.go_to_previous_position();
244 
245                         /* EAGAIN */
246                         if e.kind() == std::io::ErrorKind::WouldBlock {
247                             break;
248                         }
249 
250                         error!("net: rx: failed reading from tap: {}", e);
251                         return Err(NetQueuePairError::ReadTap(e));
252                     }
253 
254                     // Write num_buffers to guest memory. We simply write 1 as we
255                     // never spread the frame over more than one descriptor chain.
256                     desc_chain
257                         .memory()
258                         .write_obj(1u16, num_buffers_addr)
259                         .map_err(NetQueuePairError::GuestMemory)?;
260 
261                     self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
262                     self.counter_frames += Wrapping(1);
263 
264                     result as u32
265                 } else {
266                     0
267                 };
268 
269                 used_desc_head = (desc_chain.head_index(), len);
270 
271                 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
272                 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
273                 // chain go-through even if it was over the rate limit, and simply stop
274                 // processing oncoming `avail_desc` if any.
275                 if let Some(rate_limiter) = rate_limiter {
276                     rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
277                         || !rate_limiter.consume(len as u64, TokenType::Bytes);
278                 }
279             } else {
280                 break;
281             }
282 
283             queue
284                 .add_used(used_desc_head.0, used_desc_head.1)
285                 .map_err(NetQueuePairError::QueueAddUsed)?;
286             if !queue
287                 .enable_notification()
288                 .map_err(NetQueuePairError::QueueEnableNotification)?
289             {
290                 break;
291             }
292         }
293 
294         Ok(exhausted_descs)
295     }
296 }
297 
298 #[derive(Default, Clone)]
299 pub struct NetCounters {
300     pub tx_bytes: Arc<AtomicU64>,
301     pub tx_frames: Arc<AtomicU64>,
302     pub rx_bytes: Arc<AtomicU64>,
303     pub rx_frames: Arc<AtomicU64>,
304 }
305 
306 #[derive(Debug)]
307 pub enum NetQueuePairError {
308     /// No memory configured
309     NoMemoryConfigured,
310     /// Error registering listener
311     RegisterListener(io::Error),
312     /// Error unregistering listener
313     UnregisterListener(io::Error),
314     /// Error writing to the TAP device
315     WriteTap(io::Error),
316     /// Error reading from the TAP device
317     ReadTap(io::Error),
318     /// Error related to guest memory
319     GuestMemory(vm_memory::GuestMemoryError),
320     /// Returned an error while iterating through the queue
321     QueueIteratorFailed(virtio_queue::Error),
322     /// Descriptor chain is too short
323     DescriptorChainTooShort,
324     /// Descriptor chain does not contain valid descriptors
325     DescriptorChainInvalid,
326     /// Failed to determine if queue needed notification
327     QueueNeedsNotification(virtio_queue::Error),
328     /// Failed to enable notification on the queue
329     QueueEnableNotification(virtio_queue::Error),
330     /// Failed to add used index to the queue
331     QueueAddUsed(virtio_queue::Error),
332 }
333 
334 pub struct NetQueuePair {
335     pub tap: Tap,
336     // With epoll each FD must be unique. So in order to filter the
337     // events we need to get a second FD responding to the original
338     // device so that we can send EPOLLOUT and EPOLLIN to separate
339     // events.
340     pub tap_for_write_epoll: Tap,
341     pub rx: RxVirtio,
342     pub tx: TxVirtio,
343     pub epoll_fd: Option<RawFd>,
344     pub rx_tap_listening: bool,
345     pub tx_tap_listening: bool,
346     pub counters: NetCounters,
347     pub tap_rx_event_id: u16,
348     pub tap_tx_event_id: u16,
349     pub rx_desc_avail: bool,
350     pub rx_rate_limiter: Option<RateLimiter>,
351     pub tx_rate_limiter: Option<RateLimiter>,
352     pub access_platform: Option<Arc<dyn AccessPlatform>>,
353 }
354 
355 impl NetQueuePair {
356     pub fn process_tx(
357         &mut self,
358         queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
359     ) -> Result<bool, NetQueuePairError> {
360         let tx_tap_retry = self.tx.process_desc_chain(
361             &mut self.tap,
362             queue,
363             &mut self.tx_rate_limiter,
364             self.access_platform.as_ref(),
365         )?;
366 
367         // We got told to try again when writing to the tap. Wait for the TAP to be writable
368         if tx_tap_retry && !self.tx_tap_listening {
369             register_listener(
370                 self.epoll_fd.unwrap(),
371                 self.tap_for_write_epoll.as_raw_fd(),
372                 epoll::Events::EPOLLOUT,
373                 u64::from(self.tap_tx_event_id),
374             )
375             .map_err(NetQueuePairError::RegisterListener)?;
376             self.tx_tap_listening = true;
377             info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable.");
378         } else if !tx_tap_retry && self.tx_tap_listening {
379             unregister_listener(
380                 self.epoll_fd.unwrap(),
381                 self.tap_for_write_epoll.as_raw_fd(),
382                 epoll::Events::EPOLLOUT,
383                 u64::from(self.tap_tx_event_id),
384             )
385             .map_err(NetQueuePairError::UnregisterListener)?;
386             self.tx_tap_listening = false;
387             info!("Writing to TAP succeeded. No longer listening for TAP to become writable.");
388         }
389 
390         self.counters
391             .tx_bytes
392             .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
393         self.counters
394             .tx_frames
395             .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
396         self.tx.counter_bytes = Wrapping(0);
397         self.tx.counter_frames = Wrapping(0);
398 
399         queue
400             .needs_notification()
401             .map_err(NetQueuePairError::QueueNeedsNotification)
402     }
403 
404     pub fn process_rx(
405         &mut self,
406         queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>,
407     ) -> Result<bool, NetQueuePairError> {
408         self.rx_desc_avail = !self.rx.process_desc_chain(
409             &mut self.tap,
410             queue,
411             &mut self.rx_rate_limiter,
412             self.access_platform.as_ref(),
413         )?;
414         let rate_limit_reached = self
415             .rx_rate_limiter
416             .as_ref()
417             .map_or(false, |r| r.is_blocked());
418 
419         // Stop listening on the `RX_TAP_EVENT` when:
420         // 1) there is no available describles, or
421         // 2) the RX rate limit is reached.
422         if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) {
423             unregister_listener(
424                 self.epoll_fd.unwrap(),
425                 self.tap.as_raw_fd(),
426                 epoll::Events::EPOLLIN,
427                 u64::from(self.tap_rx_event_id),
428             )
429             .map_err(NetQueuePairError::UnregisterListener)?;
430             self.rx_tap_listening = false;
431         }
432 
433         self.counters
434             .rx_bytes
435             .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
436         self.counters
437             .rx_frames
438             .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
439         self.rx.counter_bytes = Wrapping(0);
440         self.rx.counter_frames = Wrapping(0);
441 
442         queue
443             .needs_notification()
444             .map_err(NetQueuePairError::QueueNeedsNotification)
445     }
446 }
447