1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. 2 // SPDX-License-Identifier: Apache-2.0 3 4 #![deny(missing_docs)] 5 //! # Rate Limiter 6 //! 7 //! Provides a rate limiter written in Rust useful for IO operations that need to 8 //! be throttled. 9 //! 10 //! ## Behavior 11 //! 12 //! The rate limiter starts off as 'unblocked' with two token buckets configured 13 //! with the values passed in the `RateLimiter::new()` constructor. 14 //! All subsequent accounting is done independently for each token bucket based 15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter 16 //! goes in the 'blocked' state. At this point an internal timer is set up which 17 //! will later 'wake up' the user in order to retry sending data. The 'wake up' 18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD` 19 //! trait implementation. 20 //! 21 //! The contract is that the user shall also call the `event_handler()` method on 22 //! receipt of such an event. 23 //! 24 //! The token buckets are replenished every time a `consume()` is called, before 25 //! actually trying to consume the requested amount of tokens. The amount of tokens 26 //! replenished is automatically calculated to respect the `complete_refill_time` 27 //! configuration parameter provided by the user. The token buckets will never 28 //! replenish above their respective `size`. 29 //! 30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity 31 //! on top of their `size`. This initial extra credit does not replenish and 32 //! can be used for an initial burst of data. 33 //! 34 //! The granularity for 'wake up' events when the rate limiter is blocked is 35 //! currently hardcoded to `100 milliseconds`. 36 //! 37 //! ## Limitations 38 //! 39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its 40 //! usage is limited to Linux systems. 41 //! 42 //! Another particularity of this implementation is that it is not self-driving. 43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd` 44 //! trait and provides an *event-handler* as part of its API. This *event-handler* 45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD. 46 #[macro_use] 47 extern crate log; 48 49 use std::io; 50 use std::os::unix::io::{AsRawFd, RawFd}; 51 use std::sync::atomic::{AtomicBool, Ordering}; 52 use std::sync::Mutex; 53 use std::time::{Duration, Instant}; 54 55 use thiserror::Error; 56 use vmm_sys_util::timerfd::TimerFd; 57 58 /// Module for group rate limiting. 59 pub mod group; 60 61 #[derive(Error, Debug)] 62 /// Describes the errors that may occur while handling rate limiter events. 63 pub enum Error { 64 /// The event handler was called spuriously. 65 #[error("Event handler was called spuriously: {0}")] 66 SpuriousRateLimiterEvent(&'static str), 67 /// The event handler encounters while TimerFd::wait() 68 #[error("Failed to wait for the timer")] 69 TimerFdWaitError(#[source] std::io::Error), 70 } 71 72 // Interval at which the refill timer will run when limiter is at capacity. 73 const REFILL_TIMER_INTERVAL_MS: u64 = 100; 74 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS); 75 76 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000; 77 78 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor. 79 fn gcd(x: u64, y: u64) -> u64 { 80 let mut x = x; 81 let mut y = y; 82 while y != 0 { 83 let t = y; 84 y = x % y; 85 x = t; 86 } 87 x 88 } 89 90 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`. 91 #[derive(Clone, Debug, PartialEq)] 92 pub enum BucketReduction { 93 /// There are not enough tokens to complete the operation. 94 Failure, 95 /// A part of the available tokens have been consumed. 96 Success, 97 /// A number of tokens `inner` times larger than the bucket size have been consumed. 98 OverConsumption(f64), 99 } 100 101 /// TokenBucket provides a lower level interface to rate limiting with a 102 /// configurable capacity, refill-rate and initial burst. 103 #[derive(Clone, Debug, PartialEq, Eq)] 104 pub struct TokenBucket { 105 // Bucket defining traits. 106 size: u64, 107 // Initial burst size (number of free initial tokens, that can be consumed at no cost) 108 one_time_burst: u64, 109 // Complete refill time in milliseconds. 110 refill_time: u64, 111 112 // Internal state descriptors. 113 budget: u64, 114 last_update: Instant, 115 116 // Fields used for pre-processing optimizations. 117 processed_capacity: u64, 118 processed_refill_time: u64, 119 } 120 121 impl TokenBucket { 122 /// Creates a `TokenBucket` wrapped in an `Option`. 123 /// 124 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms` 125 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial 126 /// extra credit on top of total capacity, that does not replenish and which can be used 127 /// for an initial burst of data. 128 /// 129 /// If the `size` or the `complete refill time` are zero, then `None` is returned. 130 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> { 131 // If either token bucket capacity or refill time is 0, disable limiting. 132 if size == 0 || complete_refill_time_ms == 0 { 133 return None; 134 } 135 // Formula for computing current refill amount: 136 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000) 137 // In order to avoid overflows, simplify the fractions by computing greatest common divisor. 138 139 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC; 140 // Get the greatest common factor between `size` and `complete_refill_time_ns`. 141 let common_factor = gcd(size, complete_refill_time_ns); 142 // The division will be exact since `common_factor` is a factor of `size`. 143 let processed_capacity: u64 = size / common_factor; 144 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`. 145 let processed_refill_time: u64 = complete_refill_time_ns / common_factor; 146 147 Some(TokenBucket { 148 size, 149 one_time_burst, 150 refill_time: complete_refill_time_ms, 151 // Start off full. 152 budget: size, 153 // Last updated is now. 154 last_update: Instant::now(), 155 processed_capacity, 156 processed_refill_time, 157 }) 158 } 159 160 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded. 161 // TODO (Issue #259): handle cases where a single request is larger than the full capacity 162 // for such cases we need to support partial fulfilment of requests 163 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction { 164 // First things first: consume the one-time-burst budget. 165 if self.one_time_burst > 0 { 166 // We still have burst budget for *all* tokens requests. 167 if self.one_time_burst >= tokens { 168 self.one_time_burst -= tokens; 169 self.last_update = Instant::now(); 170 // No need to continue to the refill process, we still have burst budget to consume from. 171 return BucketReduction::Success; 172 } else { 173 // We still have burst budget for *some* of the tokens requests. 174 // The tokens left unfulfilled will be consumed from current `self.budget`. 175 tokens -= self.one_time_burst; 176 self.one_time_burst = 0; 177 } 178 } 179 180 // Compute time passed since last refill/update. 181 let time_delta = self.last_update.elapsed().as_nanos() as u64; 182 self.last_update = Instant::now(); 183 184 // At each 'time_delta' nanoseconds the bucket should refill with: 185 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000) 186 // `processed_capacity` and `processed_refill_time` are the result of simplifying above 187 // fraction formula with their greatest-common-factor. 188 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time; 189 190 if self.budget >= self.size { 191 self.budget = self.size; 192 } 193 194 if tokens > self.budget { 195 // This operation requests a bandwidth higher than the bucket size 196 if tokens > self.size { 197 error!( 198 "Consumed {} tokens from bucket of size {}", 199 tokens, self.size 200 ); 201 // Empty the bucket and report an overconsumption of 202 // (remaining tokens / size) times larger than the bucket size 203 tokens -= self.budget; 204 self.budget = 0; 205 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64); 206 } 207 // If not enough tokens consume() fails, return false. 208 return BucketReduction::Failure; 209 } 210 211 self.budget -= tokens; 212 BucketReduction::Success 213 } 214 215 /// "Manually" adds tokens to bucket. 216 pub fn replenish(&mut self, tokens: u64) { 217 // This means we are still during the burst interval. 218 // Of course there is a very small chance that the last reduce() also used up burst 219 // budget which should now be replenished, but for performance and code-complexity 220 // reasons we're just gonna let that slide since it's practically inconsequential. 221 if self.one_time_burst > 0 { 222 self.one_time_burst += tokens; 223 return; 224 } 225 self.budget = std::cmp::min(self.budget + tokens, self.size); 226 } 227 228 /// Returns the capacity of the token bucket. 229 pub fn capacity(&self) -> u64 { 230 self.size 231 } 232 233 /// Returns the remaining one time burst budget. 234 pub fn one_time_burst(&self) -> u64 { 235 self.one_time_burst 236 } 237 238 /// Returns the time in milliseconds required to to completely fill the bucket. 239 pub fn refill_time_ms(&self) -> u64 { 240 self.refill_time 241 } 242 243 /// Returns the current budget (one time burst allowance notwithstanding). 244 pub fn budget(&self) -> u64 { 245 self.budget 246 } 247 } 248 249 /// Enum that describes the type of token used. 250 pub enum TokenType { 251 /// Token type used for bandwidth limiting. 252 Bytes, 253 /// Token type used for operations/second limiting. 254 Ops, 255 } 256 257 /// Enum that describes the type of token bucket update. 258 pub enum BucketUpdate { 259 /// No Update - same as before. 260 None, 261 /// Rate Limiting is disabled on this bucket. 262 Disabled, 263 /// Rate Limiting enabled with updated bucket. 264 Update(TokenBucket), 265 } 266 267 /// Rate Limiter that works on both bandwidth and ops/s limiting. 268 /// 269 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually. 270 /// 271 /// Implementation uses a single timer through TimerFd to refresh either or 272 /// both token buckets. 273 /// 274 /// Its internal buckets are 'passively' replenished as they're being used (as 275 /// part of `consume()` operations). 276 /// A timer is enabled and used to 'actively' replenish the token buckets when 277 /// limiting is in effect and `consume()` operations are disabled. 278 /// 279 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait 280 /// implementation. These events are meant to be consumed by the user of this struct. 281 /// On each such event, the user must call the `event_handler()` method. 282 pub struct RateLimiter { 283 inner: Mutex<RateLimiterInner>, 284 285 // Internal flag that quickly determines timer state. 286 timer_active: AtomicBool, 287 } 288 289 struct RateLimiterInner { 290 bandwidth: Option<TokenBucket>, 291 ops: Option<TokenBucket>, 292 293 timer_fd: TimerFd, 294 } 295 296 impl RateLimiterInner { 297 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once). 298 fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) { 299 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state()) 300 self.timer_fd 301 .reset(dur, None) 302 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure)."); 303 flag.store(true, Ordering::Relaxed) 304 } 305 } 306 307 impl RateLimiter { 308 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s. 309 /// 310 /// # Arguments 311 /// 312 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket. 313 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`, 314 /// that does not replenish and which can be used for an initial burst of data. 315 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes` 316 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes. 317 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket. 318 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`, 319 /// that does not replenish and which can be used for an initial burst of data. 320 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token 321 /// bucket to go from zero Ops to `ops_total_capacity` Ops. 322 /// 323 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter 324 /// is **disabled** for that respective token type. 325 /// 326 /// # Errors 327 /// 328 /// If the timerfd creation fails, an error is returned. 329 pub fn new( 330 bytes_total_capacity: u64, 331 bytes_one_time_burst: u64, 332 bytes_complete_refill_time_ms: u64, 333 ops_total_capacity: u64, 334 ops_one_time_burst: u64, 335 ops_complete_refill_time_ms: u64, 336 ) -> io::Result<Self> { 337 let bytes_token_bucket = TokenBucket::new( 338 bytes_total_capacity, 339 bytes_one_time_burst, 340 bytes_complete_refill_time_ms, 341 ); 342 343 let ops_token_bucket = TokenBucket::new( 344 ops_total_capacity, 345 ops_one_time_burst, 346 ops_complete_refill_time_ms, 347 ); 348 349 // We'll need a timer_fd, even if our current config effectively disables rate limiting, 350 // because `Self::update_buckets()` might re-enable it later, and we might be 351 // seccomp-blocked from creating the timer_fd at that time. 352 let timer_fd = TimerFd::new()?; 353 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag 354 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`. 355 // SAFETY: FFI calls. 356 let ret = unsafe { 357 let fd = timer_fd.as_raw_fd(); 358 let mut flags = libc::fcntl(fd, libc::F_GETFL); 359 flags |= libc::O_NONBLOCK; 360 libc::fcntl(fd, libc::F_SETFL, flags) 361 }; 362 if ret < 0 { 363 return Err(std::io::Error::last_os_error()); 364 } 365 366 Ok(RateLimiter { 367 inner: Mutex::new(RateLimiterInner { 368 bandwidth: bytes_token_bucket, 369 ops: ops_token_bucket, 370 timer_fd, 371 }), 372 timer_active: AtomicBool::new(false), 373 }) 374 } 375 376 /// Attempts to consume tokens and returns whether that is possible. 377 /// 378 /// If rate limiting is disabled on provided `token_type`, this function will always succeed. 379 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool { 380 // If the timer is active, we can't consume tokens from any bucket and the function fails. 381 if self.is_blocked() { 382 return false; 383 } 384 let mut guard = self.inner.lock().unwrap(); 385 // Identify the required token bucket. 386 let token_bucket = match token_type { 387 TokenType::Bytes => guard.bandwidth.as_mut(), 388 TokenType::Ops => guard.ops.as_mut(), 389 }; 390 // Try to consume from the token bucket. 391 if let Some(bucket) = token_bucket { 392 let refill_time = bucket.refill_time_ms(); 393 match bucket.reduce(tokens) { 394 // When we report budget is over, there will be no further calls here, 395 // register a timer to replenish the bucket and resume processing; 396 // make sure there is only one running timer for this limiter. 397 BucketReduction::Failure => { 398 if !self.is_blocked() { 399 guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active); 400 } 401 false 402 } 403 // The operation succeeded and further calls can be made. 404 BucketReduction::Success => true, 405 // The operation succeeded as the tokens have been consumed 406 // but the timer still needs to be armed. 407 BucketReduction::OverConsumption(ratio) => { 408 // The operation "borrowed" a number of tokens `ratio` times 409 // greater than the size of the bucket, and since it takes 410 // `refill_time` milliseconds to fill an empty bucket, in 411 // order to enforce the bandwidth limit we need to prevent 412 // further calls to the rate limiter for 413 // `ratio * refill_time` milliseconds. 414 guard.activate_timer( 415 Duration::from_millis((ratio * refill_time as f64) as u64), 416 &self.timer_active, 417 ); 418 true 419 } 420 } 421 } else { 422 // If bucket is not present rate limiting is disabled on token type, 423 // consume() will always succeed. 424 true 425 } 426 } 427 428 /// Adds tokens of `token_type` to their respective bucket. 429 /// 430 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a 431 /// `consume()` if needed. 432 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) { 433 let mut guard = self.inner.lock().unwrap(); 434 // Identify the required token bucket. 435 let token_bucket = match token_type { 436 TokenType::Bytes => guard.bandwidth.as_mut(), 437 TokenType::Ops => guard.ops.as_mut(), 438 }; 439 // Add tokens to the token bucket. 440 if let Some(bucket) = token_bucket { 441 bucket.replenish(tokens); 442 } 443 } 444 445 /// Returns whether this rate limiter is blocked. 446 /// 447 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough 448 /// budget for it. 449 /// An event will be generated on the exported FD when the limiter 'unblocks'. 450 pub fn is_blocked(&self) -> bool { 451 self.timer_active.load(Ordering::Relaxed) 452 } 453 454 /// This function needs to be called every time there is an event on the 455 /// FD provided by this object's `AsRawFd` trait implementation. 456 /// 457 /// # Errors 458 /// 459 /// If the rate limiter is disabled or is not blocked, an error is returned. 460 pub fn event_handler(&self) -> Result<(), Error> { 461 let mut guard = self.inner.lock().unwrap(); 462 loop { 463 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following 464 // `timer_fd::wait()` won't block (which is different from its default behavior.) 465 match guard.timer_fd.wait() { 466 Err(e) => { 467 let err: std::io::Error = e.into(); 468 match err.kind() { 469 std::io::ErrorKind::Interrupted => (), 470 std::io::ErrorKind::WouldBlock => { 471 return Err(Error::SpuriousRateLimiterEvent( 472 "Rate limiter event handler called without a present timer", 473 )) 474 } 475 _ => return Err(Error::TimerFdWaitError(err)), 476 } 477 } 478 _ => { 479 self.timer_active.store(false, Ordering::Relaxed); 480 return Ok(()); 481 } 482 } 483 } 484 } 485 486 /// Updates the parameters of the token buckets associated with this RateLimiter. 487 // TODO: Please note that, right now, the buckets become full after being updated. 488 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) { 489 let mut guard = self.inner.lock().unwrap(); 490 match bytes { 491 BucketUpdate::Disabled => guard.bandwidth = None, 492 BucketUpdate::Update(tb) => guard.bandwidth = Some(tb), 493 BucketUpdate::None => (), 494 }; 495 match ops { 496 BucketUpdate::Disabled => guard.ops = None, 497 BucketUpdate::Update(tb) => guard.ops = Some(tb), 498 BucketUpdate::None => (), 499 }; 500 } 501 } 502 503 impl AsRawFd for RateLimiter { 504 /// Provides a FD which needs to be monitored for POLLIN events. 505 /// 506 /// This object's `event_handler()` method must be called on such events. 507 /// 508 /// Will return a negative value if rate limiting is disabled on both 509 /// token types. 510 fn as_raw_fd(&self) -> RawFd { 511 let guard = self.inner.lock().unwrap(); 512 guard.timer_fd.as_raw_fd() 513 } 514 } 515 516 impl Default for RateLimiter { 517 /// Default RateLimiter is a no-op limiter with infinite budget. 518 fn default() -> Self { 519 // Safe to unwrap since this will not attempt to create timer_fd. 520 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter") 521 } 522 } 523 524 #[cfg(test)] 525 pub(crate) mod tests { 526 use std::{fmt, thread}; 527 528 use super::*; 529 530 impl TokenBucket { 531 // Resets the token bucket: budget set to max capacity and last-updated set to now. 532 fn reset(&mut self) { 533 self.budget = self.size; 534 self.last_update = Instant::now(); 535 } 536 537 fn get_last_update(&self) -> &Instant { 538 &self.last_update 539 } 540 541 fn get_processed_capacity(&self) -> u64 { 542 self.processed_capacity 543 } 544 545 fn get_processed_refill_time(&self) -> u64 { 546 self.processed_refill_time 547 } 548 549 // After a restore, we cannot be certain that the last_update field has the same value. 550 #[allow(dead_code)] 551 fn partial_eq(&self, other: &TokenBucket) -> bool { 552 (other.capacity() == self.capacity()) 553 && (other.one_time_burst() == self.one_time_burst()) 554 && (other.refill_time_ms() == self.refill_time_ms()) 555 && (other.budget() == self.budget()) 556 } 557 } 558 559 impl RateLimiter { 560 fn bandwidth(&self) -> Option<TokenBucket> { 561 let guard = self.inner.lock().unwrap(); 562 guard.bandwidth.clone() 563 } 564 565 fn ops(&self) -> Option<TokenBucket> { 566 let guard = self.inner.lock().unwrap(); 567 guard.ops.clone() 568 } 569 } 570 571 impl PartialEq for RateLimiter { 572 fn eq(&self, other: &RateLimiter) -> bool { 573 let self_guard = self.inner.lock().unwrap(); 574 let other_guard = other.inner.lock().unwrap(); 575 self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops 576 } 577 } 578 579 impl fmt::Debug for RateLimiter { 580 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 581 let guard = self.inner.lock().unwrap(); 582 write!( 583 f, 584 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 585 guard.bandwidth, guard.ops 586 ) 587 } 588 } 589 590 #[test] 591 fn test_token_bucket_create() { 592 let before = Instant::now(); 593 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 594 assert_eq!(tb.capacity(), 1000); 595 assert_eq!(tb.budget(), 1000); 596 assert!(*tb.get_last_update() >= before); 597 let after = Instant::now(); 598 assert!(*tb.get_last_update() <= after); 599 assert_eq!(tb.get_processed_capacity(), 1); 600 assert_eq!(tb.get_processed_refill_time(), 1_000_000); 601 602 // Verify invalid bucket configurations result in `None`. 603 assert!(TokenBucket::new(0, 1234, 1000).is_none()); 604 assert!(TokenBucket::new(100, 1234, 0).is_none()); 605 assert!(TokenBucket::new(0, 1234, 0).is_none()); 606 } 607 608 #[test] 609 fn test_token_bucket_preprocess() { 610 let tb = TokenBucket::new(1000, 0, 1000).unwrap(); 611 assert_eq!(tb.get_processed_capacity(), 1); 612 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC); 613 614 let thousand = 1000; 615 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap(); 616 assert_eq!(tb.get_processed_capacity(), 3 * 19); 617 assert_eq!( 618 tb.get_processed_refill_time(), 619 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand) 620 ); 621 } 622 623 #[test] 624 fn test_token_bucket_reduce() { 625 // token bucket with capacity 1000 and refill time of 1000 milliseconds 626 // allowing rate of 1 token/ms. 627 let capacity = 1000; 628 let refill_ms = 1000; 629 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap(); 630 631 assert_eq!(tb.reduce(123), BucketReduction::Success); 632 assert_eq!(tb.budget(), capacity - 123); 633 634 thread::sleep(Duration::from_millis(123)); 635 assert_eq!(tb.reduce(1), BucketReduction::Success); 636 assert_eq!(tb.budget(), capacity - 1); 637 assert_eq!(tb.reduce(100), BucketReduction::Success); 638 assert_eq!(tb.reduce(capacity), BucketReduction::Failure); 639 640 // token bucket with capacity 1000 and refill time of 1000 milliseconds 641 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap(); 642 // safely assuming the thread can run these 3 commands in less than 500ms 643 assert_eq!(tb.reduce(1000), BucketReduction::Success); 644 assert_eq!(tb.one_time_burst(), 100); 645 assert_eq!(tb.reduce(500), BucketReduction::Success); 646 assert_eq!(tb.one_time_burst(), 0); 647 assert_eq!(tb.reduce(500), BucketReduction::Success); 648 assert_eq!(tb.reduce(500), BucketReduction::Failure); 649 thread::sleep(Duration::from_millis(500)); 650 assert_eq!(tb.reduce(500), BucketReduction::Success); 651 thread::sleep(Duration::from_millis(1000)); 652 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5)); 653 654 let before = Instant::now(); 655 tb.reset(); 656 assert_eq!(tb.capacity(), 1000); 657 assert_eq!(tb.budget(), 1000); 658 assert!(*tb.get_last_update() >= before); 659 let after = Instant::now(); 660 assert!(*tb.get_last_update() <= after); 661 } 662 663 #[test] 664 fn test_rate_limiter_default() { 665 let l = RateLimiter::default(); 666 667 // limiter should not be blocked 668 assert!(!l.is_blocked()); 669 // limiter should be disabled so consume(whatever) should work 670 assert!(l.consume(u64::MAX, TokenType::Ops)); 671 assert!(l.consume(u64::MAX, TokenType::Bytes)); 672 // calling the handler without there having been an event should error 673 l.event_handler().unwrap_err(); 674 assert_eq!( 675 format!("{:?}", l.event_handler().err().unwrap()), 676 "SpuriousRateLimiterEvent(\ 677 \"Rate limiter event handler called without a present timer\")" 678 ); 679 } 680 681 #[test] 682 fn test_rate_limiter_new() { 683 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap(); 684 let bw = l.bandwidth().unwrap(); 685 assert_eq!(bw.capacity(), 1000); 686 assert_eq!(bw.one_time_burst(), 1001); 687 assert_eq!(bw.refill_time_ms(), 1002); 688 assert_eq!(bw.budget(), 1000); 689 690 let ops = l.ops().unwrap(); 691 assert_eq!(ops.capacity(), 1003); 692 assert_eq!(ops.one_time_burst(), 1004); 693 assert_eq!(ops.refill_time_ms(), 1005); 694 assert_eq!(ops.budget(), 1003); 695 } 696 697 #[test] 698 fn test_rate_limiter_manual_replenish() { 699 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 700 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 701 702 // consume 123 bytes 703 assert!(l.consume(123, TokenType::Bytes)); 704 l.manual_replenish(23, TokenType::Bytes); 705 { 706 let bytes_tb = l.bandwidth().unwrap(); 707 assert_eq!(bytes_tb.budget(), 900); 708 } 709 // consume 123 ops 710 assert!(l.consume(123, TokenType::Ops)); 711 l.manual_replenish(23, TokenType::Ops); 712 { 713 let bytes_tb = l.ops().unwrap(); 714 assert_eq!(bytes_tb.budget(), 900); 715 } 716 } 717 718 #[test] 719 fn test_rate_limiter_bandwidth() { 720 // rate limiter with limit of 1000 bytes/s 721 let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap(); 722 723 // limiter should not be blocked 724 assert!(!l.is_blocked()); 725 // raw FD for this disabled should be valid 726 assert!(l.as_raw_fd() > 0); 727 728 // ops/s limiter should be disabled so consume(whatever) should work 729 assert!(l.consume(u64::MAX, TokenType::Ops)); 730 731 // do full 1000 bytes 732 assert!(l.consume(1000, TokenType::Bytes)); 733 // try and fail on another 100 734 assert!(!l.consume(100, TokenType::Bytes)); 735 // since consume failed, limiter should be blocked now 736 assert!(l.is_blocked()); 737 // wait half the timer period 738 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 739 // limiter should still be blocked 740 assert!(l.is_blocked()); 741 // wait the other half of the timer period 742 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 743 // the timer_fd should have an event on it by now 744 l.event_handler().unwrap(); 745 // limiter should now be unblocked 746 assert!(!l.is_blocked()); 747 // try and succeed on another 100 bytes this time 748 assert!(l.consume(100, TokenType::Bytes)); 749 } 750 751 #[test] 752 fn test_rate_limiter_ops() { 753 // rate limiter with limit of 1000 ops/s 754 let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap(); 755 756 // limiter should not be blocked 757 assert!(!l.is_blocked()); 758 // raw FD for this disabled should be valid 759 assert!(l.as_raw_fd() > 0); 760 761 // bytes/s limiter should be disabled so consume(whatever) should work 762 assert!(l.consume(u64::MAX, TokenType::Bytes)); 763 764 // do full 1000 ops 765 assert!(l.consume(1000, TokenType::Ops)); 766 // try and fail on another 100 767 assert!(!l.consume(100, TokenType::Ops)); 768 // since consume failed, limiter should be blocked now 769 assert!(l.is_blocked()); 770 // wait half the timer period 771 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 772 // limiter should still be blocked 773 assert!(l.is_blocked()); 774 // wait the other half of the timer period 775 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 776 // the timer_fd should have an event on it by now 777 l.event_handler().unwrap(); 778 // limiter should now be unblocked 779 assert!(!l.is_blocked()); 780 // try and succeed on another 100 ops this time 781 assert!(l.consume(100, TokenType::Ops)); 782 } 783 784 #[test] 785 fn test_rate_limiter_full() { 786 // rate limiter with limit of 1000 bytes/s and 1000 ops/s 787 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 788 789 // limiter should not be blocked 790 assert!(!l.is_blocked()); 791 // raw FD for this disabled should be valid 792 assert!(l.as_raw_fd() > 0); 793 794 // do full 1000 bytes 795 assert!(l.consume(1000, TokenType::Ops)); 796 // do full 1000 bytes 797 assert!(l.consume(1000, TokenType::Bytes)); 798 // try and fail on another 100 ops 799 assert!(!l.consume(100, TokenType::Ops)); 800 // try and fail on another 100 bytes 801 assert!(!l.consume(100, TokenType::Bytes)); 802 // since consume failed, limiter should be blocked now 803 assert!(l.is_blocked()); 804 // wait half the timer period 805 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 806 // limiter should still be blocked 807 assert!(l.is_blocked()); 808 // wait the other half of the timer period 809 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2)); 810 // the timer_fd should have an event on it by now 811 l.event_handler().unwrap(); 812 // limiter should now be unblocked 813 assert!(!l.is_blocked()); 814 // try and succeed on another 100 ops this time 815 assert!(l.consume(100, TokenType::Ops)); 816 // try and succeed on another 100 bytes this time 817 assert!(l.consume(100, TokenType::Bytes)); 818 } 819 820 #[test] 821 fn test_rate_limiter_overconsumption() { 822 // initialize the rate limiter 823 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 824 // try to consume 2.5x the bucket size 825 // we are "borrowing" 1.5x the bucket size in tokens since 826 // the bucket is full 827 assert!(l.consume(2500, TokenType::Bytes)); 828 829 // check that even after a whole second passes, the rate limiter 830 // is still blocked 831 thread::sleep(Duration::from_millis(1000)); 832 l.event_handler().unwrap_err(); 833 assert!(l.is_blocked()); 834 835 // after 1.5x the replenish time has passed, the rate limiter 836 // is available again 837 thread::sleep(Duration::from_millis(500)); 838 l.event_handler().unwrap(); 839 assert!(!l.is_blocked()); 840 841 // reset the rate limiter 842 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap(); 843 // try to consume 1.5x the bucket size 844 // we are "borrowing" 1.5x the bucket size in tokens since 845 // the bucket is full, should arm the timer to 0.5x replenish 846 // time, which is 500 ms 847 assert!(l.consume(1500, TokenType::Bytes)); 848 849 // check that after more than the minimum refill time, 850 // the rate limiter is still blocked 851 thread::sleep(Duration::from_millis(200)); 852 l.event_handler().unwrap_err(); 853 assert!(l.is_blocked()); 854 855 // try to consume some tokens, which should fail as the timer 856 // is still active 857 assert!(!l.consume(100, TokenType::Bytes)); 858 l.event_handler().unwrap_err(); 859 assert!(l.is_blocked()); 860 861 // check that after the minimum refill time, the timer was not 862 // overwritten and the rate limiter is still blocked from the 863 // borrowing we performed earlier 864 thread::sleep(Duration::from_millis(100)); 865 l.event_handler().unwrap_err(); 866 assert!(l.is_blocked()); 867 assert!(!l.consume(100, TokenType::Bytes)); 868 869 // after waiting out the full duration, rate limiter should be 870 // available again 871 thread::sleep(Duration::from_millis(200)); 872 l.event_handler().unwrap(); 873 assert!(!l.is_blocked()); 874 assert!(l.consume(100, TokenType::Bytes)); 875 } 876 877 #[test] 878 fn test_update_buckets() { 879 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap(); 880 881 let initial_bw = x.bandwidth(); 882 let initial_ops = x.ops(); 883 884 x.update_buckets(BucketUpdate::None, BucketUpdate::None); 885 assert_eq!(x.bandwidth(), initial_bw); 886 assert_eq!(x.ops(), initial_ops); 887 888 let new_bw = TokenBucket::new(123, 0, 57).unwrap(); 889 let new_ops = TokenBucket::new(321, 12346, 89).unwrap(); 890 x.update_buckets( 891 BucketUpdate::Update(new_bw.clone()), 892 BucketUpdate::Update(new_ops.clone()), 893 ); 894 895 { 896 let mut guard = x.inner.lock().unwrap(); 897 // We have manually adjust the last_update field, because it changes when update_buckets() 898 // constructs new buckets (and thus gets a different value for last_update). We do this so 899 // it makes sense to test the following assertions. 900 guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update; 901 guard.ops.as_mut().unwrap().last_update = new_ops.last_update; 902 } 903 904 assert_eq!(x.bandwidth(), Some(new_bw)); 905 assert_eq!(x.ops(), Some(new_ops)); 906 907 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled); 908 assert_eq!(x.bandwidth(), None); 909 assert_eq!(x.ops(), None); 910 } 911 912 #[test] 913 fn test_rate_limiter_debug() { 914 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap(); 915 assert_eq!( 916 format!("{l:?}"), 917 format!( 918 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}", 919 l.bandwidth(), 920 l.ops() 921 ), 922 ); 923 } 924 } 925