1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 #![deny(missing_docs)] 5 //! # Rate Limiter 6 //! 7 //! Provides a rate limiter written in Rust useful for IO operations that need to 8 //! be throttled. 9 //! 10 //! ## Behavior 11 //! 12 //! The rate limiter starts off as 'unblocked' with two token buckets configured 13 //! with the values passed in the `RateLimiter::new()` constructor. 14 //! All subsequent accounting is done independently for each token bucket based 15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter 16 //! goes in the 'blocked' state. At this point an internal timer is set up which 17 //! will later 'wake up' the user in order to retry sending data. The 'wake up' 18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD` 19 //! trait implementation. 20 //! 21 //! The contract is that the user shall also call the `event_handler()` method on 22 //! receipt of such an event. 23 //! 24 //! The token buckets are replenished every time a `consume()` is called, before 25 //! actually trying to consume the requested amount of tokens. The amount of tokens 26 //! replenished is automatically calculated to respect the `complete_refill_time` 27 //! configuration parameter provided by the user. The token buckets will never 28 //! replenish above their respective `size`. 29 //! 30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity 31 //! on top of their `size`. This initial extra credit does not replenish and 32 //! can be used for an initial burst of data. 33 //! 34 //! The granularity for 'wake up' events when the rate limiter is blocked is 35 //! currently hardcoded to `100 milliseconds`. 36 //! 37 //! ## Limitations 38 //! 39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its 40 //! usage is limited to Linux systems. 41 //! 42 //! Another particularity of this implementation is that it is not self-driving. 43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd` 44 //! trait and provides an *event-handler* as part of its API. This *event-handler* 45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD. 46 #[macro_use] 47 extern crate log; 48 49 use std::io; 50 use std::os::unix::io::{AsRawFd, RawFd}; 51 use std::sync::atomic::{AtomicBool, Ordering}; 52 use std::sync::Mutex; 53 use std::time::{Duration, Instant}; 54 use vmm_sys_util::timerfd::TimerFd; 55 56 /// Module for group rate limiting. 57 pub mod group; 58 59 #[derive(Debug)] 60 /// Describes the errors that may occur while handling rate limiter events. 61 pub enum Error { 62 /// The event handler was called spuriously. 63 SpuriousRateLimiterEvent(&'static str), 64 /// The event handler encounters while TimerFd::wait() 65 TimerFdWaitError(std::io::Error), 66 } 67 68 // Interval at which the refill timer will run when limiter is at capacity. 69 const REFILL_TIMER_INTERVAL_MS: u64 = 100; 70 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS); 71 72 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000; 73 74 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor. 75 fn gcd(x: u64, y: u64) -> u64 { 76 let mut x = x; 77 let mut y = y; 78 while y != 0 { 79 let t = y; 80 y = x % y; 81 x = t; 82 } 83 x 84 } 85 86 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`. 87 #[derive(Clone, Debug, PartialEq)] 88 pub enum BucketReduction { 89 /// There are not enough tokens to complete the operation. 90 Failure, 91 /// A part of the available tokens have been consumed. 92 Success, 93 /// A number of tokens `inner` times larger than the bucket size have been consumed. 94 OverConsumption(f64), 95 } 96 97 /// TokenBucket provides a lower level interface to rate limiting with a 98 /// configurable capacity, refill-rate and initial burst. 99 #[derive(Clone, Debug, PartialEq, Eq)] 100 pub struct TokenBucket { 101 // Bucket defining traits. 102 size: u64, 103 // Initial burst size (number of free initial tokens, that can be consumed at no cost) 104 one_time_burst: u64, 105 // Complete refill time in milliseconds. 106 refill_time: u64, 107 108 // Internal state descriptors. 109 budget: u64, 110 last_update: Instant, 111 112 // Fields used for pre-processing optimizations. 113 processed_capacity: u64, 114 processed_refill_time: u64, 115 } 116 117 impl TokenBucket { 118 /// Creates a `TokenBucket` wrapped in an `Option`. 119 /// 120 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms` 121 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial 122 /// extra credit on top of total capacity, that does not replenish and which can be used 123 /// for an initial burst of data. 124 /// 125 /// If the `size` or the `complete refill time` are zero, then `None` is returned. 126 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> { 127 // If either token bucket capacity or refill time is 0, disable limiting. 128 if size == 0 || complete_refill_time_ms == 0 { 129 return None; 130 } 131 // Formula for computing current refill amount: 132 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000) 133 // In order to avoid overflows, simplify the fractions by computing greatest common divisor. 134 135 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC; 136 // Get the greatest common factor between `size` and `complete_refill_time_ns`. 137 let common_factor = gcd(size, complete_refill_time_ns); 138 // The division will be exact since `common_factor` is a factor of `size`. 139 let processed_capacity: u64 = size / common_factor; 140 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`. 141 let processed_refill_time: u64 = complete_refill_time_ns / common_factor; 142 143 Some(TokenBucket { 144 size, 145 one_time_burst, 146 refill_time: complete_refill_time_ms, 147 // Start off full. 148 budget: size, 149 // Last updated is now. 150 last_update: Instant::now(), 151 processed_capacity, 152 processed_refill_time, 153 }) 154 } 155 156 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded. 157 // TODO (Issue #259): handle cases where a single request is larger than the full capacity 158 // for such cases we need to support partial fulfilment of requests 159 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction { 160 // First things first: consume the one-time-burst budget. 161 if self.one_time_burst > 0 { 162 // We still have burst budget for *all* tokens requests. 163 if self.one_time_burst >= tokens { 164 self.one_time_burst -= tokens; 165 self.last_update = Instant::now(); 166 // No need to continue to the refill process, we still have burst budget to consume from. 167 return BucketReduction::Success; 168 } else { 169 // We still have burst budget for *some* of the tokens requests. 170 // The tokens left unfulfilled will be consumed from current `self.budget`. 171 tokens -= self.one_time_burst; 172 self.one_time_burst = 0; 173 } 174 } 175 176 // Compute time passed since last refill/update. 177 let time_delta = self.last_update.elapsed().as_nanos() as u64; 178 self.last_update = Instant::now(); 179 180 // At each 'time_delta' nanoseconds the bucket should refill with: 181 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000) 182 // `processed_capacity` and `processed_refill_time` are the result of simplifying above 183 // fraction formula with their greatest-common-factor. 184 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time; 185 186 if self.budget >= self.size { 187 self.budget = self.size; 188 } 189 190 if tokens > self.budget { 191 // This operation requests a bandwidth higher than the bucket size 192 if tokens > self.size { 193 error!( 194 "Consumed {} tokens from bucket of size {}", 195 tokens, self.size 196 ); 197 // Empty the bucket and report an overconsumption of 198 // (remaining tokens / size) times larger than the bucket size 199 tokens -= self.budget; 200 self.budget = 0; 201 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64); 202 } 203 // If not enough tokens consume() fails, return false. 204 return BucketReduction::Failure; 205 } 206 207 self.budget -= tokens; 208 BucketReduction::Success 209 } 210 211 /// "Manually" adds tokens to bucket. 212 pub fn replenish(&mut self, tokens: u64) { 213 // This means we are still during the burst interval. 214 // Of course there is a very small chance that the last reduce() also used up burst 215 // budget which should now be replenished, but for performance and code-complexity 216 // reasons we're just gonna let that slide since it's practically inconsequential. 217 if self.one_time_burst > 0 { 218 self.one_time_burst += tokens; 219 return; 220 } 221 self.budget = std::cmp::min(self.budget + tokens, self.size); 222 } 223 224 /// Returns the capacity of the token bucket. 225 pub fn capacity(&self) -> u64 { 226 self.size 227 } 228 229 /// Returns the remaining one time burst budget. 230 pub fn one_time_burst(&self) -> u64 { 231 self.one_time_burst 232 } 233 234 /// Returns the time in milliseconds required to to completely fill the bucket. 235 pub fn refill_time_ms(&self) -> u64 { 236 self.refill_time 237 } 238 239 /// Returns the current budget (one time burst allowance notwithstanding). 240 pub fn budget(&self) -> u64 { 241 self.budget 242 } 243 } 244 245 /// Enum that describes the type of token used. 246 pub enum TokenType { 247 /// Token type used for bandwidth limiting. 248 Bytes, 249 /// Token type used for operations/second limiting. 250 Ops, 251 } 252 253 /// Enum that describes the type of token bucket update. 254 pub enum BucketUpdate { 255 /// No Update - same as before. 256 None, 257 /// Rate Limiting is disabled on this bucket. 258 Disabled, 259 /// Rate Limiting enabled with updated bucket. 260 Update(TokenBucket), 261 } 262 263 /// Rate Limiter that works on both bandwidth and ops/s limiting. 264 /// 265 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually. 266 /// 267 /// Implementation uses a single timer through TimerFd to refresh either or 268 /// both token buckets. 269 /// 270 /// Its internal buckets are 'passively' replenished as they're being used (as 271 /// part of `consume()` operations). 272 /// A timer is enabled and used to 'actively' replenish the token buckets when 273 /// limiting is in effect and `consume()` operations are disabled. 274 /// 275 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait 276 /// implementation. These events are meant to be consumed by the user of this struct. 277 /// On each such event, the user must call the `event_handler()` method. 278 pub struct RateLimiter { 279 inner: Mutex<RateLimiterInner>, 280 281 // Internal flag that quickly determines timer state. 282 timer_active: AtomicBool, 283 } 284 285 struct RateLimiterInner { 286 bandwidth: Option<TokenBucket>, 287 ops: Option<TokenBucket>, 288 289 timer_fd: TimerFd, 290 } 291 292 impl RateLimiterInner { 293 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once). 294 fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) { 295 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state()) 296 self.timer_fd 297 .reset(dur, None) 298 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure)."); 299 flag.store(true, Ordering::Relaxed) 300 } 301 } 302 303 impl RateLimiter { 304 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s. 305 /// 306 /// # Arguments 307 /// 308 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket. 309 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`, 310 /// that does not replenish and which can be used for an initial burst of data. 311 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes` 312 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes. 313 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket. 314 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`, 315 /// that does not replenish and which can be used for an initial burst of data. 316 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token 317 /// bucket to go from zero Ops to `ops_total_capacity` Ops. 318 /// 319 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter 320 /// is **disabled** for that respective token type. 321 /// 322 /// # Errors 323 /// 324 /// If the timerfd creation fails, an error is returned. 325 pub fn new( 326 bytes_total_capacity: u64, 327 bytes_one_time_burst: u64, 328 bytes_complete_refill_time_ms: u64, 329 ops_total_capacity: u64, 330 ops_one_time_burst: u64, 331 ops_complete_refill_time_ms: u64, 332 ) -> io::Result<Self> { 333 let bytes_token_bucket = TokenBucket::new( 334 bytes_total_capacity, 335 bytes_one_time_burst, 336 bytes_complete_refill_time_ms, 337 ); 338 339 let ops_token_bucket = TokenBucket::new( 340 ops_total_capacity, 341 ops_one_time_burst, 342 ops_complete_refill_time_ms, 343 ); 344 345 // We'll need a timer_fd, even if our current config effectively disables rate limiting, 346 // because `Self::update_buckets()` might re-enable it later, and we might be 347 // seccomp-blocked from creating the timer_fd at that time. 348 let timer_fd = TimerFd::new()?; 349 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag 350 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`. 351 // SAFETY: FFI calls. 352 let ret = unsafe { 353 let fd = timer_fd.as_raw_fd(); 354 let mut flags = libc::fcntl(fd, libc::F_GETFL); 355 flags |= libc::O_NONBLOCK; 356 libc::fcntl(fd, libc::F_SETFL, flags) 357 }; 358 if ret < 0 { 359 return Err(std::io::Error::last_os_error()); 360 } 361 362 Ok(RateLimiter { 363 inner: Mutex::new(RateLimiterInner { 364 bandwidth: bytes_token_bucket, 365 ops: ops_token_bucket, 366 timer_fd, 367 }), 368 timer_active: AtomicBool::new(false), 369 }) 370 } 371 372 /// Attempts to consume tokens and returns whether that is possible. 373 /// 374 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 375 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 376 // If the timer is active, we can't consume tokens from any bucket and the function fails. 377 if self.is_blocked() { 378 return false; 379 } 380 let mut guard = self.inner.lock().unwrap(); 381 // Identify the required token bucket. 382 let token_bucket = match token_type { 383 TokenType::Bytes => guard.bandwidth.as_mut(), 384 TokenType::Ops => guard.ops.as_mut(), 385 }; 386 // Try to consume from the token bucket. 387 if let Some(bucket) = token_bucket { 388 let refill_time = bucket.refill_time_ms(); 389 match bucket.reduce(tokens) { 390 // When we report budget is over, there will be no further calls here, 391 // register a timer to replenish the bucket and resume processing; 392 // make sure there is only one running timer for this limiter. 393 BucketReduction::Failure => { 394 if !self.is_blocked() { 395 guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active); 396 } 397 false 398 } 399 // The operation succeeded and further calls can be made. 400 BucketReduction::Success => true, 401 // The operation succeeded as the tokens have been consumed 402 // but the timer still needs to be armed. 403 BucketReduction::OverConsumption(ratio) => { 404 // The operation "borrowed" a number of tokens `ratio` times 405 // greater than the size of the bucket, and since it takes 406 // `refill_time` milliseconds to fill an empty bucket, in 407 // order to enforce the bandwidth limit we need to prevent 408 // further calls to the rate limiter for 409 // `ratio * refill_time` milliseconds. 410 guard.activate_timer( 411 Duration::from_millis((ratio * refill_time as f64) as u64), 412 &self.timer_active, 413 ); 414 true 415 } 416 } 417 } else { 418 // If bucket is not present rate limiting is disabled on token type, 419 // consume() will always succeed. 420 true 421 } 422 } 423 424 /// Adds tokens of `token_type` to their respective bucket. 425 /// 426 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 427 /// `consume()` if needed. 428 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 429 let mut guard = self.inner.lock().unwrap(); 430 // Identify the required token bucket. 431 let token_bucket = match token_type { 432 TokenType::Bytes => guard.bandwidth.as_mut(), 433 TokenType::Ops => guard.ops.as_mut(), 434 }; 435 // Add tokens to the token bucket. 436 if let Some(bucket) = token_bucket { 437 bucket.replenish(tokens); 438 } 439 } 440 441 /// Returns whether this rate limiter is blocked. 442 /// 443 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 444 /// budget for it. 445 /// An event will be generated on the exported FD when the limiter 'unblocks'. 446 pub fn is_blocked(&self) -> bool { 447 self.timer_active.load(Ordering::Relaxed) 448 } 449 450 /// This function needs to be called every time there is an event on the 451 /// FD provided by this object's `AsRawFd` trait implementation. 452 /// 453 /// # Errors 454 /// 455 /// If the rate limiter is disabled or is not blocked, an error is returned. 456 pub fn event_handler(&self) -> Result<(), Error> { 457 let mut guard = self.inner.lock().unwrap(); 458 loop { 459 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following 460 // `timer_fd::wait()` won't block (which is different from its default behavior.) 461 match guard.timer_fd.wait() { 462 Err(e) => { 463 let err: std::io::Error = e.into(); 464 match err.kind() { 465 std::io::ErrorKind::Interrupted => (), 466 std::io::ErrorKind::WouldBlock => { 467 return Err(Error::SpuriousRateLimiterEvent( 468 "Rate limiter event handler called without a present timer", 469 )) 470 } 471 _ => return Err(Error::TimerFdWaitError(err)), 472 } 473 } 474 _ => { 475 self.timer_active.store(false, Ordering::Relaxed); 476 return Ok(()); 477 } 478 } 479 } 480 } 481 482 /// Updates the parameters of the token buckets associated with this RateLimiter. 483 // TODO: Please note that, right now, the buckets become full after being updated. 484 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) { 485 let mut guard = self.inner.lock().unwrap(); 486 match bytes { 487 BucketUpdate::Disabled => guard.bandwidth = None, 488 BucketUpdate::Update(tb) => guard.bandwidth = Some(tb), 489 BucketUpdate::None => (), 490 }; 491 match ops { 492 BucketUpdate::Disabled => guard.ops = None, 493 BucketUpdate::Update(tb) => guard.ops = Some(tb), 494 BucketUpdate::None => (), 495 }; 496 } 497 } 498 499 impl AsRawFd for RateLimiter { 500 /// Provides a FD which needs to be monitored for POLLIN events. 501 /// 502 /// This object's `event_handler()` method must be called on such events. 503 /// 504 /// Will return a negative value if rate limiting is disabled on both 505 /// token types. 506 fn as_raw_fd(&self) -> RawFd { 507 let guard = self.inner.lock().unwrap(); 508 guard.timer_fd.as_raw_fd() 509 } 510 } 511 512 impl Default for RateLimiter { 513 /// Default RateLimiter is a no-op limiter with infinite budget. 514 fn default() -> Self { 515 // Safe to unwrap since this will not attempt to create timer_fd. 516 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter") 517 } 518 } 519 520 #[cfg(test)] 521 pub(crate) mod tests { 522 use super::*; 523 use std::fmt; 524 use std::thread; 525 526 impl TokenBucket { 527 // Resets the token bucket: budget set to max capacity and last-updated set to now. 528 fn reset(&mut self) { 529 self.budget = self.size; 530 self.last_update = Instant::now(); 531 } 532 533 fn get_last_update(&self) -> &Instant { 534 &self.last_update 535 } 536 537 fn get_processed_capacity(&self) -> u64 { 538 self.processed_capacity 539 } 540 541 fn get_processed_refill_time(&self) -> u64 { 542 self.processed_refill_time 543 } 544 545 // After a restore, we cannot be certain that the last_update field has the same value. 546 pub fn partial_eq(&self, other: &TokenBucket) -> bool { 547 (other.capacity() == self.capacity()) 548 && (other.one_time_burst() == self.one_time_burst()) 549 && (other.refill_time_ms() == self.refill_time_ms()) 550 && (other.budget() == self.budget()) 551 } 552 } 553 554 impl RateLimiter { 555 pub fn bandwidth(&self) -> Option<TokenBucket> { 556 let guard = self.inner.lock().unwrap(); 557 guard.bandwidth.clone() 558 } 559 560 pub fn ops(&self) -> Option<TokenBucket> { 561 let guard = self.inner.lock().unwrap(); 562 guard.ops.clone() 563 } 564 } 565 566 impl PartialEq for RateLimiter { 567 fn eq(&self, other: &RateLimiter) -> bool { 568 let self_guard = self.inner.lock().unwrap(); 569 let other_guard = other.inner.lock().unwrap(); 570 self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops 571 } 572 } 573 574 impl fmt::Debug for RateLimiter { 575 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 576 let guard = self.inner.lock().unwrap(); 577 write!( 578 f, 579 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 580 guard.bandwidth, guard.ops 581 ) 582 } 583 } 584 585 #[test] 586 fn test_token_bucket_create() { 587 let before = Instant::now(); 588 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 589 assert_eq!(tb.capacity(), 1000); 590 assert_eq!(tb.budget(), 1000); 591 assert!(*tb.get_last_update() >= before); 592 let after = Instant::now(); 593 assert!(*tb.get_last_update() <= after); 594 assert_eq!(tb.get_processed_capacity(), 1); 595 assert_eq!(tb.get_processed_refill_time(), 1_000_000); 596 597 // Verify invalid bucket configurations result in `None`. 598 assert!(TokenBucket::new(0, 1234, 1000).is_none()); 599 assert!(TokenBucket::new(100, 1234, 0).is_none()); 600 assert!(TokenBucket::new(0, 1234, 0).is_none()); 601 } 602 603 #[test] 604 fn test_token_bucket_preprocess() { 605 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 606 assert_eq!(tb.get_processed_capacity(), 1); 607 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC); 608 609 let thousand = 1000; 610 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap(); 611 assert_eq!(tb.get_processed_capacity(), 3 * 19); 612 assert_eq!( 613 tb.get_processed_refill_time(), 614 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand) 615 ); 616 } 617 618 #[test] 619 fn test_token_bucket_reduce() { 620 // token bucket with capacity 1000 and refill time of 1000 milliseconds 621 // allowing rate of 1 token/ms. 622 let capacity = 1000; 623 let refill_ms = 1000; 624 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap(); 625 626 assert_eq!(tb.reduce(123), BucketReduction::Success); 627 assert_eq!(tb.budget(), capacity - 123); 628 629 thread::sleep(Duration::from_millis(123)); 630 assert_eq!(tb.reduce(1), BucketReduction::Success); 631 assert_eq!(tb.budget(), capacity - 1); 632 assert_eq!(tb.reduce(100), BucketReduction::Success); 633 assert_eq!(tb.reduce(capacity), BucketReduction::Failure); 634 635 // token bucket with capacity 1000 and refill time of 1000 milliseconds 636 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap(); 637 // safely assuming the thread can run these 3 commands in less than 500ms 638 assert_eq!(tb.reduce(1000), BucketReduction::Success); 639 assert_eq!(tb.one_time_burst(), 100); 640 assert_eq!(tb.reduce(500), BucketReduction::Success); 641 assert_eq!(tb.one_time_burst(), 0); 642 assert_eq!(tb.reduce(500), BucketReduction::Success); 643 assert_eq!(tb.reduce(500), BucketReduction::Failure); 644 thread::sleep(Duration::from_millis(500)); 645 assert_eq!(tb.reduce(500), BucketReduction::Success); 646 thread::sleep(Duration::from_millis(1000)); 647 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5)); 648 649 let before = Instant::now(); 650 tb.reset(); 651 assert_eq!(tb.capacity(), 1000); 652 assert_eq!(tb.budget(), 1000); 653 assert!(*tb.get_last_update() >= before); 654 let after = Instant::now(); 655 assert!(*tb.get_last_update() <= after); 656 } 657 658 #[test] 659 fn test_rate_limiter_default() { 660 let l = RateLimiter::default(); 661 662 // limiter should not be blocked 663 assert!(!l.is_blocked()); 664 // limiter should be disabled so consume(whatever) should work 665 assert!(l.consume(u64::MAX, TokenType::Ops)); 666 assert!(l.consume(u64::MAX, TokenType::Bytes)); 667 // calling the handler without there having been an event should error 668 assert!(l.event_handler().is_err()); 669 assert_eq!( 670 format!("{:?}", l.event_handler().err().unwrap()), 671 "SpuriousRateLimiterEvent(\ 672 \"Rate limiter event handler called without a present timer\")" 673 ); 674 } 675 676 #[test] 677 fn test_rate_limiter_new() { 678 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 679 let bw = l.bandwidth().unwrap(); 680 assert_eq!(bw.capacity(), 1000); 681 assert_eq!(bw.one_time_burst(), 1001); 682 assert_eq!(bw.refill_time_ms(), 1002); 683 assert_eq!(bw.budget(), 1000); 684 685 let ops = l.ops().unwrap(); 686 assert_eq!(ops.capacity(), 1003); 687 assert_eq!(ops.one_time_burst(), 1004); 688 assert_eq!(ops.refill_time_ms(), 1005); 689 assert_eq!(ops.budget(), 1003); 690 } 691 692 #[test] 693 fn test_rate_limiter_manual_replenish() { 694 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 695 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 696 697 // consume 123 bytes 698 assert!(l.consume(123, TokenType::Bytes)); 699 l.manual_replenish(23, TokenType::Bytes); 700 { 701 let bytes_tb = l.bandwidth().unwrap(); 702 assert_eq!(bytes_tb.budget(), 900); 703 } 704 // consume 123 ops 705 assert!(l.consume(123, TokenType::Ops)); 706 l.manual_replenish(23, TokenType::Ops); 707 { 708 let bytes_tb = l.ops().unwrap(); 709 assert_eq!(bytes_tb.budget(), 900); 710 } 711 } 712 713 #[test] 714 fn test_rate_limiter_bandwidth() { 715 // rate limiter with limit of 1000 bytes/s 716 let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap(); 717 718 // limiter should not be blocked 719 assert!(!l.is_blocked()); 720 // raw FD for this disabled should be valid 721 assert!(l.as_raw_fd() > 0); 722 723 // ops/s limiter should be disabled so consume(whatever) should work 724 assert!(l.consume(u64::MAX, TokenType::Ops)); 725 726 // do full 1000 bytes 727 assert!(l.consume(1000, TokenType::Bytes)); 728 // try and fail on another 100 729 assert!(!l.consume(100, TokenType::Bytes)); 730 // since consume failed, limiter should be blocked now 731 assert!(l.is_blocked()); 732 // wait half the timer period 733 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 734 // limiter should still be blocked 735 assert!(l.is_blocked()); 736 // wait the other half of the timer period 737 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 738 // the timer_fd should have an event on it by now 739 assert!(l.event_handler().is_ok()); 740 // limiter should now be unblocked 741 assert!(!l.is_blocked()); 742 // try and succeed on another 100 bytes this time 743 assert!(l.consume(100, TokenType::Bytes)); 744 } 745 746 #[test] 747 fn test_rate_limiter_ops() { 748 // rate limiter with limit of 1000 ops/s 749 let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap(); 750 751 // limiter should not be blocked 752 assert!(!l.is_blocked()); 753 // raw FD for this disabled should be valid 754 assert!(l.as_raw_fd() > 0); 755 756 // bytes/s limiter should be disabled so consume(whatever) should work 757 assert!(l.consume(u64::MAX, TokenType::Bytes)); 758 759 // do full 1000 ops 760 assert!(l.consume(1000, TokenType::Ops)); 761 // try and fail on another 100 762 assert!(!l.consume(100, TokenType::Ops)); 763 // since consume failed, limiter should be blocked now 764 assert!(l.is_blocked()); 765 // wait half the timer period 766 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 767 // limiter should still be blocked 768 assert!(l.is_blocked()); 769 // wait the other half of the timer period 770 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 771 // the timer_fd should have an event on it by now 772 assert!(l.event_handler().is_ok()); 773 // limiter should now be unblocked 774 assert!(!l.is_blocked()); 775 // try and succeed on another 100 ops this time 776 assert!(l.consume(100, TokenType::Ops)); 777 } 778 779 #[test] 780 fn test_rate_limiter_full() { 781 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 782 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 783 784 // limiter should not be blocked 785 assert!(!l.is_blocked()); 786 // raw FD for this disabled should be valid 787 assert!(l.as_raw_fd() > 0); 788 789 // do full 1000 bytes 790 assert!(l.consume(1000, TokenType::Ops)); 791 // do full 1000 bytes 792 assert!(l.consume(1000, TokenType::Bytes)); 793 // try and fail on another 100 ops 794 assert!(!l.consume(100, TokenType::Ops)); 795 // try and fail on another 100 bytes 796 assert!(!l.consume(100, TokenType::Bytes)); 797 // since consume failed, limiter should be blocked now 798 assert!(l.is_blocked()); 799 // wait half the timer period 800 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 801 // limiter should still be blocked 802 assert!(l.is_blocked()); 803 // wait the other half of the timer period 804 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 805 // the timer_fd should have an event on it by now 806 assert!(l.event_handler().is_ok()); 807 // limiter should now be unblocked 808 assert!(!l.is_blocked()); 809 // try and succeed on another 100 ops this time 810 assert!(l.consume(100, TokenType::Ops)); 811 // try and succeed on another 100 bytes this time 812 assert!(l.consume(100, TokenType::Bytes)); 813 } 814 815 #[test] 816 fn test_rate_limiter_overconsumption() { 817 // initialize the rate limiter 818 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 819 // try to consume 2.5x the bucket size 820 // we are "borrowing" 1.5x the bucket size in tokens since 821 // the bucket is full 822 assert!(l.consume(2500, TokenType::Bytes)); 823 824 // check that even after a whole second passes, the rate limiter 825 // is still blocked 826 thread::sleep(Duration::from_millis(1000)); 827 assert!(l.event_handler().is_err()); 828 assert!(l.is_blocked()); 829 830 // after 1.5x the replenish time has passed, the rate limiter 831 // is available again 832 thread::sleep(Duration::from_millis(500)); 833 assert!(l.event_handler().is_ok()); 834 assert!(!l.is_blocked()); 835 836 // reset the rate limiter 837 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 838 // try to consume 1.5x the bucket size 839 // we are "borrowing" 1.5x the bucket size in tokens since 840 // the bucket is full, should arm the timer to 0.5x replenish 841 // time, which is 500 ms 842 assert!(l.consume(1500, TokenType::Bytes)); 843 844 // check that after more than the minimum refill time, 845 // the rate limiter is still blocked 846 thread::sleep(Duration::from_millis(200)); 847 assert!(l.event_handler().is_err()); 848 assert!(l.is_blocked()); 849 850 // try to consume some tokens, which should fail as the timer 851 // is still active 852 assert!(!l.consume(100, TokenType::Bytes)); 853 assert!(l.event_handler().is_err()); 854 assert!(l.is_blocked()); 855 856 // check that after the minimum refill time, the timer was not 857 // overwritten and the rate limiter is still blocked from the 858 // borrowing we performed earlier 859 thread::sleep(Duration::from_millis(100)); 860 assert!(l.event_handler().is_err()); 861 assert!(l.is_blocked()); 862 assert!(!l.consume(100, TokenType::Bytes)); 863 864 // after waiting out the full duration, rate limiter should be 865 // available again 866 thread::sleep(Duration::from_millis(200)); 867 assert!(l.event_handler().is_ok()); 868 assert!(!l.is_blocked()); 869 assert!(l.consume(100, TokenType::Bytes)); 870 } 871 872 #[test] 873 fn test_update_buckets() { 874 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap(); 875 876 let initial_bw = x.bandwidth(); 877 let initial_ops = x.ops(); 878 879 x.update_buckets(BucketUpdate::None, BucketUpdate::None); 880 assert_eq!(x.bandwidth(), initial_bw); 881 assert_eq!(x.ops(), initial_ops); 882 883 let new_bw = TokenBucket::new(123, 0, 57).unwrap(); 884 let new_ops = TokenBucket::new(321, 12346, 89).unwrap(); 885 x.update_buckets( 886 BucketUpdate::Update(new_bw.clone()), 887 BucketUpdate::Update(new_ops.clone()), 888 ); 889 890 { 891 let mut guard = x.inner.lock().unwrap(); 892 // We have manually adjust the last_update field, because it changes when update_buckets() 893 // constructs new buckets (and thus gets a different value for last_update). We do this so 894 // it makes sense to test the following assertions. 895 guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update; 896 guard.ops.as_mut().unwrap().last_update = new_ops.last_update; 897 } 898 899 assert_eq!(x.bandwidth(), Some(new_bw)); 900 assert_eq!(x.ops(), Some(new_ops)); 901 902 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled); 903 assert_eq!(x.bandwidth(), None); 904 assert_eq!(x.ops(), None); 905 } 906 907 #[test] 908 fn test_rate_limiter_debug() { 909 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap(); 910 assert_eq!( 911 format!("{l:?}"), 912 format!( 913 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 914 l.bandwidth(), 915 l.ops() 916 ), 917 ); 918 } 919 } 920