1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 // Copyright 2023 Crusoe Energy Systems LLC 5 // SPDX-License-Identifier: Apache-2.0 6 7 use core::panic::AssertUnwindSafe; 8 use std::fs::File; 9 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 10 use std::sync::{Arc, Mutex}; 11 use std::{io, result, thread}; 12 13 use thiserror::Error; 14 use vmm_sys_util::eventfd::EventFd; 15 16 use crate::{RateLimiter, TokenType}; 17 18 /// Errors associated with rate-limiter group. 19 #[derive(Debug, Error)] 20 pub enum Error { 21 /// Cannot create thread 22 #[error("Error spawning rate-limiter thread {0}")] 23 ThreadSpawn(#[source] io::Error), 24 25 /// Cannot create epoll context. 26 #[error("Error creating epoll context: {0}")] 27 Epoll(#[source] io::Error), 28 29 /// Cannot create EventFd. 30 #[error("Error creating EventFd: {0}")] 31 EventFd(#[source] io::Error), 32 33 /// Cannot create RateLimiter. 34 #[error("Error creating RateLimiter: {0}")] 35 RateLimiter(#[source] io::Error), 36 37 /// Cannot read from EventFd. 38 #[error("Error reading from EventFd: {0}")] 39 EventFdRead(#[source] io::Error), 40 41 /// Cannot write to EventFd. 42 #[error("Error writing to EventFd: {0}")] 43 EventFdWrite(#[source] io::Error), 44 } 45 46 /// Handle to a RateLimiterGroup 47 /// 48 /// The RateLimiterGroupHandle may be used in exactly the same way as 49 /// the RateLimiter type. When the RateLimiter within a RateLimiterGroup 50 /// is unblocked, each RateLimiterGroupHandle will be notified. 51 pub struct RateLimiterGroupHandle { 52 eventfd: Arc<EventFd>, 53 inner: Arc<RateLimiterGroupInner>, 54 } 55 56 impl RateLimiterGroupHandle { 57 fn new(inner: Arc<RateLimiterGroupInner>) -> result::Result<Self, Error> { 58 let eventfd = Arc::new(EventFd::new(0).map_err(Error::EventFd)?); 59 inner.handles.lock().unwrap().push(eventfd.clone()); 60 Ok(Self { eventfd, inner }) 61 } 62 63 /// Attempts to consume tokens and returns whether that is possible. 64 /// 65 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 66 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 67 self.inner.rate_limiter.consume(tokens, token_type) 68 } 69 70 /// Adds tokens of `token_type` to their respective bucket. 71 /// 72 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 73 /// `consume()` if needed. 74 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 75 self.inner.rate_limiter.manual_replenish(tokens, token_type) 76 } 77 78 /// This function needs to be called every time there is an event on the 79 /// FD provided by this object's `AsRawFd` trait implementation. 80 /// 81 /// # Errors 82 /// 83 /// If the rate limiter is disabled or is not blocked, an error is returned. 84 pub fn event_handler(&self) -> Result<(), Error> { 85 self.eventfd.read().map_err(Error::EventFdRead).map(|_| ()) 86 } 87 88 /// Returns whether this rate limiter is blocked. 89 /// 90 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 91 /// budget for it. 92 /// An event will be generated on the exported FD when the limiter 'unblocks'. 93 pub fn is_blocked(&self) -> bool { 94 self.inner.rate_limiter.is_blocked() 95 } 96 } 97 98 impl Clone for RateLimiterGroupHandle { 99 fn clone(&self) -> Self { 100 RateLimiterGroupHandle::new(self.inner.clone()).unwrap() 101 } 102 } 103 104 impl AsRawFd for RateLimiterGroupHandle { 105 fn as_raw_fd(&self) -> RawFd { 106 self.eventfd.as_raw_fd() 107 } 108 } 109 110 impl Drop for RateLimiterGroupHandle { 111 fn drop(&mut self) { 112 let mut handles = self.inner.handles.lock().unwrap(); 113 let index = handles 114 .iter() 115 .position(|handle| handle.as_raw_fd() == self.eventfd.as_raw_fd()) 116 .expect("RateLimiterGroupHandle must be subscribed to RateLimiterGroup"); 117 handles.remove(index); 118 } 119 } 120 121 struct RateLimiterGroupInner { 122 id: String, 123 rate_limiter: RateLimiter, 124 handles: Mutex<Vec<Arc<EventFd>>>, 125 } 126 127 /// A RateLimiterGroup is an extension of RateLimiter that enables rate-limiting 128 /// the aggregate io consumption of multiple consumers. 129 pub struct RateLimiterGroup { 130 inner: Arc<RateLimiterGroupInner>, 131 epoll_file: File, 132 kill_evt: EventFd, 133 epoll_thread: Option<thread::JoinHandle<()>>, 134 } 135 136 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 137 #[repr(u64)] 138 enum EpollDispatch { 139 Kill = 1, 140 Unblocked = 2, 141 Unknown, 142 } 143 144 impl From<u64> for EpollDispatch { 145 fn from(v: u64) -> Self { 146 use EpollDispatch::*; 147 match v { 148 1 => Kill, 149 2 => Unblocked, 150 _ => Unknown, 151 } 152 } 153 } 154 155 impl RateLimiterGroup { 156 /// Create a new RateLimiterGroup. 157 pub fn new( 158 id: &str, 159 bytes_total_capacity: u64, 160 bytes_one_time_burst: u64, 161 bytes_complete_refill_time_ms: u64, 162 ops_total_capacity: u64, 163 ops_one_time_burst: u64, 164 ops_complete_refill_time_ms: u64, 165 ) -> result::Result<Self, Error> { 166 let rate_limiter = RateLimiter::new( 167 bytes_total_capacity, 168 bytes_one_time_burst, 169 bytes_complete_refill_time_ms, 170 ops_total_capacity, 171 ops_one_time_burst, 172 ops_complete_refill_time_ms, 173 ) 174 .map_err(Error::RateLimiter)?; 175 176 let epoll_fd = epoll::create(true).map_err(Error::Epoll)?; 177 let kill_evt = EventFd::new(0).map_err(Error::EventFd)?; 178 179 epoll::ctl( 180 epoll_fd, 181 epoll::ControlOptions::EPOLL_CTL_ADD, 182 kill_evt.as_raw_fd(), 183 epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Kill as u64), 184 ) 185 .map_err(Error::Epoll)?; 186 187 epoll::ctl( 188 epoll_fd, 189 epoll::ControlOptions::EPOLL_CTL_ADD, 190 rate_limiter.as_raw_fd(), 191 epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Unblocked as u64), 192 ) 193 .map_err(Error::Epoll)?; 194 195 // Use 'File' to enforce closing on 'epoll_fd' 196 // SAFETY: epoll_fd is valid 197 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 198 199 Ok(Self { 200 inner: Arc::new(RateLimiterGroupInner { 201 id: id.to_string(), 202 rate_limiter, 203 handles: Mutex::new(Vec::new()), 204 }), 205 epoll_file, 206 kill_evt, 207 epoll_thread: None, 208 }) 209 } 210 211 /// Create a new RateLimiterGroupHandle. 212 pub fn new_handle(&self) -> result::Result<RateLimiterGroupHandle, Error> { 213 RateLimiterGroupHandle::new(self.inner.clone()) 214 } 215 216 /// Start a worker thread to broadcast an event to each RateLimiterGroupHandle 217 /// when the RateLimiter becomes unblocked. 218 pub fn start_thread(&mut self, exit_evt: EventFd) -> result::Result<(), Error> { 219 let inner = self.inner.clone(); 220 let epoll_fd = self.epoll_file.as_raw_fd(); 221 thread::Builder::new() 222 .name(format!("rate-limit-group-{}", inner.id)) 223 .spawn(move || { 224 let res = std::panic::catch_unwind(AssertUnwindSafe(move || { 225 const EPOLL_EVENTS_LEN: usize = 2; 226 227 let mut events = 228 [epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN]; 229 230 loop { 231 let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) { 232 Ok(res) => res, 233 Err(e) => { 234 if e.kind() == io::ErrorKind::Interrupted { 235 continue; 236 } else { 237 return Err(Error::Epoll(e)); 238 } 239 } 240 }; 241 242 for event in events.iter().take(num_events) { 243 let dispatch_event: EpollDispatch = event.data.into(); 244 match dispatch_event { 245 EpollDispatch::Unknown => { 246 let event = event.data; 247 warn!("Unknown rate-limiter loop event: {}", event); 248 } 249 EpollDispatch::Unblocked => { 250 inner.rate_limiter.event_handler().unwrap(); 251 let handles = inner.handles.lock().unwrap(); 252 for handle in handles.iter() { 253 handle.write(1).map_err(Error::EventFdWrite)? 254 } 255 } 256 EpollDispatch::Kill => { 257 info!( 258 "KILL_EVENT received, stopping rate-limit-group epoll loop" 259 ); 260 return Ok(()); 261 } 262 } 263 } 264 } 265 })); 266 267 match res { 268 Ok(res) => { 269 if let Err(e) = res { 270 error!("Error running rate-limit-group worker: {:?}", e); 271 exit_evt.write(1).unwrap(); 272 } 273 } 274 Err(_) => { 275 error!("rate-limit-group worker panicked"); 276 exit_evt.write(1).unwrap(); 277 } 278 }; 279 }) 280 .map(|thread| self.epoll_thread.insert(thread)) 281 .map_err(Error::ThreadSpawn)?; 282 283 Ok(()) 284 } 285 } 286 287 impl Drop for RateLimiterGroup { 288 fn drop(&mut self) { 289 self.kill_evt.write(1).unwrap(); 290 291 if let Some(t) = self.epoll_thread.take() { 292 if let Err(e) = t.join() { 293 error!("Error joining thread: {:?}", e); 294 } 295 } 296 } 297 } 298 299 #[cfg(test)] 300 pub(crate) mod tests { 301 use std::os::fd::AsRawFd; 302 use std::thread; 303 use std::time::Duration; 304 305 use vmm_sys_util::eventfd::EventFd; 306 307 use super::RateLimiterGroupHandle; 308 use crate::group::RateLimiterGroup; 309 use crate::{TokenBucket, TokenType, REFILL_TIMER_INTERVAL_MS}; 310 311 impl RateLimiterGroupHandle { 312 fn bandwidth(&self) -> Option<TokenBucket> { 313 let guard = self.inner.rate_limiter.inner.lock().unwrap(); 314 guard.bandwidth.clone() 315 } 316 317 fn ops(&self) -> Option<TokenBucket> { 318 let guard = self.inner.rate_limiter.inner.lock().unwrap(); 319 guard.ops.clone() 320 } 321 } 322 323 #[test] 324 fn test_rate_limiter_group_new() { 325 let l = RateLimiterGroup::new("test", 1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 326 let h = l.new_handle().unwrap(); 327 let bw = h.bandwidth().unwrap(); 328 assert_eq!(bw.capacity(), 1000); 329 assert_eq!(bw.one_time_burst(), 1001); 330 assert_eq!(bw.refill_time_ms(), 1002); 331 assert_eq!(bw.budget(), 1000); 332 333 let ops = h.ops().unwrap(); 334 assert_eq!(ops.capacity(), 1003); 335 assert_eq!(ops.one_time_burst(), 1004); 336 assert_eq!(ops.refill_time_ms(), 1005); 337 assert_eq!(ops.budget(), 1003); 338 } 339 340 #[test] 341 fn test_rate_limiter_group_manual_replenish() { 342 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 343 let l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 344 let h = l.new_handle().unwrap(); 345 346 // consume 123 bytes 347 assert!(h.consume(123, TokenType::Bytes)); 348 h.manual_replenish(23, TokenType::Bytes); 349 { 350 let bytes_tb = h.bandwidth().unwrap(); 351 assert_eq!(bytes_tb.budget(), 900); 352 } 353 // consume 123 ops 354 assert!(h.consume(123, TokenType::Ops)); 355 h.manual_replenish(23, TokenType::Ops); 356 { 357 let bytes_tb = h.ops().unwrap(); 358 assert_eq!(bytes_tb.budget(), 900); 359 } 360 } 361 362 #[test] 363 fn test_rate_limiter_group_bandwidth() { 364 // rate limiter with limit of 1000 bytes/s 365 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 0, 0, 0).unwrap(); 366 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 367 368 let h = l.new_handle().unwrap(); 369 370 // limiter should not be blocked 371 assert!(!h.is_blocked()); 372 // raw FD for this disabled should be valid 373 assert!(h.as_raw_fd() > 0); 374 375 // ops/s limiter should be disabled so consume(whatever) should work 376 assert!(h.consume(u64::MAX, TokenType::Ops)); 377 378 // do full 1000 bytes 379 assert!(h.consume(1000, TokenType::Bytes)); 380 // try and fail on another 100 381 assert!(!h.consume(100, TokenType::Bytes)); 382 // since consume failed, limiter should be blocked now 383 assert!(h.is_blocked()); 384 // wait half the timer period 385 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 386 // limiter should still be blocked 387 assert!(h.is_blocked()); 388 // wait the other half of the timer period 389 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 390 // the timer_fd should have an event on it by now 391 h.event_handler().unwrap(); 392 // limiter should now be unblocked 393 assert!(!h.is_blocked()); 394 // try and succeed on another 100 bytes this time 395 assert!(h.consume(100, TokenType::Bytes)); 396 } 397 398 #[test] 399 fn test_rate_limiter_group_ops() { 400 // rate limiter with limit of 1000 ops/s 401 let mut l = RateLimiterGroup::new("test", 0, 0, 0, 1000, 0, 1000).unwrap(); 402 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 403 404 let h = l.new_handle().unwrap(); 405 406 // limiter should not be blocked 407 assert!(!h.is_blocked()); 408 // raw FD for this disabled should be valid 409 assert!(h.as_raw_fd() > 0); 410 411 // bytes/s limiter should be disabled so consume(whatever) should work 412 assert!(h.consume(u64::MAX, TokenType::Bytes)); 413 414 // do full 1000 ops 415 assert!(h.consume(1000, TokenType::Ops)); 416 // try and fail on another 100 417 assert!(!h.consume(100, TokenType::Ops)); 418 // since consume failed, limiter should be blocked now 419 assert!(h.is_blocked()); 420 // wait half the timer period 421 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 422 // limiter should still be blocked 423 assert!(h.is_blocked()); 424 // wait the other half of the timer period 425 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 426 // the timer_fd should have an event on it by now 427 h.event_handler().unwrap(); 428 // limiter should now be unblocked 429 assert!(!h.is_blocked()); 430 // try and succeed on another 100 ops this time 431 assert!(h.consume(100, TokenType::Ops)); 432 } 433 434 #[test] 435 fn test_rate_limiter_group_full() { 436 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 437 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 438 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 439 440 let h = l.new_handle().unwrap(); 441 442 // limiter should not be blocked 443 assert!(!h.is_blocked()); 444 // raw FD for this disabled should be valid 445 assert!(h.as_raw_fd() > 0); 446 447 // do full 1000 bytes 448 assert!(h.consume(1000, TokenType::Ops)); 449 // do full 1000 bytes 450 assert!(h.consume(1000, TokenType::Bytes)); 451 // try and fail on another 100 ops 452 assert!(!h.consume(100, TokenType::Ops)); 453 // try and fail on another 100 bytes 454 assert!(!h.consume(100, TokenType::Bytes)); 455 // since consume failed, limiter should be blocked now 456 assert!(h.is_blocked()); 457 // wait half the timer period 458 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 459 // limiter should still be blocked 460 assert!(h.is_blocked()); 461 // wait the other half of the timer period 462 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 463 // the timer_fd should have an event on it by now 464 h.event_handler().unwrap(); 465 // limiter should now be unblocked 466 assert!(!h.is_blocked()); 467 // try and succeed on another 100 ops this time 468 assert!(h.consume(100, TokenType::Ops)); 469 // try and succeed on another 100 bytes this time 470 assert!(h.consume(100, TokenType::Bytes)); 471 } 472 473 #[test] 474 fn test_rate_limiter_group_overconsumption() { 475 // initialize the rate limiter 476 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 477 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 478 479 let h = l.new_handle().unwrap(); 480 481 // try to consume 2.5x the bucket size 482 // we are "borrowing" 1.5x the bucket size in tokens since 483 // the bucket is full 484 assert!(h.consume(2500, TokenType::Bytes)); 485 486 // check that even after a whole second passes, the rate limiter 487 // is still blocked 488 thread::sleep(Duration::from_millis(1000)); 489 assert!(h.is_blocked()); 490 491 // after 1.5x the replenish time has passed, the rate limiter 492 // is available again 493 thread::sleep(Duration::from_millis(500)); 494 h.event_handler().unwrap(); 495 assert!(!h.is_blocked()); 496 497 // reset the rate limiter 498 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 499 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 500 501 let h = l.new_handle().unwrap(); 502 // try to consume 1.5x the bucket size 503 // we are "borrowing" 1.5x the bucket size in tokens since 504 // the bucket is full, should arm the timer to 0.5x replenish 505 // time, which is 500 ms 506 assert!(h.consume(1500, TokenType::Bytes)); 507 508 // check that after more than the minimum refill time, 509 // the rate limiter is still blocked 510 thread::sleep(Duration::from_millis(200)); 511 assert!(h.is_blocked()); 512 513 // try to consume some tokens, which should fail as the timer 514 // is still active 515 assert!(!h.consume(100, TokenType::Bytes)); 516 assert!(h.is_blocked()); 517 518 // check that after the minimum refill time, the timer was not 519 // overwritten and the rate limiter is still blocked from the 520 // borrowing we performed earlier 521 thread::sleep(Duration::from_millis(100)); 522 assert!(h.is_blocked()); 523 assert!(!h.consume(100, TokenType::Bytes)); 524 525 // after waiting out the full duration, rate limiter should be 526 // available again 527 thread::sleep(Duration::from_millis(200)); 528 h.event_handler().unwrap(); 529 assert!(!h.is_blocked()); 530 assert!(h.consume(100, TokenType::Bytes)); 531 } 532 } 533