1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 #![deny(missing_docs)] 5 //! # Rate Limiter 6 //! 7 //! Provides a rate limiter written in Rust useful for IO operations that need to 8 //! be throttled. 9 //! 10 //! ## Behavior 11 //! 12 //! The rate limiter starts off as 'unblocked' with two token buckets configured 13 //! with the values passed in the `RateLimiter::new()` constructor. 14 //! All subsequent accounting is done independently for each token bucket based 15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter 16 //! goes in the 'blocked' state. At this point an internal timer is set up which 17 //! will later 'wake up' the user in order to retry sending data. The 'wake up' 18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD` 19 //! trait implementation. 20 //! 21 //! The contract is that the user shall also call the `event_handler()` method on 22 //! receipt of such an event. 23 //! 24 //! The token buckets are replenished every time a `consume()` is called, before 25 //! actually trying to consume the requested amount of tokens. The amount of tokens 26 //! replenished is automatically calculated to respect the `complete_refill_time` 27 //! configuration parameter provided by the user. The token buckets will never 28 //! replenish above their respective `size`. 29 //! 30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity 31 //! on top of their `size`. This initial extra credit does not replenish and 32 //! can be used for an initial burst of data. 33 //! 34 //! The granularity for 'wake up' events when the rate limiter is blocked is 35 //! currently hardcoded to `100 milliseconds`. 36 //! 37 //! ## Limitations 38 //! 39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its 40 //! usage is limited to Linux systems. 41 //! 42 //! Another particularity of this implementation is that it is not self-driving. 43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd` 44 //! trait and provides an *event-handler* as part of its API. This *event-handler* 45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD. 46 #[macro_use] 47 extern crate log; 48 49 use std::os::unix::io::{AsRawFd, RawFd}; 50 use std::time::{Duration, Instant}; 51 use std::{fmt, io}; 52 use vmm_sys_util::timerfd::TimerFd; 53 54 #[derive(Debug)] 55 /// Describes the errors that may occur while handling rate limiter events. 56 pub enum Error { 57 /// The event handler was called spuriously. 58 SpuriousRateLimiterEvent(&'static str), 59 /// The event handler encounters while TimerFd::wait() 60 TimerFdWaitError(std::io::Error), 61 } 62 63 // Interval at which the refill timer will run when limiter is at capacity. 64 const REFILL_TIMER_INTERVAL_MS: u64 = 100; 65 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS); 66 67 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000; 68 69 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor. 70 fn gcd(x: u64, y: u64) -> u64 { 71 let mut x = x; 72 let mut y = y; 73 while y != 0 { 74 let t = y; 75 y = x % y; 76 x = t; 77 } 78 x 79 } 80 81 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`. 82 #[derive(Clone, Debug, PartialEq)] 83 pub enum BucketReduction { 84 /// There are not enough tokens to complete the operation. 85 Failure, 86 /// A part of the available tokens have been consumed. 87 Success, 88 /// A number of tokens `inner` times larger than the bucket size have been consumed. 89 OverConsumption(f64), 90 } 91 92 /// TokenBucket provides a lower level interface to rate limiting with a 93 /// configurable capacity, refill-rate and initial burst. 94 #[derive(Clone, Debug, PartialEq, Eq)] 95 pub struct TokenBucket { 96 // Bucket defining traits. 97 size: u64, 98 // Initial burst size (number of free initial tokens, that can be consumed at no cost) 99 one_time_burst: u64, 100 // Complete refill time in milliseconds. 101 refill_time: u64, 102 103 // Internal state descriptors. 104 budget: u64, 105 last_update: Instant, 106 107 // Fields used for pre-processing optimizations. 108 processed_capacity: u64, 109 processed_refill_time: u64, 110 } 111 112 impl TokenBucket { 113 /// Creates a `TokenBucket` wrapped in an `Option`. 114 /// 115 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms` 116 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial 117 /// extra credit on top of total capacity, that does not replenish and which can be used 118 /// for an initial burst of data. 119 /// 120 /// If the `size` or the `complete refill time` are zero, then `None` is returned. 121 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> { 122 // If either token bucket capacity or refill time is 0, disable limiting. 123 if size == 0 || complete_refill_time_ms == 0 { 124 return None; 125 } 126 // Formula for computing current refill amount: 127 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000) 128 // In order to avoid overflows, simplify the fractions by computing greatest common divisor. 129 130 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC; 131 // Get the greatest common factor between `size` and `complete_refill_time_ns`. 132 let common_factor = gcd(size, complete_refill_time_ns); 133 // The division will be exact since `common_factor` is a factor of `size`. 134 let processed_capacity: u64 = size / common_factor; 135 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`. 136 let processed_refill_time: u64 = complete_refill_time_ns / common_factor; 137 138 Some(TokenBucket { 139 size, 140 one_time_burst, 141 refill_time: complete_refill_time_ms, 142 // Start off full. 143 budget: size, 144 // Last updated is now. 145 last_update: Instant::now(), 146 processed_capacity, 147 processed_refill_time, 148 }) 149 } 150 151 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded. 152 // TODO (Issue #259): handle cases where a single request is larger than the full capacity 153 // for such cases we need to support partial fulfilment of requests 154 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction { 155 // First things first: consume the one-time-burst budget. 156 if self.one_time_burst > 0 { 157 // We still have burst budget for *all* tokens requests. 158 if self.one_time_burst >= tokens { 159 self.one_time_burst -= tokens; 160 self.last_update = Instant::now(); 161 // No need to continue to the refill process, we still have burst budget to consume from. 162 return BucketReduction::Success; 163 } else { 164 // We still have burst budget for *some* of the tokens requests. 165 // The tokens left unfulfilled will be consumed from current `self.budget`. 166 tokens -= self.one_time_burst; 167 self.one_time_burst = 0; 168 } 169 } 170 171 // Compute time passed since last refill/update. 172 let time_delta = self.last_update.elapsed().as_nanos() as u64; 173 self.last_update = Instant::now(); 174 175 // At each 'time_delta' nanoseconds the bucket should refill with: 176 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000) 177 // `processed_capacity` and `processed_refill_time` are the result of simplifying above 178 // fraction formula with their greatest-common-factor. 179 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time; 180 181 if self.budget >= self.size { 182 self.budget = self.size; 183 } 184 185 if tokens > self.budget { 186 // This operation requests a bandwidth higher than the bucket size 187 if tokens > self.size { 188 error!( 189 "Consumed {} tokens from bucket of size {}", 190 tokens, self.size 191 ); 192 // Empty the bucket and report an overconsumption of 193 // (remaining tokens / size) times larger than the bucket size 194 tokens -= self.budget; 195 self.budget = 0; 196 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64); 197 } 198 // If not enough tokens consume() fails, return false. 199 return BucketReduction::Failure; 200 } 201 202 self.budget -= tokens; 203 BucketReduction::Success 204 } 205 206 /// "Manually" adds tokens to bucket. 207 pub fn replenish(&mut self, tokens: u64) { 208 // This means we are still during the burst interval. 209 // Of course there is a very small chance that the last reduce() also used up burst 210 // budget which should now be replenished, but for performance and code-complexity 211 // reasons we're just gonna let that slide since it's practically inconsequential. 212 if self.one_time_burst > 0 { 213 self.one_time_burst += tokens; 214 return; 215 } 216 self.budget = std::cmp::min(self.budget + tokens, self.size); 217 } 218 219 /// Returns the capacity of the token bucket. 220 pub fn capacity(&self) -> u64 { 221 self.size 222 } 223 224 /// Returns the remaining one time burst budget. 225 pub fn one_time_burst(&self) -> u64 { 226 self.one_time_burst 227 } 228 229 /// Returns the time in milliseconds required to to completely fill the bucket. 230 pub fn refill_time_ms(&self) -> u64 { 231 self.refill_time 232 } 233 234 /// Returns the current budget (one time burst allowance notwithstanding). 235 pub fn budget(&self) -> u64 { 236 self.budget 237 } 238 } 239 240 /// Enum that describes the type of token used. 241 pub enum TokenType { 242 /// Token type used for bandwidth limiting. 243 Bytes, 244 /// Token type used for operations/second limiting. 245 Ops, 246 } 247 248 /// Enum that describes the type of token bucket update. 249 pub enum BucketUpdate { 250 /// No Update - same as before. 251 None, 252 /// Rate Limiting is disabled on this bucket. 253 Disabled, 254 /// Rate Limiting enabled with updated bucket. 255 Update(TokenBucket), 256 } 257 258 /// Rate Limiter that works on both bandwidth and ops/s limiting. 259 /// 260 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually. 261 /// 262 /// Implementation uses a single timer through TimerFd to refresh either or 263 /// both token buckets. 264 /// 265 /// Its internal buckets are 'passively' replenished as they're being used (as 266 /// part of `consume()` operations). 267 /// A timer is enabled and used to 'actively' replenish the token buckets when 268 /// limiting is in effect and `consume()` operations are disabled. 269 /// 270 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait 271 /// implementation. These events are meant to be consumed by the user of this struct. 272 /// On each such event, the user must call the `event_handler()` method. 273 pub struct RateLimiter { 274 bandwidth: Option<TokenBucket>, 275 ops: Option<TokenBucket>, 276 277 timer_fd: TimerFd, 278 // Internal flag that quickly determines timer state. 279 timer_active: bool, 280 } 281 282 impl PartialEq for RateLimiter { 283 fn eq(&self, other: &RateLimiter) -> bool { 284 self.bandwidth == other.bandwidth && self.ops == other.ops 285 } 286 } 287 288 impl fmt::Debug for RateLimiter { 289 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 290 write!( 291 f, 292 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 293 self.bandwidth, self.ops 294 ) 295 } 296 } 297 298 impl RateLimiter { 299 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s. 300 /// 301 /// # Arguments 302 /// 303 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket. 304 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`, 305 /// that does not replenish and which can be used for an initial burst of data. 306 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes` 307 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes. 308 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket. 309 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`, 310 /// that does not replenish and which can be used for an initial burst of data. 311 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token 312 /// bucket to go from zero Ops to `ops_total_capacity` Ops. 313 /// 314 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter 315 /// is **disabled** for that respective token type. 316 /// 317 /// # Errors 318 /// 319 /// If the timerfd creation fails, an error is returned. 320 pub fn new( 321 bytes_total_capacity: u64, 322 bytes_one_time_burst: u64, 323 bytes_complete_refill_time_ms: u64, 324 ops_total_capacity: u64, 325 ops_one_time_burst: u64, 326 ops_complete_refill_time_ms: u64, 327 ) -> io::Result<Self> { 328 let bytes_token_bucket = TokenBucket::new( 329 bytes_total_capacity, 330 bytes_one_time_burst, 331 bytes_complete_refill_time_ms, 332 ); 333 334 let ops_token_bucket = TokenBucket::new( 335 ops_total_capacity, 336 ops_one_time_burst, 337 ops_complete_refill_time_ms, 338 ); 339 340 // We'll need a timer_fd, even if our current config effectively disables rate limiting, 341 // because `Self::update_buckets()` might re-enable it later, and we might be 342 // seccomp-blocked from creating the timer_fd at that time. 343 let timer_fd = TimerFd::new()?; 344 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag 345 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`. 346 // SAFETY: FFI calls. 347 let ret = unsafe { 348 let fd = timer_fd.as_raw_fd(); 349 let mut flags = libc::fcntl(fd, libc::F_GETFL); 350 flags |= libc::O_NONBLOCK; 351 libc::fcntl(fd, libc::F_SETFL, flags) 352 }; 353 if ret < 0 { 354 return Err(std::io::Error::last_os_error()); 355 } 356 357 Ok(RateLimiter { 358 bandwidth: bytes_token_bucket, 359 ops: ops_token_bucket, 360 timer_fd, 361 timer_active: false, 362 }) 363 } 364 365 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once). 366 fn activate_timer(&mut self, dur: Duration) { 367 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state()) 368 self.timer_fd 369 .reset(dur, None) 370 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure)."); 371 self.timer_active = true; 372 } 373 374 /// Attempts to consume tokens and returns whether that is possible. 375 /// 376 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 377 pub fn consume(&mut self, tokens: u64, token_type: TokenType) -> bool { 378 // If the timer is active, we can't consume tokens from any bucket and the function fails. 379 if self.timer_active { 380 return false; 381 } 382 383 // Identify the required token bucket. 384 let token_bucket = match token_type { 385 TokenType::Bytes => self.bandwidth.as_mut(), 386 TokenType::Ops => self.ops.as_mut(), 387 }; 388 // Try to consume from the token bucket. 389 if let Some(bucket) = token_bucket { 390 let refill_time = bucket.refill_time_ms(); 391 match bucket.reduce(tokens) { 392 // When we report budget is over, there will be no further calls here, 393 // register a timer to replenish the bucket and resume processing; 394 // make sure there is only one running timer for this limiter. 395 BucketReduction::Failure => { 396 if !self.timer_active { 397 self.activate_timer(TIMER_REFILL_DUR); 398 } 399 false 400 } 401 // The operation succeeded and further calls can be made. 402 BucketReduction::Success => true, 403 // The operation succeeded as the tokens have been consumed 404 // but the timer still needs to be armed. 405 BucketReduction::OverConsumption(ratio) => { 406 // The operation "borrowed" a number of tokens `ratio` times 407 // greater than the size of the bucket, and since it takes 408 // `refill_time` milliseconds to fill an empty bucket, in 409 // order to enforce the bandwidth limit we need to prevent 410 // further calls to the rate limiter for 411 // `ratio * refill_time` milliseconds. 412 self.activate_timer(Duration::from_millis((ratio * refill_time as f64) as u64)); 413 true 414 } 415 } 416 } else { 417 // If bucket is not present rate limiting is disabled on token type, 418 // consume() will always succeed. 419 true 420 } 421 } 422 423 /// Adds tokens of `token_type` to their respective bucket. 424 /// 425 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 426 /// `consume()` if needed. 427 pub fn manual_replenish(&mut self, tokens: u64, token_type: TokenType) { 428 // Identify the required token bucket. 429 let token_bucket = match token_type { 430 TokenType::Bytes => self.bandwidth.as_mut(), 431 TokenType::Ops => self.ops.as_mut(), 432 }; 433 // Add tokens to the token bucket. 434 if let Some(bucket) = token_bucket { 435 bucket.replenish(tokens); 436 } 437 } 438 439 /// Returns whether this rate limiter is blocked. 440 /// 441 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 442 /// budget for it. 443 /// An event will be generated on the exported FD when the limiter 'unblocks'. 444 pub fn is_blocked(&self) -> bool { 445 self.timer_active 446 } 447 448 /// This function needs to be called every time there is an event on the 449 /// FD provided by this object's `AsRawFd` trait implementation. 450 /// 451 /// # Errors 452 /// 453 /// If the rate limiter is disabled or is not blocked, an error is returned. 454 pub fn event_handler(&mut self) -> Result<(), Error> { 455 loop { 456 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following 457 // `timer_fd::wait()` won't block (which is different from its default behavior.) 458 match self.timer_fd.wait() { 459 Err(e) => { 460 let err: std::io::Error = e.into(); 461 match err.kind() { 462 std::io::ErrorKind::Interrupted => (), 463 std::io::ErrorKind::WouldBlock => { 464 return Err(Error::SpuriousRateLimiterEvent( 465 "Rate limiter event handler called without a present timer", 466 )) 467 } 468 _ => return Err(Error::TimerFdWaitError(err)), 469 } 470 } 471 _ => { 472 self.timer_active = false; 473 return Ok(()); 474 } 475 } 476 } 477 } 478 479 /// Updates the parameters of the token buckets associated with this RateLimiter. 480 // TODO: Please note that, right now, the buckets become full after being updated. 481 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) { 482 match bytes { 483 BucketUpdate::Disabled => self.bandwidth = None, 484 BucketUpdate::Update(tb) => self.bandwidth = Some(tb), 485 BucketUpdate::None => (), 486 }; 487 match ops { 488 BucketUpdate::Disabled => self.ops = None, 489 BucketUpdate::Update(tb) => self.ops = Some(tb), 490 BucketUpdate::None => (), 491 }; 492 } 493 494 /// Returns an immutable view of the inner bandwidth token bucket. 495 pub fn bandwidth(&self) -> Option<&TokenBucket> { 496 self.bandwidth.as_ref() 497 } 498 499 /// Returns an immutable view of the inner ops token bucket. 500 pub fn ops(&self) -> Option<&TokenBucket> { 501 self.ops.as_ref() 502 } 503 } 504 505 impl AsRawFd for RateLimiter { 506 /// Provides a FD which needs to be monitored for POLLIN events. 507 /// 508 /// This object's `event_handler()` method must be called on such events. 509 /// 510 /// Will return a negative value if rate limiting is disabled on both 511 /// token types. 512 fn as_raw_fd(&self) -> RawFd { 513 self.timer_fd.as_raw_fd() 514 } 515 } 516 517 impl Default for RateLimiter { 518 /// Default RateLimiter is a no-op limiter with infinite budget. 519 fn default() -> Self { 520 // Safe to unwrap since this will not attempt to create timer_fd. 521 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter") 522 } 523 } 524 525 #[cfg(test)] 526 pub(crate) mod tests { 527 use super::*; 528 use std::thread; 529 use std::time::Duration; 530 531 impl TokenBucket { 532 // Resets the token bucket: budget set to max capacity and last-updated set to now. 533 fn reset(&mut self) { 534 self.budget = self.size; 535 self.last_update = Instant::now(); 536 } 537 538 fn get_last_update(&self) -> &Instant { 539 &self.last_update 540 } 541 542 fn get_processed_capacity(&self) -> u64 { 543 self.processed_capacity 544 } 545 546 fn get_processed_refill_time(&self) -> u64 { 547 self.processed_refill_time 548 } 549 550 // After a restore, we cannot be certain that the last_update field has the same value. 551 pub fn partial_eq(&self, other: &TokenBucket) -> bool { 552 (other.capacity() == self.capacity()) 553 && (other.one_time_burst() == self.one_time_burst()) 554 && (other.refill_time_ms() == self.refill_time_ms()) 555 && (other.budget() == self.budget()) 556 } 557 } 558 559 impl RateLimiter { 560 fn get_token_bucket(&self, token_type: TokenType) -> Option<&TokenBucket> { 561 match token_type { 562 TokenType::Bytes => self.bandwidth.as_ref(), 563 TokenType::Ops => self.ops.as_ref(), 564 } 565 } 566 } 567 568 #[test] 569 fn test_token_bucket_create() { 570 let before = Instant::now(); 571 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 572 assert_eq!(tb.capacity(), 1000); 573 assert_eq!(tb.budget(), 1000); 574 assert!(*tb.get_last_update() >= before); 575 let after = Instant::now(); 576 assert!(*tb.get_last_update() <= after); 577 assert_eq!(tb.get_processed_capacity(), 1); 578 assert_eq!(tb.get_processed_refill_time(), 1_000_000); 579 580 // Verify invalid bucket configurations result in `None`. 581 assert!(TokenBucket::new(0, 1234, 1000).is_none()); 582 assert!(TokenBucket::new(100, 1234, 0).is_none()); 583 assert!(TokenBucket::new(0, 1234, 0).is_none()); 584 } 585 586 #[test] 587 fn test_token_bucket_preprocess() { 588 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 589 assert_eq!(tb.get_processed_capacity(), 1); 590 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC); 591 592 let thousand = 1000; 593 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap(); 594 assert_eq!(tb.get_processed_capacity(), 3 * 19); 595 assert_eq!( 596 tb.get_processed_refill_time(), 597 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand) 598 ); 599 } 600 601 #[test] 602 fn test_token_bucket_reduce() { 603 // token bucket with capacity 1000 and refill time of 1000 milliseconds 604 // allowing rate of 1 token/ms. 605 let capacity = 1000; 606 let refill_ms = 1000; 607 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap(); 608 609 assert_eq!(tb.reduce(123), BucketReduction::Success); 610 assert_eq!(tb.budget(), capacity - 123); 611 612 thread::sleep(Duration::from_millis(123)); 613 assert_eq!(tb.reduce(1), BucketReduction::Success); 614 assert_eq!(tb.budget(), capacity - 1); 615 assert_eq!(tb.reduce(100), BucketReduction::Success); 616 assert_eq!(tb.reduce(capacity), BucketReduction::Failure); 617 618 // token bucket with capacity 1000 and refill time of 1000 milliseconds 619 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap(); 620 // safely assuming the thread can run these 3 commands in less than 500ms 621 assert_eq!(tb.reduce(1000), BucketReduction::Success); 622 assert_eq!(tb.one_time_burst(), 100); 623 assert_eq!(tb.reduce(500), BucketReduction::Success); 624 assert_eq!(tb.one_time_burst(), 0); 625 assert_eq!(tb.reduce(500), BucketReduction::Success); 626 assert_eq!(tb.reduce(500), BucketReduction::Failure); 627 thread::sleep(Duration::from_millis(500)); 628 assert_eq!(tb.reduce(500), BucketReduction::Success); 629 thread::sleep(Duration::from_millis(1000)); 630 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5)); 631 632 let before = Instant::now(); 633 tb.reset(); 634 assert_eq!(tb.capacity(), 1000); 635 assert_eq!(tb.budget(), 1000); 636 assert!(*tb.get_last_update() >= before); 637 let after = Instant::now(); 638 assert!(*tb.get_last_update() <= after); 639 } 640 641 #[test] 642 fn test_rate_limiter_default() { 643 let mut l = RateLimiter::default(); 644 645 // limiter should not be blocked 646 assert!(!l.is_blocked()); 647 // limiter should be disabled so consume(whatever) should work 648 assert!(l.consume(u64::max_value(), TokenType::Ops)); 649 assert!(l.consume(u64::max_value(), TokenType::Bytes)); 650 // calling the handler without there having been an event should error 651 assert!(l.event_handler().is_err()); 652 assert_eq!( 653 format!("{:?}", l.event_handler().err().unwrap()), 654 "SpuriousRateLimiterEvent(\ 655 \"Rate limiter event handler called without a present timer\")" 656 ); 657 } 658 659 #[test] 660 fn test_rate_limiter_new() { 661 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 662 663 let bw = l.bandwidth.unwrap(); 664 assert_eq!(bw.capacity(), 1000); 665 assert_eq!(bw.one_time_burst(), 1001); 666 assert_eq!(bw.refill_time_ms(), 1002); 667 assert_eq!(bw.budget(), 1000); 668 669 let ops = l.ops.unwrap(); 670 assert_eq!(ops.capacity(), 1003); 671 assert_eq!(ops.one_time_burst(), 1004); 672 assert_eq!(ops.refill_time_ms(), 1005); 673 assert_eq!(ops.budget(), 1003); 674 } 675 676 #[test] 677 fn test_rate_limiter_manual_replenish() { 678 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 679 let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 680 681 // consume 123 bytes 682 assert!(l.consume(123, TokenType::Bytes)); 683 l.manual_replenish(23, TokenType::Bytes); 684 { 685 let bytes_tb = l.get_token_bucket(TokenType::Bytes).unwrap(); 686 assert_eq!(bytes_tb.budget(), 900); 687 } 688 // consume 123 ops 689 assert!(l.consume(123, TokenType::Ops)); 690 l.manual_replenish(23, TokenType::Ops); 691 { 692 let bytes_tb = l.get_token_bucket(TokenType::Ops).unwrap(); 693 assert_eq!(bytes_tb.budget(), 900); 694 } 695 } 696 697 #[test] 698 fn test_rate_limiter_bandwidth() { 699 // rate limiter with limit of 1000 bytes/s 700 let mut l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap(); 701 702 // limiter should not be blocked 703 assert!(!l.is_blocked()); 704 // raw FD for this disabled should be valid 705 assert!(l.as_raw_fd() > 0); 706 707 // ops/s limiter should be disabled so consume(whatever) should work 708 assert!(l.consume(u64::max_value(), TokenType::Ops)); 709 710 // do full 1000 bytes 711 assert!(l.consume(1000, TokenType::Bytes)); 712 // try and fail on another 100 713 assert!(!l.consume(100, TokenType::Bytes)); 714 // since consume failed, limiter should be blocked now 715 assert!(l.is_blocked()); 716 // wait half the timer period 717 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 718 // limiter should still be blocked 719 assert!(l.is_blocked()); 720 // wait the other half of the timer period 721 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 722 // the timer_fd should have an event on it by now 723 assert!(l.event_handler().is_ok()); 724 // limiter should now be unblocked 725 assert!(!l.is_blocked()); 726 // try and succeed on another 100 bytes this time 727 assert!(l.consume(100, TokenType::Bytes)); 728 } 729 730 #[test] 731 fn test_rate_limiter_ops() { 732 // rate limiter with limit of 1000 ops/s 733 let mut l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap(); 734 735 // limiter should not be blocked 736 assert!(!l.is_blocked()); 737 // raw FD for this disabled should be valid 738 assert!(l.as_raw_fd() > 0); 739 740 // bytes/s limiter should be disabled so consume(whatever) should work 741 assert!(l.consume(u64::max_value(), TokenType::Bytes)); 742 743 // do full 1000 ops 744 assert!(l.consume(1000, TokenType::Ops)); 745 // try and fail on another 100 746 assert!(!l.consume(100, TokenType::Ops)); 747 // since consume failed, limiter should be blocked now 748 assert!(l.is_blocked()); 749 // wait half the timer period 750 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 751 // limiter should still be blocked 752 assert!(l.is_blocked()); 753 // wait the other half of the timer period 754 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 755 // the timer_fd should have an event on it by now 756 assert!(l.event_handler().is_ok()); 757 // limiter should now be unblocked 758 assert!(!l.is_blocked()); 759 // try and succeed on another 100 ops this time 760 assert!(l.consume(100, TokenType::Ops)); 761 } 762 763 #[test] 764 fn test_rate_limiter_full() { 765 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 766 let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 767 768 // limiter should not be blocked 769 assert!(!l.is_blocked()); 770 // raw FD for this disabled should be valid 771 assert!(l.as_raw_fd() > 0); 772 773 // do full 1000 bytes 774 assert!(l.consume(1000, TokenType::Ops)); 775 // do full 1000 bytes 776 assert!(l.consume(1000, TokenType::Bytes)); 777 // try and fail on another 100 ops 778 assert!(!l.consume(100, TokenType::Ops)); 779 // try and fail on another 100 bytes 780 assert!(!l.consume(100, TokenType::Bytes)); 781 // since consume failed, limiter should be blocked now 782 assert!(l.is_blocked()); 783 // wait half the timer period 784 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 785 // limiter should still be blocked 786 assert!(l.is_blocked()); 787 // wait the other half of the timer period 788 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 789 // the timer_fd should have an event on it by now 790 assert!(l.event_handler().is_ok()); 791 // limiter should now be unblocked 792 assert!(!l.is_blocked()); 793 // try and succeed on another 100 ops this time 794 assert!(l.consume(100, TokenType::Ops)); 795 // try and succeed on another 100 bytes this time 796 assert!(l.consume(100, TokenType::Bytes)); 797 } 798 799 #[test] 800 fn test_rate_limiter_overconsumption() { 801 // initialize the rate limiter 802 let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 803 // try to consume 2.5x the bucket size 804 // we are "borrowing" 1.5x the bucket size in tokens since 805 // the bucket is full 806 assert!(l.consume(2500, TokenType::Bytes)); 807 808 // check that even after a whole second passes, the rate limiter 809 // is still blocked 810 thread::sleep(Duration::from_millis(1000)); 811 assert!(l.event_handler().is_err()); 812 assert!(l.is_blocked()); 813 814 // after 1.5x the replenish time has passed, the rate limiter 815 // is available again 816 thread::sleep(Duration::from_millis(500)); 817 assert!(l.event_handler().is_ok()); 818 assert!(!l.is_blocked()); 819 820 // reset the rate limiter 821 let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 822 // try to consume 1.5x the bucket size 823 // we are "borrowing" 1.5x the bucket size in tokens since 824 // the bucket is full, should arm the timer to 0.5x replenish 825 // time, which is 500 ms 826 assert!(l.consume(1500, TokenType::Bytes)); 827 828 // check that after more than the minimum refill time, 829 // the rate limiter is still blocked 830 thread::sleep(Duration::from_millis(200)); 831 assert!(l.event_handler().is_err()); 832 assert!(l.is_blocked()); 833 834 // try to consume some tokens, which should fail as the timer 835 // is still active 836 assert!(!l.consume(100, TokenType::Bytes)); 837 assert!(l.event_handler().is_err()); 838 assert!(l.is_blocked()); 839 840 // check that after the minimum refill time, the timer was not 841 // overwritten and the rate limiter is still blocked from the 842 // borrowing we performed earlier 843 thread::sleep(Duration::from_millis(100)); 844 assert!(l.event_handler().is_err()); 845 assert!(l.is_blocked()); 846 assert!(!l.consume(100, TokenType::Bytes)); 847 848 // after waiting out the full duration, rate limiter should be 849 // availale again 850 thread::sleep(Duration::from_millis(200)); 851 assert!(l.event_handler().is_ok()); 852 assert!(!l.is_blocked()); 853 assert!(l.consume(100, TokenType::Bytes)); 854 } 855 856 #[test] 857 fn test_update_buckets() { 858 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap(); 859 860 let initial_bw = x.bandwidth.clone(); 861 let initial_ops = x.ops.clone(); 862 863 x.update_buckets(BucketUpdate::None, BucketUpdate::None); 864 assert_eq!(x.bandwidth, initial_bw); 865 assert_eq!(x.ops, initial_ops); 866 867 let new_bw = TokenBucket::new(123, 0, 57).unwrap(); 868 let new_ops = TokenBucket::new(321, 12346, 89).unwrap(); 869 x.update_buckets( 870 BucketUpdate::Update(new_bw.clone()), 871 BucketUpdate::Update(new_ops.clone()), 872 ); 873 874 // We have manually adjust the last_update field, because it changes when update_buckets() 875 // constructs new buckets (and thus gets a different value for last_update). We do this so 876 // it makes sense to test the following assertions. 877 x.bandwidth.as_mut().unwrap().last_update = new_bw.last_update; 878 x.ops.as_mut().unwrap().last_update = new_ops.last_update; 879 880 assert_eq!(x.bandwidth, Some(new_bw)); 881 assert_eq!(x.ops, Some(new_ops)); 882 883 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled); 884 assert_eq!(x.bandwidth, None); 885 assert_eq!(x.ops, None); 886 } 887 888 #[test] 889 fn test_rate_limiter_debug() { 890 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap(); 891 assert_eq!( 892 format!("{l:?}"), 893 format!( 894 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 895 l.bandwidth(), 896 l.ops() 897 ), 898 ); 899 } 900 } 901