1 // Copyright (c) 2020 Intel Corporation. All rights reserved. 2 // 3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 4 5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; 6 use crate::GuestMemoryMmap; 7 use rate_limiter::{RateLimiter, TokenType}; 8 use std::io; 9 use std::num::Wrapping; 10 use std::os::unix::io::{AsRawFd, RawFd}; 11 use std::sync::atomic::{AtomicU64, Ordering}; 12 use std::sync::Arc; 13 use thiserror::Error; 14 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 15 use vm_memory::{Bytes, GuestMemory}; 16 use vm_virtio::{AccessPlatform, Translatable}; 17 18 #[derive(Clone)] 19 pub struct TxVirtio { 20 pub counter_bytes: Wrapping<u64>, 21 pub counter_frames: Wrapping<u64>, 22 } 23 24 impl Default for TxVirtio { 25 fn default() -> Self { 26 Self::new() 27 } 28 } 29 30 impl TxVirtio { 31 pub fn new() -> Self { 32 TxVirtio { 33 counter_bytes: Wrapping(0), 34 counter_frames: Wrapping(0), 35 } 36 } 37 38 pub fn process_desc_chain( 39 &mut self, 40 mem: &GuestMemoryMmap, 41 tap: &Tap, 42 queue: &mut Queue, 43 rate_limiter: &mut Option<RateLimiter>, 44 access_platform: Option<&Arc<dyn AccessPlatform>>, 45 ) -> Result<bool, NetQueuePairError> { 46 let mut retry_write = false; 47 let mut rate_limit_reached = false; 48 49 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 50 if rate_limit_reached { 51 queue.go_to_previous_position(); 52 break; 53 } 54 55 let mut next_desc = desc_chain.next(); 56 57 let mut iovecs = Vec::new(); 58 while let Some(desc) = next_desc { 59 let desc_addr = desc 60 .addr() 61 .translate_gva(access_platform, desc.len() as usize); 62 if !desc.is_write_only() && desc.len() > 0 { 63 let buf = desc_chain 64 .memory() 65 .get_slice(desc_addr, desc.len() as usize) 66 .map_err(NetQueuePairError::GuestMemory)? 67 .ptr_guard_mut(); 68 let iovec = libc::iovec { 69 iov_base: buf.as_ptr() as *mut libc::c_void, 70 iov_len: desc.len() as libc::size_t, 71 }; 72 iovecs.push(iovec); 73 } else { 74 error!( 75 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 76 desc_addr.0, 77 desc.len(), 78 desc.is_write_only() 79 ); 80 return Err(NetQueuePairError::DescriptorChainInvalid); 81 } 82 next_desc = desc_chain.next(); 83 } 84 85 let len = if !iovecs.is_empty() { 86 // SAFETY: FFI call with correct arguments 87 let result = unsafe { 88 libc::writev( 89 tap.as_raw_fd() as libc::c_int, 90 iovecs.as_ptr(), 91 iovecs.len() as libc::c_int, 92 ) 93 }; 94 95 if result < 0 { 96 let e = std::io::Error::last_os_error(); 97 98 /* EAGAIN */ 99 if e.kind() == std::io::ErrorKind::WouldBlock { 100 queue.go_to_previous_position(); 101 retry_write = true; 102 break; 103 } 104 error!("net: tx: failed writing to tap: {}", e); 105 return Err(NetQueuePairError::WriteTap(e)); 106 } 107 108 if (result as usize) < vnet_hdr_len() { 109 return Err(NetQueuePairError::InvalidVirtioNetHeader); 110 } 111 112 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 113 self.counter_frames += Wrapping(1); 114 115 result as u32 116 } else { 117 0 118 }; 119 120 // For the sake of simplicity (similar to the RX rate limiting), we always 121 // let the 'last' descriptor chain go-through even if it was over the rate 122 // limit, and simply stop processing oncoming `avail_desc` if any. 123 if let Some(rate_limiter) = rate_limiter { 124 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 125 || !rate_limiter.consume(len as u64, TokenType::Bytes); 126 } 127 128 queue 129 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 130 .map_err(NetQueuePairError::QueueAddUsed)?; 131 132 if !queue 133 .enable_notification(mem) 134 .map_err(NetQueuePairError::QueueEnableNotification)? 135 { 136 break; 137 } 138 } 139 140 Ok(retry_write) 141 } 142 } 143 144 #[derive(Clone)] 145 pub struct RxVirtio { 146 pub counter_bytes: Wrapping<u64>, 147 pub counter_frames: Wrapping<u64>, 148 } 149 150 impl Default for RxVirtio { 151 fn default() -> Self { 152 Self::new() 153 } 154 } 155 156 impl RxVirtio { 157 pub fn new() -> Self { 158 RxVirtio { 159 counter_bytes: Wrapping(0), 160 counter_frames: Wrapping(0), 161 } 162 } 163 164 pub fn process_desc_chain( 165 &mut self, 166 mem: &GuestMemoryMmap, 167 tap: &Tap, 168 queue: &mut Queue, 169 rate_limiter: &mut Option<RateLimiter>, 170 access_platform: Option<&Arc<dyn AccessPlatform>>, 171 ) -> Result<bool, NetQueuePairError> { 172 let mut exhausted_descs = true; 173 let mut rate_limit_reached = false; 174 175 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 176 if rate_limit_reached { 177 exhausted_descs = false; 178 queue.go_to_previous_position(); 179 break; 180 } 181 182 let desc = desc_chain 183 .next() 184 .ok_or(NetQueuePairError::DescriptorChainTooShort)?; 185 186 let num_buffers_addr = desc_chain 187 .memory() 188 .checked_offset( 189 desc.addr() 190 .translate_gva(access_platform, desc.len() as usize), 191 10, 192 ) 193 .ok_or(NetQueuePairError::DescriptorInvalidHeader)?; 194 let mut next_desc = Some(desc); 195 196 let mut iovecs = Vec::new(); 197 while let Some(desc) = next_desc { 198 let desc_addr = desc 199 .addr() 200 .translate_gva(access_platform, desc.len() as usize); 201 if desc.is_write_only() && desc.len() > 0 { 202 let buf = desc_chain 203 .memory() 204 .get_slice(desc_addr, desc.len() as usize) 205 .map_err(NetQueuePairError::GuestMemory)? 206 .ptr_guard_mut(); 207 let iovec = libc::iovec { 208 iov_base: buf.as_ptr() as *mut libc::c_void, 209 iov_len: desc.len() as libc::size_t, 210 }; 211 iovecs.push(iovec); 212 } else { 213 error!( 214 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 215 desc_addr.0, 216 desc.len(), 217 desc.is_write_only() 218 ); 219 return Err(NetQueuePairError::DescriptorChainInvalid); 220 } 221 next_desc = desc_chain.next(); 222 } 223 224 let len = if !iovecs.is_empty() { 225 // SAFETY: FFI call with correct arguments 226 let result = unsafe { 227 libc::readv( 228 tap.as_raw_fd() as libc::c_int, 229 iovecs.as_ptr(), 230 iovecs.len() as libc::c_int, 231 ) 232 }; 233 if result < 0 { 234 let e = std::io::Error::last_os_error(); 235 exhausted_descs = false; 236 queue.go_to_previous_position(); 237 238 /* EAGAIN */ 239 if e.kind() == std::io::ErrorKind::WouldBlock { 240 break; 241 } 242 243 error!("net: rx: failed reading from tap: {}", e); 244 return Err(NetQueuePairError::ReadTap(e)); 245 } 246 247 if (result as usize) < vnet_hdr_len() { 248 return Err(NetQueuePairError::InvalidVirtioNetHeader); 249 } 250 251 // Write num_buffers to guest memory. We simply write 1 as we 252 // never spread the frame over more than one descriptor chain. 253 desc_chain 254 .memory() 255 .write_obj(1u16, num_buffers_addr) 256 .map_err(NetQueuePairError::GuestMemory)?; 257 258 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 259 self.counter_frames += Wrapping(1); 260 261 result as u32 262 } else { 263 0 264 }; 265 266 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and 267 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor 268 // chain go-through even if it was over the rate limit, and simply stop 269 // processing oncoming `avail_desc` if any. 270 if let Some(rate_limiter) = rate_limiter { 271 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 272 || !rate_limiter.consume(len as u64, TokenType::Bytes); 273 } 274 275 queue 276 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 277 .map_err(NetQueuePairError::QueueAddUsed)?; 278 279 if !queue 280 .enable_notification(mem) 281 .map_err(NetQueuePairError::QueueEnableNotification)? 282 { 283 break; 284 } 285 } 286 287 Ok(exhausted_descs) 288 } 289 } 290 291 #[derive(Default, Clone)] 292 pub struct NetCounters { 293 pub tx_bytes: Arc<AtomicU64>, 294 pub tx_frames: Arc<AtomicU64>, 295 pub rx_bytes: Arc<AtomicU64>, 296 pub rx_frames: Arc<AtomicU64>, 297 } 298 299 #[derive(Error, Debug)] 300 pub enum NetQueuePairError { 301 #[error("No memory configured")] 302 NoMemoryConfigured, 303 #[error("Error registering listener: {0}")] 304 RegisterListener(io::Error), 305 #[error("Error unregistering listener: {0}")] 306 UnregisterListener(io::Error), 307 #[error("Error writing to the TAP device: {0}")] 308 WriteTap(io::Error), 309 #[error("Error reading from the TAP device: {0}")] 310 ReadTap(io::Error), 311 #[error("Error related to guest memory: {0}")] 312 GuestMemory(vm_memory::GuestMemoryError), 313 #[error("Returned an error while iterating through the queue: {0}")] 314 QueueIteratorFailed(virtio_queue::Error), 315 #[error("Descriptor chain is too short")] 316 DescriptorChainTooShort, 317 #[error("Descriptor chain does not contain valid descriptors")] 318 DescriptorChainInvalid, 319 #[error("Failed to determine if queue needed notification: {0}")] 320 QueueNeedsNotification(virtio_queue::Error), 321 #[error("Failed to enable notification on the queue: {0}")] 322 QueueEnableNotification(virtio_queue::Error), 323 #[error("Failed to add used index to the queue: {0}")] 324 QueueAddUsed(virtio_queue::Error), 325 #[error("Descriptor with invalid virtio-net header")] 326 DescriptorInvalidHeader, 327 #[error("Invalid virtio-net header")] 328 InvalidVirtioNetHeader, 329 } 330 331 pub struct NetQueuePair { 332 pub tap: Tap, 333 // With epoll each FD must be unique. So in order to filter the 334 // events we need to get a second FD responding to the original 335 // device so that we can send EPOLLOUT and EPOLLIN to separate 336 // events. 337 pub tap_for_write_epoll: Tap, 338 pub rx: RxVirtio, 339 pub tx: TxVirtio, 340 pub epoll_fd: Option<RawFd>, 341 pub rx_tap_listening: bool, 342 pub tx_tap_listening: bool, 343 pub counters: NetCounters, 344 pub tap_rx_event_id: u16, 345 pub tap_tx_event_id: u16, 346 pub rx_desc_avail: bool, 347 pub rx_rate_limiter: Option<RateLimiter>, 348 pub tx_rate_limiter: Option<RateLimiter>, 349 pub access_platform: Option<Arc<dyn AccessPlatform>>, 350 } 351 352 impl NetQueuePair { 353 pub fn process_tx( 354 &mut self, 355 mem: &GuestMemoryMmap, 356 queue: &mut Queue, 357 ) -> Result<bool, NetQueuePairError> { 358 let tx_tap_retry = self.tx.process_desc_chain( 359 mem, 360 &self.tap, 361 queue, 362 &mut self.tx_rate_limiter, 363 self.access_platform.as_ref(), 364 )?; 365 366 // We got told to try again when writing to the tap. Wait for the TAP to be writable 367 if tx_tap_retry && !self.tx_tap_listening { 368 register_listener( 369 self.epoll_fd.unwrap(), 370 self.tap_for_write_epoll.as_raw_fd(), 371 epoll::Events::EPOLLOUT, 372 u64::from(self.tap_tx_event_id), 373 ) 374 .map_err(NetQueuePairError::RegisterListener)?; 375 self.tx_tap_listening = true; 376 info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable."); 377 } else if !tx_tap_retry && self.tx_tap_listening { 378 unregister_listener( 379 self.epoll_fd.unwrap(), 380 self.tap_for_write_epoll.as_raw_fd(), 381 epoll::Events::EPOLLOUT, 382 u64::from(self.tap_tx_event_id), 383 ) 384 .map_err(NetQueuePairError::UnregisterListener)?; 385 self.tx_tap_listening = false; 386 info!("Writing to TAP succeeded. No longer listening for TAP to become writable."); 387 } 388 389 self.counters 390 .tx_bytes 391 .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); 392 self.counters 393 .tx_frames 394 .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); 395 self.tx.counter_bytes = Wrapping(0); 396 self.tx.counter_frames = Wrapping(0); 397 398 queue 399 .needs_notification(mem) 400 .map_err(NetQueuePairError::QueueNeedsNotification) 401 } 402 403 pub fn process_rx( 404 &mut self, 405 mem: &GuestMemoryMmap, 406 queue: &mut Queue, 407 ) -> Result<bool, NetQueuePairError> { 408 self.rx_desc_avail = !self.rx.process_desc_chain( 409 mem, 410 &self.tap, 411 queue, 412 &mut self.rx_rate_limiter, 413 self.access_platform.as_ref(), 414 )?; 415 let rate_limit_reached = self 416 .rx_rate_limiter 417 .as_ref() 418 .map_or(false, |r| r.is_blocked()); 419 420 // Stop listening on the `RX_TAP_EVENT` when: 421 // 1) there is no available describles, or 422 // 2) the RX rate limit is reached. 423 if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) { 424 unregister_listener( 425 self.epoll_fd.unwrap(), 426 self.tap.as_raw_fd(), 427 epoll::Events::EPOLLIN, 428 u64::from(self.tap_rx_event_id), 429 ) 430 .map_err(NetQueuePairError::UnregisterListener)?; 431 self.rx_tap_listening = false; 432 } 433 434 self.counters 435 .rx_bytes 436 .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); 437 self.counters 438 .rx_frames 439 .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); 440 self.rx.counter_bytes = Wrapping(0); 441 self.rx.counter_frames = Wrapping(0); 442 443 queue 444 .needs_notification(mem) 445 .map_err(NetQueuePairError::QueueNeedsNotification) 446 } 447 } 448