1 // Copyright (c) 2020 Intel Corporation. All rights reserved. 2 // 3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 4 5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; 6 use crate::GuestMemoryMmap; 7 use rate_limiter::{RateLimiter, TokenType}; 8 use std::io; 9 use std::num::Wrapping; 10 use std::os::unix::io::{AsRawFd, RawFd}; 11 use std::sync::atomic::{AtomicU64, Ordering}; 12 use std::sync::Arc; 13 use virtio_queue::Queue; 14 use vm_memory::{Bytes, GuestMemory, GuestMemoryAtomic}; 15 use vm_virtio::{AccessPlatform, Translatable}; 16 17 #[derive(Clone)] 18 pub struct TxVirtio { 19 pub counter_bytes: Wrapping<u64>, 20 pub counter_frames: Wrapping<u64>, 21 } 22 23 impl Default for TxVirtio { 24 fn default() -> Self { 25 Self::new() 26 } 27 } 28 29 impl TxVirtio { 30 pub fn new() -> Self { 31 TxVirtio { 32 counter_bytes: Wrapping(0), 33 counter_frames: Wrapping(0), 34 } 35 } 36 37 pub fn process_desc_chain( 38 &mut self, 39 tap: &mut Tap, 40 queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>, 41 rate_limiter: &mut Option<RateLimiter>, 42 access_platform: Option<&Arc<dyn AccessPlatform>>, 43 ) -> Result<bool, NetQueuePairError> { 44 let mut retry_write = false; 45 let mut rate_limit_reached = false; 46 47 loop { 48 let used_desc_head: (u16, u32); 49 let mut avail_iter = queue 50 .iter() 51 .map_err(NetQueuePairError::QueueIteratorFailed)?; 52 53 if let Some(mut desc_chain) = avail_iter.next() { 54 if rate_limit_reached { 55 avail_iter.go_to_previous_position(); 56 break; 57 } 58 59 let mut next_desc = desc_chain.next(); 60 61 let mut iovecs = Vec::new(); 62 while let Some(desc) = next_desc { 63 let desc_addr = desc 64 .addr() 65 .translate_gva(access_platform, desc.len() as usize); 66 if !desc.is_write_only() && desc.len() > 0 { 67 let buf = desc_chain 68 .memory() 69 .get_slice(desc_addr, desc.len() as usize) 70 .map_err(NetQueuePairError::GuestMemory)? 71 .as_ptr(); 72 let iovec = libc::iovec { 73 iov_base: buf as *mut libc::c_void, 74 iov_len: desc.len() as libc::size_t, 75 }; 76 iovecs.push(iovec); 77 } else { 78 error!( 79 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 80 desc_addr.0, 81 desc.len(), 82 desc.is_write_only() 83 ); 84 return Err(NetQueuePairError::DescriptorChainInvalid); 85 } 86 next_desc = desc_chain.next(); 87 } 88 89 let len = if !iovecs.is_empty() { 90 let result = unsafe { 91 libc::writev( 92 tap.as_raw_fd() as libc::c_int, 93 iovecs.as_ptr() as *const libc::iovec, 94 iovecs.len() as libc::c_int, 95 ) 96 }; 97 98 if result < 0 { 99 let e = std::io::Error::last_os_error(); 100 101 /* EAGAIN */ 102 if e.kind() == std::io::ErrorKind::WouldBlock { 103 avail_iter.go_to_previous_position(); 104 retry_write = true; 105 break; 106 } 107 error!("net: tx: failed writing to tap: {}", e); 108 return Err(NetQueuePairError::WriteTap(e)); 109 } 110 111 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 112 self.counter_frames += Wrapping(1); 113 114 result as u32 115 } else { 116 0 117 }; 118 119 used_desc_head = (desc_chain.head_index(), len); 120 121 // For the sake of simplicity (similar to the RX rate limiting), we always 122 // let the 'last' descriptor chain go-through even if it was over the rate 123 // limit, and simply stop processing oncoming `avail_desc` if any. 124 if let Some(rate_limiter) = rate_limiter { 125 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 126 || !rate_limiter.consume(len as u64, TokenType::Bytes); 127 } 128 } else { 129 break; 130 } 131 132 queue 133 .add_used(used_desc_head.0, used_desc_head.1) 134 .map_err(NetQueuePairError::QueueAddUsed)?; 135 if !queue 136 .enable_notification() 137 .map_err(NetQueuePairError::QueueEnableNotification)? 138 { 139 break; 140 } 141 } 142 143 Ok(retry_write) 144 } 145 } 146 147 #[derive(Clone)] 148 pub struct RxVirtio { 149 pub counter_bytes: Wrapping<u64>, 150 pub counter_frames: Wrapping<u64>, 151 } 152 153 impl Default for RxVirtio { 154 fn default() -> Self { 155 Self::new() 156 } 157 } 158 159 impl RxVirtio { 160 pub fn new() -> Self { 161 RxVirtio { 162 counter_bytes: Wrapping(0), 163 counter_frames: Wrapping(0), 164 } 165 } 166 167 pub fn process_desc_chain( 168 &mut self, 169 tap: &mut Tap, 170 queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>, 171 rate_limiter: &mut Option<RateLimiter>, 172 access_platform: Option<&Arc<dyn AccessPlatform>>, 173 ) -> Result<bool, NetQueuePairError> { 174 let mut exhausted_descs = true; 175 let mut rate_limit_reached = false; 176 177 loop { 178 let used_desc_head: (u16, u32); 179 let mut avail_iter = queue 180 .iter() 181 .map_err(NetQueuePairError::QueueIteratorFailed)?; 182 183 if let Some(mut desc_chain) = avail_iter.next() { 184 if rate_limit_reached { 185 exhausted_descs = false; 186 avail_iter.go_to_previous_position(); 187 break; 188 } 189 190 let desc = desc_chain 191 .next() 192 .ok_or(NetQueuePairError::DescriptorChainTooShort)?; 193 194 let num_buffers_addr = desc_chain 195 .memory() 196 .checked_offset( 197 desc.addr() 198 .translate_gva(access_platform, desc.len() as usize), 199 10, 200 ) 201 .unwrap(); 202 let mut next_desc = Some(desc); 203 204 let mut iovecs = Vec::new(); 205 while let Some(desc) = next_desc { 206 let desc_addr = desc 207 .addr() 208 .translate_gva(access_platform, desc.len() as usize); 209 if desc.is_write_only() && desc.len() > 0 { 210 let buf = desc_chain 211 .memory() 212 .get_slice(desc_addr, desc.len() as usize) 213 .map_err(NetQueuePairError::GuestMemory)? 214 .as_ptr(); 215 let iovec = libc::iovec { 216 iov_base: buf as *mut libc::c_void, 217 iov_len: desc.len() as libc::size_t, 218 }; 219 iovecs.push(iovec); 220 } else { 221 error!( 222 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 223 desc_addr.0, 224 desc.len(), 225 desc.is_write_only() 226 ); 227 return Err(NetQueuePairError::DescriptorChainInvalid); 228 } 229 next_desc = desc_chain.next(); 230 } 231 232 let len = if !iovecs.is_empty() { 233 let result = unsafe { 234 libc::readv( 235 tap.as_raw_fd() as libc::c_int, 236 iovecs.as_ptr() as *const libc::iovec, 237 iovecs.len() as libc::c_int, 238 ) 239 }; 240 if result < 0 { 241 let e = std::io::Error::last_os_error(); 242 exhausted_descs = false; 243 avail_iter.go_to_previous_position(); 244 245 /* EAGAIN */ 246 if e.kind() == std::io::ErrorKind::WouldBlock { 247 break; 248 } 249 250 error!("net: rx: failed reading from tap: {}", e); 251 return Err(NetQueuePairError::ReadTap(e)); 252 } 253 254 // Write num_buffers to guest memory. We simply write 1 as we 255 // never spread the frame over more than one descriptor chain. 256 desc_chain 257 .memory() 258 .write_obj(1u16, num_buffers_addr) 259 .map_err(NetQueuePairError::GuestMemory)?; 260 261 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 262 self.counter_frames += Wrapping(1); 263 264 result as u32 265 } else { 266 0 267 }; 268 269 used_desc_head = (desc_chain.head_index(), len); 270 271 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and 272 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor 273 // chain go-through even if it was over the rate limit, and simply stop 274 // processing oncoming `avail_desc` if any. 275 if let Some(rate_limiter) = rate_limiter { 276 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 277 || !rate_limiter.consume(len as u64, TokenType::Bytes); 278 } 279 } else { 280 break; 281 } 282 283 queue 284 .add_used(used_desc_head.0, used_desc_head.1) 285 .map_err(NetQueuePairError::QueueAddUsed)?; 286 if !queue 287 .enable_notification() 288 .map_err(NetQueuePairError::QueueEnableNotification)? 289 { 290 break; 291 } 292 } 293 294 Ok(exhausted_descs) 295 } 296 } 297 298 #[derive(Default, Clone)] 299 pub struct NetCounters { 300 pub tx_bytes: Arc<AtomicU64>, 301 pub tx_frames: Arc<AtomicU64>, 302 pub rx_bytes: Arc<AtomicU64>, 303 pub rx_frames: Arc<AtomicU64>, 304 } 305 306 #[derive(Debug)] 307 pub enum NetQueuePairError { 308 /// No memory configured 309 NoMemoryConfigured, 310 /// Error registering listener 311 RegisterListener(io::Error), 312 /// Error unregistering listener 313 UnregisterListener(io::Error), 314 /// Error writing to the TAP device 315 WriteTap(io::Error), 316 /// Error reading from the TAP device 317 ReadTap(io::Error), 318 /// Error related to guest memory 319 GuestMemory(vm_memory::GuestMemoryError), 320 /// Returned an error while iterating through the queue 321 QueueIteratorFailed(virtio_queue::Error), 322 /// Descriptor chain is too short 323 DescriptorChainTooShort, 324 /// Descriptor chain does not contain valid descriptors 325 DescriptorChainInvalid, 326 /// Failed to determine if queue needed notification 327 QueueNeedsNotification(virtio_queue::Error), 328 /// Failed to enable notification on the queue 329 QueueEnableNotification(virtio_queue::Error), 330 /// Failed to add used index to the queue 331 QueueAddUsed(virtio_queue::Error), 332 } 333 334 pub struct NetQueuePair { 335 pub tap: Tap, 336 // With epoll each FD must be unique. So in order to filter the 337 // events we need to get a second FD responding to the original 338 // device so that we can send EPOLLOUT and EPOLLIN to separate 339 // events. 340 pub tap_for_write_epoll: Tap, 341 pub rx: RxVirtio, 342 pub tx: TxVirtio, 343 pub epoll_fd: Option<RawFd>, 344 pub rx_tap_listening: bool, 345 pub tx_tap_listening: bool, 346 pub counters: NetCounters, 347 pub tap_rx_event_id: u16, 348 pub tap_tx_event_id: u16, 349 pub rx_desc_avail: bool, 350 pub rx_rate_limiter: Option<RateLimiter>, 351 pub tx_rate_limiter: Option<RateLimiter>, 352 pub access_platform: Option<Arc<dyn AccessPlatform>>, 353 } 354 355 impl NetQueuePair { 356 pub fn process_tx( 357 &mut self, 358 queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>, 359 ) -> Result<bool, NetQueuePairError> { 360 let tx_tap_retry = self.tx.process_desc_chain( 361 &mut self.tap, 362 queue, 363 &mut self.tx_rate_limiter, 364 self.access_platform.as_ref(), 365 )?; 366 367 // We got told to try again when writing to the tap. Wait for the TAP to be writable 368 if tx_tap_retry && !self.tx_tap_listening { 369 register_listener( 370 self.epoll_fd.unwrap(), 371 self.tap_for_write_epoll.as_raw_fd(), 372 epoll::Events::EPOLLOUT, 373 u64::from(self.tap_tx_event_id), 374 ) 375 .map_err(NetQueuePairError::RegisterListener)?; 376 self.tx_tap_listening = true; 377 info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable."); 378 } else if !tx_tap_retry && self.tx_tap_listening { 379 unregister_listener( 380 self.epoll_fd.unwrap(), 381 self.tap_for_write_epoll.as_raw_fd(), 382 epoll::Events::EPOLLOUT, 383 u64::from(self.tap_tx_event_id), 384 ) 385 .map_err(NetQueuePairError::UnregisterListener)?; 386 self.tx_tap_listening = false; 387 info!("Writing to TAP succeeded. No longer listening for TAP to become writable."); 388 } 389 390 self.counters 391 .tx_bytes 392 .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); 393 self.counters 394 .tx_frames 395 .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); 396 self.tx.counter_bytes = Wrapping(0); 397 self.tx.counter_frames = Wrapping(0); 398 399 queue 400 .needs_notification() 401 .map_err(NetQueuePairError::QueueNeedsNotification) 402 } 403 404 pub fn process_rx( 405 &mut self, 406 queue: &mut Queue<GuestMemoryAtomic<GuestMemoryMmap>>, 407 ) -> Result<bool, NetQueuePairError> { 408 self.rx_desc_avail = !self.rx.process_desc_chain( 409 &mut self.tap, 410 queue, 411 &mut self.rx_rate_limiter, 412 self.access_platform.as_ref(), 413 )?; 414 let rate_limit_reached = self 415 .rx_rate_limiter 416 .as_ref() 417 .map_or(false, |r| r.is_blocked()); 418 419 // Stop listening on the `RX_TAP_EVENT` when: 420 // 1) there is no available describles, or 421 // 2) the RX rate limit is reached. 422 if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) { 423 unregister_listener( 424 self.epoll_fd.unwrap(), 425 self.tap.as_raw_fd(), 426 epoll::Events::EPOLLIN, 427 u64::from(self.tap_rx_event_id), 428 ) 429 .map_err(NetQueuePairError::UnregisterListener)?; 430 self.rx_tap_listening = false; 431 } 432 433 self.counters 434 .rx_bytes 435 .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); 436 self.counters 437 .rx_frames 438 .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); 439 self.rx.counter_bytes = Wrapping(0); 440 self.rx.counter_frames = Wrapping(0); 441 442 queue 443 .needs_notification() 444 .map_err(NetQueuePairError::QueueNeedsNotification) 445 } 446 } 447