1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 #![deny(missing_docs)] 5 //! # Rate Limiter 6 //! 7 //! Provides a rate limiter written in Rust useful for IO operations that need to 8 //! be throttled. 9 //! 10 //! ## Behavior 11 //! 12 //! The rate limiter starts off as 'unblocked' with two token buckets configured 13 //! with the values passed in the `RateLimiter::new()` constructor. 14 //! All subsequent accounting is done independently for each token bucket based 15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter 16 //! goes in the 'blocked' state. At this point an internal timer is set up which 17 //! will later 'wake up' the user in order to retry sending data. The 'wake up' 18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD` 19 //! trait implementation. 20 //! 21 //! The contract is that the user shall also call the `event_handler()` method on 22 //! receipt of such an event. 23 //! 24 //! The token buckets are replenished every time a `consume()` is called, before 25 //! actually trying to consume the requested amount of tokens. The amount of tokens 26 //! replenished is automatically calculated to respect the `complete_refill_time` 27 //! configuration parameter provided by the user. The token buckets will never 28 //! replenish above their respective `size`. 29 //! 30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity 31 //! on top of their `size`. This initial extra credit does not replenish and 32 //! can be used for an initial burst of data. 33 //! 34 //! The granularity for 'wake up' events when the rate limiter is blocked is 35 //! currently hardcoded to `100 milliseconds`. 36 //! 37 //! ## Limitations 38 //! 39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its 40 //! usage is limited to Linux systems. 41 //! 42 //! Another particularity of this implementation is that it is not self-driving. 43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd` 44 //! trait and provides an *event-handler* as part of its API. This *event-handler* 45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD. 46 #[macro_use] 47 extern crate log; 48 49 use std::io; 50 use std::os::unix::io::{AsRawFd, RawFd}; 51 use std::sync::atomic::{AtomicBool, Ordering}; 52 use std::sync::Mutex; 53 use std::time::{Duration, Instant}; 54 55 use vmm_sys_util::timerfd::TimerFd; 56 57 /// Module for group rate limiting. 58 pub mod group; 59 60 #[derive(Debug)] 61 /// Describes the errors that may occur while handling rate limiter events. 62 pub enum Error { 63 /// The event handler was called spuriously. 64 SpuriousRateLimiterEvent(&'static str), 65 /// The event handler encounters while TimerFd::wait() 66 TimerFdWaitError(std::io::Error), 67 } 68 69 // Interval at which the refill timer will run when limiter is at capacity. 70 const REFILL_TIMER_INTERVAL_MS: u64 = 100; 71 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS); 72 73 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000; 74 75 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor. 76 fn gcd(x: u64, y: u64) -> u64 { 77 let mut x = x; 78 let mut y = y; 79 while y != 0 { 80 let t = y; 81 y = x % y; 82 x = t; 83 } 84 x 85 } 86 87 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`. 88 #[derive(Clone, Debug, PartialEq)] 89 pub enum BucketReduction { 90 /// There are not enough tokens to complete the operation. 91 Failure, 92 /// A part of the available tokens have been consumed. 93 Success, 94 /// A number of tokens `inner` times larger than the bucket size have been consumed. 95 OverConsumption(f64), 96 } 97 98 /// TokenBucket provides a lower level interface to rate limiting with a 99 /// configurable capacity, refill-rate and initial burst. 100 #[derive(Clone, Debug, PartialEq, Eq)] 101 pub struct TokenBucket { 102 // Bucket defining traits. 103 size: u64, 104 // Initial burst size (number of free initial tokens, that can be consumed at no cost) 105 one_time_burst: u64, 106 // Complete refill time in milliseconds. 107 refill_time: u64, 108 109 // Internal state descriptors. 110 budget: u64, 111 last_update: Instant, 112 113 // Fields used for pre-processing optimizations. 114 processed_capacity: u64, 115 processed_refill_time: u64, 116 } 117 118 impl TokenBucket { 119 /// Creates a `TokenBucket` wrapped in an `Option`. 120 /// 121 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms` 122 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial 123 /// extra credit on top of total capacity, that does not replenish and which can be used 124 /// for an initial burst of data. 125 /// 126 /// If the `size` or the `complete refill time` are zero, then `None` is returned. 127 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> { 128 // If either token bucket capacity or refill time is 0, disable limiting. 129 if size == 0 || complete_refill_time_ms == 0 { 130 return None; 131 } 132 // Formula for computing current refill amount: 133 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000) 134 // In order to avoid overflows, simplify the fractions by computing greatest common divisor. 135 136 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC; 137 // Get the greatest common factor between `size` and `complete_refill_time_ns`. 138 let common_factor = gcd(size, complete_refill_time_ns); 139 // The division will be exact since `common_factor` is a factor of `size`. 140 let processed_capacity: u64 = size / common_factor; 141 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`. 142 let processed_refill_time: u64 = complete_refill_time_ns / common_factor; 143 144 Some(TokenBucket { 145 size, 146 one_time_burst, 147 refill_time: complete_refill_time_ms, 148 // Start off full. 149 budget: size, 150 // Last updated is now. 151 last_update: Instant::now(), 152 processed_capacity, 153 processed_refill_time, 154 }) 155 } 156 157 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded. 158 // TODO (Issue #259): handle cases where a single request is larger than the full capacity 159 // for such cases we need to support partial fulfilment of requests 160 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction { 161 // First things first: consume the one-time-burst budget. 162 if self.one_time_burst > 0 { 163 // We still have burst budget for *all* tokens requests. 164 if self.one_time_burst >= tokens { 165 self.one_time_burst -= tokens; 166 self.last_update = Instant::now(); 167 // No need to continue to the refill process, we still have burst budget to consume from. 168 return BucketReduction::Success; 169 } else { 170 // We still have burst budget for *some* of the tokens requests. 171 // The tokens left unfulfilled will be consumed from current `self.budget`. 172 tokens -= self.one_time_burst; 173 self.one_time_burst = 0; 174 } 175 } 176 177 // Compute time passed since last refill/update. 178 let time_delta = self.last_update.elapsed().as_nanos() as u64; 179 self.last_update = Instant::now(); 180 181 // At each 'time_delta' nanoseconds the bucket should refill with: 182 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000) 183 // `processed_capacity` and `processed_refill_time` are the result of simplifying above 184 // fraction formula with their greatest-common-factor. 185 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time; 186 187 if self.budget >= self.size { 188 self.budget = self.size; 189 } 190 191 if tokens > self.budget { 192 // This operation requests a bandwidth higher than the bucket size 193 if tokens > self.size { 194 error!( 195 "Consumed {} tokens from bucket of size {}", 196 tokens, self.size 197 ); 198 // Empty the bucket and report an overconsumption of 199 // (remaining tokens / size) times larger than the bucket size 200 tokens -= self.budget; 201 self.budget = 0; 202 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64); 203 } 204 // If not enough tokens consume() fails, return false. 205 return BucketReduction::Failure; 206 } 207 208 self.budget -= tokens; 209 BucketReduction::Success 210 } 211 212 /// "Manually" adds tokens to bucket. 213 pub fn replenish(&mut self, tokens: u64) { 214 // This means we are still during the burst interval. 215 // Of course there is a very small chance that the last reduce() also used up burst 216 // budget which should now be replenished, but for performance and code-complexity 217 // reasons we're just gonna let that slide since it's practically inconsequential. 218 if self.one_time_burst > 0 { 219 self.one_time_burst += tokens; 220 return; 221 } 222 self.budget = std::cmp::min(self.budget + tokens, self.size); 223 } 224 225 /// Returns the capacity of the token bucket. 226 pub fn capacity(&self) -> u64 { 227 self.size 228 } 229 230 /// Returns the remaining one time burst budget. 231 pub fn one_time_burst(&self) -> u64 { 232 self.one_time_burst 233 } 234 235 /// Returns the time in milliseconds required to to completely fill the bucket. 236 pub fn refill_time_ms(&self) -> u64 { 237 self.refill_time 238 } 239 240 /// Returns the current budget (one time burst allowance notwithstanding). 241 pub fn budget(&self) -> u64 { 242 self.budget 243 } 244 } 245 246 /// Enum that describes the type of token used. 247 pub enum TokenType { 248 /// Token type used for bandwidth limiting. 249 Bytes, 250 /// Token type used for operations/second limiting. 251 Ops, 252 } 253 254 /// Enum that describes the type of token bucket update. 255 pub enum BucketUpdate { 256 /// No Update - same as before. 257 None, 258 /// Rate Limiting is disabled on this bucket. 259 Disabled, 260 /// Rate Limiting enabled with updated bucket. 261 Update(TokenBucket), 262 } 263 264 /// Rate Limiter that works on both bandwidth and ops/s limiting. 265 /// 266 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually. 267 /// 268 /// Implementation uses a single timer through TimerFd to refresh either or 269 /// both token buckets. 270 /// 271 /// Its internal buckets are 'passively' replenished as they're being used (as 272 /// part of `consume()` operations). 273 /// A timer is enabled and used to 'actively' replenish the token buckets when 274 /// limiting is in effect and `consume()` operations are disabled. 275 /// 276 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait 277 /// implementation. These events are meant to be consumed by the user of this struct. 278 /// On each such event, the user must call the `event_handler()` method. 279 pub struct RateLimiter { 280 inner: Mutex<RateLimiterInner>, 281 282 // Internal flag that quickly determines timer state. 283 timer_active: AtomicBool, 284 } 285 286 struct RateLimiterInner { 287 bandwidth: Option<TokenBucket>, 288 ops: Option<TokenBucket>, 289 290 timer_fd: TimerFd, 291 } 292 293 impl RateLimiterInner { 294 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once). 295 fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) { 296 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state()) 297 self.timer_fd 298 .reset(dur, None) 299 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure)."); 300 flag.store(true, Ordering::Relaxed) 301 } 302 } 303 304 impl RateLimiter { 305 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s. 306 /// 307 /// # Arguments 308 /// 309 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket. 310 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`, 311 /// that does not replenish and which can be used for an initial burst of data. 312 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes` 313 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes. 314 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket. 315 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`, 316 /// that does not replenish and which can be used for an initial burst of data. 317 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token 318 /// bucket to go from zero Ops to `ops_total_capacity` Ops. 319 /// 320 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter 321 /// is **disabled** for that respective token type. 322 /// 323 /// # Errors 324 /// 325 /// If the timerfd creation fails, an error is returned. 326 pub fn new( 327 bytes_total_capacity: u64, 328 bytes_one_time_burst: u64, 329 bytes_complete_refill_time_ms: u64, 330 ops_total_capacity: u64, 331 ops_one_time_burst: u64, 332 ops_complete_refill_time_ms: u64, 333 ) -> io::Result<Self> { 334 let bytes_token_bucket = TokenBucket::new( 335 bytes_total_capacity, 336 bytes_one_time_burst, 337 bytes_complete_refill_time_ms, 338 ); 339 340 let ops_token_bucket = TokenBucket::new( 341 ops_total_capacity, 342 ops_one_time_burst, 343 ops_complete_refill_time_ms, 344 ); 345 346 // We'll need a timer_fd, even if our current config effectively disables rate limiting, 347 // because `Self::update_buckets()` might re-enable it later, and we might be 348 // seccomp-blocked from creating the timer_fd at that time. 349 let timer_fd = TimerFd::new()?; 350 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag 351 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`. 352 // SAFETY: FFI calls. 353 let ret = unsafe { 354 let fd = timer_fd.as_raw_fd(); 355 let mut flags = libc::fcntl(fd, libc::F_GETFL); 356 flags |= libc::O_NONBLOCK; 357 libc::fcntl(fd, libc::F_SETFL, flags) 358 }; 359 if ret < 0 { 360 return Err(std::io::Error::last_os_error()); 361 } 362 363 Ok(RateLimiter { 364 inner: Mutex::new(RateLimiterInner { 365 bandwidth: bytes_token_bucket, 366 ops: ops_token_bucket, 367 timer_fd, 368 }), 369 timer_active: AtomicBool::new(false), 370 }) 371 } 372 373 /// Attempts to consume tokens and returns whether that is possible. 374 /// 375 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 376 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 377 // If the timer is active, we can't consume tokens from any bucket and the function fails. 378 if self.is_blocked() { 379 return false; 380 } 381 let mut guard = self.inner.lock().unwrap(); 382 // Identify the required token bucket. 383 let token_bucket = match token_type { 384 TokenType::Bytes => guard.bandwidth.as_mut(), 385 TokenType::Ops => guard.ops.as_mut(), 386 }; 387 // Try to consume from the token bucket. 388 if let Some(bucket) = token_bucket { 389 let refill_time = bucket.refill_time_ms(); 390 match bucket.reduce(tokens) { 391 // When we report budget is over, there will be no further calls here, 392 // register a timer to replenish the bucket and resume processing; 393 // make sure there is only one running timer for this limiter. 394 BucketReduction::Failure => { 395 if !self.is_blocked() { 396 guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active); 397 } 398 false 399 } 400 // The operation succeeded and further calls can be made. 401 BucketReduction::Success => true, 402 // The operation succeeded as the tokens have been consumed 403 // but the timer still needs to be armed. 404 BucketReduction::OverConsumption(ratio) => { 405 // The operation "borrowed" a number of tokens `ratio` times 406 // greater than the size of the bucket, and since it takes 407 // `refill_time` milliseconds to fill an empty bucket, in 408 // order to enforce the bandwidth limit we need to prevent 409 // further calls to the rate limiter for 410 // `ratio * refill_time` milliseconds. 411 guard.activate_timer( 412 Duration::from_millis((ratio * refill_time as f64) as u64), 413 &self.timer_active, 414 ); 415 true 416 } 417 } 418 } else { 419 // If bucket is not present rate limiting is disabled on token type, 420 // consume() will always succeed. 421 true 422 } 423 } 424 425 /// Adds tokens of `token_type` to their respective bucket. 426 /// 427 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 428 /// `consume()` if needed. 429 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 430 let mut guard = self.inner.lock().unwrap(); 431 // Identify the required token bucket. 432 let token_bucket = match token_type { 433 TokenType::Bytes => guard.bandwidth.as_mut(), 434 TokenType::Ops => guard.ops.as_mut(), 435 }; 436 // Add tokens to the token bucket. 437 if let Some(bucket) = token_bucket { 438 bucket.replenish(tokens); 439 } 440 } 441 442 /// Returns whether this rate limiter is blocked. 443 /// 444 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 445 /// budget for it. 446 /// An event will be generated on the exported FD when the limiter 'unblocks'. 447 pub fn is_blocked(&self) -> bool { 448 self.timer_active.load(Ordering::Relaxed) 449 } 450 451 /// This function needs to be called every time there is an event on the 452 /// FD provided by this object's `AsRawFd` trait implementation. 453 /// 454 /// # Errors 455 /// 456 /// If the rate limiter is disabled or is not blocked, an error is returned. 457 pub fn event_handler(&self) -> Result<(), Error> { 458 let mut guard = self.inner.lock().unwrap(); 459 loop { 460 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following 461 // `timer_fd::wait()` won't block (which is different from its default behavior.) 462 match guard.timer_fd.wait() { 463 Err(e) => { 464 let err: std::io::Error = e.into(); 465 match err.kind() { 466 std::io::ErrorKind::Interrupted => (), 467 std::io::ErrorKind::WouldBlock => { 468 return Err(Error::SpuriousRateLimiterEvent( 469 "Rate limiter event handler called without a present timer", 470 )) 471 } 472 _ => return Err(Error::TimerFdWaitError(err)), 473 } 474 } 475 _ => { 476 self.timer_active.store(false, Ordering::Relaxed); 477 return Ok(()); 478 } 479 } 480 } 481 } 482 483 /// Updates the parameters of the token buckets associated with this RateLimiter. 484 // TODO: Please note that, right now, the buckets become full after being updated. 485 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) { 486 let mut guard = self.inner.lock().unwrap(); 487 match bytes { 488 BucketUpdate::Disabled => guard.bandwidth = None, 489 BucketUpdate::Update(tb) => guard.bandwidth = Some(tb), 490 BucketUpdate::None => (), 491 }; 492 match ops { 493 BucketUpdate::Disabled => guard.ops = None, 494 BucketUpdate::Update(tb) => guard.ops = Some(tb), 495 BucketUpdate::None => (), 496 }; 497 } 498 } 499 500 impl AsRawFd for RateLimiter { 501 /// Provides a FD which needs to be monitored for POLLIN events. 502 /// 503 /// This object's `event_handler()` method must be called on such events. 504 /// 505 /// Will return a negative value if rate limiting is disabled on both 506 /// token types. 507 fn as_raw_fd(&self) -> RawFd { 508 let guard = self.inner.lock().unwrap(); 509 guard.timer_fd.as_raw_fd() 510 } 511 } 512 513 impl Default for RateLimiter { 514 /// Default RateLimiter is a no-op limiter with infinite budget. 515 fn default() -> Self { 516 // Safe to unwrap since this will not attempt to create timer_fd. 517 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter") 518 } 519 } 520 521 #[cfg(test)] 522 pub(crate) mod tests { 523 use std::{fmt, thread}; 524 525 use super::*; 526 527 impl TokenBucket { 528 // Resets the token bucket: budget set to max capacity and last-updated set to now. 529 fn reset(&mut self) { 530 self.budget = self.size; 531 self.last_update = Instant::now(); 532 } 533 534 fn get_last_update(&self) -> &Instant { 535 &self.last_update 536 } 537 538 fn get_processed_capacity(&self) -> u64 { 539 self.processed_capacity 540 } 541 542 fn get_processed_refill_time(&self) -> u64 { 543 self.processed_refill_time 544 } 545 546 // After a restore, we cannot be certain that the last_update field has the same value. 547 pub fn partial_eq(&self, other: &TokenBucket) -> bool { 548 (other.capacity() == self.capacity()) 549 && (other.one_time_burst() == self.one_time_burst()) 550 && (other.refill_time_ms() == self.refill_time_ms()) 551 && (other.budget() == self.budget()) 552 } 553 } 554 555 impl RateLimiter { 556 pub fn bandwidth(&self) -> Option<TokenBucket> { 557 let guard = self.inner.lock().unwrap(); 558 guard.bandwidth.clone() 559 } 560 561 pub fn ops(&self) -> Option<TokenBucket> { 562 let guard = self.inner.lock().unwrap(); 563 guard.ops.clone() 564 } 565 } 566 567 impl PartialEq for RateLimiter { 568 fn eq(&self, other: &RateLimiter) -> bool { 569 let self_guard = self.inner.lock().unwrap(); 570 let other_guard = other.inner.lock().unwrap(); 571 self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops 572 } 573 } 574 575 impl fmt::Debug for RateLimiter { 576 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 577 let guard = self.inner.lock().unwrap(); 578 write!( 579 f, 580 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 581 guard.bandwidth, guard.ops 582 ) 583 } 584 } 585 586 #[test] 587 fn test_token_bucket_create() { 588 let before = Instant::now(); 589 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 590 assert_eq!(tb.capacity(), 1000); 591 assert_eq!(tb.budget(), 1000); 592 assert!(*tb.get_last_update() >= before); 593 let after = Instant::now(); 594 assert!(*tb.get_last_update() <= after); 595 assert_eq!(tb.get_processed_capacity(), 1); 596 assert_eq!(tb.get_processed_refill_time(), 1_000_000); 597 598 // Verify invalid bucket configurations result in `None`. 599 assert!(TokenBucket::new(0, 1234, 1000).is_none()); 600 assert!(TokenBucket::new(100, 1234, 0).is_none()); 601 assert!(TokenBucket::new(0, 1234, 0).is_none()); 602 } 603 604 #[test] 605 fn test_token_bucket_preprocess() { 606 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 607 assert_eq!(tb.get_processed_capacity(), 1); 608 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC); 609 610 let thousand = 1000; 611 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap(); 612 assert_eq!(tb.get_processed_capacity(), 3 * 19); 613 assert_eq!( 614 tb.get_processed_refill_time(), 615 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand) 616 ); 617 } 618 619 #[test] 620 fn test_token_bucket_reduce() { 621 // token bucket with capacity 1000 and refill time of 1000 milliseconds 622 // allowing rate of 1 token/ms. 623 let capacity = 1000; 624 let refill_ms = 1000; 625 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap(); 626 627 assert_eq!(tb.reduce(123), BucketReduction::Success); 628 assert_eq!(tb.budget(), capacity - 123); 629 630 thread::sleep(Duration::from_millis(123)); 631 assert_eq!(tb.reduce(1), BucketReduction::Success); 632 assert_eq!(tb.budget(), capacity - 1); 633 assert_eq!(tb.reduce(100), BucketReduction::Success); 634 assert_eq!(tb.reduce(capacity), BucketReduction::Failure); 635 636 // token bucket with capacity 1000 and refill time of 1000 milliseconds 637 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap(); 638 // safely assuming the thread can run these 3 commands in less than 500ms 639 assert_eq!(tb.reduce(1000), BucketReduction::Success); 640 assert_eq!(tb.one_time_burst(), 100); 641 assert_eq!(tb.reduce(500), BucketReduction::Success); 642 assert_eq!(tb.one_time_burst(), 0); 643 assert_eq!(tb.reduce(500), BucketReduction::Success); 644 assert_eq!(tb.reduce(500), BucketReduction::Failure); 645 thread::sleep(Duration::from_millis(500)); 646 assert_eq!(tb.reduce(500), BucketReduction::Success); 647 thread::sleep(Duration::from_millis(1000)); 648 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5)); 649 650 let before = Instant::now(); 651 tb.reset(); 652 assert_eq!(tb.capacity(), 1000); 653 assert_eq!(tb.budget(), 1000); 654 assert!(*tb.get_last_update() >= before); 655 let after = Instant::now(); 656 assert!(*tb.get_last_update() <= after); 657 } 658 659 #[test] 660 fn test_rate_limiter_default() { 661 let l = RateLimiter::default(); 662 663 // limiter should not be blocked 664 assert!(!l.is_blocked()); 665 // limiter should be disabled so consume(whatever) should work 666 assert!(l.consume(u64::MAX, TokenType::Ops)); 667 assert!(l.consume(u64::MAX, TokenType::Bytes)); 668 // calling the handler without there having been an event should error 669 assert!(l.event_handler().is_err()); 670 assert_eq!( 671 format!("{:?}", l.event_handler().err().unwrap()), 672 "SpuriousRateLimiterEvent(\ 673 \"Rate limiter event handler called without a present timer\")" 674 ); 675 } 676 677 #[test] 678 fn test_rate_limiter_new() { 679 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 680 let bw = l.bandwidth().unwrap(); 681 assert_eq!(bw.capacity(), 1000); 682 assert_eq!(bw.one_time_burst(), 1001); 683 assert_eq!(bw.refill_time_ms(), 1002); 684 assert_eq!(bw.budget(), 1000); 685 686 let ops = l.ops().unwrap(); 687 assert_eq!(ops.capacity(), 1003); 688 assert_eq!(ops.one_time_burst(), 1004); 689 assert_eq!(ops.refill_time_ms(), 1005); 690 assert_eq!(ops.budget(), 1003); 691 } 692 693 #[test] 694 fn test_rate_limiter_manual_replenish() { 695 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 696 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 697 698 // consume 123 bytes 699 assert!(l.consume(123, TokenType::Bytes)); 700 l.manual_replenish(23, TokenType::Bytes); 701 { 702 let bytes_tb = l.bandwidth().unwrap(); 703 assert_eq!(bytes_tb.budget(), 900); 704 } 705 // consume 123 ops 706 assert!(l.consume(123, TokenType::Ops)); 707 l.manual_replenish(23, TokenType::Ops); 708 { 709 let bytes_tb = l.ops().unwrap(); 710 assert_eq!(bytes_tb.budget(), 900); 711 } 712 } 713 714 #[test] 715 fn test_rate_limiter_bandwidth() { 716 // rate limiter with limit of 1000 bytes/s 717 let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap(); 718 719 // limiter should not be blocked 720 assert!(!l.is_blocked()); 721 // raw FD for this disabled should be valid 722 assert!(l.as_raw_fd() > 0); 723 724 // ops/s limiter should be disabled so consume(whatever) should work 725 assert!(l.consume(u64::MAX, TokenType::Ops)); 726 727 // do full 1000 bytes 728 assert!(l.consume(1000, TokenType::Bytes)); 729 // try and fail on another 100 730 assert!(!l.consume(100, TokenType::Bytes)); 731 // since consume failed, limiter should be blocked now 732 assert!(l.is_blocked()); 733 // wait half the timer period 734 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 735 // limiter should still be blocked 736 assert!(l.is_blocked()); 737 // wait the other half of the timer period 738 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 739 // the timer_fd should have an event on it by now 740 assert!(l.event_handler().is_ok()); 741 // limiter should now be unblocked 742 assert!(!l.is_blocked()); 743 // try and succeed on another 100 bytes this time 744 assert!(l.consume(100, TokenType::Bytes)); 745 } 746 747 #[test] 748 fn test_rate_limiter_ops() { 749 // rate limiter with limit of 1000 ops/s 750 let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap(); 751 752 // limiter should not be blocked 753 assert!(!l.is_blocked()); 754 // raw FD for this disabled should be valid 755 assert!(l.as_raw_fd() > 0); 756 757 // bytes/s limiter should be disabled so consume(whatever) should work 758 assert!(l.consume(u64::MAX, TokenType::Bytes)); 759 760 // do full 1000 ops 761 assert!(l.consume(1000, TokenType::Ops)); 762 // try and fail on another 100 763 assert!(!l.consume(100, TokenType::Ops)); 764 // since consume failed, limiter should be blocked now 765 assert!(l.is_blocked()); 766 // wait half the timer period 767 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 768 // limiter should still be blocked 769 assert!(l.is_blocked()); 770 // wait the other half of the timer period 771 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 772 // the timer_fd should have an event on it by now 773 assert!(l.event_handler().is_ok()); 774 // limiter should now be unblocked 775 assert!(!l.is_blocked()); 776 // try and succeed on another 100 ops this time 777 assert!(l.consume(100, TokenType::Ops)); 778 } 779 780 #[test] 781 fn test_rate_limiter_full() { 782 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 783 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 784 785 // limiter should not be blocked 786 assert!(!l.is_blocked()); 787 // raw FD for this disabled should be valid 788 assert!(l.as_raw_fd() > 0); 789 790 // do full 1000 bytes 791 assert!(l.consume(1000, TokenType::Ops)); 792 // do full 1000 bytes 793 assert!(l.consume(1000, TokenType::Bytes)); 794 // try and fail on another 100 ops 795 assert!(!l.consume(100, TokenType::Ops)); 796 // try and fail on another 100 bytes 797 assert!(!l.consume(100, TokenType::Bytes)); 798 // since consume failed, limiter should be blocked now 799 assert!(l.is_blocked()); 800 // wait half the timer period 801 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 802 // limiter should still be blocked 803 assert!(l.is_blocked()); 804 // wait the other half of the timer period 805 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 806 // the timer_fd should have an event on it by now 807 assert!(l.event_handler().is_ok()); 808 // limiter should now be unblocked 809 assert!(!l.is_blocked()); 810 // try and succeed on another 100 ops this time 811 assert!(l.consume(100, TokenType::Ops)); 812 // try and succeed on another 100 bytes this time 813 assert!(l.consume(100, TokenType::Bytes)); 814 } 815 816 #[test] 817 fn test_rate_limiter_overconsumption() { 818 // initialize the rate limiter 819 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 820 // try to consume 2.5x the bucket size 821 // we are "borrowing" 1.5x the bucket size in tokens since 822 // the bucket is full 823 assert!(l.consume(2500, TokenType::Bytes)); 824 825 // check that even after a whole second passes, the rate limiter 826 // is still blocked 827 thread::sleep(Duration::from_millis(1000)); 828 assert!(l.event_handler().is_err()); 829 assert!(l.is_blocked()); 830 831 // after 1.5x the replenish time has passed, the rate limiter 832 // is available again 833 thread::sleep(Duration::from_millis(500)); 834 assert!(l.event_handler().is_ok()); 835 assert!(!l.is_blocked()); 836 837 // reset the rate limiter 838 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 839 // try to consume 1.5x the bucket size 840 // we are "borrowing" 1.5x the bucket size in tokens since 841 // the bucket is full, should arm the timer to 0.5x replenish 842 // time, which is 500 ms 843 assert!(l.consume(1500, TokenType::Bytes)); 844 845 // check that after more than the minimum refill time, 846 // the rate limiter is still blocked 847 thread::sleep(Duration::from_millis(200)); 848 assert!(l.event_handler().is_err()); 849 assert!(l.is_blocked()); 850 851 // try to consume some tokens, which should fail as the timer 852 // is still active 853 assert!(!l.consume(100, TokenType::Bytes)); 854 assert!(l.event_handler().is_err()); 855 assert!(l.is_blocked()); 856 857 // check that after the minimum refill time, the timer was not 858 // overwritten and the rate limiter is still blocked from the 859 // borrowing we performed earlier 860 thread::sleep(Duration::from_millis(100)); 861 assert!(l.event_handler().is_err()); 862 assert!(l.is_blocked()); 863 assert!(!l.consume(100, TokenType::Bytes)); 864 865 // after waiting out the full duration, rate limiter should be 866 // available again 867 thread::sleep(Duration::from_millis(200)); 868 assert!(l.event_handler().is_ok()); 869 assert!(!l.is_blocked()); 870 assert!(l.consume(100, TokenType::Bytes)); 871 } 872 873 #[test] 874 fn test_update_buckets() { 875 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap(); 876 877 let initial_bw = x.bandwidth(); 878 let initial_ops = x.ops(); 879 880 x.update_buckets(BucketUpdate::None, BucketUpdate::None); 881 assert_eq!(x.bandwidth(), initial_bw); 882 assert_eq!(x.ops(), initial_ops); 883 884 let new_bw = TokenBucket::new(123, 0, 57).unwrap(); 885 let new_ops = TokenBucket::new(321, 12346, 89).unwrap(); 886 x.update_buckets( 887 BucketUpdate::Update(new_bw.clone()), 888 BucketUpdate::Update(new_ops.clone()), 889 ); 890 891 { 892 let mut guard = x.inner.lock().unwrap(); 893 // We have manually adjust the last_update field, because it changes when update_buckets() 894 // constructs new buckets (and thus gets a different value for last_update). We do this so 895 // it makes sense to test the following assertions. 896 guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update; 897 guard.ops.as_mut().unwrap().last_update = new_ops.last_update; 898 } 899 900 assert_eq!(x.bandwidth(), Some(new_bw)); 901 assert_eq!(x.ops(), Some(new_ops)); 902 903 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled); 904 assert_eq!(x.bandwidth(), None); 905 assert_eq!(x.ops(), None); 906 } 907 908 #[test] 909 fn test_rate_limiter_debug() { 910 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap(); 911 assert_eq!( 912 format!("{l:?}"), 913 format!( 914 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 915 l.bandwidth(), 916 l.ops() 917 ), 918 ); 919 } 920 } 921