1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 #![deny(missing_docs)] 5 //! # Rate Limiter 6 //! 7 //! Provides a rate limiter written in Rust useful for IO operations that need to 8 //! be throttled. 9 //! 10 //! ## Behavior 11 //! 12 //! The rate limiter starts off as 'unblocked' with two token buckets configured 13 //! with the values passed in the `RateLimiter::new()` constructor. 14 //! All subsequent accounting is done independently for each token bucket based 15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter 16 //! goes in the 'blocked' state. At this point an internal timer is set up which 17 //! will later 'wake up' the user in order to retry sending data. The 'wake up' 18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD` 19 //! trait implementation. 20 //! 21 //! The contract is that the user shall also call the `event_handler()` method on 22 //! receipt of such an event. 23 //! 24 //! The token buckets are replenished every time a `consume()` is called, before 25 //! actually trying to consume the requested amount of tokens. The amount of tokens 26 //! replenished is automatically calculated to respect the `complete_refill_time` 27 //! configuration parameter provided by the user. The token buckets will never 28 //! replenish above their respective `size`. 29 //! 30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity 31 //! on top of their `size`. This initial extra credit does not replenish and 32 //! can be used for an initial burst of data. 33 //! 34 //! The granularity for 'wake up' events when the rate limiter is blocked is 35 //! currently hardcoded to `100 milliseconds`. 36 //! 37 //! ## Limitations 38 //! 39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its 40 //! usage is limited to Linux systems. 41 //! 42 //! Another particularity of this implementation is that it is not self-driving. 43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd` 44 //! trait and provides an *event-handler* as part of its API. This *event-handler* 45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD. 46 #[macro_use] 47 extern crate log; 48 49 use std::io; 50 use std::os::unix::io::{AsRawFd, RawFd}; 51 use std::sync::atomic::{AtomicBool, Ordering}; 52 use std::sync::Mutex; 53 use std::time::{Duration, Instant}; 54 use vmm_sys_util::timerfd::TimerFd; 55 56 #[derive(Debug)] 57 /// Describes the errors that may occur while handling rate limiter events. 58 pub enum Error { 59 /// The event handler was called spuriously. 60 SpuriousRateLimiterEvent(&'static str), 61 /// The event handler encounters while TimerFd::wait() 62 TimerFdWaitError(std::io::Error), 63 } 64 65 // Interval at which the refill timer will run when limiter is at capacity. 66 const REFILL_TIMER_INTERVAL_MS: u64 = 100; 67 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS); 68 69 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000; 70 71 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor. 72 fn gcd(x: u64, y: u64) -> u64 { 73 let mut x = x; 74 let mut y = y; 75 while y != 0 { 76 let t = y; 77 y = x % y; 78 x = t; 79 } 80 x 81 } 82 83 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`. 84 #[derive(Clone, Debug, PartialEq)] 85 pub enum BucketReduction { 86 /// There are not enough tokens to complete the operation. 87 Failure, 88 /// A part of the available tokens have been consumed. 89 Success, 90 /// A number of tokens `inner` times larger than the bucket size have been consumed. 91 OverConsumption(f64), 92 } 93 94 /// TokenBucket provides a lower level interface to rate limiting with a 95 /// configurable capacity, refill-rate and initial burst. 96 #[derive(Clone, Debug, PartialEq, Eq)] 97 pub struct TokenBucket { 98 // Bucket defining traits. 99 size: u64, 100 // Initial burst size (number of free initial tokens, that can be consumed at no cost) 101 one_time_burst: u64, 102 // Complete refill time in milliseconds. 103 refill_time: u64, 104 105 // Internal state descriptors. 106 budget: u64, 107 last_update: Instant, 108 109 // Fields used for pre-processing optimizations. 110 processed_capacity: u64, 111 processed_refill_time: u64, 112 } 113 114 impl TokenBucket { 115 /// Creates a `TokenBucket` wrapped in an `Option`. 116 /// 117 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms` 118 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial 119 /// extra credit on top of total capacity, that does not replenish and which can be used 120 /// for an initial burst of data. 121 /// 122 /// If the `size` or the `complete refill time` are zero, then `None` is returned. 123 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> { 124 // If either token bucket capacity or refill time is 0, disable limiting. 125 if size == 0 || complete_refill_time_ms == 0 { 126 return None; 127 } 128 // Formula for computing current refill amount: 129 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000) 130 // In order to avoid overflows, simplify the fractions by computing greatest common divisor. 131 132 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC; 133 // Get the greatest common factor between `size` and `complete_refill_time_ns`. 134 let common_factor = gcd(size, complete_refill_time_ns); 135 // The division will be exact since `common_factor` is a factor of `size`. 136 let processed_capacity: u64 = size / common_factor; 137 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`. 138 let processed_refill_time: u64 = complete_refill_time_ns / common_factor; 139 140 Some(TokenBucket { 141 size, 142 one_time_burst, 143 refill_time: complete_refill_time_ms, 144 // Start off full. 145 budget: size, 146 // Last updated is now. 147 last_update: Instant::now(), 148 processed_capacity, 149 processed_refill_time, 150 }) 151 } 152 153 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded. 154 // TODO (Issue #259): handle cases where a single request is larger than the full capacity 155 // for such cases we need to support partial fulfilment of requests 156 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction { 157 // First things first: consume the one-time-burst budget. 158 if self.one_time_burst > 0 { 159 // We still have burst budget for *all* tokens requests. 160 if self.one_time_burst >= tokens { 161 self.one_time_burst -= tokens; 162 self.last_update = Instant::now(); 163 // No need to continue to the refill process, we still have burst budget to consume from. 164 return BucketReduction::Success; 165 } else { 166 // We still have burst budget for *some* of the tokens requests. 167 // The tokens left unfulfilled will be consumed from current `self.budget`. 168 tokens -= self.one_time_burst; 169 self.one_time_burst = 0; 170 } 171 } 172 173 // Compute time passed since last refill/update. 174 let time_delta = self.last_update.elapsed().as_nanos() as u64; 175 self.last_update = Instant::now(); 176 177 // At each 'time_delta' nanoseconds the bucket should refill with: 178 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000) 179 // `processed_capacity` and `processed_refill_time` are the result of simplifying above 180 // fraction formula with their greatest-common-factor. 181 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time; 182 183 if self.budget >= self.size { 184 self.budget = self.size; 185 } 186 187 if tokens > self.budget { 188 // This operation requests a bandwidth higher than the bucket size 189 if tokens > self.size { 190 error!( 191 "Consumed {} tokens from bucket of size {}", 192 tokens, self.size 193 ); 194 // Empty the bucket and report an overconsumption of 195 // (remaining tokens / size) times larger than the bucket size 196 tokens -= self.budget; 197 self.budget = 0; 198 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64); 199 } 200 // If not enough tokens consume() fails, return false. 201 return BucketReduction::Failure; 202 } 203 204 self.budget -= tokens; 205 BucketReduction::Success 206 } 207 208 /// "Manually" adds tokens to bucket. 209 pub fn replenish(&mut self, tokens: u64) { 210 // This means we are still during the burst interval. 211 // Of course there is a very small chance that the last reduce() also used up burst 212 // budget which should now be replenished, but for performance and code-complexity 213 // reasons we're just gonna let that slide since it's practically inconsequential. 214 if self.one_time_burst > 0 { 215 self.one_time_burst += tokens; 216 return; 217 } 218 self.budget = std::cmp::min(self.budget + tokens, self.size); 219 } 220 221 /// Returns the capacity of the token bucket. 222 pub fn capacity(&self) -> u64 { 223 self.size 224 } 225 226 /// Returns the remaining one time burst budget. 227 pub fn one_time_burst(&self) -> u64 { 228 self.one_time_burst 229 } 230 231 /// Returns the time in milliseconds required to to completely fill the bucket. 232 pub fn refill_time_ms(&self) -> u64 { 233 self.refill_time 234 } 235 236 /// Returns the current budget (one time burst allowance notwithstanding). 237 pub fn budget(&self) -> u64 { 238 self.budget 239 } 240 } 241 242 /// Enum that describes the type of token used. 243 pub enum TokenType { 244 /// Token type used for bandwidth limiting. 245 Bytes, 246 /// Token type used for operations/second limiting. 247 Ops, 248 } 249 250 /// Enum that describes the type of token bucket update. 251 pub enum BucketUpdate { 252 /// No Update - same as before. 253 None, 254 /// Rate Limiting is disabled on this bucket. 255 Disabled, 256 /// Rate Limiting enabled with updated bucket. 257 Update(TokenBucket), 258 } 259 260 /// Rate Limiter that works on both bandwidth and ops/s limiting. 261 /// 262 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually. 263 /// 264 /// Implementation uses a single timer through TimerFd to refresh either or 265 /// both token buckets. 266 /// 267 /// Its internal buckets are 'passively' replenished as they're being used (as 268 /// part of `consume()` operations). 269 /// A timer is enabled and used to 'actively' replenish the token buckets when 270 /// limiting is in effect and `consume()` operations are disabled. 271 /// 272 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait 273 /// implementation. These events are meant to be consumed by the user of this struct. 274 /// On each such event, the user must call the `event_handler()` method. 275 pub struct RateLimiter { 276 inner: Mutex<RateLimiterInner>, 277 278 // Internal flag that quickly determines timer state. 279 timer_active: AtomicBool, 280 } 281 282 struct RateLimiterInner { 283 bandwidth: Option<TokenBucket>, 284 ops: Option<TokenBucket>, 285 286 timer_fd: TimerFd, 287 } 288 289 impl RateLimiterInner { 290 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once). 291 fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) { 292 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state()) 293 self.timer_fd 294 .reset(dur, None) 295 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure)."); 296 flag.store(true, Ordering::Relaxed) 297 } 298 } 299 300 impl RateLimiter { 301 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s. 302 /// 303 /// # Arguments 304 /// 305 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket. 306 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`, 307 /// that does not replenish and which can be used for an initial burst of data. 308 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes` 309 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes. 310 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket. 311 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`, 312 /// that does not replenish and which can be used for an initial burst of data. 313 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token 314 /// bucket to go from zero Ops to `ops_total_capacity` Ops. 315 /// 316 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter 317 /// is **disabled** for that respective token type. 318 /// 319 /// # Errors 320 /// 321 /// If the timerfd creation fails, an error is returned. 322 pub fn new( 323 bytes_total_capacity: u64, 324 bytes_one_time_burst: u64, 325 bytes_complete_refill_time_ms: u64, 326 ops_total_capacity: u64, 327 ops_one_time_burst: u64, 328 ops_complete_refill_time_ms: u64, 329 ) -> io::Result<Self> { 330 let bytes_token_bucket = TokenBucket::new( 331 bytes_total_capacity, 332 bytes_one_time_burst, 333 bytes_complete_refill_time_ms, 334 ); 335 336 let ops_token_bucket = TokenBucket::new( 337 ops_total_capacity, 338 ops_one_time_burst, 339 ops_complete_refill_time_ms, 340 ); 341 342 // We'll need a timer_fd, even if our current config effectively disables rate limiting, 343 // because `Self::update_buckets()` might re-enable it later, and we might be 344 // seccomp-blocked from creating the timer_fd at that time. 345 let timer_fd = TimerFd::new()?; 346 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag 347 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`. 348 // SAFETY: FFI calls. 349 let ret = unsafe { 350 let fd = timer_fd.as_raw_fd(); 351 let mut flags = libc::fcntl(fd, libc::F_GETFL); 352 flags |= libc::O_NONBLOCK; 353 libc::fcntl(fd, libc::F_SETFL, flags) 354 }; 355 if ret < 0 { 356 return Err(std::io::Error::last_os_error()); 357 } 358 359 Ok(RateLimiter { 360 inner: Mutex::new(RateLimiterInner { 361 bandwidth: bytes_token_bucket, 362 ops: ops_token_bucket, 363 timer_fd, 364 }), 365 timer_active: AtomicBool::new(false), 366 }) 367 } 368 369 /// Attempts to consume tokens and returns whether that is possible. 370 /// 371 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 372 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 373 // If the timer is active, we can't consume tokens from any bucket and the function fails. 374 if self.is_blocked() { 375 return false; 376 } 377 let mut guard = self.inner.lock().unwrap(); 378 // Identify the required token bucket. 379 let token_bucket = match token_type { 380 TokenType::Bytes => guard.bandwidth.as_mut(), 381 TokenType::Ops => guard.ops.as_mut(), 382 }; 383 // Try to consume from the token bucket. 384 if let Some(bucket) = token_bucket { 385 let refill_time = bucket.refill_time_ms(); 386 match bucket.reduce(tokens) { 387 // When we report budget is over, there will be no further calls here, 388 // register a timer to replenish the bucket and resume processing; 389 // make sure there is only one running timer for this limiter. 390 BucketReduction::Failure => { 391 if !self.is_blocked() { 392 guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active); 393 } 394 false 395 } 396 // The operation succeeded and further calls can be made. 397 BucketReduction::Success => true, 398 // The operation succeeded as the tokens have been consumed 399 // but the timer still needs to be armed. 400 BucketReduction::OverConsumption(ratio) => { 401 // The operation "borrowed" a number of tokens `ratio` times 402 // greater than the size of the bucket, and since it takes 403 // `refill_time` milliseconds to fill an empty bucket, in 404 // order to enforce the bandwidth limit we need to prevent 405 // further calls to the rate limiter for 406 // `ratio * refill_time` milliseconds. 407 guard.activate_timer( 408 Duration::from_millis((ratio * refill_time as f64) as u64), 409 &self.timer_active, 410 ); 411 true 412 } 413 } 414 } else { 415 // If bucket is not present rate limiting is disabled on token type, 416 // consume() will always succeed. 417 true 418 } 419 } 420 421 /// Adds tokens of `token_type` to their respective bucket. 422 /// 423 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 424 /// `consume()` if needed. 425 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 426 let mut guard = self.inner.lock().unwrap(); 427 // Identify the required token bucket. 428 let token_bucket = match token_type { 429 TokenType::Bytes => guard.bandwidth.as_mut(), 430 TokenType::Ops => guard.ops.as_mut(), 431 }; 432 // Add tokens to the token bucket. 433 if let Some(bucket) = token_bucket { 434 bucket.replenish(tokens); 435 } 436 } 437 438 /// Returns whether this rate limiter is blocked. 439 /// 440 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 441 /// budget for it. 442 /// An event will be generated on the exported FD when the limiter 'unblocks'. 443 pub fn is_blocked(&self) -> bool { 444 self.timer_active.load(Ordering::Relaxed) 445 } 446 447 /// This function needs to be called every time there is an event on the 448 /// FD provided by this object's `AsRawFd` trait implementation. 449 /// 450 /// # Errors 451 /// 452 /// If the rate limiter is disabled or is not blocked, an error is returned. 453 pub fn event_handler(&self) -> Result<(), Error> { 454 let mut guard = self.inner.lock().unwrap(); 455 loop { 456 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following 457 // `timer_fd::wait()` won't block (which is different from its default behavior.) 458 match guard.timer_fd.wait() { 459 Err(e) => { 460 let err: std::io::Error = e.into(); 461 match err.kind() { 462 std::io::ErrorKind::Interrupted => (), 463 std::io::ErrorKind::WouldBlock => { 464 return Err(Error::SpuriousRateLimiterEvent( 465 "Rate limiter event handler called without a present timer", 466 )) 467 } 468 _ => return Err(Error::TimerFdWaitError(err)), 469 } 470 } 471 _ => { 472 self.timer_active.store(false, Ordering::Relaxed); 473 return Ok(()); 474 } 475 } 476 } 477 } 478 479 /// Updates the parameters of the token buckets associated with this RateLimiter. 480 // TODO: Please note that, right now, the buckets become full after being updated. 481 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) { 482 let mut guard = self.inner.lock().unwrap(); 483 match bytes { 484 BucketUpdate::Disabled => guard.bandwidth = None, 485 BucketUpdate::Update(tb) => guard.bandwidth = Some(tb), 486 BucketUpdate::None => (), 487 }; 488 match ops { 489 BucketUpdate::Disabled => guard.ops = None, 490 BucketUpdate::Update(tb) => guard.ops = Some(tb), 491 BucketUpdate::None => (), 492 }; 493 } 494 } 495 496 impl AsRawFd for RateLimiter { 497 /// Provides a FD which needs to be monitored for POLLIN events. 498 /// 499 /// This object's `event_handler()` method must be called on such events. 500 /// 501 /// Will return a negative value if rate limiting is disabled on both 502 /// token types. 503 fn as_raw_fd(&self) -> RawFd { 504 let guard = self.inner.lock().unwrap(); 505 guard.timer_fd.as_raw_fd() 506 } 507 } 508 509 impl Default for RateLimiter { 510 /// Default RateLimiter is a no-op limiter with infinite budget. 511 fn default() -> Self { 512 // Safe to unwrap since this will not attempt to create timer_fd. 513 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter") 514 } 515 } 516 517 #[cfg(test)] 518 pub(crate) mod tests { 519 use super::*; 520 use std::fmt; 521 use std::thread; 522 use std::time::Duration; 523 524 impl TokenBucket { 525 // Resets the token bucket: budget set to max capacity and last-updated set to now. 526 fn reset(&mut self) { 527 self.budget = self.size; 528 self.last_update = Instant::now(); 529 } 530 531 fn get_last_update(&self) -> &Instant { 532 &self.last_update 533 } 534 535 fn get_processed_capacity(&self) -> u64 { 536 self.processed_capacity 537 } 538 539 fn get_processed_refill_time(&self) -> u64 { 540 self.processed_refill_time 541 } 542 543 // After a restore, we cannot be certain that the last_update field has the same value. 544 pub fn partial_eq(&self, other: &TokenBucket) -> bool { 545 (other.capacity() == self.capacity()) 546 && (other.one_time_burst() == self.one_time_burst()) 547 && (other.refill_time_ms() == self.refill_time_ms()) 548 && (other.budget() == self.budget()) 549 } 550 } 551 552 impl RateLimiter { 553 pub fn bandwidth(&self) -> Option<TokenBucket> { 554 let guard = self.inner.lock().unwrap(); 555 guard.bandwidth.clone() 556 } 557 558 pub fn ops(&self) -> Option<TokenBucket> { 559 let guard = self.inner.lock().unwrap(); 560 guard.ops.clone() 561 } 562 } 563 564 impl PartialEq for RateLimiter { 565 fn eq(&self, other: &RateLimiter) -> bool { 566 let self_guard = self.inner.lock().unwrap(); 567 let other_guard = other.inner.lock().unwrap(); 568 self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops 569 } 570 } 571 572 impl fmt::Debug for RateLimiter { 573 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 574 let guard = self.inner.lock().unwrap(); 575 write!( 576 f, 577 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 578 guard.bandwidth, guard.ops 579 ) 580 } 581 } 582 583 #[test] 584 fn test_token_bucket_create() { 585 let before = Instant::now(); 586 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 587 assert_eq!(tb.capacity(), 1000); 588 assert_eq!(tb.budget(), 1000); 589 assert!(*tb.get_last_update() >= before); 590 let after = Instant::now(); 591 assert!(*tb.get_last_update() <= after); 592 assert_eq!(tb.get_processed_capacity(), 1); 593 assert_eq!(tb.get_processed_refill_time(), 1_000_000); 594 595 // Verify invalid bucket configurations result in `None`. 596 assert!(TokenBucket::new(0, 1234, 1000).is_none()); 597 assert!(TokenBucket::new(100, 1234, 0).is_none()); 598 assert!(TokenBucket::new(0, 1234, 0).is_none()); 599 } 600 601 #[test] 602 fn test_token_bucket_preprocess() { 603 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 604 assert_eq!(tb.get_processed_capacity(), 1); 605 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC); 606 607 let thousand = 1000; 608 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap(); 609 assert_eq!(tb.get_processed_capacity(), 3 * 19); 610 assert_eq!( 611 tb.get_processed_refill_time(), 612 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand) 613 ); 614 } 615 616 #[test] 617 fn test_token_bucket_reduce() { 618 // token bucket with capacity 1000 and refill time of 1000 milliseconds 619 // allowing rate of 1 token/ms. 620 let capacity = 1000; 621 let refill_ms = 1000; 622 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap(); 623 624 assert_eq!(tb.reduce(123), BucketReduction::Success); 625 assert_eq!(tb.budget(), capacity - 123); 626 627 thread::sleep(Duration::from_millis(123)); 628 assert_eq!(tb.reduce(1), BucketReduction::Success); 629 assert_eq!(tb.budget(), capacity - 1); 630 assert_eq!(tb.reduce(100), BucketReduction::Success); 631 assert_eq!(tb.reduce(capacity), BucketReduction::Failure); 632 633 // token bucket with capacity 1000 and refill time of 1000 milliseconds 634 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap(); 635 // safely assuming the thread can run these 3 commands in less than 500ms 636 assert_eq!(tb.reduce(1000), BucketReduction::Success); 637 assert_eq!(tb.one_time_burst(), 100); 638 assert_eq!(tb.reduce(500), BucketReduction::Success); 639 assert_eq!(tb.one_time_burst(), 0); 640 assert_eq!(tb.reduce(500), BucketReduction::Success); 641 assert_eq!(tb.reduce(500), BucketReduction::Failure); 642 thread::sleep(Duration::from_millis(500)); 643 assert_eq!(tb.reduce(500), BucketReduction::Success); 644 thread::sleep(Duration::from_millis(1000)); 645 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5)); 646 647 let before = Instant::now(); 648 tb.reset(); 649 assert_eq!(tb.capacity(), 1000); 650 assert_eq!(tb.budget(), 1000); 651 assert!(*tb.get_last_update() >= before); 652 let after = Instant::now(); 653 assert!(*tb.get_last_update() <= after); 654 } 655 656 #[test] 657 fn test_rate_limiter_default() { 658 let l = RateLimiter::default(); 659 660 // limiter should not be blocked 661 assert!(!l.is_blocked()); 662 // limiter should be disabled so consume(whatever) should work 663 assert!(l.consume(u64::max_value(), TokenType::Ops)); 664 assert!(l.consume(u64::max_value(), TokenType::Bytes)); 665 // calling the handler without there having been an event should error 666 assert!(l.event_handler().is_err()); 667 assert_eq!( 668 format!("{:?}", l.event_handler().err().unwrap()), 669 "SpuriousRateLimiterEvent(\ 670 \"Rate limiter event handler called without a present timer\")" 671 ); 672 } 673 674 #[test] 675 fn test_rate_limiter_new() { 676 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 677 let bw = l.bandwidth().unwrap(); 678 assert_eq!(bw.capacity(), 1000); 679 assert_eq!(bw.one_time_burst(), 1001); 680 assert_eq!(bw.refill_time_ms(), 1002); 681 assert_eq!(bw.budget(), 1000); 682 683 let ops = l.ops().unwrap(); 684 assert_eq!(ops.capacity(), 1003); 685 assert_eq!(ops.one_time_burst(), 1004); 686 assert_eq!(ops.refill_time_ms(), 1005); 687 assert_eq!(ops.budget(), 1003); 688 } 689 690 #[test] 691 fn test_rate_limiter_manual_replenish() { 692 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 693 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 694 695 // consume 123 bytes 696 assert!(l.consume(123, TokenType::Bytes)); 697 l.manual_replenish(23, TokenType::Bytes); 698 { 699 let bytes_tb = l.bandwidth().unwrap(); 700 assert_eq!(bytes_tb.budget(), 900); 701 } 702 // consume 123 ops 703 assert!(l.consume(123, TokenType::Ops)); 704 l.manual_replenish(23, TokenType::Ops); 705 { 706 let bytes_tb = l.ops().unwrap(); 707 assert_eq!(bytes_tb.budget(), 900); 708 } 709 } 710 711 #[test] 712 fn test_rate_limiter_bandwidth() { 713 // rate limiter with limit of 1000 bytes/s 714 let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap(); 715 716 // limiter should not be blocked 717 assert!(!l.is_blocked()); 718 // raw FD for this disabled should be valid 719 assert!(l.as_raw_fd() > 0); 720 721 // ops/s limiter should be disabled so consume(whatever) should work 722 assert!(l.consume(u64::max_value(), TokenType::Ops)); 723 724 // do full 1000 bytes 725 assert!(l.consume(1000, TokenType::Bytes)); 726 // try and fail on another 100 727 assert!(!l.consume(100, TokenType::Bytes)); 728 // since consume failed, limiter should be blocked now 729 assert!(l.is_blocked()); 730 // wait half the timer period 731 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 732 // limiter should still be blocked 733 assert!(l.is_blocked()); 734 // wait the other half of the timer period 735 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 736 // the timer_fd should have an event on it by now 737 assert!(l.event_handler().is_ok()); 738 // limiter should now be unblocked 739 assert!(!l.is_blocked()); 740 // try and succeed on another 100 bytes this time 741 assert!(l.consume(100, TokenType::Bytes)); 742 } 743 744 #[test] 745 fn test_rate_limiter_ops() { 746 // rate limiter with limit of 1000 ops/s 747 let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap(); 748 749 // limiter should not be blocked 750 assert!(!l.is_blocked()); 751 // raw FD for this disabled should be valid 752 assert!(l.as_raw_fd() > 0); 753 754 // bytes/s limiter should be disabled so consume(whatever) should work 755 assert!(l.consume(u64::max_value(), TokenType::Bytes)); 756 757 // do full 1000 ops 758 assert!(l.consume(1000, TokenType::Ops)); 759 // try and fail on another 100 760 assert!(!l.consume(100, TokenType::Ops)); 761 // since consume failed, limiter should be blocked now 762 assert!(l.is_blocked()); 763 // wait half the timer period 764 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 765 // limiter should still be blocked 766 assert!(l.is_blocked()); 767 // wait the other half of the timer period 768 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 769 // the timer_fd should have an event on it by now 770 assert!(l.event_handler().is_ok()); 771 // limiter should now be unblocked 772 assert!(!l.is_blocked()); 773 // try and succeed on another 100 ops this time 774 assert!(l.consume(100, TokenType::Ops)); 775 } 776 777 #[test] 778 fn test_rate_limiter_full() { 779 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 780 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 781 782 // limiter should not be blocked 783 assert!(!l.is_blocked()); 784 // raw FD for this disabled should be valid 785 assert!(l.as_raw_fd() > 0); 786 787 // do full 1000 bytes 788 assert!(l.consume(1000, TokenType::Ops)); 789 // do full 1000 bytes 790 assert!(l.consume(1000, TokenType::Bytes)); 791 // try and fail on another 100 ops 792 assert!(!l.consume(100, TokenType::Ops)); 793 // try and fail on another 100 bytes 794 assert!(!l.consume(100, TokenType::Bytes)); 795 // since consume failed, limiter should be blocked now 796 assert!(l.is_blocked()); 797 // wait half the timer period 798 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 799 // limiter should still be blocked 800 assert!(l.is_blocked()); 801 // wait the other half of the timer period 802 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 803 // the timer_fd should have an event on it by now 804 assert!(l.event_handler().is_ok()); 805 // limiter should now be unblocked 806 assert!(!l.is_blocked()); 807 // try and succeed on another 100 ops this time 808 assert!(l.consume(100, TokenType::Ops)); 809 // try and succeed on another 100 bytes this time 810 assert!(l.consume(100, TokenType::Bytes)); 811 } 812 813 #[test] 814 fn test_rate_limiter_overconsumption() { 815 // initialize the rate limiter 816 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 817 // try to consume 2.5x the bucket size 818 // we are "borrowing" 1.5x the bucket size in tokens since 819 // the bucket is full 820 assert!(l.consume(2500, TokenType::Bytes)); 821 822 // check that even after a whole second passes, the rate limiter 823 // is still blocked 824 thread::sleep(Duration::from_millis(1000)); 825 assert!(l.event_handler().is_err()); 826 assert!(l.is_blocked()); 827 828 // after 1.5x the replenish time has passed, the rate limiter 829 // is available again 830 thread::sleep(Duration::from_millis(500)); 831 assert!(l.event_handler().is_ok()); 832 assert!(!l.is_blocked()); 833 834 // reset the rate limiter 835 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 836 // try to consume 1.5x the bucket size 837 // we are "borrowing" 1.5x the bucket size in tokens since 838 // the bucket is full, should arm the timer to 0.5x replenish 839 // time, which is 500 ms 840 assert!(l.consume(1500, TokenType::Bytes)); 841 842 // check that after more than the minimum refill time, 843 // the rate limiter is still blocked 844 thread::sleep(Duration::from_millis(200)); 845 assert!(l.event_handler().is_err()); 846 assert!(l.is_blocked()); 847 848 // try to consume some tokens, which should fail as the timer 849 // is still active 850 assert!(!l.consume(100, TokenType::Bytes)); 851 assert!(l.event_handler().is_err()); 852 assert!(l.is_blocked()); 853 854 // check that after the minimum refill time, the timer was not 855 // overwritten and the rate limiter is still blocked from the 856 // borrowing we performed earlier 857 thread::sleep(Duration::from_millis(100)); 858 assert!(l.event_handler().is_err()); 859 assert!(l.is_blocked()); 860 assert!(!l.consume(100, TokenType::Bytes)); 861 862 // after waiting out the full duration, rate limiter should be 863 // availale again 864 thread::sleep(Duration::from_millis(200)); 865 assert!(l.event_handler().is_ok()); 866 assert!(!l.is_blocked()); 867 assert!(l.consume(100, TokenType::Bytes)); 868 } 869 870 #[test] 871 fn test_update_buckets() { 872 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap(); 873 874 let initial_bw = x.bandwidth(); 875 let initial_ops = x.ops(); 876 877 x.update_buckets(BucketUpdate::None, BucketUpdate::None); 878 assert_eq!(x.bandwidth(), initial_bw); 879 assert_eq!(x.ops(), initial_ops); 880 881 let new_bw = TokenBucket::new(123, 0, 57).unwrap(); 882 let new_ops = TokenBucket::new(321, 12346, 89).unwrap(); 883 x.update_buckets( 884 BucketUpdate::Update(new_bw.clone()), 885 BucketUpdate::Update(new_ops.clone()), 886 ); 887 888 { 889 let mut guard = x.inner.lock().unwrap(); 890 // We have manually adjust the last_update field, because it changes when update_buckets() 891 // constructs new buckets (and thus gets a different value for last_update). We do this so 892 // it makes sense to test the following assertions. 893 guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update; 894 guard.ops.as_mut().unwrap().last_update = new_ops.last_update; 895 } 896 897 assert_eq!(x.bandwidth(), Some(new_bw)); 898 assert_eq!(x.ops(), Some(new_ops)); 899 900 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled); 901 assert_eq!(x.bandwidth(), None); 902 assert_eq!(x.ops(), None); 903 } 904 905 #[test] 906 fn test_rate_limiter_debug() { 907 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap(); 908 assert_eq!( 909 format!("{l:?}"), 910 format!( 911 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 912 l.bandwidth(), 913 l.ops() 914 ), 915 ); 916 } 917 } 918