xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision 5e52729453cb62edbe4fb3a4aa24f8cca31e667e)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::os::unix::io::{AsRawFd, RawFd};
50 use std::time::{Duration, Instant};
51 use std::{fmt, io};
52 use vmm_sys_util::timerfd::TimerFd;
53 
54 #[derive(Debug)]
55 /// Describes the errors that may occur while handling rate limiter events.
56 pub enum Error {
57     /// The event handler was called spuriously.
58     SpuriousRateLimiterEvent(&'static str),
59     /// The event handler encounters while TimerFd::wait()
60     TimerFdWaitError(std::io::Error),
61 }
62 
63 // Interval at which the refill timer will run when limiter is at capacity.
64 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
65 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
66 
67 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
68 
69 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
70 fn gcd(x: u64, y: u64) -> u64 {
71     let mut x = x;
72     let mut y = y;
73     while y != 0 {
74         let t = y;
75         y = x % y;
76         x = t;
77     }
78     x
79 }
80 
81 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
82 #[derive(Clone, Debug, PartialEq)]
83 pub enum BucketReduction {
84     /// There are not enough tokens to complete the operation.
85     Failure,
86     /// A part of the available tokens have been consumed.
87     Success,
88     /// A number of tokens `inner` times larger than the bucket size have been consumed.
89     OverConsumption(f64),
90 }
91 
92 /// TokenBucket provides a lower level interface to rate limiting with a
93 /// configurable capacity, refill-rate and initial burst.
94 #[derive(Clone, Debug, PartialEq, Eq)]
95 pub struct TokenBucket {
96     // Bucket defining traits.
97     size: u64,
98     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
99     one_time_burst: u64,
100     // Complete refill time in milliseconds.
101     refill_time: u64,
102 
103     // Internal state descriptors.
104     budget: u64,
105     last_update: Instant,
106 
107     // Fields used for pre-processing optimizations.
108     processed_capacity: u64,
109     processed_refill_time: u64,
110 }
111 
112 impl TokenBucket {
113     /// Creates a `TokenBucket` wrapped in an `Option`.
114     ///
115     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
116     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
117     /// extra credit on top of total capacity, that does not replenish and which can be used
118     /// for an initial burst of data.
119     ///
120     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
121     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
122         // If either token bucket capacity or refill time is 0, disable limiting.
123         if size == 0 || complete_refill_time_ms == 0 {
124             return None;
125         }
126         // Formula for computing current refill amount:
127         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
128         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
129 
130         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
131         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
132         let common_factor = gcd(size, complete_refill_time_ns);
133         // The division will be exact since `common_factor` is a factor of `size`.
134         let processed_capacity: u64 = size / common_factor;
135         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
136         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
137 
138         Some(TokenBucket {
139             size,
140             one_time_burst,
141             refill_time: complete_refill_time_ms,
142             // Start off full.
143             budget: size,
144             // Last updated is now.
145             last_update: Instant::now(),
146             processed_capacity,
147             processed_refill_time,
148         })
149     }
150 
151     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
152     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
153     // for such cases we need to support partial fulfilment of requests
154     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
155         // First things first: consume the one-time-burst budget.
156         if self.one_time_burst > 0 {
157             // We still have burst budget for *all* tokens requests.
158             if self.one_time_burst >= tokens {
159                 self.one_time_burst -= tokens;
160                 self.last_update = Instant::now();
161                 // No need to continue to the refill process, we still have burst budget to consume from.
162                 return BucketReduction::Success;
163             } else {
164                 // We still have burst budget for *some* of the tokens requests.
165                 // The tokens left unfulfilled will be consumed from current `self.budget`.
166                 tokens -= self.one_time_burst;
167                 self.one_time_burst = 0;
168             }
169         }
170 
171         // Compute time passed since last refill/update.
172         let time_delta = self.last_update.elapsed().as_nanos() as u64;
173         self.last_update = Instant::now();
174 
175         // At each 'time_delta' nanoseconds the bucket should refill with:
176         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
177         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
178         // fraction formula with their greatest-common-factor.
179         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
180 
181         if self.budget >= self.size {
182             self.budget = self.size;
183         }
184 
185         if tokens > self.budget {
186             // This operation requests a bandwidth higher than the bucket size
187             if tokens > self.size {
188                 error!(
189                     "Consumed {} tokens from bucket of size {}",
190                     tokens, self.size
191                 );
192                 // Empty the bucket and report an overconsumption of
193                 // (remaining tokens / size) times larger than the bucket size
194                 tokens -= self.budget;
195                 self.budget = 0;
196                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
197             }
198             // If not enough tokens consume() fails, return false.
199             return BucketReduction::Failure;
200         }
201 
202         self.budget -= tokens;
203         BucketReduction::Success
204     }
205 
206     /// "Manually" adds tokens to bucket.
207     pub fn replenish(&mut self, tokens: u64) {
208         // This means we are still during the burst interval.
209         // Of course there is a very small chance  that the last reduce() also used up burst
210         // budget which should now be replenished, but for performance and code-complexity
211         // reasons we're just gonna let that slide since it's practically inconsequential.
212         if self.one_time_burst > 0 {
213             self.one_time_burst += tokens;
214             return;
215         }
216         self.budget = std::cmp::min(self.budget + tokens, self.size);
217     }
218 
219     /// Returns the capacity of the token bucket.
220     pub fn capacity(&self) -> u64 {
221         self.size
222     }
223 
224     /// Returns the remaining one time burst budget.
225     pub fn one_time_burst(&self) -> u64 {
226         self.one_time_burst
227     }
228 
229     /// Returns the time in milliseconds required to to completely fill the bucket.
230     pub fn refill_time_ms(&self) -> u64 {
231         self.refill_time
232     }
233 
234     /// Returns the current budget (one time burst allowance notwithstanding).
235     pub fn budget(&self) -> u64 {
236         self.budget
237     }
238 }
239 
240 /// Enum that describes the type of token used.
241 pub enum TokenType {
242     /// Token type used for bandwidth limiting.
243     Bytes,
244     /// Token type used for operations/second limiting.
245     Ops,
246 }
247 
248 /// Enum that describes the type of token bucket update.
249 pub enum BucketUpdate {
250     /// No Update - same as before.
251     None,
252     /// Rate Limiting is disabled on this bucket.
253     Disabled,
254     /// Rate Limiting enabled with updated bucket.
255     Update(TokenBucket),
256 }
257 
258 /// Rate Limiter that works on both bandwidth and ops/s limiting.
259 ///
260 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
261 ///
262 /// Implementation uses a single timer through TimerFd to refresh either or
263 /// both token buckets.
264 ///
265 /// Its internal buckets are 'passively' replenished as they're being used (as
266 /// part of `consume()` operations).
267 /// A timer is enabled and used to 'actively' replenish the token buckets when
268 /// limiting is in effect and `consume()` operations are disabled.
269 ///
270 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
271 /// implementation. These events are meant to be consumed by the user of this struct.
272 /// On each such event, the user must call the `event_handler()` method.
273 pub struct RateLimiter {
274     bandwidth: Option<TokenBucket>,
275     ops: Option<TokenBucket>,
276 
277     timer_fd: TimerFd,
278     // Internal flag that quickly determines timer state.
279     timer_active: bool,
280 }
281 
282 impl PartialEq for RateLimiter {
283     fn eq(&self, other: &RateLimiter) -> bool {
284         self.bandwidth == other.bandwidth && self.ops == other.ops
285     }
286 }
287 
288 impl fmt::Debug for RateLimiter {
289     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
290         write!(
291             f,
292             "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
293             self.bandwidth, self.ops
294         )
295     }
296 }
297 
298 impl RateLimiter {
299     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
300     ///
301     /// # Arguments
302     ///
303     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
304     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
305     /// that does not replenish and which can be used for an initial burst of data.
306     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
307     /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
308     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
309     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
310     /// that does not replenish and which can be used for an initial burst of data.
311     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
312     /// bucket to go from zero Ops to `ops_total_capacity` Ops.
313     ///
314     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
315     /// is **disabled** for that respective token type.
316     ///
317     /// # Errors
318     ///
319     /// If the timerfd creation fails, an error is returned.
320     pub fn new(
321         bytes_total_capacity: u64,
322         bytes_one_time_burst: u64,
323         bytes_complete_refill_time_ms: u64,
324         ops_total_capacity: u64,
325         ops_one_time_burst: u64,
326         ops_complete_refill_time_ms: u64,
327     ) -> io::Result<Self> {
328         let bytes_token_bucket = TokenBucket::new(
329             bytes_total_capacity,
330             bytes_one_time_burst,
331             bytes_complete_refill_time_ms,
332         );
333 
334         let ops_token_bucket = TokenBucket::new(
335             ops_total_capacity,
336             ops_one_time_burst,
337             ops_complete_refill_time_ms,
338         );
339 
340         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
341         // because `Self::update_buckets()` might re-enable it later, and we might be
342         // seccomp-blocked from creating the timer_fd at that time.
343         let timer_fd = TimerFd::new()?;
344         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
345         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
346         // SAFETY: FFI calls.
347         let ret = unsafe {
348             let fd = timer_fd.as_raw_fd();
349             let mut flags = libc::fcntl(fd, libc::F_GETFL);
350             flags |= libc::O_NONBLOCK;
351             libc::fcntl(fd, libc::F_SETFL, flags)
352         };
353         if ret < 0 {
354             return Err(std::io::Error::last_os_error());
355         }
356 
357         Ok(RateLimiter {
358             bandwidth: bytes_token_bucket,
359             ops: ops_token_bucket,
360             timer_fd,
361             timer_active: false,
362         })
363     }
364 
365     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
366     fn activate_timer(&mut self, dur: Duration) {
367         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
368         self.timer_fd
369             .reset(dur, None)
370             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
371         self.timer_active = true;
372     }
373 
374     /// Attempts to consume tokens and returns whether that is possible.
375     ///
376     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
377     pub fn consume(&mut self, tokens: u64, token_type: TokenType) -> bool {
378         // If the timer is active, we can't consume tokens from any bucket and the function fails.
379         if self.timer_active {
380             return false;
381         }
382 
383         // Identify the required token bucket.
384         let token_bucket = match token_type {
385             TokenType::Bytes => self.bandwidth.as_mut(),
386             TokenType::Ops => self.ops.as_mut(),
387         };
388         // Try to consume from the token bucket.
389         if let Some(bucket) = token_bucket {
390             let refill_time = bucket.refill_time_ms();
391             match bucket.reduce(tokens) {
392                 // When we report budget is over, there will be no further calls here,
393                 // register a timer to replenish the bucket and resume processing;
394                 // make sure there is only one running timer for this limiter.
395                 BucketReduction::Failure => {
396                     if !self.timer_active {
397                         self.activate_timer(TIMER_REFILL_DUR);
398                     }
399                     false
400                 }
401                 // The operation succeeded and further calls can be made.
402                 BucketReduction::Success => true,
403                 // The operation succeeded as the tokens have been consumed
404                 // but the timer still needs to be armed.
405                 BucketReduction::OverConsumption(ratio) => {
406                     // The operation "borrowed" a number of tokens `ratio` times
407                     // greater than the size of the bucket, and since it takes
408                     // `refill_time` milliseconds to fill an empty bucket, in
409                     // order to enforce the bandwidth limit we need to prevent
410                     // further calls to the rate limiter for
411                     // `ratio * refill_time` milliseconds.
412                     self.activate_timer(Duration::from_millis((ratio * refill_time as f64) as u64));
413                     true
414                 }
415             }
416         } else {
417             // If bucket is not present rate limiting is disabled on token type,
418             // consume() will always succeed.
419             true
420         }
421     }
422 
423     /// Adds tokens of `token_type` to their respective bucket.
424     ///
425     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
426     /// `consume()` if needed.
427     pub fn manual_replenish(&mut self, tokens: u64, token_type: TokenType) {
428         // Identify the required token bucket.
429         let token_bucket = match token_type {
430             TokenType::Bytes => self.bandwidth.as_mut(),
431             TokenType::Ops => self.ops.as_mut(),
432         };
433         // Add tokens to the token bucket.
434         if let Some(bucket) = token_bucket {
435             bucket.replenish(tokens);
436         }
437     }
438 
439     /// Returns whether this rate limiter is blocked.
440     ///
441     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
442     /// budget for it.
443     /// An event will be generated on the exported FD when the limiter 'unblocks'.
444     pub fn is_blocked(&self) -> bool {
445         self.timer_active
446     }
447 
448     /// This function needs to be called every time there is an event on the
449     /// FD provided by this object's `AsRawFd` trait implementation.
450     ///
451     /// # Errors
452     ///
453     /// If the rate limiter is disabled or is not blocked, an error is returned.
454     pub fn event_handler(&mut self) -> Result<(), Error> {
455         loop {
456             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
457             // `timer_fd::wait()` won't block (which is different from its default behavior.)
458             match self.timer_fd.wait() {
459                 Err(e) => {
460                     let err: std::io::Error = e.into();
461                     match err.kind() {
462                         std::io::ErrorKind::Interrupted => (),
463                         std::io::ErrorKind::WouldBlock => {
464                             return Err(Error::SpuriousRateLimiterEvent(
465                                 "Rate limiter event handler called without a present timer",
466                             ))
467                         }
468                         _ => return Err(Error::TimerFdWaitError(err)),
469                     }
470                 }
471                 _ => {
472                     self.timer_active = false;
473                     return Ok(());
474                 }
475             }
476         }
477     }
478 
479     /// Updates the parameters of the token buckets associated with this RateLimiter.
480     // TODO: Please note that, right now, the buckets become full after being updated.
481     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
482         match bytes {
483             BucketUpdate::Disabled => self.bandwidth = None,
484             BucketUpdate::Update(tb) => self.bandwidth = Some(tb),
485             BucketUpdate::None => (),
486         };
487         match ops {
488             BucketUpdate::Disabled => self.ops = None,
489             BucketUpdate::Update(tb) => self.ops = Some(tb),
490             BucketUpdate::None => (),
491         };
492     }
493 
494     /// Returns an immutable view of the inner bandwidth token bucket.
495     pub fn bandwidth(&self) -> Option<&TokenBucket> {
496         self.bandwidth.as_ref()
497     }
498 
499     /// Returns an immutable view of the inner ops token bucket.
500     pub fn ops(&self) -> Option<&TokenBucket> {
501         self.ops.as_ref()
502     }
503 }
504 
505 impl AsRawFd for RateLimiter {
506     /// Provides a FD which needs to be monitored for POLLIN events.
507     ///
508     /// This object's `event_handler()` method must be called on such events.
509     ///
510     /// Will return a negative value if rate limiting is disabled on both
511     /// token types.
512     fn as_raw_fd(&self) -> RawFd {
513         self.timer_fd.as_raw_fd()
514     }
515 }
516 
517 impl Default for RateLimiter {
518     /// Default RateLimiter is a no-op limiter with infinite budget.
519     fn default() -> Self {
520         // Safe to unwrap since this will not attempt to create timer_fd.
521         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
522     }
523 }
524 
525 #[cfg(test)]
526 pub(crate) mod tests {
527     use super::*;
528     use std::thread;
529     use std::time::Duration;
530 
531     impl TokenBucket {
532         // Resets the token bucket: budget set to max capacity and last-updated set to now.
533         fn reset(&mut self) {
534             self.budget = self.size;
535             self.last_update = Instant::now();
536         }
537 
538         fn get_last_update(&self) -> &Instant {
539             &self.last_update
540         }
541 
542         fn get_processed_capacity(&self) -> u64 {
543             self.processed_capacity
544         }
545 
546         fn get_processed_refill_time(&self) -> u64 {
547             self.processed_refill_time
548         }
549 
550         // After a restore, we cannot be certain that the last_update field has the same value.
551         pub fn partial_eq(&self, other: &TokenBucket) -> bool {
552             (other.capacity() == self.capacity())
553                 && (other.one_time_burst() == self.one_time_burst())
554                 && (other.refill_time_ms() == self.refill_time_ms())
555                 && (other.budget() == self.budget())
556         }
557     }
558 
559     impl RateLimiter {
560         fn get_token_bucket(&self, token_type: TokenType) -> Option<&TokenBucket> {
561             match token_type {
562                 TokenType::Bytes => self.bandwidth.as_ref(),
563                 TokenType::Ops => self.ops.as_ref(),
564             }
565         }
566     }
567 
568     #[test]
569     fn test_token_bucket_create() {
570         let before = Instant::now();
571         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
572         assert_eq!(tb.capacity(), 1000);
573         assert_eq!(tb.budget(), 1000);
574         assert!(*tb.get_last_update() >= before);
575         let after = Instant::now();
576         assert!(*tb.get_last_update() <= after);
577         assert_eq!(tb.get_processed_capacity(), 1);
578         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
579 
580         // Verify invalid bucket configurations result in `None`.
581         assert!(TokenBucket::new(0, 1234, 1000).is_none());
582         assert!(TokenBucket::new(100, 1234, 0).is_none());
583         assert!(TokenBucket::new(0, 1234, 0).is_none());
584     }
585 
586     #[test]
587     fn test_token_bucket_preprocess() {
588         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
589         assert_eq!(tb.get_processed_capacity(), 1);
590         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
591 
592         let thousand = 1000;
593         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
594         assert_eq!(tb.get_processed_capacity(), 3 * 19);
595         assert_eq!(
596             tb.get_processed_refill_time(),
597             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
598         );
599     }
600 
601     #[test]
602     fn test_token_bucket_reduce() {
603         // token bucket with capacity 1000 and refill time of 1000 milliseconds
604         // allowing rate of 1 token/ms.
605         let capacity = 1000;
606         let refill_ms = 1000;
607         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
608 
609         assert_eq!(tb.reduce(123), BucketReduction::Success);
610         assert_eq!(tb.budget(), capacity - 123);
611 
612         thread::sleep(Duration::from_millis(123));
613         assert_eq!(tb.reduce(1), BucketReduction::Success);
614         assert_eq!(tb.budget(), capacity - 1);
615         assert_eq!(tb.reduce(100), BucketReduction::Success);
616         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
617 
618         // token bucket with capacity 1000 and refill time of 1000 milliseconds
619         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
620         // safely assuming the thread can run these 3 commands in less than 500ms
621         assert_eq!(tb.reduce(1000), BucketReduction::Success);
622         assert_eq!(tb.one_time_burst(), 100);
623         assert_eq!(tb.reduce(500), BucketReduction::Success);
624         assert_eq!(tb.one_time_burst(), 0);
625         assert_eq!(tb.reduce(500), BucketReduction::Success);
626         assert_eq!(tb.reduce(500), BucketReduction::Failure);
627         thread::sleep(Duration::from_millis(500));
628         assert_eq!(tb.reduce(500), BucketReduction::Success);
629         thread::sleep(Duration::from_millis(1000));
630         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
631 
632         let before = Instant::now();
633         tb.reset();
634         assert_eq!(tb.capacity(), 1000);
635         assert_eq!(tb.budget(), 1000);
636         assert!(*tb.get_last_update() >= before);
637         let after = Instant::now();
638         assert!(*tb.get_last_update() <= after);
639     }
640 
641     #[test]
642     fn test_rate_limiter_default() {
643         let mut l = RateLimiter::default();
644 
645         // limiter should not be blocked
646         assert!(!l.is_blocked());
647         // limiter should be disabled so consume(whatever) should work
648         assert!(l.consume(u64::max_value(), TokenType::Ops));
649         assert!(l.consume(u64::max_value(), TokenType::Bytes));
650         // calling the handler without there having been an event should error
651         assert!(l.event_handler().is_err());
652         assert_eq!(
653             format!("{:?}", l.event_handler().err().unwrap()),
654             "SpuriousRateLimiterEvent(\
655              \"Rate limiter event handler called without a present timer\")"
656         );
657     }
658 
659     #[test]
660     fn test_rate_limiter_new() {
661         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
662 
663         let bw = l.bandwidth.unwrap();
664         assert_eq!(bw.capacity(), 1000);
665         assert_eq!(bw.one_time_burst(), 1001);
666         assert_eq!(bw.refill_time_ms(), 1002);
667         assert_eq!(bw.budget(), 1000);
668 
669         let ops = l.ops.unwrap();
670         assert_eq!(ops.capacity(), 1003);
671         assert_eq!(ops.one_time_burst(), 1004);
672         assert_eq!(ops.refill_time_ms(), 1005);
673         assert_eq!(ops.budget(), 1003);
674     }
675 
676     #[test]
677     fn test_rate_limiter_manual_replenish() {
678         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
679         let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
680 
681         // consume 123 bytes
682         assert!(l.consume(123, TokenType::Bytes));
683         l.manual_replenish(23, TokenType::Bytes);
684         {
685             let bytes_tb = l.get_token_bucket(TokenType::Bytes).unwrap();
686             assert_eq!(bytes_tb.budget(), 900);
687         }
688         // consume 123 ops
689         assert!(l.consume(123, TokenType::Ops));
690         l.manual_replenish(23, TokenType::Ops);
691         {
692             let bytes_tb = l.get_token_bucket(TokenType::Ops).unwrap();
693             assert_eq!(bytes_tb.budget(), 900);
694         }
695     }
696 
697     #[test]
698     fn test_rate_limiter_bandwidth() {
699         // rate limiter with limit of 1000 bytes/s
700         let mut l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
701 
702         // limiter should not be blocked
703         assert!(!l.is_blocked());
704         // raw FD for this disabled should be valid
705         assert!(l.as_raw_fd() > 0);
706 
707         // ops/s limiter should be disabled so consume(whatever) should work
708         assert!(l.consume(u64::max_value(), TokenType::Ops));
709 
710         // do full 1000 bytes
711         assert!(l.consume(1000, TokenType::Bytes));
712         // try and fail on another 100
713         assert!(!l.consume(100, TokenType::Bytes));
714         // since consume failed, limiter should be blocked now
715         assert!(l.is_blocked());
716         // wait half the timer period
717         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
718         // limiter should still be blocked
719         assert!(l.is_blocked());
720         // wait the other half of the timer period
721         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
722         // the timer_fd should have an event on it by now
723         assert!(l.event_handler().is_ok());
724         // limiter should now be unblocked
725         assert!(!l.is_blocked());
726         // try and succeed on another 100 bytes this time
727         assert!(l.consume(100, TokenType::Bytes));
728     }
729 
730     #[test]
731     fn test_rate_limiter_ops() {
732         // rate limiter with limit of 1000 ops/s
733         let mut l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
734 
735         // limiter should not be blocked
736         assert!(!l.is_blocked());
737         // raw FD for this disabled should be valid
738         assert!(l.as_raw_fd() > 0);
739 
740         // bytes/s limiter should be disabled so consume(whatever) should work
741         assert!(l.consume(u64::max_value(), TokenType::Bytes));
742 
743         // do full 1000 ops
744         assert!(l.consume(1000, TokenType::Ops));
745         // try and fail on another 100
746         assert!(!l.consume(100, TokenType::Ops));
747         // since consume failed, limiter should be blocked now
748         assert!(l.is_blocked());
749         // wait half the timer period
750         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
751         // limiter should still be blocked
752         assert!(l.is_blocked());
753         // wait the other half of the timer period
754         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
755         // the timer_fd should have an event on it by now
756         assert!(l.event_handler().is_ok());
757         // limiter should now be unblocked
758         assert!(!l.is_blocked());
759         // try and succeed on another 100 ops this time
760         assert!(l.consume(100, TokenType::Ops));
761     }
762 
763     #[test]
764     fn test_rate_limiter_full() {
765         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
766         let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
767 
768         // limiter should not be blocked
769         assert!(!l.is_blocked());
770         // raw FD for this disabled should be valid
771         assert!(l.as_raw_fd() > 0);
772 
773         // do full 1000 bytes
774         assert!(l.consume(1000, TokenType::Ops));
775         // do full 1000 bytes
776         assert!(l.consume(1000, TokenType::Bytes));
777         // try and fail on another 100 ops
778         assert!(!l.consume(100, TokenType::Ops));
779         // try and fail on another 100 bytes
780         assert!(!l.consume(100, TokenType::Bytes));
781         // since consume failed, limiter should be blocked now
782         assert!(l.is_blocked());
783         // wait half the timer period
784         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
785         // limiter should still be blocked
786         assert!(l.is_blocked());
787         // wait the other half of the timer period
788         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
789         // the timer_fd should have an event on it by now
790         assert!(l.event_handler().is_ok());
791         // limiter should now be unblocked
792         assert!(!l.is_blocked());
793         // try and succeed on another 100 ops this time
794         assert!(l.consume(100, TokenType::Ops));
795         // try and succeed on another 100 bytes this time
796         assert!(l.consume(100, TokenType::Bytes));
797     }
798 
799     #[test]
800     fn test_rate_limiter_overconsumption() {
801         // initialize the rate limiter
802         let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
803         // try to consume 2.5x the bucket size
804         // we are "borrowing" 1.5x the bucket size in tokens since
805         // the bucket is full
806         assert!(l.consume(2500, TokenType::Bytes));
807 
808         // check that even after a whole second passes, the rate limiter
809         // is still blocked
810         thread::sleep(Duration::from_millis(1000));
811         assert!(l.event_handler().is_err());
812         assert!(l.is_blocked());
813 
814         // after 1.5x the replenish time has passed, the rate limiter
815         // is available again
816         thread::sleep(Duration::from_millis(500));
817         assert!(l.event_handler().is_ok());
818         assert!(!l.is_blocked());
819 
820         // reset the rate limiter
821         let mut l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
822         // try to consume 1.5x the bucket size
823         // we are "borrowing" 1.5x the bucket size in tokens since
824         // the bucket is full, should arm the timer to 0.5x replenish
825         // time, which is 500 ms
826         assert!(l.consume(1500, TokenType::Bytes));
827 
828         // check that after more than the minimum refill time,
829         // the rate limiter is still blocked
830         thread::sleep(Duration::from_millis(200));
831         assert!(l.event_handler().is_err());
832         assert!(l.is_blocked());
833 
834         // try to consume some tokens, which should fail as the timer
835         // is still active
836         assert!(!l.consume(100, TokenType::Bytes));
837         assert!(l.event_handler().is_err());
838         assert!(l.is_blocked());
839 
840         // check that after the minimum refill time, the timer was not
841         // overwritten and the rate limiter is still blocked from the
842         // borrowing we performed earlier
843         thread::sleep(Duration::from_millis(100));
844         assert!(l.event_handler().is_err());
845         assert!(l.is_blocked());
846         assert!(!l.consume(100, TokenType::Bytes));
847 
848         // after waiting out the full duration, rate limiter should be
849         // availale again
850         thread::sleep(Duration::from_millis(200));
851         assert!(l.event_handler().is_ok());
852         assert!(!l.is_blocked());
853         assert!(l.consume(100, TokenType::Bytes));
854     }
855 
856     #[test]
857     fn test_update_buckets() {
858         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
859 
860         let initial_bw = x.bandwidth.clone();
861         let initial_ops = x.ops.clone();
862 
863         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
864         assert_eq!(x.bandwidth, initial_bw);
865         assert_eq!(x.ops, initial_ops);
866 
867         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
868         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
869         x.update_buckets(
870             BucketUpdate::Update(new_bw.clone()),
871             BucketUpdate::Update(new_ops.clone()),
872         );
873 
874         // We have manually adjust the last_update field, because it changes when update_buckets()
875         // constructs new buckets (and thus gets a different value for last_update). We do this so
876         // it makes sense to test the following assertions.
877         x.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
878         x.ops.as_mut().unwrap().last_update = new_ops.last_update;
879 
880         assert_eq!(x.bandwidth, Some(new_bw));
881         assert_eq!(x.ops, Some(new_ops));
882 
883         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
884         assert_eq!(x.bandwidth, None);
885         assert_eq!(x.ops, None);
886     }
887 
888     #[test]
889     fn test_rate_limiter_debug() {
890         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
891         assert_eq!(
892             format!("{l:?}"),
893             format!(
894                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
895                 l.bandwidth(),
896                 l.ops()
897             ),
898         );
899     }
900 }
901