xref: /cloud-hypervisor/net_util/src/queue_pair.rs (revision 2571e59438597f53aa4993cd70d6462fe1364ba7)
1 // Copyright (c) 2020 Intel Corporation. All rights reserved.
2 //
3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause
4 
5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap};
6 use crate::GuestMemoryMmap;
7 use rate_limiter::{RateLimiter, TokenType};
8 use std::io;
9 use std::num::Wrapping;
10 use std::os::unix::io::{AsRawFd, RawFd};
11 use std::sync::atomic::{AtomicU64, Ordering};
12 use std::sync::Arc;
13 use thiserror::Error;
14 use virtio_queue::{Queue, QueueOwnedT, QueueT};
15 use vm_memory::{Bytes, GuestMemory};
16 use vm_virtio::{AccessPlatform, Translatable};
17 
18 #[derive(Clone)]
19 pub struct TxVirtio {
20     pub counter_bytes: Wrapping<u64>,
21     pub counter_frames: Wrapping<u64>,
22 }
23 
24 impl Default for TxVirtio {
25     fn default() -> Self {
26         Self::new()
27     }
28 }
29 
30 impl TxVirtio {
31     pub fn new() -> Self {
32         TxVirtio {
33             counter_bytes: Wrapping(0),
34             counter_frames: Wrapping(0),
35         }
36     }
37 
38     pub fn process_desc_chain(
39         &mut self,
40         mem: &GuestMemoryMmap,
41         tap: &mut Tap,
42         queue: &mut Queue,
43         rate_limiter: &mut Option<RateLimiter>,
44         access_platform: Option<&Arc<dyn AccessPlatform>>,
45     ) -> Result<bool, NetQueuePairError> {
46         let mut retry_write = false;
47         let mut rate_limit_reached = false;
48 
49         while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
50             if rate_limit_reached {
51                 queue.go_to_previous_position();
52                 break;
53             }
54 
55             let mut next_desc = desc_chain.next();
56 
57             let mut iovecs = Vec::new();
58             while let Some(desc) = next_desc {
59                 let desc_addr = desc
60                     .addr()
61                     .translate_gva(access_platform, desc.len() as usize);
62                 if !desc.is_write_only() && desc.len() > 0 {
63                     let buf = desc_chain
64                         .memory()
65                         .get_slice(desc_addr, desc.len() as usize)
66                         .map_err(NetQueuePairError::GuestMemory)?
67                         .as_ptr();
68                     let iovec = libc::iovec {
69                         iov_base: buf as *mut libc::c_void,
70                         iov_len: desc.len() as libc::size_t,
71                     };
72                     iovecs.push(iovec);
73                 } else {
74                     error!(
75                         "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
76                         desc_addr.0,
77                         desc.len(),
78                         desc.is_write_only()
79                     );
80                     return Err(NetQueuePairError::DescriptorChainInvalid);
81                 }
82                 next_desc = desc_chain.next();
83             }
84 
85             let len = if !iovecs.is_empty() {
86                 // SAFETY: FFI call with correct arguments
87                 let result = unsafe {
88                     libc::writev(
89                         tap.as_raw_fd() as libc::c_int,
90                         iovecs.as_ptr(),
91                         iovecs.len() as libc::c_int,
92                     )
93                 };
94 
95                 if result < 0 {
96                     let e = std::io::Error::last_os_error();
97 
98                     /* EAGAIN */
99                     if e.kind() == std::io::ErrorKind::WouldBlock {
100                         queue.go_to_previous_position();
101                         retry_write = true;
102                         break;
103                     }
104                     error!("net: tx: failed writing to tap: {}", e);
105                     return Err(NetQueuePairError::WriteTap(e));
106                 }
107 
108                 if (result as usize) < vnet_hdr_len() {
109                     return Err(NetQueuePairError::InvalidVirtioNetHeader);
110                 }
111 
112                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
113                 self.counter_frames += Wrapping(1);
114 
115                 result as u32
116             } else {
117                 0
118             };
119 
120             // For the sake of simplicity (similar to the RX rate limiting), we always
121             // let the 'last' descriptor chain go-through even if it was over the rate
122             // limit, and simply stop processing oncoming `avail_desc` if any.
123             if let Some(rate_limiter) = rate_limiter {
124                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
125                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
126             }
127 
128             queue
129                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
130                 .map_err(NetQueuePairError::QueueAddUsed)?;
131 
132             if !queue
133                 .enable_notification(mem)
134                 .map_err(NetQueuePairError::QueueEnableNotification)?
135             {
136                 break;
137             }
138         }
139 
140         Ok(retry_write)
141     }
142 }
143 
144 #[derive(Clone)]
145 pub struct RxVirtio {
146     pub counter_bytes: Wrapping<u64>,
147     pub counter_frames: Wrapping<u64>,
148 }
149 
150 impl Default for RxVirtio {
151     fn default() -> Self {
152         Self::new()
153     }
154 }
155 
156 impl RxVirtio {
157     pub fn new() -> Self {
158         RxVirtio {
159             counter_bytes: Wrapping(0),
160             counter_frames: Wrapping(0),
161         }
162     }
163 
164     pub fn process_desc_chain(
165         &mut self,
166         mem: &GuestMemoryMmap,
167         tap: &mut Tap,
168         queue: &mut Queue,
169         rate_limiter: &mut Option<RateLimiter>,
170         access_platform: Option<&Arc<dyn AccessPlatform>>,
171     ) -> Result<bool, NetQueuePairError> {
172         let mut exhausted_descs = true;
173         let mut rate_limit_reached = false;
174 
175         while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) {
176             if rate_limit_reached {
177                 exhausted_descs = false;
178                 queue.go_to_previous_position();
179                 break;
180             }
181 
182             let desc = desc_chain
183                 .next()
184                 .ok_or(NetQueuePairError::DescriptorChainTooShort)?;
185 
186             let num_buffers_addr = desc_chain
187                 .memory()
188                 .checked_offset(
189                     desc.addr()
190                         .translate_gva(access_platform, desc.len() as usize),
191                     10,
192                 )
193                 .ok_or(NetQueuePairError::DescriptorInvalidHeader)?;
194             let mut next_desc = Some(desc);
195 
196             let mut iovecs = Vec::new();
197             while let Some(desc) = next_desc {
198                 let desc_addr = desc
199                     .addr()
200                     .translate_gva(access_platform, desc.len() as usize);
201                 if desc.is_write_only() && desc.len() > 0 {
202                     let buf = desc_chain
203                         .memory()
204                         .get_slice(desc_addr, desc.len() as usize)
205                         .map_err(NetQueuePairError::GuestMemory)?
206                         .as_ptr();
207                     let iovec = libc::iovec {
208                         iov_base: buf as *mut libc::c_void,
209                         iov_len: desc.len() as libc::size_t,
210                     };
211                     iovecs.push(iovec);
212                 } else {
213                     error!(
214                         "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}",
215                         desc_addr.0,
216                         desc.len(),
217                         desc.is_write_only()
218                     );
219                     return Err(NetQueuePairError::DescriptorChainInvalid);
220                 }
221                 next_desc = desc_chain.next();
222             }
223 
224             let len = if !iovecs.is_empty() {
225                 // SAFETY: FFI call with correct arguments
226                 let result = unsafe {
227                     libc::readv(
228                         tap.as_raw_fd() as libc::c_int,
229                         iovecs.as_ptr(),
230                         iovecs.len() as libc::c_int,
231                     )
232                 };
233                 if result < 0 {
234                     let e = std::io::Error::last_os_error();
235                     exhausted_descs = false;
236                     queue.go_to_previous_position();
237 
238                     /* EAGAIN */
239                     if e.kind() == std::io::ErrorKind::WouldBlock {
240                         break;
241                     }
242 
243                     error!("net: rx: failed reading from tap: {}", e);
244                     return Err(NetQueuePairError::ReadTap(e));
245                 }
246 
247                 if (result as usize) < vnet_hdr_len() {
248                     return Err(NetQueuePairError::InvalidVirtioNetHeader);
249                 }
250 
251                 // Write num_buffers to guest memory. We simply write 1 as we
252                 // never spread the frame over more than one descriptor chain.
253                 desc_chain
254                     .memory()
255                     .write_obj(1u16, num_buffers_addr)
256                     .map_err(NetQueuePairError::GuestMemory)?;
257 
258                 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64);
259                 self.counter_frames += Wrapping(1);
260 
261                 result as u32
262             } else {
263                 0
264             };
265 
266             // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and
267             // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor
268             // chain go-through even if it was over the rate limit, and simply stop
269             // processing oncoming `avail_desc` if any.
270             if let Some(rate_limiter) = rate_limiter {
271                 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops)
272                     || !rate_limiter.consume(len as u64, TokenType::Bytes);
273             }
274 
275             queue
276                 .add_used(desc_chain.memory(), desc_chain.head_index(), len)
277                 .map_err(NetQueuePairError::QueueAddUsed)?;
278 
279             if !queue
280                 .enable_notification(mem)
281                 .map_err(NetQueuePairError::QueueEnableNotification)?
282             {
283                 break;
284             }
285         }
286 
287         Ok(exhausted_descs)
288     }
289 }
290 
291 #[derive(Default, Clone)]
292 pub struct NetCounters {
293     pub tx_bytes: Arc<AtomicU64>,
294     pub tx_frames: Arc<AtomicU64>,
295     pub rx_bytes: Arc<AtomicU64>,
296     pub rx_frames: Arc<AtomicU64>,
297 }
298 
299 #[derive(Error, Debug)]
300 pub enum NetQueuePairError {
301     #[error("No memory configured")]
302     NoMemoryConfigured,
303     #[error("Error registering listener: {0}")]
304     RegisterListener(io::Error),
305     #[error("Error unregistering listener: {0}")]
306     UnregisterListener(io::Error),
307     #[error("Error writing to the TAP device: {0}")]
308     WriteTap(io::Error),
309     #[error("Error reading from the TAP device: {0}")]
310     ReadTap(io::Error),
311     #[error("Error related to guest memory: {0}")]
312     GuestMemory(vm_memory::GuestMemoryError),
313     #[error("Returned an error while iterating through the queue: {0}")]
314     QueueIteratorFailed(virtio_queue::Error),
315     #[error("Descriptor chain is too short")]
316     DescriptorChainTooShort,
317     #[error("Descriptor chain does not contain valid descriptors")]
318     DescriptorChainInvalid,
319     #[error("Failed to determine if queue needed notification: {0}")]
320     QueueNeedsNotification(virtio_queue::Error),
321     #[error("Failed to enable notification on the queue: {0}")]
322     QueueEnableNotification(virtio_queue::Error),
323     #[error("Failed to add used index to the queue: {0}")]
324     QueueAddUsed(virtio_queue::Error),
325     #[error("Descriptor with invalid virtio-net header")]
326     DescriptorInvalidHeader,
327     #[error("Invalid virtio-net header")]
328     InvalidVirtioNetHeader,
329 }
330 
331 pub struct NetQueuePair {
332     pub tap: Tap,
333     // With epoll each FD must be unique. So in order to filter the
334     // events we need to get a second FD responding to the original
335     // device so that we can send EPOLLOUT and EPOLLIN to separate
336     // events.
337     pub tap_for_write_epoll: Tap,
338     pub rx: RxVirtio,
339     pub tx: TxVirtio,
340     pub epoll_fd: Option<RawFd>,
341     pub rx_tap_listening: bool,
342     pub tx_tap_listening: bool,
343     pub counters: NetCounters,
344     pub tap_rx_event_id: u16,
345     pub tap_tx_event_id: u16,
346     pub rx_desc_avail: bool,
347     pub rx_rate_limiter: Option<RateLimiter>,
348     pub tx_rate_limiter: Option<RateLimiter>,
349     pub access_platform: Option<Arc<dyn AccessPlatform>>,
350 }
351 
352 impl NetQueuePair {
353     pub fn process_tx(
354         &mut self,
355         mem: &GuestMemoryMmap,
356         queue: &mut Queue,
357     ) -> Result<bool, NetQueuePairError> {
358         let tx_tap_retry = self.tx.process_desc_chain(
359             mem,
360             &mut self.tap,
361             queue,
362             &mut self.tx_rate_limiter,
363             self.access_platform.as_ref(),
364         )?;
365 
366         // We got told to try again when writing to the tap. Wait for the TAP to be writable
367         if tx_tap_retry && !self.tx_tap_listening {
368             register_listener(
369                 self.epoll_fd.unwrap(),
370                 self.tap_for_write_epoll.as_raw_fd(),
371                 epoll::Events::EPOLLOUT,
372                 u64::from(self.tap_tx_event_id),
373             )
374             .map_err(NetQueuePairError::RegisterListener)?;
375             self.tx_tap_listening = true;
376             info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable.");
377         } else if !tx_tap_retry && self.tx_tap_listening {
378             unregister_listener(
379                 self.epoll_fd.unwrap(),
380                 self.tap_for_write_epoll.as_raw_fd(),
381                 epoll::Events::EPOLLOUT,
382                 u64::from(self.tap_tx_event_id),
383             )
384             .map_err(NetQueuePairError::UnregisterListener)?;
385             self.tx_tap_listening = false;
386             info!("Writing to TAP succeeded. No longer listening for TAP to become writable.");
387         }
388 
389         self.counters
390             .tx_bytes
391             .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel);
392         self.counters
393             .tx_frames
394             .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel);
395         self.tx.counter_bytes = Wrapping(0);
396         self.tx.counter_frames = Wrapping(0);
397 
398         queue
399             .needs_notification(mem)
400             .map_err(NetQueuePairError::QueueNeedsNotification)
401     }
402 
403     pub fn process_rx(
404         &mut self,
405         mem: &GuestMemoryMmap,
406         queue: &mut Queue,
407     ) -> Result<bool, NetQueuePairError> {
408         self.rx_desc_avail = !self.rx.process_desc_chain(
409             mem,
410             &mut self.tap,
411             queue,
412             &mut self.rx_rate_limiter,
413             self.access_platform.as_ref(),
414         )?;
415         let rate_limit_reached = self
416             .rx_rate_limiter
417             .as_ref()
418             .map_or(false, |r| r.is_blocked());
419 
420         // Stop listening on the `RX_TAP_EVENT` when:
421         // 1) there is no available describles, or
422         // 2) the RX rate limit is reached.
423         if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) {
424             unregister_listener(
425                 self.epoll_fd.unwrap(),
426                 self.tap.as_raw_fd(),
427                 epoll::Events::EPOLLIN,
428                 u64::from(self.tap_rx_event_id),
429             )
430             .map_err(NetQueuePairError::UnregisterListener)?;
431             self.rx_tap_listening = false;
432         }
433 
434         self.counters
435             .rx_bytes
436             .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel);
437         self.counters
438             .rx_frames
439             .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel);
440         self.rx.counter_bytes = Wrapping(0);
441         self.rx.counter_frames = Wrapping(0);
442 
443         queue
444             .needs_notification(mem)
445             .map_err(NetQueuePairError::QueueNeedsNotification)
446     }
447 }
448