xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision fa7a000dbe9637eb256af18ae8c3c4a8d5bf9c8f)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54 use vmm_sys_util::timerfd::TimerFd;
55 
56 /// Module for group rate limiting.
57 pub mod group;
58 
59 #[derive(Debug)]
60 /// Describes the errors that may occur while handling rate limiter events.
61 pub enum Error {
62     /// The event handler was called spuriously.
63     SpuriousRateLimiterEvent(&'static str),
64     /// The event handler encounters while TimerFd::wait()
65     TimerFdWaitError(std::io::Error),
66 }
67 
68 // Interval at which the refill timer will run when limiter is at capacity.
69 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
70 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
71 
72 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
73 
74 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
75 fn gcd(x: u64, y: u64) -> u64 {
76     let mut x = x;
77     let mut y = y;
78     while y != 0 {
79         let t = y;
80         y = x % y;
81         x = t;
82     }
83     x
84 }
85 
86 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
87 #[derive(Clone, Debug, PartialEq)]
88 pub enum BucketReduction {
89     /// There are not enough tokens to complete the operation.
90     Failure,
91     /// A part of the available tokens have been consumed.
92     Success,
93     /// A number of tokens `inner` times larger than the bucket size have been consumed.
94     OverConsumption(f64),
95 }
96 
97 /// TokenBucket provides a lower level interface to rate limiting with a
98 /// configurable capacity, refill-rate and initial burst.
99 #[derive(Clone, Debug, PartialEq, Eq)]
100 pub struct TokenBucket {
101     // Bucket defining traits.
102     size: u64,
103     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
104     one_time_burst: u64,
105     // Complete refill time in milliseconds.
106     refill_time: u64,
107 
108     // Internal state descriptors.
109     budget: u64,
110     last_update: Instant,
111 
112     // Fields used for pre-processing optimizations.
113     processed_capacity: u64,
114     processed_refill_time: u64,
115 }
116 
117 impl TokenBucket {
118     /// Creates a `TokenBucket` wrapped in an `Option`.
119     ///
120     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
121     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
122     /// extra credit on top of total capacity, that does not replenish and which can be used
123     /// for an initial burst of data.
124     ///
125     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
126     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
127         // If either token bucket capacity or refill time is 0, disable limiting.
128         if size == 0 || complete_refill_time_ms == 0 {
129             return None;
130         }
131         // Formula for computing current refill amount:
132         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
133         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
134 
135         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
136         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
137         let common_factor = gcd(size, complete_refill_time_ns);
138         // The division will be exact since `common_factor` is a factor of `size`.
139         let processed_capacity: u64 = size / common_factor;
140         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
141         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
142 
143         Some(TokenBucket {
144             size,
145             one_time_burst,
146             refill_time: complete_refill_time_ms,
147             // Start off full.
148             budget: size,
149             // Last updated is now.
150             last_update: Instant::now(),
151             processed_capacity,
152             processed_refill_time,
153         })
154     }
155 
156     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
157     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
158     // for such cases we need to support partial fulfilment of requests
159     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
160         // First things first: consume the one-time-burst budget.
161         if self.one_time_burst > 0 {
162             // We still have burst budget for *all* tokens requests.
163             if self.one_time_burst >= tokens {
164                 self.one_time_burst -= tokens;
165                 self.last_update = Instant::now();
166                 // No need to continue to the refill process, we still have burst budget to consume from.
167                 return BucketReduction::Success;
168             } else {
169                 // We still have burst budget for *some* of the tokens requests.
170                 // The tokens left unfulfilled will be consumed from current `self.budget`.
171                 tokens -= self.one_time_burst;
172                 self.one_time_burst = 0;
173             }
174         }
175 
176         // Compute time passed since last refill/update.
177         let time_delta = self.last_update.elapsed().as_nanos() as u64;
178         self.last_update = Instant::now();
179 
180         // At each 'time_delta' nanoseconds the bucket should refill with:
181         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
182         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
183         // fraction formula with their greatest-common-factor.
184         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
185 
186         if self.budget >= self.size {
187             self.budget = self.size;
188         }
189 
190         if tokens > self.budget {
191             // This operation requests a bandwidth higher than the bucket size
192             if tokens > self.size {
193                 error!(
194                     "Consumed {} tokens from bucket of size {}",
195                     tokens, self.size
196                 );
197                 // Empty the bucket and report an overconsumption of
198                 // (remaining tokens / size) times larger than the bucket size
199                 tokens -= self.budget;
200                 self.budget = 0;
201                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
202             }
203             // If not enough tokens consume() fails, return false.
204             return BucketReduction::Failure;
205         }
206 
207         self.budget -= tokens;
208         BucketReduction::Success
209     }
210 
211     /// "Manually" adds tokens to bucket.
212     pub fn replenish(&mut self, tokens: u64) {
213         // This means we are still during the burst interval.
214         // Of course there is a very small chance  that the last reduce() also used up burst
215         // budget which should now be replenished, but for performance and code-complexity
216         // reasons we're just gonna let that slide since it's practically inconsequential.
217         if self.one_time_burst > 0 {
218             self.one_time_burst += tokens;
219             return;
220         }
221         self.budget = std::cmp::min(self.budget + tokens, self.size);
222     }
223 
224     /// Returns the capacity of the token bucket.
225     pub fn capacity(&self) -> u64 {
226         self.size
227     }
228 
229     /// Returns the remaining one time burst budget.
230     pub fn one_time_burst(&self) -> u64 {
231         self.one_time_burst
232     }
233 
234     /// Returns the time in milliseconds required to to completely fill the bucket.
235     pub fn refill_time_ms(&self) -> u64 {
236         self.refill_time
237     }
238 
239     /// Returns the current budget (one time burst allowance notwithstanding).
240     pub fn budget(&self) -> u64 {
241         self.budget
242     }
243 }
244 
245 /// Enum that describes the type of token used.
246 pub enum TokenType {
247     /// Token type used for bandwidth limiting.
248     Bytes,
249     /// Token type used for operations/second limiting.
250     Ops,
251 }
252 
253 /// Enum that describes the type of token bucket update.
254 pub enum BucketUpdate {
255     /// No Update - same as before.
256     None,
257     /// Rate Limiting is disabled on this bucket.
258     Disabled,
259     /// Rate Limiting enabled with updated bucket.
260     Update(TokenBucket),
261 }
262 
263 /// Rate Limiter that works on both bandwidth and ops/s limiting.
264 ///
265 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
266 ///
267 /// Implementation uses a single timer through TimerFd to refresh either or
268 /// both token buckets.
269 ///
270 /// Its internal buckets are 'passively' replenished as they're being used (as
271 /// part of `consume()` operations).
272 /// A timer is enabled and used to 'actively' replenish the token buckets when
273 /// limiting is in effect and `consume()` operations are disabled.
274 ///
275 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
276 /// implementation. These events are meant to be consumed by the user of this struct.
277 /// On each such event, the user must call the `event_handler()` method.
278 pub struct RateLimiter {
279     inner: Mutex<RateLimiterInner>,
280 
281     // Internal flag that quickly determines timer state.
282     timer_active: AtomicBool,
283 }
284 
285 struct RateLimiterInner {
286     bandwidth: Option<TokenBucket>,
287     ops: Option<TokenBucket>,
288 
289     timer_fd: TimerFd,
290 }
291 
292 impl RateLimiterInner {
293     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
294     fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
295         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
296         self.timer_fd
297             .reset(dur, None)
298             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
299         flag.store(true, Ordering::Relaxed)
300     }
301 }
302 
303 impl RateLimiter {
304     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
305     ///
306     /// # Arguments
307     ///
308     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
309     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
310     /// that does not replenish and which can be used for an initial burst of data.
311     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
312     /// token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
313     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
314     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
315     /// that does not replenish and which can be used for an initial burst of data.
316     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
317     /// bucket to go from zero Ops to `ops_total_capacity` Ops.
318     ///
319     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
320     /// is **disabled** for that respective token type.
321     ///
322     /// # Errors
323     ///
324     /// If the timerfd creation fails, an error is returned.
325     pub fn new(
326         bytes_total_capacity: u64,
327         bytes_one_time_burst: u64,
328         bytes_complete_refill_time_ms: u64,
329         ops_total_capacity: u64,
330         ops_one_time_burst: u64,
331         ops_complete_refill_time_ms: u64,
332     ) -> io::Result<Self> {
333         let bytes_token_bucket = TokenBucket::new(
334             bytes_total_capacity,
335             bytes_one_time_burst,
336             bytes_complete_refill_time_ms,
337         );
338 
339         let ops_token_bucket = TokenBucket::new(
340             ops_total_capacity,
341             ops_one_time_burst,
342             ops_complete_refill_time_ms,
343         );
344 
345         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
346         // because `Self::update_buckets()` might re-enable it later, and we might be
347         // seccomp-blocked from creating the timer_fd at that time.
348         let timer_fd = TimerFd::new()?;
349         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
350         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
351         // SAFETY: FFI calls.
352         let ret = unsafe {
353             let fd = timer_fd.as_raw_fd();
354             let mut flags = libc::fcntl(fd, libc::F_GETFL);
355             flags |= libc::O_NONBLOCK;
356             libc::fcntl(fd, libc::F_SETFL, flags)
357         };
358         if ret < 0 {
359             return Err(std::io::Error::last_os_error());
360         }
361 
362         Ok(RateLimiter {
363             inner: Mutex::new(RateLimiterInner {
364                 bandwidth: bytes_token_bucket,
365                 ops: ops_token_bucket,
366                 timer_fd,
367             }),
368             timer_active: AtomicBool::new(false),
369         })
370     }
371 
372     /// Attempts to consume tokens and returns whether that is possible.
373     ///
374     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
375     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
376         // If the timer is active, we can't consume tokens from any bucket and the function fails.
377         if self.is_blocked() {
378             return false;
379         }
380         let mut guard = self.inner.lock().unwrap();
381         // Identify the required token bucket.
382         let token_bucket = match token_type {
383             TokenType::Bytes => guard.bandwidth.as_mut(),
384             TokenType::Ops => guard.ops.as_mut(),
385         };
386         // Try to consume from the token bucket.
387         if let Some(bucket) = token_bucket {
388             let refill_time = bucket.refill_time_ms();
389             match bucket.reduce(tokens) {
390                 // When we report budget is over, there will be no further calls here,
391                 // register a timer to replenish the bucket and resume processing;
392                 // make sure there is only one running timer for this limiter.
393                 BucketReduction::Failure => {
394                     if !self.is_blocked() {
395                         guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
396                     }
397                     false
398                 }
399                 // The operation succeeded and further calls can be made.
400                 BucketReduction::Success => true,
401                 // The operation succeeded as the tokens have been consumed
402                 // but the timer still needs to be armed.
403                 BucketReduction::OverConsumption(ratio) => {
404                     // The operation "borrowed" a number of tokens `ratio` times
405                     // greater than the size of the bucket, and since it takes
406                     // `refill_time` milliseconds to fill an empty bucket, in
407                     // order to enforce the bandwidth limit we need to prevent
408                     // further calls to the rate limiter for
409                     // `ratio * refill_time` milliseconds.
410                     guard.activate_timer(
411                         Duration::from_millis((ratio * refill_time as f64) as u64),
412                         &self.timer_active,
413                     );
414                     true
415                 }
416             }
417         } else {
418             // If bucket is not present rate limiting is disabled on token type,
419             // consume() will always succeed.
420             true
421         }
422     }
423 
424     /// Adds tokens of `token_type` to their respective bucket.
425     ///
426     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
427     /// `consume()` if needed.
428     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
429         let mut guard = self.inner.lock().unwrap();
430         // Identify the required token bucket.
431         let token_bucket = match token_type {
432             TokenType::Bytes => guard.bandwidth.as_mut(),
433             TokenType::Ops => guard.ops.as_mut(),
434         };
435         // Add tokens to the token bucket.
436         if let Some(bucket) = token_bucket {
437             bucket.replenish(tokens);
438         }
439     }
440 
441     /// Returns whether this rate limiter is blocked.
442     ///
443     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
444     /// budget for it.
445     /// An event will be generated on the exported FD when the limiter 'unblocks'.
446     pub fn is_blocked(&self) -> bool {
447         self.timer_active.load(Ordering::Relaxed)
448     }
449 
450     /// This function needs to be called every time there is an event on the
451     /// FD provided by this object's `AsRawFd` trait implementation.
452     ///
453     /// # Errors
454     ///
455     /// If the rate limiter is disabled or is not blocked, an error is returned.
456     pub fn event_handler(&self) -> Result<(), Error> {
457         let mut guard = self.inner.lock().unwrap();
458         loop {
459             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
460             // `timer_fd::wait()` won't block (which is different from its default behavior.)
461             match guard.timer_fd.wait() {
462                 Err(e) => {
463                     let err: std::io::Error = e.into();
464                     match err.kind() {
465                         std::io::ErrorKind::Interrupted => (),
466                         std::io::ErrorKind::WouldBlock => {
467                             return Err(Error::SpuriousRateLimiterEvent(
468                                 "Rate limiter event handler called without a present timer",
469                             ))
470                         }
471                         _ => return Err(Error::TimerFdWaitError(err)),
472                     }
473                 }
474                 _ => {
475                     self.timer_active.store(false, Ordering::Relaxed);
476                     return Ok(());
477                 }
478             }
479         }
480     }
481 
482     /// Updates the parameters of the token buckets associated with this RateLimiter.
483     // TODO: Please note that, right now, the buckets become full after being updated.
484     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
485         let mut guard = self.inner.lock().unwrap();
486         match bytes {
487             BucketUpdate::Disabled => guard.bandwidth = None,
488             BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
489             BucketUpdate::None => (),
490         };
491         match ops {
492             BucketUpdate::Disabled => guard.ops = None,
493             BucketUpdate::Update(tb) => guard.ops = Some(tb),
494             BucketUpdate::None => (),
495         };
496     }
497 }
498 
499 impl AsRawFd for RateLimiter {
500     /// Provides a FD which needs to be monitored for POLLIN events.
501     ///
502     /// This object's `event_handler()` method must be called on such events.
503     ///
504     /// Will return a negative value if rate limiting is disabled on both
505     /// token types.
506     fn as_raw_fd(&self) -> RawFd {
507         let guard = self.inner.lock().unwrap();
508         guard.timer_fd.as_raw_fd()
509     }
510 }
511 
512 impl Default for RateLimiter {
513     /// Default RateLimiter is a no-op limiter with infinite budget.
514     fn default() -> Self {
515         // Safe to unwrap since this will not attempt to create timer_fd.
516         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
517     }
518 }
519 
520 #[cfg(test)]
521 pub(crate) mod tests {
522     use super::*;
523     use std::fmt;
524     use std::thread;
525 
526     impl TokenBucket {
527         // Resets the token bucket: budget set to max capacity and last-updated set to now.
528         fn reset(&mut self) {
529             self.budget = self.size;
530             self.last_update = Instant::now();
531         }
532 
533         fn get_last_update(&self) -> &Instant {
534             &self.last_update
535         }
536 
537         fn get_processed_capacity(&self) -> u64 {
538             self.processed_capacity
539         }
540 
541         fn get_processed_refill_time(&self) -> u64 {
542             self.processed_refill_time
543         }
544 
545         // After a restore, we cannot be certain that the last_update field has the same value.
546         pub fn partial_eq(&self, other: &TokenBucket) -> bool {
547             (other.capacity() == self.capacity())
548                 && (other.one_time_burst() == self.one_time_burst())
549                 && (other.refill_time_ms() == self.refill_time_ms())
550                 && (other.budget() == self.budget())
551         }
552     }
553 
554     impl RateLimiter {
555         pub fn bandwidth(&self) -> Option<TokenBucket> {
556             let guard = self.inner.lock().unwrap();
557             guard.bandwidth.clone()
558         }
559 
560         pub fn ops(&self) -> Option<TokenBucket> {
561             let guard = self.inner.lock().unwrap();
562             guard.ops.clone()
563         }
564     }
565 
566     impl PartialEq for RateLimiter {
567         fn eq(&self, other: &RateLimiter) -> bool {
568             let self_guard = self.inner.lock().unwrap();
569             let other_guard = other.inner.lock().unwrap();
570             self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
571         }
572     }
573 
574     impl fmt::Debug for RateLimiter {
575         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576             let guard = self.inner.lock().unwrap();
577             write!(
578                 f,
579                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
580                 guard.bandwidth, guard.ops
581             )
582         }
583     }
584 
585     #[test]
586     fn test_token_bucket_create() {
587         let before = Instant::now();
588         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
589         assert_eq!(tb.capacity(), 1000);
590         assert_eq!(tb.budget(), 1000);
591         assert!(*tb.get_last_update() >= before);
592         let after = Instant::now();
593         assert!(*tb.get_last_update() <= after);
594         assert_eq!(tb.get_processed_capacity(), 1);
595         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
596 
597         // Verify invalid bucket configurations result in `None`.
598         assert!(TokenBucket::new(0, 1234, 1000).is_none());
599         assert!(TokenBucket::new(100, 1234, 0).is_none());
600         assert!(TokenBucket::new(0, 1234, 0).is_none());
601     }
602 
603     #[test]
604     fn test_token_bucket_preprocess() {
605         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
606         assert_eq!(tb.get_processed_capacity(), 1);
607         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
608 
609         let thousand = 1000;
610         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
611         assert_eq!(tb.get_processed_capacity(), 3 * 19);
612         assert_eq!(
613             tb.get_processed_refill_time(),
614             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
615         );
616     }
617 
618     #[test]
619     fn test_token_bucket_reduce() {
620         // token bucket with capacity 1000 and refill time of 1000 milliseconds
621         // allowing rate of 1 token/ms.
622         let capacity = 1000;
623         let refill_ms = 1000;
624         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
625 
626         assert_eq!(tb.reduce(123), BucketReduction::Success);
627         assert_eq!(tb.budget(), capacity - 123);
628 
629         thread::sleep(Duration::from_millis(123));
630         assert_eq!(tb.reduce(1), BucketReduction::Success);
631         assert_eq!(tb.budget(), capacity - 1);
632         assert_eq!(tb.reduce(100), BucketReduction::Success);
633         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
634 
635         // token bucket with capacity 1000 and refill time of 1000 milliseconds
636         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
637         // safely assuming the thread can run these 3 commands in less than 500ms
638         assert_eq!(tb.reduce(1000), BucketReduction::Success);
639         assert_eq!(tb.one_time_burst(), 100);
640         assert_eq!(tb.reduce(500), BucketReduction::Success);
641         assert_eq!(tb.one_time_burst(), 0);
642         assert_eq!(tb.reduce(500), BucketReduction::Success);
643         assert_eq!(tb.reduce(500), BucketReduction::Failure);
644         thread::sleep(Duration::from_millis(500));
645         assert_eq!(tb.reduce(500), BucketReduction::Success);
646         thread::sleep(Duration::from_millis(1000));
647         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
648 
649         let before = Instant::now();
650         tb.reset();
651         assert_eq!(tb.capacity(), 1000);
652         assert_eq!(tb.budget(), 1000);
653         assert!(*tb.get_last_update() >= before);
654         let after = Instant::now();
655         assert!(*tb.get_last_update() <= after);
656     }
657 
658     #[test]
659     fn test_rate_limiter_default() {
660         let l = RateLimiter::default();
661 
662         // limiter should not be blocked
663         assert!(!l.is_blocked());
664         // limiter should be disabled so consume(whatever) should work
665         assert!(l.consume(u64::max_value(), TokenType::Ops));
666         assert!(l.consume(u64::max_value(), TokenType::Bytes));
667         // calling the handler without there having been an event should error
668         assert!(l.event_handler().is_err());
669         assert_eq!(
670             format!("{:?}", l.event_handler().err().unwrap()),
671             "SpuriousRateLimiterEvent(\
672              \"Rate limiter event handler called without a present timer\")"
673         );
674     }
675 
676     #[test]
677     fn test_rate_limiter_new() {
678         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
679         let bw = l.bandwidth().unwrap();
680         assert_eq!(bw.capacity(), 1000);
681         assert_eq!(bw.one_time_burst(), 1001);
682         assert_eq!(bw.refill_time_ms(), 1002);
683         assert_eq!(bw.budget(), 1000);
684 
685         let ops = l.ops().unwrap();
686         assert_eq!(ops.capacity(), 1003);
687         assert_eq!(ops.one_time_burst(), 1004);
688         assert_eq!(ops.refill_time_ms(), 1005);
689         assert_eq!(ops.budget(), 1003);
690     }
691 
692     #[test]
693     fn test_rate_limiter_manual_replenish() {
694         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
695         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
696 
697         // consume 123 bytes
698         assert!(l.consume(123, TokenType::Bytes));
699         l.manual_replenish(23, TokenType::Bytes);
700         {
701             let bytes_tb = l.bandwidth().unwrap();
702             assert_eq!(bytes_tb.budget(), 900);
703         }
704         // consume 123 ops
705         assert!(l.consume(123, TokenType::Ops));
706         l.manual_replenish(23, TokenType::Ops);
707         {
708             let bytes_tb = l.ops().unwrap();
709             assert_eq!(bytes_tb.budget(), 900);
710         }
711     }
712 
713     #[test]
714     fn test_rate_limiter_bandwidth() {
715         // rate limiter with limit of 1000 bytes/s
716         let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
717 
718         // limiter should not be blocked
719         assert!(!l.is_blocked());
720         // raw FD for this disabled should be valid
721         assert!(l.as_raw_fd() > 0);
722 
723         // ops/s limiter should be disabled so consume(whatever) should work
724         assert!(l.consume(u64::max_value(), TokenType::Ops));
725 
726         // do full 1000 bytes
727         assert!(l.consume(1000, TokenType::Bytes));
728         // try and fail on another 100
729         assert!(!l.consume(100, TokenType::Bytes));
730         // since consume failed, limiter should be blocked now
731         assert!(l.is_blocked());
732         // wait half the timer period
733         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
734         // limiter should still be blocked
735         assert!(l.is_blocked());
736         // wait the other half of the timer period
737         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
738         // the timer_fd should have an event on it by now
739         assert!(l.event_handler().is_ok());
740         // limiter should now be unblocked
741         assert!(!l.is_blocked());
742         // try and succeed on another 100 bytes this time
743         assert!(l.consume(100, TokenType::Bytes));
744     }
745 
746     #[test]
747     fn test_rate_limiter_ops() {
748         // rate limiter with limit of 1000 ops/s
749         let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
750 
751         // limiter should not be blocked
752         assert!(!l.is_blocked());
753         // raw FD for this disabled should be valid
754         assert!(l.as_raw_fd() > 0);
755 
756         // bytes/s limiter should be disabled so consume(whatever) should work
757         assert!(l.consume(u64::max_value(), TokenType::Bytes));
758 
759         // do full 1000 ops
760         assert!(l.consume(1000, TokenType::Ops));
761         // try and fail on another 100
762         assert!(!l.consume(100, TokenType::Ops));
763         // since consume failed, limiter should be blocked now
764         assert!(l.is_blocked());
765         // wait half the timer period
766         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
767         // limiter should still be blocked
768         assert!(l.is_blocked());
769         // wait the other half of the timer period
770         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
771         // the timer_fd should have an event on it by now
772         assert!(l.event_handler().is_ok());
773         // limiter should now be unblocked
774         assert!(!l.is_blocked());
775         // try and succeed on another 100 ops this time
776         assert!(l.consume(100, TokenType::Ops));
777     }
778 
779     #[test]
780     fn test_rate_limiter_full() {
781         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
782         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
783 
784         // limiter should not be blocked
785         assert!(!l.is_blocked());
786         // raw FD for this disabled should be valid
787         assert!(l.as_raw_fd() > 0);
788 
789         // do full 1000 bytes
790         assert!(l.consume(1000, TokenType::Ops));
791         // do full 1000 bytes
792         assert!(l.consume(1000, TokenType::Bytes));
793         // try and fail on another 100 ops
794         assert!(!l.consume(100, TokenType::Ops));
795         // try and fail on another 100 bytes
796         assert!(!l.consume(100, TokenType::Bytes));
797         // since consume failed, limiter should be blocked now
798         assert!(l.is_blocked());
799         // wait half the timer period
800         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
801         // limiter should still be blocked
802         assert!(l.is_blocked());
803         // wait the other half of the timer period
804         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
805         // the timer_fd should have an event on it by now
806         assert!(l.event_handler().is_ok());
807         // limiter should now be unblocked
808         assert!(!l.is_blocked());
809         // try and succeed on another 100 ops this time
810         assert!(l.consume(100, TokenType::Ops));
811         // try and succeed on another 100 bytes this time
812         assert!(l.consume(100, TokenType::Bytes));
813     }
814 
815     #[test]
816     fn test_rate_limiter_overconsumption() {
817         // initialize the rate limiter
818         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
819         // try to consume 2.5x the bucket size
820         // we are "borrowing" 1.5x the bucket size in tokens since
821         // the bucket is full
822         assert!(l.consume(2500, TokenType::Bytes));
823 
824         // check that even after a whole second passes, the rate limiter
825         // is still blocked
826         thread::sleep(Duration::from_millis(1000));
827         assert!(l.event_handler().is_err());
828         assert!(l.is_blocked());
829 
830         // after 1.5x the replenish time has passed, the rate limiter
831         // is available again
832         thread::sleep(Duration::from_millis(500));
833         assert!(l.event_handler().is_ok());
834         assert!(!l.is_blocked());
835 
836         // reset the rate limiter
837         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
838         // try to consume 1.5x the bucket size
839         // we are "borrowing" 1.5x the bucket size in tokens since
840         // the bucket is full, should arm the timer to 0.5x replenish
841         // time, which is 500 ms
842         assert!(l.consume(1500, TokenType::Bytes));
843 
844         // check that after more than the minimum refill time,
845         // the rate limiter is still blocked
846         thread::sleep(Duration::from_millis(200));
847         assert!(l.event_handler().is_err());
848         assert!(l.is_blocked());
849 
850         // try to consume some tokens, which should fail as the timer
851         // is still active
852         assert!(!l.consume(100, TokenType::Bytes));
853         assert!(l.event_handler().is_err());
854         assert!(l.is_blocked());
855 
856         // check that after the minimum refill time, the timer was not
857         // overwritten and the rate limiter is still blocked from the
858         // borrowing we performed earlier
859         thread::sleep(Duration::from_millis(100));
860         assert!(l.event_handler().is_err());
861         assert!(l.is_blocked());
862         assert!(!l.consume(100, TokenType::Bytes));
863 
864         // after waiting out the full duration, rate limiter should be
865         // available again
866         thread::sleep(Duration::from_millis(200));
867         assert!(l.event_handler().is_ok());
868         assert!(!l.is_blocked());
869         assert!(l.consume(100, TokenType::Bytes));
870     }
871 
872     #[test]
873     fn test_update_buckets() {
874         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
875 
876         let initial_bw = x.bandwidth();
877         let initial_ops = x.ops();
878 
879         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
880         assert_eq!(x.bandwidth(), initial_bw);
881         assert_eq!(x.ops(), initial_ops);
882 
883         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
884         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
885         x.update_buckets(
886             BucketUpdate::Update(new_bw.clone()),
887             BucketUpdate::Update(new_ops.clone()),
888         );
889 
890         {
891             let mut guard = x.inner.lock().unwrap();
892             // We have manually adjust the last_update field, because it changes when update_buckets()
893             // constructs new buckets (and thus gets a different value for last_update). We do this so
894             // it makes sense to test the following assertions.
895             guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
896             guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
897         }
898 
899         assert_eq!(x.bandwidth(), Some(new_bw));
900         assert_eq!(x.ops(), Some(new_ops));
901 
902         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
903         assert_eq!(x.bandwidth(), None);
904         assert_eq!(x.ops(), None);
905     }
906 
907     #[test]
908     fn test_rate_limiter_debug() {
909         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
910         assert_eq!(
911             format!("{l:?}"),
912             format!(
913                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
914                 l.bandwidth(),
915                 l.ops()
916             ),
917         );
918     }
919 }
920