xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision 07d1208dd53a207a65b649b8952780dfd0ca59d9)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54 use vmm_sys_util::timerfd::TimerFd;
55 
56 #[derive(Debug)]
57 /// Describes the errors that may occur while handling rate limiter events.
58 pub enum Error {
59     /// The event handler was called spuriously.
60     SpuriousRateLimiterEvent(&'static str),
61     /// The event handler encounters while TimerFd::wait()
62     TimerFdWaitError(std::io::Error),
63 }
64 
65 // Interval at which the refill timer will run when limiter is at capacity.
66 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
67 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
68 
69 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
70 
71 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
72 fn gcd(x: u64, y: u64) -> u64 {
73     let mut x = x;
74     let mut y = y;
75     while y != 0 {
76         let t = y;
77         y = x % y;
78         x = t;
79     }
80     x
81 }
82 
83 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
84 #[derive(Clone, Debug, PartialEq)]
85 pub enum BucketReduction {
86     /// There are not enough tokens to complete the operation.
87     Failure,
88     /// A part of the available tokens have been consumed.
89     Success,
90     /// A number of tokens `inner` times larger than the bucket size have been consumed.
91     OverConsumption(f64),
92 }
93 
94 /// TokenBucket provides a lower level interface to rate limiting with a
95 /// configurable capacity, refill-rate and initial burst.
96 #[derive(Clone, Debug, PartialEq, Eq)]
97 pub struct TokenBucket {
98     // Bucket defining traits.
99     size: u64,
100     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
101     one_time_burst: u64,
102     // Complete refill time in milliseconds.
103     refill_time: u64,
104 
105     // Internal state descriptors.
106     budget: u64,
107     last_update: Instant,
108 
109     // Fields used for pre-processing optimizations.
110     processed_capacity: u64,
111     processed_refill_time: u64,
112 }
113 
114 impl TokenBucket {
115     /// Creates a `TokenBucket` wrapped in an `Option`.
116     ///
117     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
118     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
119     /// extra credit on top of total capacity, that does not replenish and which can be used
120     /// for an initial burst of data.
121     ///
122     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
123     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
124         // If either token bucket capacity or refill time is 0, disable limiting.
125         if size == 0 || complete_refill_time_ms == 0 {
126             return None;
127         }
128         // Formula for computing current refill amount:
129         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
130         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
131 
132         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
133         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
134         let common_factor = gcd(size, complete_refill_time_ns);
135         // The division will be exact since `common_factor` is a factor of `size`.
136         let processed_capacity: u64 = size / common_factor;
137         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
138         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
139 
140         Some(TokenBucket {
141             size,
142             one_time_burst,
143             refill_time: complete_refill_time_ms,
144             // Start off full.
145             budget: size,
146             // Last updated is now.
147             last_update: Instant::now(),
148             processed_capacity,
149             processed_refill_time,
150         })
151     }
152 
153     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
154     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
155     // for such cases we need to support partial fulfilment of requests
156     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
157         // First things first: consume the one-time-burst budget.
158         if self.one_time_burst > 0 {
159             // We still have burst budget for *all* tokens requests.
160             if self.one_time_burst >= tokens {
161                 self.one_time_burst -= tokens;
162                 self.last_update = Instant::now();
163                 // No need to continue to the refill process, we still have burst budget to consume from.
164                 return BucketReduction::Success;
165             } else {
166                 // We still have burst budget for *some* of the tokens requests.
167                 // The tokens left unfulfilled will be consumed from current `self.budget`.
168                 tokens -= self.one_time_burst;
169                 self.one_time_burst = 0;
170             }
171         }
172 
173         // Compute time passed since last refill/update.
174         let time_delta = self.last_update.elapsed().as_nanos() as u64;
175         self.last_update = Instant::now();
176 
177         // At each 'time_delta' nanoseconds the bucket should refill with:
178         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
179         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
180         // fraction formula with their greatest-common-factor.
181         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
182 
183         if self.budget >= self.size {
184             self.budget = self.size;
185         }
186 
187         if tokens > self.budget {
188             // This operation requests a bandwidth higher than the bucket size
189             if tokens > self.size {
190                 error!(
191                     "Consumed {} tokens from bucket of size {}",
192                     tokens, self.size
193                 );
194                 // Empty the bucket and report an overconsumption of
195                 // (remaining tokens / size) times larger than the bucket size
196                 tokens -= self.budget;
197                 self.budget = 0;
198                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
199             }
200             // If not enough tokens consume() fails, return false.
201             return BucketReduction::Failure;
202         }
203 
204         self.budget -= tokens;
205         BucketReduction::Success
206     }
207 
208     /// "Manually" adds tokens to bucket.
209     pub fn replenish(&mut self, tokens: u64) {
210         // This means we are still during the burst interval.
211         // Of course there is a very small chance  that the last reduce() also used up burst
212         // budget which should now be replenished, but for performance and code-complexity
213         // reasons we're just gonna let that slide since it's practically inconsequential.
214         if self.one_time_burst > 0 {
215             self.one_time_burst += tokens;
216             return;
217         }
218         self.budget = std::cmp::min(self.budget + tokens, self.size);
219     }
220 
221     /// Returns the capacity of the token bucket.
222     pub fn capacity(&self) -> u64 {
223         self.size
224     }
225 
226     /// Returns the remaining one time burst budget.
227     pub fn one_time_burst(&self) -> u64 {
228         self.one_time_burst
229     }
230 
231     /// Returns the time in milliseconds required to to completely fill the bucket.
232     pub fn refill_time_ms(&self) -> u64 {
233         self.refill_time
234     }
235 
236     /// Returns the current budget (one time burst allowance notwithstanding).
237     pub fn budget(&self) -> u64 {
238         self.budget
239     }
240 }
241 
242 /// Enum that describes the type of token used.
243 pub enum TokenType {
244     /// Token type used for bandwidth limiting.
245     Bytes,
246     /// Token type used for operations/second limiting.
247     Ops,
248 }
249 
250 /// Enum that describes the type of token bucket update.
251 pub enum BucketUpdate {
252     /// No Update - same as before.
253     None,
254     /// Rate Limiting is disabled on this bucket.
255     Disabled,
256     /// Rate Limiting enabled with updated bucket.
257     Update(TokenBucket),
258 }
259 
260 /// Rate Limiter that works on both bandwidth and ops/s limiting.
261 ///
262 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
263 ///
264 /// Implementation uses a single timer through TimerFd to refresh either or
265 /// both token buckets.
266 ///
267 /// Its internal buckets are 'passively' replenished as they're being used (as
268 /// part of `consume()` operations).
269 /// A timer is enabled and used to 'actively' replenish the token buckets when
270 /// limiting is in effect and `consume()` operations are disabled.
271 ///
272 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
273 /// implementation. These events are meant to be consumed by the user of this struct.
274 /// On each such event, the user must call the `event_handler()` method.
275 pub struct RateLimiter {
276     inner: Mutex<RateLimiterInner>,
277 
278     // Internal flag that quickly determines timer state.
279     timer_active: AtomicBool,
280 }
281 
282 struct RateLimiterInner {
283     bandwidth: Option<TokenBucket>,
284     ops: Option<TokenBucket>,
285 
286     timer_fd: TimerFd,
287 }
288 
289 impl RateLimiterInner {
290     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
291     fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
292         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
293         self.timer_fd
294             .reset(dur, None)
295             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
296         flag.store(true, Ordering::Relaxed)
297     }
298 }
299 
300 impl RateLimiter {
301     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
302     ///
303     /// # Arguments
304     ///
305     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
306     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
307     /// that does not replenish and which can be used for an initial burst of data.
308     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
309     /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
310     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
311     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
312     /// that does not replenish and which can be used for an initial burst of data.
313     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
314     /// bucket to go from zero Ops to `ops_total_capacity` Ops.
315     ///
316     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
317     /// is **disabled** for that respective token type.
318     ///
319     /// # Errors
320     ///
321     /// If the timerfd creation fails, an error is returned.
322     pub fn new(
323         bytes_total_capacity: u64,
324         bytes_one_time_burst: u64,
325         bytes_complete_refill_time_ms: u64,
326         ops_total_capacity: u64,
327         ops_one_time_burst: u64,
328         ops_complete_refill_time_ms: u64,
329     ) -> io::Result<Self> {
330         let bytes_token_bucket = TokenBucket::new(
331             bytes_total_capacity,
332             bytes_one_time_burst,
333             bytes_complete_refill_time_ms,
334         );
335 
336         let ops_token_bucket = TokenBucket::new(
337             ops_total_capacity,
338             ops_one_time_burst,
339             ops_complete_refill_time_ms,
340         );
341 
342         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
343         // because `Self::update_buckets()` might re-enable it later, and we might be
344         // seccomp-blocked from creating the timer_fd at that time.
345         let timer_fd = TimerFd::new()?;
346         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
347         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
348         // SAFETY: FFI calls.
349         let ret = unsafe {
350             let fd = timer_fd.as_raw_fd();
351             let mut flags = libc::fcntl(fd, libc::F_GETFL);
352             flags |= libc::O_NONBLOCK;
353             libc::fcntl(fd, libc::F_SETFL, flags)
354         };
355         if ret < 0 {
356             return Err(std::io::Error::last_os_error());
357         }
358 
359         Ok(RateLimiter {
360             inner: Mutex::new(RateLimiterInner {
361                 bandwidth: bytes_token_bucket,
362                 ops: ops_token_bucket,
363                 timer_fd,
364             }),
365             timer_active: AtomicBool::new(false),
366         })
367     }
368 
369     /// Attempts to consume tokens and returns whether that is possible.
370     ///
371     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
372     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
373         // If the timer is active, we can't consume tokens from any bucket and the function fails.
374         if self.is_blocked() {
375             return false;
376         }
377         let mut guard = self.inner.lock().unwrap();
378         // Identify the required token bucket.
379         let token_bucket = match token_type {
380             TokenType::Bytes => guard.bandwidth.as_mut(),
381             TokenType::Ops => guard.ops.as_mut(),
382         };
383         // Try to consume from the token bucket.
384         if let Some(bucket) = token_bucket {
385             let refill_time = bucket.refill_time_ms();
386             match bucket.reduce(tokens) {
387                 // When we report budget is over, there will be no further calls here,
388                 // register a timer to replenish the bucket and resume processing;
389                 // make sure there is only one running timer for this limiter.
390                 BucketReduction::Failure => {
391                     if !self.is_blocked() {
392                         guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
393                     }
394                     false
395                 }
396                 // The operation succeeded and further calls can be made.
397                 BucketReduction::Success => true,
398                 // The operation succeeded as the tokens have been consumed
399                 // but the timer still needs to be armed.
400                 BucketReduction::OverConsumption(ratio) => {
401                     // The operation "borrowed" a number of tokens `ratio` times
402                     // greater than the size of the bucket, and since it takes
403                     // `refill_time` milliseconds to fill an empty bucket, in
404                     // order to enforce the bandwidth limit we need to prevent
405                     // further calls to the rate limiter for
406                     // `ratio * refill_time` milliseconds.
407                     guard.activate_timer(
408                         Duration::from_millis((ratio * refill_time as f64) as u64),
409                         &self.timer_active,
410                     );
411                     true
412                 }
413             }
414         } else {
415             // If bucket is not present rate limiting is disabled on token type,
416             // consume() will always succeed.
417             true
418         }
419     }
420 
421     /// Adds tokens of `token_type` to their respective bucket.
422     ///
423     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
424     /// `consume()` if needed.
425     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
426         let mut guard = self.inner.lock().unwrap();
427         // Identify the required token bucket.
428         let token_bucket = match token_type {
429             TokenType::Bytes => guard.bandwidth.as_mut(),
430             TokenType::Ops => guard.ops.as_mut(),
431         };
432         // Add tokens to the token bucket.
433         if let Some(bucket) = token_bucket {
434             bucket.replenish(tokens);
435         }
436     }
437 
438     /// Returns whether this rate limiter is blocked.
439     ///
440     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
441     /// budget for it.
442     /// An event will be generated on the exported FD when the limiter 'unblocks'.
443     pub fn is_blocked(&self) -> bool {
444         self.timer_active.load(Ordering::Relaxed)
445     }
446 
447     /// This function needs to be called every time there is an event on the
448     /// FD provided by this object's `AsRawFd` trait implementation.
449     ///
450     /// # Errors
451     ///
452     /// If the rate limiter is disabled or is not blocked, an error is returned.
453     pub fn event_handler(&self) -> Result<(), Error> {
454         let mut guard = self.inner.lock().unwrap();
455         loop {
456             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
457             // `timer_fd::wait()` won't block (which is different from its default behavior.)
458             match guard.timer_fd.wait() {
459                 Err(e) => {
460                     let err: std::io::Error = e.into();
461                     match err.kind() {
462                         std::io::ErrorKind::Interrupted => (),
463                         std::io::ErrorKind::WouldBlock => {
464                             return Err(Error::SpuriousRateLimiterEvent(
465                                 "Rate limiter event handler called without a present timer",
466                             ))
467                         }
468                         _ => return Err(Error::TimerFdWaitError(err)),
469                     }
470                 }
471                 _ => {
472                     self.timer_active.store(false, Ordering::Relaxed);
473                     return Ok(());
474                 }
475             }
476         }
477     }
478 
479     /// Updates the parameters of the token buckets associated with this RateLimiter.
480     // TODO: Please note that, right now, the buckets become full after being updated.
481     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
482         let mut guard = self.inner.lock().unwrap();
483         match bytes {
484             BucketUpdate::Disabled => guard.bandwidth = None,
485             BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
486             BucketUpdate::None => (),
487         };
488         match ops {
489             BucketUpdate::Disabled => guard.ops = None,
490             BucketUpdate::Update(tb) => guard.ops = Some(tb),
491             BucketUpdate::None => (),
492         };
493     }
494 }
495 
496 impl AsRawFd for RateLimiter {
497     /// Provides a FD which needs to be monitored for POLLIN events.
498     ///
499     /// This object's `event_handler()` method must be called on such events.
500     ///
501     /// Will return a negative value if rate limiting is disabled on both
502     /// token types.
503     fn as_raw_fd(&self) -> RawFd {
504         let guard = self.inner.lock().unwrap();
505         guard.timer_fd.as_raw_fd()
506     }
507 }
508 
509 impl Default for RateLimiter {
510     /// Default RateLimiter is a no-op limiter with infinite budget.
511     fn default() -> Self {
512         // Safe to unwrap since this will not attempt to create timer_fd.
513         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
514     }
515 }
516 
517 #[cfg(test)]
518 pub(crate) mod tests {
519     use super::*;
520     use std::fmt;
521     use std::thread;
522     use std::time::Duration;
523 
524     impl TokenBucket {
525         // Resets the token bucket: budget set to max capacity and last-updated set to now.
526         fn reset(&mut self) {
527             self.budget = self.size;
528             self.last_update = Instant::now();
529         }
530 
531         fn get_last_update(&self) -> &Instant {
532             &self.last_update
533         }
534 
535         fn get_processed_capacity(&self) -> u64 {
536             self.processed_capacity
537         }
538 
539         fn get_processed_refill_time(&self) -> u64 {
540             self.processed_refill_time
541         }
542 
543         // After a restore, we cannot be certain that the last_update field has the same value.
544         pub fn partial_eq(&self, other: &TokenBucket) -> bool {
545             (other.capacity() == self.capacity())
546                 && (other.one_time_burst() == self.one_time_burst())
547                 && (other.refill_time_ms() == self.refill_time_ms())
548                 && (other.budget() == self.budget())
549         }
550     }
551 
552     impl RateLimiter {
553         pub fn bandwidth(&self) -> Option<TokenBucket> {
554             let guard = self.inner.lock().unwrap();
555             guard.bandwidth.clone()
556         }
557 
558         pub fn ops(&self) -> Option<TokenBucket> {
559             let guard = self.inner.lock().unwrap();
560             guard.ops.clone()
561         }
562     }
563 
564     impl PartialEq for RateLimiter {
565         fn eq(&self, other: &RateLimiter) -> bool {
566             let self_guard = self.inner.lock().unwrap();
567             let other_guard = other.inner.lock().unwrap();
568             self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
569         }
570     }
571 
572     impl fmt::Debug for RateLimiter {
573         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
574             let guard = self.inner.lock().unwrap();
575             write!(
576                 f,
577                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
578                 guard.bandwidth, guard.ops
579             )
580         }
581     }
582 
583     #[test]
584     fn test_token_bucket_create() {
585         let before = Instant::now();
586         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
587         assert_eq!(tb.capacity(), 1000);
588         assert_eq!(tb.budget(), 1000);
589         assert!(*tb.get_last_update() >= before);
590         let after = Instant::now();
591         assert!(*tb.get_last_update() <= after);
592         assert_eq!(tb.get_processed_capacity(), 1);
593         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
594 
595         // Verify invalid bucket configurations result in `None`.
596         assert!(TokenBucket::new(0, 1234, 1000).is_none());
597         assert!(TokenBucket::new(100, 1234, 0).is_none());
598         assert!(TokenBucket::new(0, 1234, 0).is_none());
599     }
600 
601     #[test]
602     fn test_token_bucket_preprocess() {
603         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
604         assert_eq!(tb.get_processed_capacity(), 1);
605         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
606 
607         let thousand = 1000;
608         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
609         assert_eq!(tb.get_processed_capacity(), 3 * 19);
610         assert_eq!(
611             tb.get_processed_refill_time(),
612             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
613         );
614     }
615 
616     #[test]
617     fn test_token_bucket_reduce() {
618         // token bucket with capacity 1000 and refill time of 1000 milliseconds
619         // allowing rate of 1 token/ms.
620         let capacity = 1000;
621         let refill_ms = 1000;
622         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
623 
624         assert_eq!(tb.reduce(123), BucketReduction::Success);
625         assert_eq!(tb.budget(), capacity - 123);
626 
627         thread::sleep(Duration::from_millis(123));
628         assert_eq!(tb.reduce(1), BucketReduction::Success);
629         assert_eq!(tb.budget(), capacity - 1);
630         assert_eq!(tb.reduce(100), BucketReduction::Success);
631         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
632 
633         // token bucket with capacity 1000 and refill time of 1000 milliseconds
634         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
635         // safely assuming the thread can run these 3 commands in less than 500ms
636         assert_eq!(tb.reduce(1000), BucketReduction::Success);
637         assert_eq!(tb.one_time_burst(), 100);
638         assert_eq!(tb.reduce(500), BucketReduction::Success);
639         assert_eq!(tb.one_time_burst(), 0);
640         assert_eq!(tb.reduce(500), BucketReduction::Success);
641         assert_eq!(tb.reduce(500), BucketReduction::Failure);
642         thread::sleep(Duration::from_millis(500));
643         assert_eq!(tb.reduce(500), BucketReduction::Success);
644         thread::sleep(Duration::from_millis(1000));
645         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
646 
647         let before = Instant::now();
648         tb.reset();
649         assert_eq!(tb.capacity(), 1000);
650         assert_eq!(tb.budget(), 1000);
651         assert!(*tb.get_last_update() >= before);
652         let after = Instant::now();
653         assert!(*tb.get_last_update() <= after);
654     }
655 
656     #[test]
657     fn test_rate_limiter_default() {
658         let l = RateLimiter::default();
659 
660         // limiter should not be blocked
661         assert!(!l.is_blocked());
662         // limiter should be disabled so consume(whatever) should work
663         assert!(l.consume(u64::max_value(), TokenType::Ops));
664         assert!(l.consume(u64::max_value(), TokenType::Bytes));
665         // calling the handler without there having been an event should error
666         assert!(l.event_handler().is_err());
667         assert_eq!(
668             format!("{:?}", l.event_handler().err().unwrap()),
669             "SpuriousRateLimiterEvent(\
670              \"Rate limiter event handler called without a present timer\")"
671         );
672     }
673 
674     #[test]
675     fn test_rate_limiter_new() {
676         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
677         let bw = l.bandwidth().unwrap();
678         assert_eq!(bw.capacity(), 1000);
679         assert_eq!(bw.one_time_burst(), 1001);
680         assert_eq!(bw.refill_time_ms(), 1002);
681         assert_eq!(bw.budget(), 1000);
682 
683         let ops = l.ops().unwrap();
684         assert_eq!(ops.capacity(), 1003);
685         assert_eq!(ops.one_time_burst(), 1004);
686         assert_eq!(ops.refill_time_ms(), 1005);
687         assert_eq!(ops.budget(), 1003);
688     }
689 
690     #[test]
691     fn test_rate_limiter_manual_replenish() {
692         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
693         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
694 
695         // consume 123 bytes
696         assert!(l.consume(123, TokenType::Bytes));
697         l.manual_replenish(23, TokenType::Bytes);
698         {
699             let bytes_tb = l.bandwidth().unwrap();
700             assert_eq!(bytes_tb.budget(), 900);
701         }
702         // consume 123 ops
703         assert!(l.consume(123, TokenType::Ops));
704         l.manual_replenish(23, TokenType::Ops);
705         {
706             let bytes_tb = l.ops().unwrap();
707             assert_eq!(bytes_tb.budget(), 900);
708         }
709     }
710 
711     #[test]
712     fn test_rate_limiter_bandwidth() {
713         // rate limiter with limit of 1000 bytes/s
714         let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
715 
716         // limiter should not be blocked
717         assert!(!l.is_blocked());
718         // raw FD for this disabled should be valid
719         assert!(l.as_raw_fd() > 0);
720 
721         // ops/s limiter should be disabled so consume(whatever) should work
722         assert!(l.consume(u64::max_value(), TokenType::Ops));
723 
724         // do full 1000 bytes
725         assert!(l.consume(1000, TokenType::Bytes));
726         // try and fail on another 100
727         assert!(!l.consume(100, TokenType::Bytes));
728         // since consume failed, limiter should be blocked now
729         assert!(l.is_blocked());
730         // wait half the timer period
731         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
732         // limiter should still be blocked
733         assert!(l.is_blocked());
734         // wait the other half of the timer period
735         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
736         // the timer_fd should have an event on it by now
737         assert!(l.event_handler().is_ok());
738         // limiter should now be unblocked
739         assert!(!l.is_blocked());
740         // try and succeed on another 100 bytes this time
741         assert!(l.consume(100, TokenType::Bytes));
742     }
743 
744     #[test]
745     fn test_rate_limiter_ops() {
746         // rate limiter with limit of 1000 ops/s
747         let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
748 
749         // limiter should not be blocked
750         assert!(!l.is_blocked());
751         // raw FD for this disabled should be valid
752         assert!(l.as_raw_fd() > 0);
753 
754         // bytes/s limiter should be disabled so consume(whatever) should work
755         assert!(l.consume(u64::max_value(), TokenType::Bytes));
756 
757         // do full 1000 ops
758         assert!(l.consume(1000, TokenType::Ops));
759         // try and fail on another 100
760         assert!(!l.consume(100, TokenType::Ops));
761         // since consume failed, limiter should be blocked now
762         assert!(l.is_blocked());
763         // wait half the timer period
764         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
765         // limiter should still be blocked
766         assert!(l.is_blocked());
767         // wait the other half of the timer period
768         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
769         // the timer_fd should have an event on it by now
770         assert!(l.event_handler().is_ok());
771         // limiter should now be unblocked
772         assert!(!l.is_blocked());
773         // try and succeed on another 100 ops this time
774         assert!(l.consume(100, TokenType::Ops));
775     }
776 
777     #[test]
778     fn test_rate_limiter_full() {
779         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
780         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
781 
782         // limiter should not be blocked
783         assert!(!l.is_blocked());
784         // raw FD for this disabled should be valid
785         assert!(l.as_raw_fd() > 0);
786 
787         // do full 1000 bytes
788         assert!(l.consume(1000, TokenType::Ops));
789         // do full 1000 bytes
790         assert!(l.consume(1000, TokenType::Bytes));
791         // try and fail on another 100 ops
792         assert!(!l.consume(100, TokenType::Ops));
793         // try and fail on another 100 bytes
794         assert!(!l.consume(100, TokenType::Bytes));
795         // since consume failed, limiter should be blocked now
796         assert!(l.is_blocked());
797         // wait half the timer period
798         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
799         // limiter should still be blocked
800         assert!(l.is_blocked());
801         // wait the other half of the timer period
802         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
803         // the timer_fd should have an event on it by now
804         assert!(l.event_handler().is_ok());
805         // limiter should now be unblocked
806         assert!(!l.is_blocked());
807         // try and succeed on another 100 ops this time
808         assert!(l.consume(100, TokenType::Ops));
809         // try and succeed on another 100 bytes this time
810         assert!(l.consume(100, TokenType::Bytes));
811     }
812 
813     #[test]
814     fn test_rate_limiter_overconsumption() {
815         // initialize the rate limiter
816         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
817         // try to consume 2.5x the bucket size
818         // we are "borrowing" 1.5x the bucket size in tokens since
819         // the bucket is full
820         assert!(l.consume(2500, TokenType::Bytes));
821 
822         // check that even after a whole second passes, the rate limiter
823         // is still blocked
824         thread::sleep(Duration::from_millis(1000));
825         assert!(l.event_handler().is_err());
826         assert!(l.is_blocked());
827 
828         // after 1.5x the replenish time has passed, the rate limiter
829         // is available again
830         thread::sleep(Duration::from_millis(500));
831         assert!(l.event_handler().is_ok());
832         assert!(!l.is_blocked());
833 
834         // reset the rate limiter
835         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
836         // try to consume 1.5x the bucket size
837         // we are "borrowing" 1.5x the bucket size in tokens since
838         // the bucket is full, should arm the timer to 0.5x replenish
839         // time, which is 500 ms
840         assert!(l.consume(1500, TokenType::Bytes));
841 
842         // check that after more than the minimum refill time,
843         // the rate limiter is still blocked
844         thread::sleep(Duration::from_millis(200));
845         assert!(l.event_handler().is_err());
846         assert!(l.is_blocked());
847 
848         // try to consume some tokens, which should fail as the timer
849         // is still active
850         assert!(!l.consume(100, TokenType::Bytes));
851         assert!(l.event_handler().is_err());
852         assert!(l.is_blocked());
853 
854         // check that after the minimum refill time, the timer was not
855         // overwritten and the rate limiter is still blocked from the
856         // borrowing we performed earlier
857         thread::sleep(Duration::from_millis(100));
858         assert!(l.event_handler().is_err());
859         assert!(l.is_blocked());
860         assert!(!l.consume(100, TokenType::Bytes));
861 
862         // after waiting out the full duration, rate limiter should be
863         // availale again
864         thread::sleep(Duration::from_millis(200));
865         assert!(l.event_handler().is_ok());
866         assert!(!l.is_blocked());
867         assert!(l.consume(100, TokenType::Bytes));
868     }
869 
870     #[test]
871     fn test_update_buckets() {
872         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
873 
874         let initial_bw = x.bandwidth();
875         let initial_ops = x.ops();
876 
877         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
878         assert_eq!(x.bandwidth(), initial_bw);
879         assert_eq!(x.ops(), initial_ops);
880 
881         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
882         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
883         x.update_buckets(
884             BucketUpdate::Update(new_bw.clone()),
885             BucketUpdate::Update(new_ops.clone()),
886         );
887 
888         {
889             let mut guard = x.inner.lock().unwrap();
890             // We have manually adjust the last_update field, because it changes when update_buckets()
891             // constructs new buckets (and thus gets a different value for last_update). We do this so
892             // it makes sense to test the following assertions.
893             guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
894             guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
895         }
896 
897         assert_eq!(x.bandwidth(), Some(new_bw));
898         assert_eq!(x.ops(), Some(new_ops));
899 
900         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
901         assert_eq!(x.bandwidth(), None);
902         assert_eq!(x.ops(), None);
903     }
904 
905     #[test]
906     fn test_rate_limiter_debug() {
907         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
908         assert_eq!(
909             format!("{l:?}"),
910             format!(
911                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
912                 l.bandwidth(),
913                 l.ops()
914             ),
915         );
916     }
917 }
918