1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 // 4 // Copyright 2023 Crusoe Energy Systems LLC 5 // SPDX-License-Identifier: Apache-2.0 6 7 use crate::{RateLimiter, TokenType}; 8 use core::panic::AssertUnwindSafe; 9 use std::fs::File; 10 use std::io; 11 use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; 12 use std::result; 13 use std::sync::{Arc, Mutex}; 14 use std::thread; 15 use thiserror::Error; 16 use vmm_sys_util::eventfd::EventFd; 17 18 /// Errors associated with rate-limiter group. 19 #[derive(Debug, Error)] 20 pub enum Error { 21 /// Cannot create thread 22 #[error("Error spawning rate-limiter thread {0}")] 23 ThreadSpawn(#[source] io::Error), 24 25 /// Cannot create epoll context. 26 #[error("Error creating epoll context: {0}")] 27 Epoll(#[source] io::Error), 28 29 /// Cannot create EventFd. 30 #[error("Error creating EventFd: {0}")] 31 EventFd(#[source] io::Error), 32 33 /// Cannot create RateLimiter. 34 #[error("Error creating RateLimiter: {0}")] 35 RateLimiter(#[source] io::Error), 36 37 /// Cannot read from EventFd. 38 #[error("Error reading from EventFd: {0}")] 39 EventFdRead(#[source] io::Error), 40 41 /// Cannot write to EventFd. 42 #[error("Error writing to EventFd: {0}")] 43 EventFdWrite(#[source] io::Error), 44 } 45 46 /// The RateLimiterGroupHandle is a handle to a RateLimiterGroup that may be 47 /// used in exactly the same way as the RateLimiter type. When the RateLimiter 48 /// within a RateLimiterGroup is unblocked, each RateLimiterGroupHandle will 49 /// be notified. 50 pub struct RateLimiterGroupHandle { 51 eventfd: Arc<EventFd>, 52 inner: Arc<RateLimiterGroupInner>, 53 } 54 55 impl RateLimiterGroupHandle { 56 fn new(inner: Arc<RateLimiterGroupInner>) -> result::Result<Self, Error> { 57 let eventfd = Arc::new(EventFd::new(0).map_err(Error::EventFd)?); 58 inner.handles.lock().unwrap().push(eventfd.clone()); 59 Ok(Self { eventfd, inner }) 60 } 61 62 /// Attempts to consume tokens and returns whether that is possible. 63 /// 64 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 65 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 66 self.inner.rate_limiter.consume(tokens, token_type) 67 } 68 69 /// Adds tokens of `token_type` to their respective bucket. 70 /// 71 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 72 /// `consume()` if needed. 73 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 74 self.inner.rate_limiter.manual_replenish(tokens, token_type) 75 } 76 77 /// This function needs to be called every time there is an event on the 78 /// FD provided by this object's `AsRawFd` trait implementation. 79 /// 80 /// # Errors 81 /// 82 /// If the rate limiter is disabled or is not blocked, an error is returned. 83 pub fn event_handler(&self) -> Result<(), Error> { 84 self.eventfd.read().map_err(Error::EventFdRead).map(|_| ()) 85 } 86 87 /// Returns whether this rate limiter is blocked. 88 /// 89 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 90 /// budget for it. 91 /// An event will be generated on the exported FD when the limiter 'unblocks'. 92 pub fn is_blocked(&self) -> bool { 93 self.inner.rate_limiter.is_blocked() 94 } 95 } 96 97 impl Clone for RateLimiterGroupHandle { 98 fn clone(&self) -> Self { 99 RateLimiterGroupHandle::new(self.inner.clone()).unwrap() 100 } 101 } 102 103 impl AsRawFd for RateLimiterGroupHandle { 104 fn as_raw_fd(&self) -> RawFd { 105 self.eventfd.as_raw_fd() 106 } 107 } 108 109 impl Drop for RateLimiterGroupHandle { 110 fn drop(&mut self) { 111 let mut handles = self.inner.handles.lock().unwrap(); 112 let index = handles 113 .iter() 114 .position(|handle| handle.as_raw_fd() == self.eventfd.as_raw_fd()) 115 .expect("RateLimiterGroupHandle must be subscribed to RateLimiterGroup"); 116 handles.remove(index); 117 } 118 } 119 120 struct RateLimiterGroupInner { 121 id: String, 122 rate_limiter: RateLimiter, 123 handles: Mutex<Vec<Arc<EventFd>>>, 124 } 125 126 /// A RateLimiterGroup is an extension of RateLimiter that enables rate-limiting 127 /// the aggregate io consumption of multiple consumers. 128 pub struct RateLimiterGroup { 129 inner: Arc<RateLimiterGroupInner>, 130 epoll_file: File, 131 kill_evt: EventFd, 132 epoll_thread: Option<thread::JoinHandle<()>>, 133 } 134 135 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 136 #[repr(u64)] 137 enum EpollDispatch { 138 Kill = 1, 139 Unblocked = 2, 140 Unknown, 141 } 142 143 impl From<u64> for EpollDispatch { 144 fn from(v: u64) -> Self { 145 use EpollDispatch::*; 146 match v { 147 1 => Kill, 148 2 => Unblocked, 149 _ => Unknown, 150 } 151 } 152 } 153 154 impl RateLimiterGroup { 155 /// Create a new RateLimiterGroup. 156 pub fn new( 157 id: &str, 158 bytes_total_capacity: u64, 159 bytes_one_time_burst: u64, 160 bytes_complete_refill_time_ms: u64, 161 ops_total_capacity: u64, 162 ops_one_time_burst: u64, 163 ops_complete_refill_time_ms: u64, 164 ) -> result::Result<Self, Error> { 165 let rate_limiter = RateLimiter::new( 166 bytes_total_capacity, 167 bytes_one_time_burst, 168 bytes_complete_refill_time_ms, 169 ops_total_capacity, 170 ops_one_time_burst, 171 ops_complete_refill_time_ms, 172 ) 173 .map_err(Error::RateLimiter)?; 174 175 let epoll_fd = epoll::create(true).map_err(Error::Epoll)?; 176 let kill_evt = EventFd::new(0).map_err(Error::EventFd)?; 177 178 epoll::ctl( 179 epoll_fd, 180 epoll::ControlOptions::EPOLL_CTL_ADD, 181 kill_evt.as_raw_fd(), 182 epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Kill as u64), 183 ) 184 .map_err(Error::Epoll)?; 185 186 epoll::ctl( 187 epoll_fd, 188 epoll::ControlOptions::EPOLL_CTL_ADD, 189 rate_limiter.as_raw_fd(), 190 epoll::Event::new(epoll::Events::EPOLLIN, EpollDispatch::Unblocked as u64), 191 ) 192 .map_err(Error::Epoll)?; 193 194 // Use 'File' to enforce closing on 'epoll_fd' 195 // SAFETY: epoll_fd is valid 196 let epoll_file = unsafe { File::from_raw_fd(epoll_fd) }; 197 198 Ok(Self { 199 inner: Arc::new(RateLimiterGroupInner { 200 id: id.to_string(), 201 rate_limiter, 202 handles: Mutex::new(Vec::new()), 203 }), 204 epoll_file, 205 kill_evt, 206 epoll_thread: None, 207 }) 208 } 209 210 /// Create a new RateLimiterGroupHandle. 211 pub fn new_handle(&self) -> result::Result<RateLimiterGroupHandle, Error> { 212 RateLimiterGroupHandle::new(self.inner.clone()) 213 } 214 215 /// Start a worker thread to broadcast an event to each RateLimiterGroupHandle 216 /// when the RateLimiter becomes unblocked. 217 pub fn start_thread(&mut self, exit_evt: EventFd) -> result::Result<(), Error> { 218 let inner = self.inner.clone(); 219 let epoll_fd = self.epoll_file.as_raw_fd(); 220 thread::Builder::new() 221 .name(format!("rate-limit-group-{}", inner.id)) 222 .spawn(move || { 223 let res = std::panic::catch_unwind(AssertUnwindSafe(move || { 224 const EPOLL_EVENTS_LEN: usize = 2; 225 226 let mut events = 227 [epoll::Event::new(epoll::Events::empty(), 0); EPOLL_EVENTS_LEN]; 228 229 loop { 230 let num_events = match epoll::wait(epoll_fd, -1, &mut events[..]) { 231 Ok(res) => res, 232 Err(e) => { 233 if e.kind() == io::ErrorKind::Interrupted { 234 continue; 235 } else { 236 return Err(Error::Epoll(e)); 237 } 238 } 239 }; 240 241 for event in events.iter().take(num_events) { 242 let dispatch_event: EpollDispatch = event.data.into(); 243 match dispatch_event { 244 EpollDispatch::Unknown => { 245 let event = event.data; 246 warn!("Unknown rate-limiter loop event: {}", event); 247 } 248 EpollDispatch::Unblocked => { 249 inner.rate_limiter.event_handler().unwrap(); 250 let handles = inner.handles.lock().unwrap(); 251 for handle in handles.iter() { 252 handle.write(1).map_err(Error::EventFdWrite)? 253 } 254 } 255 EpollDispatch::Kill => { 256 info!( 257 "KILL_EVENT received, stopping rate-limit-group epoll loop" 258 ); 259 return Ok(()); 260 } 261 } 262 } 263 } 264 })); 265 266 match res { 267 Ok(res) => { 268 if let Err(e) = res { 269 error!("Error running rate-limit-group worker: {:?}", e); 270 exit_evt.write(1).unwrap(); 271 } 272 } 273 Err(_) => { 274 error!("rate-limit-group worker panicked"); 275 exit_evt.write(1).unwrap(); 276 } 277 }; 278 }) 279 .map(|thread| self.epoll_thread.insert(thread)) 280 .map_err(Error::ThreadSpawn)?; 281 282 Ok(()) 283 } 284 } 285 286 impl Drop for RateLimiterGroup { 287 fn drop(&mut self) { 288 self.kill_evt.write(1).unwrap(); 289 290 if let Some(t) = self.epoll_thread.take() { 291 if let Err(e) = t.join() { 292 error!("Error joining thread: {:?}", e); 293 } 294 } 295 } 296 } 297 298 #[cfg(test)] 299 pub(crate) mod tests { 300 use super::RateLimiterGroupHandle; 301 use crate::{group::RateLimiterGroup, TokenBucket, TokenType, REFILL_TIMER_INTERVAL_MS}; 302 use std::{os::fd::AsRawFd, thread, time::Duration}; 303 use vmm_sys_util::eventfd::EventFd; 304 305 impl RateLimiterGroupHandle { 306 pub fn bandwidth(&self) -> Option<TokenBucket> { 307 let guard = self.inner.rate_limiter.inner.lock().unwrap(); 308 guard.bandwidth.clone() 309 } 310 311 pub fn ops(&self) -> Option<TokenBucket> { 312 let guard = self.inner.rate_limiter.inner.lock().unwrap(); 313 guard.ops.clone() 314 } 315 } 316 317 #[test] 318 fn test_rate_limiter_group_new() { 319 let l = RateLimiterGroup::new("test", 1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 320 let h = l.new_handle().unwrap(); 321 let bw = h.bandwidth().unwrap(); 322 assert_eq!(bw.capacity(), 1000); 323 assert_eq!(bw.one_time_burst(), 1001); 324 assert_eq!(bw.refill_time_ms(), 1002); 325 assert_eq!(bw.budget(), 1000); 326 327 let ops = h.ops().unwrap(); 328 assert_eq!(ops.capacity(), 1003); 329 assert_eq!(ops.one_time_burst(), 1004); 330 assert_eq!(ops.refill_time_ms(), 1005); 331 assert_eq!(ops.budget(), 1003); 332 } 333 334 #[test] 335 fn test_rate_limiter_group_manual_replenish() { 336 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 337 let l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 338 let h = l.new_handle().unwrap(); 339 340 // consume 123 bytes 341 assert!(h.consume(123, TokenType::Bytes)); 342 h.manual_replenish(23, TokenType::Bytes); 343 { 344 let bytes_tb = h.bandwidth().unwrap(); 345 assert_eq!(bytes_tb.budget(), 900); 346 } 347 // consume 123 ops 348 assert!(h.consume(123, TokenType::Ops)); 349 h.manual_replenish(23, TokenType::Ops); 350 { 351 let bytes_tb = h.ops().unwrap(); 352 assert_eq!(bytes_tb.budget(), 900); 353 } 354 } 355 356 #[test] 357 fn test_rate_limiter_group_bandwidth() { 358 // rate limiter with limit of 1000 bytes/s 359 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 0, 0, 0).unwrap(); 360 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 361 362 let h = l.new_handle().unwrap(); 363 364 // limiter should not be blocked 365 assert!(!h.is_blocked()); 366 // raw FD for this disabled should be valid 367 assert!(h.as_raw_fd() > 0); 368 369 // ops/s limiter should be disabled so consume(whatever) should work 370 assert!(h.consume(u64::max_value(), TokenType::Ops)); 371 372 // do full 1000 bytes 373 assert!(h.consume(1000, TokenType::Bytes)); 374 // try and fail on another 100 375 assert!(!h.consume(100, TokenType::Bytes)); 376 // since consume failed, limiter should be blocked now 377 assert!(h.is_blocked()); 378 // wait half the timer period 379 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 380 // limiter should still be blocked 381 assert!(h.is_blocked()); 382 // wait the other half of the timer period 383 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 384 // the timer_fd should have an event on it by now 385 assert!(h.event_handler().is_ok()); 386 // limiter should now be unblocked 387 assert!(!h.is_blocked()); 388 // try and succeed on another 100 bytes this time 389 assert!(h.consume(100, TokenType::Bytes)); 390 } 391 392 #[test] 393 fn test_rate_limiter_group_ops() { 394 // rate limiter with limit of 1000 ops/s 395 let mut l = RateLimiterGroup::new("test", 0, 0, 0, 1000, 0, 1000).unwrap(); 396 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 397 398 let h = l.new_handle().unwrap(); 399 400 // limiter should not be blocked 401 assert!(!h.is_blocked()); 402 // raw FD for this disabled should be valid 403 assert!(h.as_raw_fd() > 0); 404 405 // bytes/s limiter should be disabled so consume(whatever) should work 406 assert!(h.consume(u64::max_value(), TokenType::Bytes)); 407 408 // do full 1000 ops 409 assert!(h.consume(1000, TokenType::Ops)); 410 // try and fail on another 100 411 assert!(!h.consume(100, TokenType::Ops)); 412 // since consume failed, limiter should be blocked now 413 assert!(h.is_blocked()); 414 // wait half the timer period 415 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 416 // limiter should still be blocked 417 assert!(h.is_blocked()); 418 // wait the other half of the timer period 419 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 420 // the timer_fd should have an event on it by now 421 assert!(h.event_handler().is_ok()); 422 // limiter should now be unblocked 423 assert!(!h.is_blocked()); 424 // try and succeed on another 100 ops this time 425 assert!(h.consume(100, TokenType::Ops)); 426 } 427 428 #[test] 429 fn test_rate_limiter_group_full() { 430 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 431 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 432 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 433 434 let h = l.new_handle().unwrap(); 435 436 // limiter should not be blocked 437 assert!(!h.is_blocked()); 438 // raw FD for this disabled should be valid 439 assert!(h.as_raw_fd() > 0); 440 441 // do full 1000 bytes 442 assert!(h.consume(1000, TokenType::Ops)); 443 // do full 1000 bytes 444 assert!(h.consume(1000, TokenType::Bytes)); 445 // try and fail on another 100 ops 446 assert!(!h.consume(100, TokenType::Ops)); 447 // try and fail on another 100 bytes 448 assert!(!h.consume(100, TokenType::Bytes)); 449 // since consume failed, limiter should be blocked now 450 assert!(h.is_blocked()); 451 // wait half the timer period 452 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 453 // limiter should still be blocked 454 assert!(h.is_blocked()); 455 // wait the other half of the timer period 456 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 457 // the timer_fd should have an event on it by now 458 assert!(h.event_handler().is_ok()); 459 // limiter should now be unblocked 460 assert!(!h.is_blocked()); 461 // try and succeed on another 100 ops this time 462 assert!(h.consume(100, TokenType::Ops)); 463 // try and succeed on another 100 bytes this time 464 assert!(h.consume(100, TokenType::Bytes)); 465 } 466 467 #[test] 468 fn test_rate_limiter_group_overconsumption() { 469 // initialize the rate limiter 470 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 471 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 472 473 let h = l.new_handle().unwrap(); 474 475 // try to consume 2.5x the bucket size 476 // we are "borrowing" 1.5x the bucket size in tokens since 477 // the bucket is full 478 assert!(h.consume(2500, TokenType::Bytes)); 479 480 // check that even after a whole second passes, the rate limiter 481 // is still blocked 482 thread::sleep(Duration::from_millis(1000)); 483 assert!(h.is_blocked()); 484 485 // after 1.5x the replenish time has passed, the rate limiter 486 // is available again 487 thread::sleep(Duration::from_millis(500)); 488 assert!(h.event_handler().is_ok()); 489 assert!(!h.is_blocked()); 490 491 // reset the rate limiter 492 let mut l = RateLimiterGroup::new("test", 1000, 0, 1000, 1000, 0, 1000).unwrap(); 493 l.start_thread(EventFd::new(0).unwrap()).unwrap(); 494 495 let h = l.new_handle().unwrap(); 496 // try to consume 1.5x the bucket size 497 // we are "borrowing" 1.5x the bucket size in tokens since 498 // the bucket is full, should arm the timer to 0.5x replenish 499 // time, which is 500 ms 500 assert!(h.consume(1500, TokenType::Bytes)); 501 502 // check that after more than the minimum refill time, 503 // the rate limiter is still blocked 504 thread::sleep(Duration::from_millis(200)); 505 assert!(h.is_blocked()); 506 507 // try to consume some tokens, which should fail as the timer 508 // is still active 509 assert!(!h.consume(100, TokenType::Bytes)); 510 assert!(h.is_blocked()); 511 512 // check that after the minimum refill time, the timer was not 513 // overwritten and the rate limiter is still blocked from the 514 // borrowing we performed earlier 515 thread::sleep(Duration::from_millis(100)); 516 assert!(h.is_blocked()); 517 assert!(!h.consume(100, TokenType::Bytes)); 518 519 // after waiting out the full duration, rate limiter should be 520 // available again 521 thread::sleep(Duration::from_millis(200)); 522 assert!(h.event_handler().is_ok()); 523 assert!(!h.is_blocked()); 524 assert!(h.consume(100, TokenType::Bytes)); 525 } 526 } 527