1 // Copyright (c) 2020 Intel Corporation. All rights reserved. 2 // 3 // SPDX-License-Identifier: Apache-2.0 AND BSD-3-Clause 4 5 use super::{register_listener, unregister_listener, vnet_hdr_len, Tap}; 6 use crate::GuestMemoryMmap; 7 use rate_limiter::{RateLimiter, TokenType}; 8 use std::io; 9 use std::num::Wrapping; 10 use std::os::unix::io::{AsRawFd, RawFd}; 11 use std::sync::atomic::{AtomicU64, Ordering}; 12 use std::sync::Arc; 13 use thiserror::Error; 14 use virtio_queue::{Queue, QueueOwnedT, QueueT}; 15 use vm_memory::{Bytes, GuestMemory}; 16 use vm_virtio::{AccessPlatform, Translatable}; 17 18 #[derive(Clone)] 19 pub struct TxVirtio { 20 pub counter_bytes: Wrapping<u64>, 21 pub counter_frames: Wrapping<u64>, 22 } 23 24 impl Default for TxVirtio { 25 fn default() -> Self { 26 Self::new() 27 } 28 } 29 30 impl TxVirtio { 31 pub fn new() -> Self { 32 TxVirtio { 33 counter_bytes: Wrapping(0), 34 counter_frames: Wrapping(0), 35 } 36 } 37 38 pub fn process_desc_chain( 39 &mut self, 40 mem: &GuestMemoryMmap, 41 tap: &mut Tap, 42 queue: &mut Queue, 43 rate_limiter: &mut Option<RateLimiter>, 44 access_platform: Option<&Arc<dyn AccessPlatform>>, 45 ) -> Result<bool, NetQueuePairError> { 46 let mut retry_write = false; 47 let mut rate_limit_reached = false; 48 49 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 50 if rate_limit_reached { 51 queue.go_to_previous_position(); 52 break; 53 } 54 55 let mut next_desc = desc_chain.next(); 56 57 let mut iovecs = Vec::new(); 58 while let Some(desc) = next_desc { 59 let desc_addr = desc 60 .addr() 61 .translate_gva(access_platform, desc.len() as usize); 62 if !desc.is_write_only() && desc.len() > 0 { 63 let buf = desc_chain 64 .memory() 65 .get_slice(desc_addr, desc.len() as usize) 66 .map_err(NetQueuePairError::GuestMemory)? 67 .as_ptr(); 68 let iovec = libc::iovec { 69 iov_base: buf as *mut libc::c_void, 70 iov_len: desc.len() as libc::size_t, 71 }; 72 iovecs.push(iovec); 73 } else { 74 error!( 75 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 76 desc_addr.0, 77 desc.len(), 78 desc.is_write_only() 79 ); 80 return Err(NetQueuePairError::DescriptorChainInvalid); 81 } 82 next_desc = desc_chain.next(); 83 } 84 85 let len = if !iovecs.is_empty() { 86 let result = unsafe { 87 libc::writev( 88 tap.as_raw_fd() as libc::c_int, 89 iovecs.as_ptr() as *const libc::iovec, 90 iovecs.len() as libc::c_int, 91 ) 92 }; 93 94 if result < 0 { 95 let e = std::io::Error::last_os_error(); 96 97 /* EAGAIN */ 98 if e.kind() == std::io::ErrorKind::WouldBlock { 99 queue.go_to_previous_position(); 100 retry_write = true; 101 break; 102 } 103 error!("net: tx: failed writing to tap: {}", e); 104 return Err(NetQueuePairError::WriteTap(e)); 105 } 106 107 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 108 self.counter_frames += Wrapping(1); 109 110 result as u32 111 } else { 112 0 113 }; 114 115 // For the sake of simplicity (similar to the RX rate limiting), we always 116 // let the 'last' descriptor chain go-through even if it was over the rate 117 // limit, and simply stop processing oncoming `avail_desc` if any. 118 if let Some(rate_limiter) = rate_limiter { 119 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 120 || !rate_limiter.consume(len as u64, TokenType::Bytes); 121 } 122 123 queue 124 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 125 .map_err(NetQueuePairError::QueueAddUsed)?; 126 127 if !queue 128 .enable_notification(mem) 129 .map_err(NetQueuePairError::QueueEnableNotification)? 130 { 131 break; 132 } 133 } 134 135 Ok(retry_write) 136 } 137 } 138 139 #[derive(Clone)] 140 pub struct RxVirtio { 141 pub counter_bytes: Wrapping<u64>, 142 pub counter_frames: Wrapping<u64>, 143 } 144 145 impl Default for RxVirtio { 146 fn default() -> Self { 147 Self::new() 148 } 149 } 150 151 impl RxVirtio { 152 pub fn new() -> Self { 153 RxVirtio { 154 counter_bytes: Wrapping(0), 155 counter_frames: Wrapping(0), 156 } 157 } 158 159 pub fn process_desc_chain( 160 &mut self, 161 mem: &GuestMemoryMmap, 162 tap: &mut Tap, 163 queue: &mut Queue, 164 rate_limiter: &mut Option<RateLimiter>, 165 access_platform: Option<&Arc<dyn AccessPlatform>>, 166 ) -> Result<bool, NetQueuePairError> { 167 let mut exhausted_descs = true; 168 let mut rate_limit_reached = false; 169 170 while let Some(mut desc_chain) = queue.pop_descriptor_chain(mem) { 171 if rate_limit_reached { 172 exhausted_descs = false; 173 queue.go_to_previous_position(); 174 break; 175 } 176 177 let desc = desc_chain 178 .next() 179 .ok_or(NetQueuePairError::DescriptorChainTooShort)?; 180 181 let num_buffers_addr = desc_chain 182 .memory() 183 .checked_offset( 184 desc.addr() 185 .translate_gva(access_platform, desc.len() as usize), 186 10, 187 ) 188 .unwrap(); 189 let mut next_desc = Some(desc); 190 191 let mut iovecs = Vec::new(); 192 while let Some(desc) = next_desc { 193 let desc_addr = desc 194 .addr() 195 .translate_gva(access_platform, desc.len() as usize); 196 if desc.is_write_only() && desc.len() > 0 { 197 let buf = desc_chain 198 .memory() 199 .get_slice(desc_addr, desc.len() as usize) 200 .map_err(NetQueuePairError::GuestMemory)? 201 .as_ptr(); 202 let iovec = libc::iovec { 203 iov_base: buf as *mut libc::c_void, 204 iov_len: desc.len() as libc::size_t, 205 }; 206 iovecs.push(iovec); 207 } else { 208 error!( 209 "Invalid descriptor chain: address = 0x{:x} length = {} write_only = {}", 210 desc_addr.0, 211 desc.len(), 212 desc.is_write_only() 213 ); 214 return Err(NetQueuePairError::DescriptorChainInvalid); 215 } 216 next_desc = desc_chain.next(); 217 } 218 219 let len = if !iovecs.is_empty() { 220 let result = unsafe { 221 libc::readv( 222 tap.as_raw_fd() as libc::c_int, 223 iovecs.as_ptr() as *const libc::iovec, 224 iovecs.len() as libc::c_int, 225 ) 226 }; 227 if result < 0 { 228 let e = std::io::Error::last_os_error(); 229 exhausted_descs = false; 230 queue.go_to_previous_position(); 231 232 /* EAGAIN */ 233 if e.kind() == std::io::ErrorKind::WouldBlock { 234 break; 235 } 236 237 error!("net: rx: failed reading from tap: {}", e); 238 return Err(NetQueuePairError::ReadTap(e)); 239 } 240 241 // Write num_buffers to guest memory. We simply write 1 as we 242 // never spread the frame over more than one descriptor chain. 243 desc_chain 244 .memory() 245 .write_obj(1u16, num_buffers_addr) 246 .map_err(NetQueuePairError::GuestMemory)?; 247 248 self.counter_bytes += Wrapping(result as u64 - vnet_hdr_len() as u64); 249 self.counter_frames += Wrapping(1); 250 251 result as u32 252 } else { 253 0 254 }; 255 256 // For the sake of simplicity (keeping the handling of RX_QUEUE_EVENT and 257 // RX_TAP_EVENT totally asynchronous), we always let the 'last' descriptor 258 // chain go-through even if it was over the rate limit, and simply stop 259 // processing oncoming `avail_desc` if any. 260 if let Some(rate_limiter) = rate_limiter { 261 rate_limit_reached = !rate_limiter.consume(1, TokenType::Ops) 262 || !rate_limiter.consume(len as u64, TokenType::Bytes); 263 } 264 265 queue 266 .add_used(desc_chain.memory(), desc_chain.head_index(), len) 267 .map_err(NetQueuePairError::QueueAddUsed)?; 268 269 if !queue 270 .enable_notification(mem) 271 .map_err(NetQueuePairError::QueueEnableNotification)? 272 { 273 break; 274 } 275 } 276 277 Ok(exhausted_descs) 278 } 279 } 280 281 #[derive(Default, Clone)] 282 pub struct NetCounters { 283 pub tx_bytes: Arc<AtomicU64>, 284 pub tx_frames: Arc<AtomicU64>, 285 pub rx_bytes: Arc<AtomicU64>, 286 pub rx_frames: Arc<AtomicU64>, 287 } 288 289 #[derive(Error, Debug)] 290 pub enum NetQueuePairError { 291 #[error("No memory configured.")] 292 NoMemoryConfigured, 293 #[error("Error registering listener: {0}")] 294 RegisterListener(io::Error), 295 #[error("Error unregistering listener: {0}")] 296 UnregisterListener(io::Error), 297 #[error("Error writing to the TAP device: {0}")] 298 WriteTap(io::Error), 299 #[error("Error reading from the TAP device: {0}")] 300 ReadTap(io::Error), 301 #[error("Error related to guest memory: {0}")] 302 GuestMemory(vm_memory::GuestMemoryError), 303 #[error("Returned an error while iterating through the queue: {0}")] 304 QueueIteratorFailed(virtio_queue::Error), 305 #[error("Descriptor chain is too short.")] 306 DescriptorChainTooShort, 307 #[error("Descriptor chain does not contain valid descriptors.")] 308 DescriptorChainInvalid, 309 #[error("Failed to determine if queue needed notification: {0}")] 310 QueueNeedsNotification(virtio_queue::Error), 311 #[error("Failed to enable notification on the queue: {0}")] 312 QueueEnableNotification(virtio_queue::Error), 313 #[error("Failed to add used index to the queue: {0}")] 314 QueueAddUsed(virtio_queue::Error), 315 } 316 317 pub struct NetQueuePair { 318 pub tap: Tap, 319 // With epoll each FD must be unique. So in order to filter the 320 // events we need to get a second FD responding to the original 321 // device so that we can send EPOLLOUT and EPOLLIN to separate 322 // events. 323 pub tap_for_write_epoll: Tap, 324 pub rx: RxVirtio, 325 pub tx: TxVirtio, 326 pub epoll_fd: Option<RawFd>, 327 pub rx_tap_listening: bool, 328 pub tx_tap_listening: bool, 329 pub counters: NetCounters, 330 pub tap_rx_event_id: u16, 331 pub tap_tx_event_id: u16, 332 pub rx_desc_avail: bool, 333 pub rx_rate_limiter: Option<RateLimiter>, 334 pub tx_rate_limiter: Option<RateLimiter>, 335 pub access_platform: Option<Arc<dyn AccessPlatform>>, 336 } 337 338 impl NetQueuePair { 339 pub fn process_tx( 340 &mut self, 341 mem: &GuestMemoryMmap, 342 queue: &mut Queue, 343 ) -> Result<bool, NetQueuePairError> { 344 let tx_tap_retry = self.tx.process_desc_chain( 345 mem, 346 &mut self.tap, 347 queue, 348 &mut self.tx_rate_limiter, 349 self.access_platform.as_ref(), 350 )?; 351 352 // We got told to try again when writing to the tap. Wait for the TAP to be writable 353 if tx_tap_retry && !self.tx_tap_listening { 354 register_listener( 355 self.epoll_fd.unwrap(), 356 self.tap_for_write_epoll.as_raw_fd(), 357 epoll::Events::EPOLLOUT, 358 u64::from(self.tap_tx_event_id), 359 ) 360 .map_err(NetQueuePairError::RegisterListener)?; 361 self.tx_tap_listening = true; 362 info!("Writing to TAP returned EAGAIN. Listening for TAP to become writable."); 363 } else if !tx_tap_retry && self.tx_tap_listening { 364 unregister_listener( 365 self.epoll_fd.unwrap(), 366 self.tap_for_write_epoll.as_raw_fd(), 367 epoll::Events::EPOLLOUT, 368 u64::from(self.tap_tx_event_id), 369 ) 370 .map_err(NetQueuePairError::UnregisterListener)?; 371 self.tx_tap_listening = false; 372 info!("Writing to TAP succeeded. No longer listening for TAP to become writable."); 373 } 374 375 self.counters 376 .tx_bytes 377 .fetch_add(self.tx.counter_bytes.0, Ordering::AcqRel); 378 self.counters 379 .tx_frames 380 .fetch_add(self.tx.counter_frames.0, Ordering::AcqRel); 381 self.tx.counter_bytes = Wrapping(0); 382 self.tx.counter_frames = Wrapping(0); 383 384 queue 385 .needs_notification(mem) 386 .map_err(NetQueuePairError::QueueNeedsNotification) 387 } 388 389 pub fn process_rx( 390 &mut self, 391 mem: &GuestMemoryMmap, 392 queue: &mut Queue, 393 ) -> Result<bool, NetQueuePairError> { 394 self.rx_desc_avail = !self.rx.process_desc_chain( 395 mem, 396 &mut self.tap, 397 queue, 398 &mut self.rx_rate_limiter, 399 self.access_platform.as_ref(), 400 )?; 401 let rate_limit_reached = self 402 .rx_rate_limiter 403 .as_ref() 404 .map_or(false, |r| r.is_blocked()); 405 406 // Stop listening on the `RX_TAP_EVENT` when: 407 // 1) there is no available describles, or 408 // 2) the RX rate limit is reached. 409 if self.rx_tap_listening && (!self.rx_desc_avail || rate_limit_reached) { 410 unregister_listener( 411 self.epoll_fd.unwrap(), 412 self.tap.as_raw_fd(), 413 epoll::Events::EPOLLIN, 414 u64::from(self.tap_rx_event_id), 415 ) 416 .map_err(NetQueuePairError::UnregisterListener)?; 417 self.rx_tap_listening = false; 418 } 419 420 self.counters 421 .rx_bytes 422 .fetch_add(self.rx.counter_bytes.0, Ordering::AcqRel); 423 self.counters 424 .rx_frames 425 .fetch_add(self.rx.counter_frames.0, Ordering::AcqRel); 426 self.rx.counter_bytes = Wrapping(0); 427 self.rx.counter_frames = Wrapping(0); 428 429 queue 430 .needs_notification(mem) 431 .map_err(NetQueuePairError::QueueNeedsNotification) 432 } 433 } 434