1 // Copyright (c) 2020 Intel Corporation. All rights reserved. 2 // 3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 4 5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; 6 use rate_limiter::{RateLimiter, TokenType}; 7 use std::io; 8 use std::num::Wrapping; 9 use std::os::unix::io::{AsRawFd, RawFd}; 10 use std::sync::atomic::{AtomicU64, Ordering}; 11 use std::sync::Arc; 12 use thiserror::Error; 13 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 14 use vm_memory::bitmap::Bitmap; 15 use vm_memory::{Bytes, GuestMemory}; 16 use vm_virtio::{AccessPlatform, Translatable}; 17 18 #[derive(Clone)] 19 pub struct TxVirtio { 20 pub counter_bytes: Wrapping<u64>, 21 pub counter_frames: Wrapping<u64>, 22 iovecs: IovecBuffer, 23 } 24 25 impl Default for TxVirtio { 26 fn default() -> Self { 27 Self::new() 28 } 29 } 30 31 impl TxVirtio { 32 pub fn new() -> Self { 33 TxVirtio { 34 counter_bytes: Wrapping(0), 35 counter_frames: Wrapping(0), 36 iovecs: IovecBuffer::new(), 37 } 38 } 39 40 pub fn process_desc_chain<B: Bitmap + 'static>( 41 &mut self, 42 mem: &vm_memory::GuestMemoryMmap<B>, 43 tap: &Tap, 44 queue: &mut Queue, 45 rate_limiter: &mut Option<RateLimiter>, 46 access_platform: Option<&Arc<dyn AccessPlatform>>, 47 ) -> Result<bool, NetQueuePairError> { 48 let mut retry_write = false; 49 let mut rate_limit_reached = false; 50 51 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 52 if rate_limit_reached { 53 queue.go_to_previous_position(); 54 break; 55 } 56 57 let mut next_desc = desc_chain.next(); 58 59 let mut iovecs = self.iovecs.borrow(); 60 while let Some(desc) = next_desc { 61 let desc_addr = desc 62 .addr() 63 .translate_gva(access_platform, desc.len() as usize); 64 if !desc.is_write_only() && desc.len() > 0 { 65 let buf = desc_chain 66 .memory() 67 .get_slice(desc_addr, desc.len() as usize) 68 .map_err(NetQueuePairError::GuestMemory)? 69 .ptr_guard_mut(); 70 let iovec = libc::iovec { 71 iov_base: buf.as_ptr() as *mut libc::c_void, 72 iov_len: desc.len() as libc::size_t, 73 }; 74 iovecs.push(iovec); 75 } else { 76 error!( 77 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 78 desc_addr.0, 79 desc.len(), 80 desc.is_write_only() 81 ); 82 return Err(NetQueuePairError::DescriptorChainInvalid); 83 } 84 next_desc = desc_chain.next(); 85 } 86 87 let len = if !iovecs.is_empty() { 88 // SAFETY: FFI call with correct arguments 89 let result = unsafe { 90 libc::writev( 91 tap.as_raw_fd() as libc::c_int, 92 iovecs.as_ptr(), 93 iovecs.len() as libc::c_int, 94 ) 95 }; 96 97 if result < 0 { 98 let e = std::io::Error::last_os_error(); 99 100 /* EAGAIN */ 101 if e.kind() == std::io::ErrorKind::WouldBlock { 102 queue.go_to_previous_position(); 103 retry_write = true; 104 break; 105 } 106 error!("net: tx: failed writing to tap: {}", e); 107 return Err(NetQueuePairError::WriteTap(e)); 108 } 109 110 if (result as usize) < vnet_hdr_len() { 111 return Err(NetQueuePairError::InvalidVirtioNetHeader); 112 } 113 114 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 115 self.counter_frames += Wrapping(1); 116 117 result as u32 118 } else { 119 0 120 }; 121 122 // For the sake of simplicity (similar to the RX rate limiting), we always 123 // let the 'last' descriptor chain go-through even if it was over the rate 124 // limit, and simply stop processing oncoming `avail_desc` if any. 125 if let Some(rate_limiter) = rate_limiter { 126 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 127 || !rate_limiter.consume(len as u64, TokenType::Bytes); 128 } 129 130 queue 131 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 132 .map_err(NetQueuePairError::QueueAddUsed)?; 133 134 if !queue 135 .enable_notification(mem) 136 .map_err(NetQueuePairError::QueueEnableNotification)? 137 { 138 break; 139 } 140 } 141 142 Ok(retry_write) 143 } 144 } 145 146 #[derive(Clone)] 147 pub struct RxVirtio { 148 pub counter_bytes: Wrapping<u64>, 149 pub counter_frames: Wrapping<u64>, 150 iovecs: IovecBuffer, 151 } 152 153 impl Default for RxVirtio { 154 fn default() -> Self { 155 Self::new() 156 } 157 } 158 159 impl RxVirtio { 160 pub fn new() -> Self { 161 RxVirtio { 162 counter_bytes: Wrapping(0), 163 counter_frames: Wrapping(0), 164 iovecs: IovecBuffer::new(), 165 } 166 } 167 168 pub fn process_desc_chain<B: Bitmap + 'static>( 169 &mut self, 170 mem: &vm_memory::GuestMemoryMmap<B>, 171 tap: &Tap, 172 queue: &mut Queue, 173 rate_limiter: &mut Option<RateLimiter>, 174 access_platform: Option<&Arc<dyn AccessPlatform>>, 175 ) -> Result<bool, NetQueuePairError> { 176 let mut exhausted_descs = true; 177 let mut rate_limit_reached = false; 178 179 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 180 if rate_limit_reached { 181 exhausted_descs = false; 182 queue.go_to_previous_position(); 183 break; 184 } 185 186 let desc = desc_chain 187 .next() 188 .ok_or(NetQueuePairError::DescriptorChainTooShort)?; 189 190 let num_buffers_addr = desc_chain 191 .memory() 192 .checked_offset( 193 desc.addr() 194 .translate_gva(access_platform, desc.len() as usize), 195 10, 196 ) 197 .ok_or(NetQueuePairError::DescriptorInvalidHeader)?; 198 let mut next_desc = Some(desc); 199 200 let mut iovecs = self.iovecs.borrow(); 201 while let Some(desc) = next_desc { 202 let desc_addr = desc 203 .addr() 204 .translate_gva(access_platform, desc.len() as usize); 205 if desc.is_write_only() && desc.len() > 0 { 206 let buf = desc_chain 207 .memory() 208 .get_slice(desc_addr, desc.len() as usize) 209 .map_err(NetQueuePairError::GuestMemory)? 210 .ptr_guard_mut(); 211 let iovec = libc::iovec { 212 iov_base: buf.as_ptr() as *mut libc::c_void, 213 iov_len: desc.len() as libc::size_t, 214 }; 215 iovecs.push(iovec); 216 } else { 217 error!( 218 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 219 desc_addr.0, 220 desc.len(), 221 desc.is_write_only() 222 ); 223 return Err(NetQueuePairError::DescriptorChainInvalid); 224 } 225 next_desc = desc_chain.next(); 226 } 227 228 let len = if !iovecs.is_empty() { 229 // SAFETY: FFI call with correct arguments 230 let result = unsafe { 231 libc::readv( 232 tap.as_raw_fd() as libc::c_int, 233 iovecs.as_ptr(), 234 iovecs.len() as libc::c_int, 235 ) 236 }; 237 if result < 0 { 238 let e = std::io::Error::last_os_error(); 239 exhausted_descs = false; 240 queue.go_to_previous_position(); 241 242 /* EAGAIN */ 243 if e.kind() == std::io::ErrorKind::WouldBlock { 244 break; 245 } 246 247 error!("net: rx: failed reading from tap: {}", e); 248 return Err(NetQueuePairError::ReadTap(e)); 249 } 250 251 if (result as usize) < vnet_hdr_len() { 252 return Err(NetQueuePairError::InvalidVirtioNetHeader); 253 } 254 255 // Write num_buffers to guest memory. We simply write 1 as we 256 // never spread the frame over more than one descriptor chain. 257 desc_chain 258 .memory() 259 .write_obj(1u16, num_buffers_addr) 260 .map_err(NetQueuePairError::GuestMemory)?; 261 262 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 263 self.counter_frames += Wrapping(1); 264 265 result as u32 266 } else { 267 0 268 }; 269 270 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and 271 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor 272 // chain go-through even if it was over the rate limit, and simply stop 273 // processing oncoming `avail_desc` if any. 274 if let Some(rate_limiter) = rate_limiter { 275 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 276 || !rate_limiter.consume(len as u64, TokenType::Bytes); 277 } 278 279 queue 280 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 281 .map_err(NetQueuePairError::QueueAddUsed)?; 282 283 if !queue 284 .enable_notification(mem) 285 .map_err(NetQueuePairError::QueueEnableNotification)? 286 { 287 break; 288 } 289 } 290 291 Ok(exhausted_descs) 292 } 293 } 294 295 #[derive(Default, Clone)] 296 struct IovecBuffer(Vec<libc::iovec>); 297 298 // SAFETY: Implementing Send for IovecBuffer is safe as the pointer inside is iovec. 299 // The iovecs are usually constructed from virtio descriptors, which are safe to send across 300 // threads. 301 unsafe impl Send for IovecBuffer {} 302 // SAFETY: Implementing Sync for IovecBuffer is safe as the pointer inside is iovec. 303 // The iovecs are usually constructed from virtio descriptors, which are safe to access from 304 // multiple threads. 305 unsafe impl Sync for IovecBuffer {} 306 307 impl IovecBuffer { 308 fn new() -> Self { 309 // Here we use 4 as the default capacity because it is enough for most cases. 310 const DEFAULT_CAPACITY: usize = 4; 311 IovecBuffer(Vec::with_capacity(DEFAULT_CAPACITY)) 312 } 313 314 fn borrow(&mut self) -> IovecBufferBorrowed<'_> { 315 IovecBufferBorrowed(&mut self.0) 316 } 317 } 318 319 struct IovecBufferBorrowed<'a>(&'a mut Vec<libc::iovec>); 320 321 impl<'a> std::ops::Deref for IovecBufferBorrowed<'a> { 322 type Target = Vec<libc::iovec>; 323 324 fn deref(&self) -> &Self::Target { 325 self.0 326 } 327 } 328 329 impl<'a> std::ops::DerefMut for IovecBufferBorrowed<'a> { 330 fn deref_mut(&mut self) -> &mut Self::Target { 331 self.0 332 } 333 } 334 335 impl Drop for IovecBufferBorrowed<'_> { 336 fn drop(&mut self) { 337 // Clear the buffer to make sure old values are not used after 338 self.0.clear(); 339 } 340 } 341 342 #[derive(Default, Clone)] 343 pub struct NetCounters { 344 pub tx_bytes: Arc<AtomicU64>, 345 pub tx_frames: Arc<AtomicU64>, 346 pub rx_bytes: Arc<AtomicU64>, 347 pub rx_frames: Arc<AtomicU64>, 348 } 349 350 #[derive(Error, Debug)] 351 pub enum NetQueuePairError { 352 #[error("No memory configured")] 353 NoMemoryConfigured, 354 #[error("Error registering listener: {0}")] 355 RegisterListener(io::Error), 356 #[error("Error unregistering listener: {0}")] 357 UnregisterListener(io::Error), 358 #[error("Error writing to the TAP device: {0}")] 359 WriteTap(io::Error), 360 #[error("Error reading from the TAP device: {0}")] 361 ReadTap(io::Error), 362 #[error("Error related to guest memory: {0}")] 363 GuestMemory(vm_memory::GuestMemoryError), 364 #[error("Returned an error while iterating through the queue: {0}")] 365 QueueIteratorFailed(virtio_queue::Error), 366 #[error("Descriptor chain is too short")] 367 DescriptorChainTooShort, 368 #[error("Descriptor chain does not contain valid descriptors")] 369 DescriptorChainInvalid, 370 #[error("Failed to determine if queue needed notification: {0}")] 371 QueueNeedsNotification(virtio_queue::Error), 372 #[error("Failed to enable notification on the queue: {0}")] 373 QueueEnableNotification(virtio_queue::Error), 374 #[error("Failed to add used index to the queue: {0}")] 375 QueueAddUsed(virtio_queue::Error), 376 #[error("Descriptor with invalid virtio-net header")] 377 DescriptorInvalidHeader, 378 #[error("Invalid virtio-net header")] 379 InvalidVirtioNetHeader, 380 } 381 382 pub struct NetQueuePair { 383 pub tap: Tap, 384 // With epoll each FD must be unique. So in order to filter the 385 // events we need to get a second FD responding to the original 386 // device so that we can send EPOLLOUT and EPOLLIN to separate 387 // events. 388 pub tap_for_write_epoll: Tap, 389 pub rx: RxVirtio, 390 pub tx: TxVirtio, 391 pub epoll_fd: Option<RawFd>, 392 pub rx_tap_listening: bool, 393 pub tx_tap_listening: bool, 394 pub counters: NetCounters, 395 pub tap_rx_event_id: u16, 396 pub tap_tx_event_id: u16, 397 pub rx_desc_avail: bool, 398 pub rx_rate_limiter: Option<RateLimiter>, 399 pub tx_rate_limiter: Option<RateLimiter>, 400 pub access_platform: Option<Arc<dyn AccessPlatform>>, 401 } 402 403 impl NetQueuePair { 404 pub fn process_tx<B: Bitmap + 'static>( 405 &mut self, 406 mem: &vm_memory::GuestMemoryMmap<B>, 407 queue: &mut Queue, 408 ) -> Result<bool, NetQueuePairError> { 409 let tx_tap_retry = self.tx.process_desc_chain( 410 mem, 411 &self.tap, 412 queue, 413 &mut self.tx_rate_limiter, 414 self.access_platform.as_ref(), 415 )?; 416 417 // We got told to try again when writing to the tap. Wait for the TAP to be writable 418 if tx_tap_retry && !self.tx_tap_listening { 419 register_listener( 420 self.epoll_fd.unwrap(), 421 self.tap_for_write_epoll.as_raw_fd(), 422 epoll::Events::EPOLLOUT, 423 u64::from(self.tap_tx_event_id), 424 ) 425 .map_err(NetQueuePairError::RegisterListener)?; 426 self.tx_tap_listening = true; 427 info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable."); 428 } else if !tx_tap_retry && self.tx_tap_listening { 429 unregister_listener( 430 self.epoll_fd.unwrap(), 431 self.tap_for_write_epoll.as_raw_fd(), 432 epoll::Events::EPOLLOUT, 433 u64::from(self.tap_tx_event_id), 434 ) 435 .map_err(NetQueuePairError::UnregisterListener)?; 436 self.tx_tap_listening = false; 437 info!("Writing to TAP succeeded. No longer listening for TAP to become writable."); 438 } 439 440 self.counters 441 .tx_bytes 442 .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); 443 self.counters 444 .tx_frames 445 .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); 446 self.tx.counter_bytes = Wrapping(0); 447 self.tx.counter_frames = Wrapping(0); 448 449 queue 450 .needs_notification(mem) 451 .map_err(NetQueuePairError::QueueNeedsNotification) 452 } 453 454 pub fn process_rx<B: Bitmap + 'static>( 455 &mut self, 456 mem: &vm_memory::GuestMemoryMmap<B>, 457 queue: &mut Queue, 458 ) -> Result<bool, NetQueuePairError> { 459 self.rx_desc_avail = !self.rx.process_desc_chain( 460 mem, 461 &self.tap, 462 queue, 463 &mut self.rx_rate_limiter, 464 self.access_platform.as_ref(), 465 )?; 466 let rate_limit_reached = self 467 .rx_rate_limiter 468 .as_ref() 469 .map_or(false, |r| r.is_blocked()); 470 471 // Stop listening on the `RX_TAP_EVENT` when: 472 // 1) there is no available describes, or 473 // 2) the RX rate limit is reached. 474 if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) { 475 unregister_listener( 476 self.epoll_fd.unwrap(), 477 self.tap.as_raw_fd(), 478 epoll::Events::EPOLLIN, 479 u64::from(self.tap_rx_event_id), 480 ) 481 .map_err(NetQueuePairError::UnregisterListener)?; 482 self.rx_tap_listening = false; 483 } 484 485 self.counters 486 .rx_bytes 487 .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); 488 self.counters 489 .rx_frames 490 .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); 491 self.rx.counter_bytes = Wrapping(0); 492 self.rx.counter_frames = Wrapping(0); 493 494 queue 495 .needs_notification(mem) 496 .map_err(NetQueuePairError::QueueNeedsNotification) 497 } 498 } 499