1 // Copyright (c) 2020 Intel Corporation. All rights reserved. 2 // 3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 4 5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; 6 use crate::GuestMemoryMmap; 7 use rate_limiter::{RateLimiter, TokenType}; 8 use std::io; 9 use std::num::Wrapping; 10 use std::os::unix::io::{AsRawFd, RawFd}; 11 use std::sync::atomic::{AtomicU64, Ordering}; 12 use std::sync::Arc; 13 use vm_memory::{Bytes, GuestAddressSpace, GuestMemory, GuestMemoryAtomic}; 14 use vm_virtio::Queue; 15 16 #[derive(Clone)] 17 pub struct TxVirtio { 18 pub counter_bytes: Wrapping<u64>, 19 pub counter_frames: Wrapping<u64>, 20 } 21 22 impl Default for TxVirtio { 23 fn default() -> Self { 24 Self::new() 25 } 26 } 27 28 impl TxVirtio { 29 pub fn new() -> Self { 30 TxVirtio { 31 counter_bytes: Wrapping(0), 32 counter_frames: Wrapping(0), 33 } 34 } 35 36 pub fn process_desc_chain( 37 &mut self, 38 mem: &GuestMemoryMmap, 39 tap: &mut Tap, 40 queue: &mut Queue, 41 rate_limiter: &mut Option<RateLimiter>, 42 ) -> Result<bool, NetQueuePairError> { 43 let mut retry_write = false; 44 let mut rate_limit_reached = false; 45 while let Some(avail_desc) = queue.iter(mem).next() { 46 if rate_limit_reached { 47 queue.go_to_previous_position(); 48 break; 49 } 50 51 let head_index = avail_desc.index; 52 let mut next_desc = Some(avail_desc); 53 54 let mut iovecs = Vec::new(); 55 while let Some(desc) = next_desc { 56 if !desc.is_write_only() && desc.len > 0 { 57 let buf = mem 58 .get_slice(desc.addr, desc.len as usize) 59 .map_err(NetQueuePairError::GuestMemory)? 60 .as_ptr(); 61 let iovec = libc::iovec { 62 iov_base: buf as *mut libc::c_void, 63 iov_len: desc.len as libc::size_t, 64 }; 65 iovecs.push(iovec); 66 } 67 next_desc = desc.next_descriptor(); 68 } 69 70 let len = if !iovecs.is_empty() { 71 let result = unsafe { 72 libc::writev( 73 tap.as_raw_fd() as libc::c_int, 74 iovecs.as_ptr() as *const libc::iovec, 75 iovecs.len() as libc::c_int, 76 ) 77 }; 78 79 if result < 0 { 80 let e = std::io::Error::last_os_error(); 81 82 /* EAGAIN */ 83 if e.kind() == std::io::ErrorKind::WouldBlock { 84 queue.go_to_previous_position(); 85 retry_write = true; 86 break; 87 } 88 error!("net: tx: failed writing to tap: {}", e); 89 return Err(NetQueuePairError::WriteTap(e)); 90 } 91 92 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 93 self.counter_frames += Wrapping(1); 94 95 result as u32 96 } else { 97 0 98 }; 99 100 queue.add_used(mem, head_index, 0); 101 queue.update_avail_event(mem); 102 103 // For the sake of simplicity (similar to the RX rate limiting), we always 104 // let the 'last' descriptor chain go-through even if it was over the rate 105 // limit, and simply stop processing oncoming `avail_desc` if any. 106 if let Some(rate_limiter) = rate_limiter { 107 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 108 || !rate_limiter.consume(len as u64, TokenType::Bytes); 109 } 110 } 111 112 Ok(retry_write) 113 } 114 } 115 116 #[derive(Clone)] 117 pub struct RxVirtio { 118 pub counter_bytes: Wrapping<u64>, 119 pub counter_frames: Wrapping<u64>, 120 } 121 122 impl Default for RxVirtio { 123 fn default() -> Self { 124 Self::new() 125 } 126 } 127 128 impl RxVirtio { 129 pub fn new() -> Self { 130 RxVirtio { 131 counter_bytes: Wrapping(0), 132 counter_frames: Wrapping(0), 133 } 134 } 135 136 pub fn process_desc_chain( 137 &mut self, 138 mem: &GuestMemoryMmap, 139 tap: &mut Tap, 140 queue: &mut Queue, 141 rate_limiter: &mut Option<RateLimiter>, 142 ) -> Result<bool, NetQueuePairError> { 143 let mut exhausted_descs = true; 144 let mut rate_limit_reached = false; 145 146 while let Some(avail_desc) = queue.iter(mem).next() { 147 if rate_limit_reached { 148 exhausted_descs = false; 149 queue.go_to_previous_position(); 150 break; 151 } 152 153 let head_index = avail_desc.index; 154 let num_buffers_addr = mem.checked_offset(avail_desc.addr, 10).unwrap(); 155 let mut next_desc = Some(avail_desc); 156 157 let mut iovecs = Vec::new(); 158 while let Some(desc) = next_desc { 159 if desc.is_write_only() && desc.len > 0 { 160 let buf = mem 161 .get_slice(desc.addr, desc.len as usize) 162 .map_err(NetQueuePairError::GuestMemory)? 163 .as_ptr(); 164 let iovec = libc::iovec { 165 iov_base: buf as *mut libc::c_void, 166 iov_len: desc.len as libc::size_t, 167 }; 168 iovecs.push(iovec); 169 } 170 next_desc = desc.next_descriptor(); 171 } 172 173 let len = if !iovecs.is_empty() { 174 let result = unsafe { 175 libc::readv( 176 tap.as_raw_fd() as libc::c_int, 177 iovecs.as_ptr() as *const libc::iovec, 178 iovecs.len() as libc::c_int, 179 ) 180 }; 181 if result < 0 { 182 let e = std::io::Error::last_os_error(); 183 exhausted_descs = false; 184 queue.go_to_previous_position(); 185 186 /* EAGAIN */ 187 if e.kind() == std::io::ErrorKind::WouldBlock { 188 break; 189 } 190 191 error!("net: rx: failed reading from tap: {}", e); 192 return Err(NetQueuePairError::ReadTap(e)); 193 } 194 195 // Write num_buffers to guest memory. We simply write 1 as we 196 // never spread the frame over more than one descriptor chain. 197 mem.write_obj(1u16, num_buffers_addr) 198 .map_err(NetQueuePairError::GuestMemory)?; 199 200 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 201 self.counter_frames += Wrapping(1); 202 203 result as u32 204 } else { 205 0 206 }; 207 208 queue.add_used(mem, head_index, len); 209 queue.update_avail_event(mem); 210 211 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and 212 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor 213 // chain go-through even if it was over the rate limit, and simply stop 214 // processing oncoming `avail_desc` if any. 215 if let Some(rate_limiter) = rate_limiter { 216 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 217 || !rate_limiter.consume(len as u64, TokenType::Bytes); 218 } 219 } 220 221 Ok(exhausted_descs) 222 } 223 } 224 225 #[derive(Default, Clone)] 226 pub struct NetCounters { 227 pub tx_bytes: Arc<AtomicU64>, 228 pub tx_frames: Arc<AtomicU64>, 229 pub rx_bytes: Arc<AtomicU64>, 230 pub rx_frames: Arc<AtomicU64>, 231 } 232 233 #[derive(Debug)] 234 pub enum NetQueuePairError { 235 /// No memory configured 236 NoMemoryConfigured, 237 /// Error registering listener 238 RegisterListener(io::Error), 239 /// Error unregistering listener 240 UnregisterListener(io::Error), 241 /// Error writing to the TAP device 242 WriteTap(io::Error), 243 /// Error reading from the TAP device 244 ReadTap(io::Error), 245 /// Error related to guest memory 246 GuestMemory(vm_memory::GuestMemoryError), 247 } 248 249 pub struct NetQueuePair { 250 pub mem: Option<GuestMemoryAtomic<GuestMemoryMmap>>, 251 pub tap: Tap, 252 // With epoll each FD must be unique. So in order to filter the 253 // events we need to get a second FD responding to the original 254 // device so that we can send EPOLLOUT and EPOLLIN to separate 255 // events. 256 pub tap_for_write_epoll: Tap, 257 pub rx: RxVirtio, 258 pub tx: TxVirtio, 259 pub epoll_fd: Option<RawFd>, 260 pub rx_tap_listening: bool, 261 pub tx_tap_listening: bool, 262 pub counters: NetCounters, 263 pub tap_rx_event_id: u16, 264 pub tap_tx_event_id: u16, 265 pub rx_desc_avail: bool, 266 pub rx_rate_limiter: Option<RateLimiter>, 267 pub tx_rate_limiter: Option<RateLimiter>, 268 } 269 270 impl NetQueuePair { 271 pub fn process_tx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> { 272 let mem = self 273 .mem 274 .as_ref() 275 .ok_or(NetQueuePairError::NoMemoryConfigured) 276 .map(|m| m.memory())?; 277 278 let tx_tap_retry = self.tx.process_desc_chain( 279 &mem, 280 &mut self.tap, 281 &mut queue, 282 &mut self.tx_rate_limiter, 283 )?; 284 285 // We got told to try again when writing to the tap. Wait for the TAP to be writable 286 if tx_tap_retry && !self.tx_tap_listening { 287 register_listener( 288 self.epoll_fd.unwrap(), 289 self.tap_for_write_epoll.as_raw_fd(), 290 epoll::Events::EPOLLOUT, 291 u64::from(self.tap_tx_event_id), 292 ) 293 .map_err(NetQueuePairError::RegisterListener)?; 294 self.tx_tap_listening = true; 295 info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable."); 296 } else if !tx_tap_retry && self.tx_tap_listening { 297 unregister_listener( 298 self.epoll_fd.unwrap(), 299 self.tap_for_write_epoll.as_raw_fd(), 300 epoll::Events::EPOLLOUT, 301 u64::from(self.tap_tx_event_id), 302 ) 303 .map_err(NetQueuePairError::UnregisterListener)?; 304 self.tx_tap_listening = false; 305 info!("Writing to TAP succeeded. No longer listening for TAP to become writable."); 306 } 307 308 self.counters 309 .tx_bytes 310 .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); 311 self.counters 312 .tx_frames 313 .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); 314 self.tx.counter_bytes = Wrapping(0); 315 self.tx.counter_frames = Wrapping(0); 316 317 Ok(queue.needs_notification(&mem, queue.next_used)) 318 } 319 320 pub fn process_rx(&mut self, mut queue: &mut Queue) -> Result<bool, NetQueuePairError> { 321 let mem = self 322 .mem 323 .as_ref() 324 .ok_or(NetQueuePairError::NoMemoryConfigured) 325 .map(|m| m.memory())?; 326 327 self.rx_desc_avail = !self.rx.process_desc_chain( 328 &mem, 329 &mut self.tap, 330 &mut queue, 331 &mut self.rx_rate_limiter, 332 )?; 333 let rate_limit_reached = self 334 .rx_rate_limiter 335 .as_ref() 336 .map_or(false, |r| r.is_blocked()); 337 338 // Stop listening on the `RX_TAP_EVENT` when: 339 // 1) there is no available describles, or 340 // 2) the RX rate limit is reached. 341 if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) { 342 unregister_listener( 343 self.epoll_fd.unwrap(), 344 self.tap.as_raw_fd(), 345 epoll::Events::EPOLLIN, 346 u64::from(self.tap_rx_event_id), 347 ) 348 .map_err(NetQueuePairError::UnregisterListener)?; 349 self.rx_tap_listening = false; 350 } 351 352 self.counters 353 .rx_bytes 354 .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); 355 self.counters 356 .rx_frames 357 .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); 358 self.rx.counter_bytes = Wrapping(0); 359 self.rx.counter_frames = Wrapping(0); 360 361 Ok(queue.needs_notification(&mem, queue.next_used)) 362 } 363 } 364