1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54
55 use thiserror::Error;
56 use vmm_sys_util::timerfd::TimerFd;
57
58 /// Module for group rate limiting.
59 pub mod group;
60
61 #[derive(Error, Debug)]
62 /// Describes the errors that may occur while handling rate limiter events.
63 pub enum Error {
64 /// The event handler was called spuriously.
65 #[error("Event handler was called spuriously: {0}")]
66 SpuriousRateLimiterEvent(&'static str),
67 /// The event handler encounters while TimerFd::wait()
68 #[error("Failed to wait for the timer")]
69 TimerFdWaitError(#[source] std::io::Error),
70 }
71
72 // Interval at which the refill timer will run when limiter is at capacity.
73 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
74 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
75
76 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
77
78 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
gcd(x: u64, y: u64) -> u6479 fn gcd(x: u64, y: u64) -> u64 {
80 let mut x = x;
81 let mut y = y;
82 while y != 0 {
83 let t = y;
84 y = x % y;
85 x = t;
86 }
87 x
88 }
89
90 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
91 #[derive(Clone, Debug, PartialEq)]
92 pub enum BucketReduction {
93 /// There are not enough tokens to complete the operation.
94 Failure,
95 /// A part of the available tokens have been consumed.
96 Success,
97 /// A number of tokens `inner` times larger than the bucket size have been consumed.
98 OverConsumption(f64),
99 }
100
101 /// TokenBucket provides a lower level interface to rate limiting with a
102 /// configurable capacity, refill-rate and initial burst.
103 #[derive(Clone, Debug, PartialEq, Eq)]
104 pub struct TokenBucket {
105 // Bucket defining traits.
106 size: u64,
107 // Initial burst size (number of free initial tokens, that can be consumed at no cost)
108 one_time_burst: u64,
109 // Complete refill time in milliseconds.
110 refill_time: u64,
111
112 // Internal state descriptors.
113 budget: u64,
114 last_update: Instant,
115
116 // Fields used for pre-processing optimizations.
117 processed_capacity: u64,
118 processed_refill_time: u64,
119 }
120
121 impl TokenBucket {
122 /// Creates a `TokenBucket` wrapped in an `Option`.
123 ///
124 /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
125 /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
126 /// extra credit on top of total capacity, that does not replenish and which can be used
127 /// for an initial burst of data.
128 ///
129 /// If the `size` or the `complete refill time` are zero, then `None` is returned.
new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self>130 pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
131 // If either token bucket capacity or refill time is 0, disable limiting.
132 if size == 0 || complete_refill_time_ms == 0 {
133 return None;
134 }
135 // Formula for computing current refill amount:
136 // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
137 // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
138
139 let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
140 // Get the greatest common factor between `size` and `complete_refill_time_ns`.
141 let common_factor = gcd(size, complete_refill_time_ns);
142 // The division will be exact since `common_factor` is a factor of `size`.
143 let processed_capacity: u64 = size / common_factor;
144 // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
145 let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
146
147 Some(TokenBucket {
148 size,
149 one_time_burst,
150 refill_time: complete_refill_time_ms,
151 // Start off full.
152 budget: size,
153 // Last updated is now.
154 last_update: Instant::now(),
155 processed_capacity,
156 processed_refill_time,
157 })
158 }
159
160 /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
161 // TODO (Issue #259): handle cases where a single request is larger than the full capacity
162 // for such cases we need to support partial fulfilment of requests
reduce(&mut self, mut tokens: u64) -> BucketReduction163 pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
164 // First things first: consume the one-time-burst budget.
165 if self.one_time_burst > 0 {
166 // We still have burst budget for *all* tokens requests.
167 if self.one_time_burst >= tokens {
168 self.one_time_burst -= tokens;
169 self.last_update = Instant::now();
170 // No need to continue to the refill process, we still have burst budget to consume from.
171 return BucketReduction::Success;
172 } else {
173 // We still have burst budget for *some* of the tokens requests.
174 // The tokens left unfulfilled will be consumed from current `self.budget`.
175 tokens -= self.one_time_burst;
176 self.one_time_burst = 0;
177 }
178 }
179
180 // Compute time passed since last refill/update.
181 let time_delta = self.last_update.elapsed().as_nanos() as u64;
182 self.last_update = Instant::now();
183
184 // At each 'time_delta' nanoseconds the bucket should refill with:
185 // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
186 // `processed_capacity` and `processed_refill_time` are the result of simplifying above
187 // fraction formula with their greatest-common-factor.
188 self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
189
190 if self.budget >= self.size {
191 self.budget = self.size;
192 }
193
194 if tokens > self.budget {
195 // This operation requests a bandwidth higher than the bucket size
196 if tokens > self.size {
197 error!(
198 "Consumed {} tokens from bucket of size {}",
199 tokens, self.size
200 );
201 // Empty the bucket and report an overconsumption of
202 // (remaining tokens / size) times larger than the bucket size
203 tokens -= self.budget;
204 self.budget = 0;
205 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
206 }
207 // If not enough tokens consume() fails, return false.
208 return BucketReduction::Failure;
209 }
210
211 self.budget -= tokens;
212 BucketReduction::Success
213 }
214
215 /// "Manually" adds tokens to bucket.
replenish(&mut self, tokens: u64)216 pub fn replenish(&mut self, tokens: u64) {
217 // This means we are still during the burst interval.
218 // Of course there is a very small chance that the last reduce() also used up burst
219 // budget which should now be replenished, but for performance and code-complexity
220 // reasons we're just gonna let that slide since it's practically inconsequential.
221 if self.one_time_burst > 0 {
222 self.one_time_burst += tokens;
223 return;
224 }
225 self.budget = std::cmp::min(self.budget + tokens, self.size);
226 }
227
228 /// Returns the capacity of the token bucket.
capacity(&self) -> u64229 pub fn capacity(&self) -> u64 {
230 self.size
231 }
232
233 /// Returns the remaining one time burst budget.
one_time_burst(&self) -> u64234 pub fn one_time_burst(&self) -> u64 {
235 self.one_time_burst
236 }
237
238 /// Returns the time in milliseconds required to to completely fill the bucket.
refill_time_ms(&self) -> u64239 pub fn refill_time_ms(&self) -> u64 {
240 self.refill_time
241 }
242
243 /// Returns the current budget (one time burst allowance notwithstanding).
budget(&self) -> u64244 pub fn budget(&self) -> u64 {
245 self.budget
246 }
247 }
248
249 /// Enum that describes the type of token used.
250 pub enum TokenType {
251 /// Token type used for bandwidth limiting.
252 Bytes,
253 /// Token type used for operations/second limiting.
254 Ops,
255 }
256
257 /// Enum that describes the type of token bucket update.
258 pub enum BucketUpdate {
259 /// No Update - same as before.
260 None,
261 /// Rate Limiting is disabled on this bucket.
262 Disabled,
263 /// Rate Limiting enabled with updated bucket.
264 Update(TokenBucket),
265 }
266
267 /// Rate Limiter that works on both bandwidth and ops/s limiting.
268 ///
269 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
270 ///
271 /// Implementation uses a single timer through TimerFd to refresh either or
272 /// both token buckets.
273 ///
274 /// Its internal buckets are 'passively' replenished as they're being used (as
275 /// part of `consume()` operations).
276 /// A timer is enabled and used to 'actively' replenish the token buckets when
277 /// limiting is in effect and `consume()` operations are disabled.
278 ///
279 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
280 /// implementation. These events are meant to be consumed by the user of this struct.
281 /// On each such event, the user must call the `event_handler()` method.
282 pub struct RateLimiter {
283 inner: Mutex<RateLimiterInner>,
284
285 // Internal flag that quickly determines timer state.
286 timer_active: AtomicBool,
287 }
288
289 struct RateLimiterInner {
290 bandwidth: Option<TokenBucket>,
291 ops: Option<TokenBucket>,
292
293 timer_fd: TimerFd,
294 }
295
296 impl RateLimiterInner {
297 // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
activate_timer(&mut self, dur: Duration, flag: &AtomicBool)298 fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
299 // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
300 self.timer_fd
301 .reset(dur, None)
302 .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
303 flag.store(true, Ordering::Relaxed)
304 }
305 }
306
307 impl RateLimiter {
308 /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
309 ///
310 /// # Arguments
311 ///
312 /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
313 /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
314 /// that does not replenish and which can be used for an initial burst of data.
315 /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
316 /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
317 /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
318 /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
319 /// that does not replenish and which can be used for an initial burst of data.
320 /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
321 /// bucket to go from zero Ops to `ops_total_capacity` Ops.
322 ///
323 /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
324 /// is **disabled** for that respective token type.
325 ///
326 /// # Errors
327 ///
328 /// If the timerfd creation fails, an error is returned.
new( bytes_total_capacity: u64, bytes_one_time_burst: u64, bytes_complete_refill_time_ms: u64, ops_total_capacity: u64, ops_one_time_burst: u64, ops_complete_refill_time_ms: u64, ) -> io::Result<Self>329 pub fn new(
330 bytes_total_capacity: u64,
331 bytes_one_time_burst: u64,
332 bytes_complete_refill_time_ms: u64,
333 ops_total_capacity: u64,
334 ops_one_time_burst: u64,
335 ops_complete_refill_time_ms: u64,
336 ) -> io::Result<Self> {
337 let bytes_token_bucket = TokenBucket::new(
338 bytes_total_capacity,
339 bytes_one_time_burst,
340 bytes_complete_refill_time_ms,
341 );
342
343 let ops_token_bucket = TokenBucket::new(
344 ops_total_capacity,
345 ops_one_time_burst,
346 ops_complete_refill_time_ms,
347 );
348
349 // We'll need a timer_fd, even if our current config effectively disables rate limiting,
350 // because `Self::update_buckets()` might re-enable it later, and we might be
351 // seccomp-blocked from creating the timer_fd at that time.
352 let timer_fd = TimerFd::new()?;
353 // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
354 // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
355 // SAFETY: FFI calls.
356 let ret = unsafe {
357 let fd = timer_fd.as_raw_fd();
358 let mut flags = libc::fcntl(fd, libc::F_GETFL);
359 flags |= libc::O_NONBLOCK;
360 libc::fcntl(fd, libc::F_SETFL, flags)
361 };
362 if ret < 0 {
363 return Err(std::io::Error::last_os_error());
364 }
365
366 Ok(RateLimiter {
367 inner: Mutex::new(RateLimiterInner {
368 bandwidth: bytes_token_bucket,
369 ops: ops_token_bucket,
370 timer_fd,
371 }),
372 timer_active: AtomicBool::new(false),
373 })
374 }
375
376 /// Attempts to consume tokens and returns whether that is possible.
377 ///
378 /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
consume(&self, tokens: u64, token_type: TokenType) -> bool379 pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
380 // If the timer is active, we can't consume tokens from any bucket and the function fails.
381 if self.is_blocked() {
382 return false;
383 }
384 let mut guard = self.inner.lock().unwrap();
385 // Identify the required token bucket.
386 let token_bucket = match token_type {
387 TokenType::Bytes => guard.bandwidth.as_mut(),
388 TokenType::Ops => guard.ops.as_mut(),
389 };
390 // Try to consume from the token bucket.
391 if let Some(bucket) = token_bucket {
392 let refill_time = bucket.refill_time_ms();
393 match bucket.reduce(tokens) {
394 // When we report budget is over, there will be no further calls here,
395 // register a timer to replenish the bucket and resume processing;
396 // make sure there is only one running timer for this limiter.
397 BucketReduction::Failure => {
398 if !self.is_blocked() {
399 guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
400 }
401 false
402 }
403 // The operation succeeded and further calls can be made.
404 BucketReduction::Success => true,
405 // The operation succeeded as the tokens have been consumed
406 // but the timer still needs to be armed.
407 BucketReduction::OverConsumption(ratio) => {
408 // The operation "borrowed" a number of tokens `ratio` times
409 // greater than the size of the bucket, and since it takes
410 // `refill_time` milliseconds to fill an empty bucket, in
411 // order to enforce the bandwidth limit we need to prevent
412 // further calls to the rate limiter for
413 // `ratio * refill_time` milliseconds.
414 guard.activate_timer(
415 Duration::from_millis((ratio * refill_time as f64) as u64),
416 &self.timer_active,
417 );
418 true
419 }
420 }
421 } else {
422 // If bucket is not present rate limiting is disabled on token type,
423 // consume() will always succeed.
424 true
425 }
426 }
427
428 /// Adds tokens of `token_type` to their respective bucket.
429 ///
430 /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
431 /// `consume()` if needed.
manual_replenish(&self, tokens: u64, token_type: TokenType)432 pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
433 let mut guard = self.inner.lock().unwrap();
434 // Identify the required token bucket.
435 let token_bucket = match token_type {
436 TokenType::Bytes => guard.bandwidth.as_mut(),
437 TokenType::Ops => guard.ops.as_mut(),
438 };
439 // Add tokens to the token bucket.
440 if let Some(bucket) = token_bucket {
441 bucket.replenish(tokens);
442 }
443 }
444
445 /// Returns whether this rate limiter is blocked.
446 ///
447 /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
448 /// budget for it.
449 /// An event will be generated on the exported FD when the limiter 'unblocks'.
is_blocked(&self) -> bool450 pub fn is_blocked(&self) -> bool {
451 self.timer_active.load(Ordering::Relaxed)
452 }
453
454 /// This function needs to be called every time there is an event on the
455 /// FD provided by this object's `AsRawFd` trait implementation.
456 ///
457 /// # Errors
458 ///
459 /// If the rate limiter is disabled or is not blocked, an error is returned.
event_handler(&self) -> Result<(), Error>460 pub fn event_handler(&self) -> Result<(), Error> {
461 let mut guard = self.inner.lock().unwrap();
462 loop {
463 // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
464 // `timer_fd::wait()` won't block (which is different from its default behavior.)
465 match guard.timer_fd.wait() {
466 Err(e) => {
467 let err: std::io::Error = e.into();
468 match err.kind() {
469 std::io::ErrorKind::Interrupted => (),
470 std::io::ErrorKind::WouldBlock => {
471 return Err(Error::SpuriousRateLimiterEvent(
472 "Rate limiter event handler called without a present timer",
473 ))
474 }
475 _ => return Err(Error::TimerFdWaitError(err)),
476 }
477 }
478 _ => {
479 self.timer_active.store(false, Ordering::Relaxed);
480 return Ok(());
481 }
482 }
483 }
484 }
485
486 /// Updates the parameters of the token buckets associated with this RateLimiter.
487 // TODO: Please note that, right now, the buckets become full after being updated.
update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate)488 pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
489 let mut guard = self.inner.lock().unwrap();
490 match bytes {
491 BucketUpdate::Disabled => guard.bandwidth = None,
492 BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
493 BucketUpdate::None => (),
494 };
495 match ops {
496 BucketUpdate::Disabled => guard.ops = None,
497 BucketUpdate::Update(tb) => guard.ops = Some(tb),
498 BucketUpdate::None => (),
499 };
500 }
501 }
502
503 impl AsRawFd for RateLimiter {
504 /// Provides a FD which needs to be monitored for POLLIN events.
505 ///
506 /// This object's `event_handler()` method must be called on such events.
507 ///
508 /// Will return a negative value if rate limiting is disabled on both
509 /// token types.
as_raw_fd(&self) -> RawFd510 fn as_raw_fd(&self) -> RawFd {
511 let guard = self.inner.lock().unwrap();
512 guard.timer_fd.as_raw_fd()
513 }
514 }
515
516 impl Default for RateLimiter {
517 /// Default RateLimiter is a no-op limiter with infinite budget.
default() -> Self518 fn default() -> Self {
519 // Safe to unwrap since this will not attempt to create timer_fd.
520 RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
521 }
522 }
523
524 #[cfg(test)]
525 pub(crate) mod tests {
526 use std::{fmt, thread};
527
528 use super::*;
529
530 impl TokenBucket {
531 // Resets the token bucket: budget set to max capacity and last-updated set to now.
reset(&mut self)532 fn reset(&mut self) {
533 self.budget = self.size;
534 self.last_update = Instant::now();
535 }
536
get_last_update(&self) -> &Instant537 fn get_last_update(&self) -> &Instant {
538 &self.last_update
539 }
540
get_processed_capacity(&self) -> u64541 fn get_processed_capacity(&self) -> u64 {
542 self.processed_capacity
543 }
544
get_processed_refill_time(&self) -> u64545 fn get_processed_refill_time(&self) -> u64 {
546 self.processed_refill_time
547 }
548
549 // After a restore, we cannot be certain that the last_update field has the same value.
550 #[allow(dead_code)]
partial_eq(&self, other: &TokenBucket) -> bool551 fn partial_eq(&self, other: &TokenBucket) -> bool {
552 (other.capacity() == self.capacity())
553 && (other.one_time_burst() == self.one_time_burst())
554 && (other.refill_time_ms() == self.refill_time_ms())
555 && (other.budget() == self.budget())
556 }
557 }
558
559 impl RateLimiter {
bandwidth(&self) -> Option<TokenBucket>560 fn bandwidth(&self) -> Option<TokenBucket> {
561 let guard = self.inner.lock().unwrap();
562 guard.bandwidth.clone()
563 }
564
ops(&self) -> Option<TokenBucket>565 fn ops(&self) -> Option<TokenBucket> {
566 let guard = self.inner.lock().unwrap();
567 guard.ops.clone()
568 }
569 }
570
571 impl PartialEq for RateLimiter {
eq(&self, other: &RateLimiter) -> bool572 fn eq(&self, other: &RateLimiter) -> bool {
573 let self_guard = self.inner.lock().unwrap();
574 let other_guard = other.inner.lock().unwrap();
575 self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
576 }
577 }
578
579 impl fmt::Debug for RateLimiter {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result580 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
581 let guard = self.inner.lock().unwrap();
582 write!(
583 f,
584 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
585 guard.bandwidth, guard.ops
586 )
587 }
588 }
589
590 #[test]
test_token_bucket_create()591 fn test_token_bucket_create() {
592 let before = Instant::now();
593 let tb = TokenBucket::new(1000, 0, 1000).unwrap();
594 assert_eq!(tb.capacity(), 1000);
595 assert_eq!(tb.budget(), 1000);
596 assert!(*tb.get_last_update() >= before);
597 let after = Instant::now();
598 assert!(*tb.get_last_update() <= after);
599 assert_eq!(tb.get_processed_capacity(), 1);
600 assert_eq!(tb.get_processed_refill_time(), 1_000_000);
601
602 // Verify invalid bucket configurations result in `None`.
603 assert!(TokenBucket::new(0, 1234, 1000).is_none());
604 assert!(TokenBucket::new(100, 1234, 0).is_none());
605 assert!(TokenBucket::new(0, 1234, 0).is_none());
606 }
607
608 #[test]
test_token_bucket_preprocess()609 fn test_token_bucket_preprocess() {
610 let tb = TokenBucket::new(1000, 0, 1000).unwrap();
611 assert_eq!(tb.get_processed_capacity(), 1);
612 assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
613
614 let thousand = 1000;
615 let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
616 assert_eq!(tb.get_processed_capacity(), 3 * 19);
617 assert_eq!(
618 tb.get_processed_refill_time(),
619 13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
620 );
621 }
622
623 #[test]
test_token_bucket_reduce()624 fn test_token_bucket_reduce() {
625 // token bucket with capacity 1000 and refill time of 1000 milliseconds
626 // allowing rate of 1 token/ms.
627 let capacity = 1000;
628 let refill_ms = 1000;
629 let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
630
631 assert_eq!(tb.reduce(123), BucketReduction::Success);
632 assert_eq!(tb.budget(), capacity - 123);
633
634 thread::sleep(Duration::from_millis(123));
635 assert_eq!(tb.reduce(1), BucketReduction::Success);
636 assert_eq!(tb.budget(), capacity - 1);
637 assert_eq!(tb.reduce(100), BucketReduction::Success);
638 assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
639
640 // token bucket with capacity 1000 and refill time of 1000 milliseconds
641 let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
642 // safely assuming the thread can run these 3 commands in less than 500ms
643 assert_eq!(tb.reduce(1000), BucketReduction::Success);
644 assert_eq!(tb.one_time_burst(), 100);
645 assert_eq!(tb.reduce(500), BucketReduction::Success);
646 assert_eq!(tb.one_time_burst(), 0);
647 assert_eq!(tb.reduce(500), BucketReduction::Success);
648 assert_eq!(tb.reduce(500), BucketReduction::Failure);
649 thread::sleep(Duration::from_millis(500));
650 assert_eq!(tb.reduce(500), BucketReduction::Success);
651 thread::sleep(Duration::from_millis(1000));
652 assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
653
654 let before = Instant::now();
655 tb.reset();
656 assert_eq!(tb.capacity(), 1000);
657 assert_eq!(tb.budget(), 1000);
658 assert!(*tb.get_last_update() >= before);
659 let after = Instant::now();
660 assert!(*tb.get_last_update() <= after);
661 }
662
663 #[test]
test_rate_limiter_default()664 fn test_rate_limiter_default() {
665 let l = RateLimiter::default();
666
667 // limiter should not be blocked
668 assert!(!l.is_blocked());
669 // limiter should be disabled so consume(whatever) should work
670 assert!(l.consume(u64::MAX, TokenType::Ops));
671 assert!(l.consume(u64::MAX, TokenType::Bytes));
672 // calling the handler without there having been an event should error
673 l.event_handler().unwrap_err();
674 assert_eq!(
675 format!("{:?}", l.event_handler().err().unwrap()),
676 "SpuriousRateLimiterEvent(\
677 \"Rate limiter event handler called without a present timer\")"
678 );
679 }
680
681 #[test]
test_rate_limiter_new()682 fn test_rate_limiter_new() {
683 let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
684 let bw = l.bandwidth().unwrap();
685 assert_eq!(bw.capacity(), 1000);
686 assert_eq!(bw.one_time_burst(), 1001);
687 assert_eq!(bw.refill_time_ms(), 1002);
688 assert_eq!(bw.budget(), 1000);
689
690 let ops = l.ops().unwrap();
691 assert_eq!(ops.capacity(), 1003);
692 assert_eq!(ops.one_time_burst(), 1004);
693 assert_eq!(ops.refill_time_ms(), 1005);
694 assert_eq!(ops.budget(), 1003);
695 }
696
697 #[test]
test_rate_limiter_manual_replenish()698 fn test_rate_limiter_manual_replenish() {
699 // rate limiter with limit of 1000 bytes/s and 1000 ops/s
700 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
701
702 // consume 123 bytes
703 assert!(l.consume(123, TokenType::Bytes));
704 l.manual_replenish(23, TokenType::Bytes);
705 {
706 let bytes_tb = l.bandwidth().unwrap();
707 assert_eq!(bytes_tb.budget(), 900);
708 }
709 // consume 123 ops
710 assert!(l.consume(123, TokenType::Ops));
711 l.manual_replenish(23, TokenType::Ops);
712 {
713 let bytes_tb = l.ops().unwrap();
714 assert_eq!(bytes_tb.budget(), 900);
715 }
716 }
717
718 #[test]
test_rate_limiter_bandwidth()719 fn test_rate_limiter_bandwidth() {
720 // rate limiter with limit of 1000 bytes/s
721 let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
722
723 // limiter should not be blocked
724 assert!(!l.is_blocked());
725 // raw FD for this disabled should be valid
726 assert!(l.as_raw_fd() > 0);
727
728 // ops/s limiter should be disabled so consume(whatever) should work
729 assert!(l.consume(u64::MAX, TokenType::Ops));
730
731 // do full 1000 bytes
732 assert!(l.consume(1000, TokenType::Bytes));
733 // try and fail on another 100
734 assert!(!l.consume(100, TokenType::Bytes));
735 // since consume failed, limiter should be blocked now
736 assert!(l.is_blocked());
737 // wait half the timer period
738 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
739 // limiter should still be blocked
740 assert!(l.is_blocked());
741 // wait the other half of the timer period
742 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
743 // the timer_fd should have an event on it by now
744 l.event_handler().unwrap();
745 // limiter should now be unblocked
746 assert!(!l.is_blocked());
747 // try and succeed on another 100 bytes this time
748 assert!(l.consume(100, TokenType::Bytes));
749 }
750
751 #[test]
test_rate_limiter_ops()752 fn test_rate_limiter_ops() {
753 // rate limiter with limit of 1000 ops/s
754 let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
755
756 // limiter should not be blocked
757 assert!(!l.is_blocked());
758 // raw FD for this disabled should be valid
759 assert!(l.as_raw_fd() > 0);
760
761 // bytes/s limiter should be disabled so consume(whatever) should work
762 assert!(l.consume(u64::MAX, TokenType::Bytes));
763
764 // do full 1000 ops
765 assert!(l.consume(1000, TokenType::Ops));
766 // try and fail on another 100
767 assert!(!l.consume(100, TokenType::Ops));
768 // since consume failed, limiter should be blocked now
769 assert!(l.is_blocked());
770 // wait half the timer period
771 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
772 // limiter should still be blocked
773 assert!(l.is_blocked());
774 // wait the other half of the timer period
775 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
776 // the timer_fd should have an event on it by now
777 l.event_handler().unwrap();
778 // limiter should now be unblocked
779 assert!(!l.is_blocked());
780 // try and succeed on another 100 ops this time
781 assert!(l.consume(100, TokenType::Ops));
782 }
783
784 #[test]
test_rate_limiter_full()785 fn test_rate_limiter_full() {
786 // rate limiter with limit of 1000 bytes/s and 1000 ops/s
787 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
788
789 // limiter should not be blocked
790 assert!(!l.is_blocked());
791 // raw FD for this disabled should be valid
792 assert!(l.as_raw_fd() > 0);
793
794 // do full 1000 bytes
795 assert!(l.consume(1000, TokenType::Ops));
796 // do full 1000 bytes
797 assert!(l.consume(1000, TokenType::Bytes));
798 // try and fail on another 100 ops
799 assert!(!l.consume(100, TokenType::Ops));
800 // try and fail on another 100 bytes
801 assert!(!l.consume(100, TokenType::Bytes));
802 // since consume failed, limiter should be blocked now
803 assert!(l.is_blocked());
804 // wait half the timer period
805 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
806 // limiter should still be blocked
807 assert!(l.is_blocked());
808 // wait the other half of the timer period
809 thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
810 // the timer_fd should have an event on it by now
811 l.event_handler().unwrap();
812 // limiter should now be unblocked
813 assert!(!l.is_blocked());
814 // try and succeed on another 100 ops this time
815 assert!(l.consume(100, TokenType::Ops));
816 // try and succeed on another 100 bytes this time
817 assert!(l.consume(100, TokenType::Bytes));
818 }
819
820 #[test]
test_rate_limiter_overconsumption()821 fn test_rate_limiter_overconsumption() {
822 // initialize the rate limiter
823 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
824 // try to consume 2.5x the bucket size
825 // we are "borrowing" 1.5x the bucket size in tokens since
826 // the bucket is full
827 assert!(l.consume(2500, TokenType::Bytes));
828
829 // check that even after a whole second passes, the rate limiter
830 // is still blocked
831 thread::sleep(Duration::from_millis(1000));
832 l.event_handler().unwrap_err();
833 assert!(l.is_blocked());
834
835 // after 1.5x the replenish time has passed, the rate limiter
836 // is available again
837 thread::sleep(Duration::from_millis(500));
838 l.event_handler().unwrap();
839 assert!(!l.is_blocked());
840
841 // reset the rate limiter
842 let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
843 // try to consume 1.5x the bucket size
844 // we are "borrowing" 1.5x the bucket size in tokens since
845 // the bucket is full, should arm the timer to 0.5x replenish
846 // time, which is 500 ms
847 assert!(l.consume(1500, TokenType::Bytes));
848
849 // check that after more than the minimum refill time,
850 // the rate limiter is still blocked
851 thread::sleep(Duration::from_millis(200));
852 l.event_handler().unwrap_err();
853 assert!(l.is_blocked());
854
855 // try to consume some tokens, which should fail as the timer
856 // is still active
857 assert!(!l.consume(100, TokenType::Bytes));
858 l.event_handler().unwrap_err();
859 assert!(l.is_blocked());
860
861 // check that after the minimum refill time, the timer was not
862 // overwritten and the rate limiter is still blocked from the
863 // borrowing we performed earlier
864 thread::sleep(Duration::from_millis(100));
865 l.event_handler().unwrap_err();
866 assert!(l.is_blocked());
867 assert!(!l.consume(100, TokenType::Bytes));
868
869 // after waiting out the full duration, rate limiter should be
870 // available again
871 thread::sleep(Duration::from_millis(200));
872 l.event_handler().unwrap();
873 assert!(!l.is_blocked());
874 assert!(l.consume(100, TokenType::Bytes));
875 }
876
877 #[test]
test_update_buckets()878 fn test_update_buckets() {
879 let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
880
881 let initial_bw = x.bandwidth();
882 let initial_ops = x.ops();
883
884 x.update_buckets(BucketUpdate::None, BucketUpdate::None);
885 assert_eq!(x.bandwidth(), initial_bw);
886 assert_eq!(x.ops(), initial_ops);
887
888 let new_bw = TokenBucket::new(123, 0, 57).unwrap();
889 let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
890 x.update_buckets(
891 BucketUpdate::Update(new_bw.clone()),
892 BucketUpdate::Update(new_ops.clone()),
893 );
894
895 {
896 let mut guard = x.inner.lock().unwrap();
897 // We have manually adjust the last_update field, because it changes when update_buckets()
898 // constructs new buckets (and thus gets a different value for last_update). We do this so
899 // it makes sense to test the following assertions.
900 guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
901 guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
902 }
903
904 assert_eq!(x.bandwidth(), Some(new_bw));
905 assert_eq!(x.ops(), Some(new_ops));
906
907 x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
908 assert_eq!(x.bandwidth(), None);
909 assert_eq!(x.ops(), None);
910 }
911
912 #[test]
test_rate_limiter_debug()913 fn test_rate_limiter_debug() {
914 let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
915 assert_eq!(
916 format!("{l:?}"),
917 format!(
918 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
919 l.bandwidth(),
920 l.ops()
921 ),
922 );
923 }
924 }
925