xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision 894a4dee6ec35bdd9df848a3a88bf5b96771c943)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54 
55 use vmm_sys_util::timerfd::TimerFd;
56 
57 /// Module for group rate limiting.
58 pub mod group;
59 
60 #[derive(Debug)]
61 /// Describes the errors that may occur while handling rate limiter events.
62 pub enum Error {
63     /// The event handler was called spuriously.
64     SpuriousRateLimiterEvent(&'static str),
65     /// The event handler encounters while TimerFd::wait()
66     TimerFdWaitError(std::io::Error),
67 }
68 
69 // Interval at which the refill timer will run when limiter is at capacity.
70 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
71 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
72 
73 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
74 
75 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
76 fn gcd(x: u64, y: u64) -> u64 {
77     let mut x = x;
78     let mut y = y;
79     while y != 0 {
80         let t = y;
81         y = x % y;
82         x = t;
83     }
84     x
85 }
86 
87 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
88 #[derive(Clone, Debug, PartialEq)]
89 pub enum BucketReduction {
90     /// There are not enough tokens to complete the operation.
91     Failure,
92     /// A part of the available tokens have been consumed.
93     Success,
94     /// A number of tokens `inner` times larger than the bucket size have been consumed.
95     OverConsumption(f64),
96 }
97 
98 /// TokenBucket provides a lower level interface to rate limiting with a
99 /// configurable capacity, refill-rate and initial burst.
100 #[derive(Clone, Debug, PartialEq, Eq)]
101 pub struct TokenBucket {
102     // Bucket defining traits.
103     size: u64,
104     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
105     one_time_burst: u64,
106     // Complete refill time in milliseconds.
107     refill_time: u64,
108 
109     // Internal state descriptors.
110     budget: u64,
111     last_update: Instant,
112 
113     // Fields used for pre-processing optimizations.
114     processed_capacity: u64,
115     processed_refill_time: u64,
116 }
117 
118 impl TokenBucket {
119     /// Creates a `TokenBucket` wrapped in an `Option`.
120     ///
121     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
122     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
123     /// extra credit on top of total capacity, that does not replenish and which can be used
124     /// for an initial burst of data.
125     ///
126     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
127     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
128         // If either token bucket capacity or refill time is 0, disable limiting.
129         if size == 0 || complete_refill_time_ms == 0 {
130             return None;
131         }
132         // Formula for computing current refill amount:
133         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
134         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
135 
136         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
137         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
138         let common_factor = gcd(size, complete_refill_time_ns);
139         // The division will be exact since `common_factor` is a factor of `size`.
140         let processed_capacity: u64 = size / common_factor;
141         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
142         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
143 
144         Some(TokenBucket {
145             size,
146             one_time_burst,
147             refill_time: complete_refill_time_ms,
148             // Start off full.
149             budget: size,
150             // Last updated is now.
151             last_update: Instant::now(),
152             processed_capacity,
153             processed_refill_time,
154         })
155     }
156 
157     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
158     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
159     // for such cases we need to support partial fulfilment of requests
160     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
161         // First things first: consume the one-time-burst budget.
162         if self.one_time_burst > 0 {
163             // We still have burst budget for *all* tokens requests.
164             if self.one_time_burst >= tokens {
165                 self.one_time_burst -= tokens;
166                 self.last_update = Instant::now();
167                 // No need to continue to the refill process, we still have burst budget to consume from.
168                 return BucketReduction::Success;
169             } else {
170                 // We still have burst budget for *some* of the tokens requests.
171                 // The tokens left unfulfilled will be consumed from current `self.budget`.
172                 tokens -= self.one_time_burst;
173                 self.one_time_burst = 0;
174             }
175         }
176 
177         // Compute time passed since last refill/update.
178         let time_delta = self.last_update.elapsed().as_nanos() as u64;
179         self.last_update = Instant::now();
180 
181         // At each 'time_delta' nanoseconds the bucket should refill with:
182         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
183         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
184         // fraction formula with their greatest-common-factor.
185         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
186 
187         if self.budget >= self.size {
188             self.budget = self.size;
189         }
190 
191         if tokens > self.budget {
192             // This operation requests a bandwidth higher than the bucket size
193             if tokens > self.size {
194                 error!(
195                     "Consumed {} tokens from bucket of size {}",
196                     tokens, self.size
197                 );
198                 // Empty the bucket and report an overconsumption of
199                 // (remaining tokens / size) times larger than the bucket size
200                 tokens -= self.budget;
201                 self.budget = 0;
202                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
203             }
204             // If not enough tokens consume() fails, return false.
205             return BucketReduction::Failure;
206         }
207 
208         self.budget -= tokens;
209         BucketReduction::Success
210     }
211 
212     /// "Manually" adds tokens to bucket.
213     pub fn replenish(&mut self, tokens: u64) {
214         // This means we are still during the burst interval.
215         // Of course there is a very small chance  that the last reduce() also used up burst
216         // budget which should now be replenished, but for performance and code-complexity
217         // reasons we're just gonna let that slide since it's practically inconsequential.
218         if self.one_time_burst > 0 {
219             self.one_time_burst += tokens;
220             return;
221         }
222         self.budget = std::cmp::min(self.budget + tokens, self.size);
223     }
224 
225     /// Returns the capacity of the token bucket.
226     pub fn capacity(&self) -> u64 {
227         self.size
228     }
229 
230     /// Returns the remaining one time burst budget.
231     pub fn one_time_burst(&self) -> u64 {
232         self.one_time_burst
233     }
234 
235     /// Returns the time in milliseconds required to to completely fill the bucket.
236     pub fn refill_time_ms(&self) -> u64 {
237         self.refill_time
238     }
239 
240     /// Returns the current budget (one time burst allowance notwithstanding).
241     pub fn budget(&self) -> u64 {
242         self.budget
243     }
244 }
245 
246 /// Enum that describes the type of token used.
247 pub enum TokenType {
248     /// Token type used for bandwidth limiting.
249     Bytes,
250     /// Token type used for operations/second limiting.
251     Ops,
252 }
253 
254 /// Enum that describes the type of token bucket update.
255 pub enum BucketUpdate {
256     /// No Update - same as before.
257     None,
258     /// Rate Limiting is disabled on this bucket.
259     Disabled,
260     /// Rate Limiting enabled with updated bucket.
261     Update(TokenBucket),
262 }
263 
264 /// Rate Limiter that works on both bandwidth and ops/s limiting.
265 ///
266 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
267 ///
268 /// Implementation uses a single timer through TimerFd to refresh either or
269 /// both token buckets.
270 ///
271 /// Its internal buckets are 'passively' replenished as they're being used (as
272 /// part of `consume()` operations).
273 /// A timer is enabled and used to 'actively' replenish the token buckets when
274 /// limiting is in effect and `consume()` operations are disabled.
275 ///
276 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
277 /// implementation. These events are meant to be consumed by the user of this struct.
278 /// On each such event, the user must call the `event_handler()` method.
279 pub struct RateLimiter {
280     inner: Mutex<RateLimiterInner>,
281 
282     // Internal flag that quickly determines timer state.
283     timer_active: AtomicBool,
284 }
285 
286 struct RateLimiterInner {
287     bandwidth: Option<TokenBucket>,
288     ops: Option<TokenBucket>,
289 
290     timer_fd: TimerFd,
291 }
292 
293 impl RateLimiterInner {
294     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
295     fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
296         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
297         self.timer_fd
298             .reset(dur, None)
299             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
300         flag.store(true, Ordering::Relaxed)
301     }
302 }
303 
304 impl RateLimiter {
305     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
306     ///
307     /// # Arguments
308     ///
309     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
310     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
311     ///   that does not replenish and which can be used for an initial burst of data.
312     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
313     ///   token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
314     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
315     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
316     ///   that does not replenish and which can be used for an initial burst of data.
317     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
318     ///   bucket to go from zero Ops to `ops_total_capacity` Ops.
319     ///
320     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
321     /// is **disabled** for that respective token type.
322     ///
323     /// # Errors
324     ///
325     /// If the timerfd creation fails, an error is returned.
326     pub fn new(
327         bytes_total_capacity: u64,
328         bytes_one_time_burst: u64,
329         bytes_complete_refill_time_ms: u64,
330         ops_total_capacity: u64,
331         ops_one_time_burst: u64,
332         ops_complete_refill_time_ms: u64,
333     ) -> io::Result<Self> {
334         let bytes_token_bucket = TokenBucket::new(
335             bytes_total_capacity,
336             bytes_one_time_burst,
337             bytes_complete_refill_time_ms,
338         );
339 
340         let ops_token_bucket = TokenBucket::new(
341             ops_total_capacity,
342             ops_one_time_burst,
343             ops_complete_refill_time_ms,
344         );
345 
346         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
347         // because `Self::update_buckets()` might re-enable it later, and we might be
348         // seccomp-blocked from creating the timer_fd at that time.
349         let timer_fd = TimerFd::new()?;
350         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
351         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
352         // SAFETY: FFI calls.
353         let ret = unsafe {
354             let fd = timer_fd.as_raw_fd();
355             let mut flags = libc::fcntl(fd, libc::F_GETFL);
356             flags |= libc::O_NONBLOCK;
357             libc::fcntl(fd, libc::F_SETFL, flags)
358         };
359         if ret < 0 {
360             return Err(std::io::Error::last_os_error());
361         }
362 
363         Ok(RateLimiter {
364             inner: Mutex::new(RateLimiterInner {
365                 bandwidth: bytes_token_bucket,
366                 ops: ops_token_bucket,
367                 timer_fd,
368             }),
369             timer_active: AtomicBool::new(false),
370         })
371     }
372 
373     /// Attempts to consume tokens and returns whether that is possible.
374     ///
375     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
376     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
377         // If the timer is active, we can't consume tokens from any bucket and the function fails.
378         if self.is_blocked() {
379             return false;
380         }
381         let mut guard = self.inner.lock().unwrap();
382         // Identify the required token bucket.
383         let token_bucket = match token_type {
384             TokenType::Bytes => guard.bandwidth.as_mut(),
385             TokenType::Ops => guard.ops.as_mut(),
386         };
387         // Try to consume from the token bucket.
388         if let Some(bucket) = token_bucket {
389             let refill_time = bucket.refill_time_ms();
390             match bucket.reduce(tokens) {
391                 // When we report budget is over, there will be no further calls here,
392                 // register a timer to replenish the bucket and resume processing;
393                 // make sure there is only one running timer for this limiter.
394                 BucketReduction::Failure => {
395                     if !self.is_blocked() {
396                         guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
397                     }
398                     false
399                 }
400                 // The operation succeeded and further calls can be made.
401                 BucketReduction::Success => true,
402                 // The operation succeeded as the tokens have been consumed
403                 // but the timer still needs to be armed.
404                 BucketReduction::OverConsumption(ratio) => {
405                     // The operation "borrowed" a number of tokens `ratio` times
406                     // greater than the size of the bucket, and since it takes
407                     // `refill_time` milliseconds to fill an empty bucket, in
408                     // order to enforce the bandwidth limit we need to prevent
409                     // further calls to the rate limiter for
410                     // `ratio * refill_time` milliseconds.
411                     guard.activate_timer(
412                         Duration::from_millis((ratio * refill_time as f64) as u64),
413                         &self.timer_active,
414                     );
415                     true
416                 }
417             }
418         } else {
419             // If bucket is not present rate limiting is disabled on token type,
420             // consume() will always succeed.
421             true
422         }
423     }
424 
425     /// Adds tokens of `token_type` to their respective bucket.
426     ///
427     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
428     /// `consume()` if needed.
429     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
430         let mut guard = self.inner.lock().unwrap();
431         // Identify the required token bucket.
432         let token_bucket = match token_type {
433             TokenType::Bytes => guard.bandwidth.as_mut(),
434             TokenType::Ops => guard.ops.as_mut(),
435         };
436         // Add tokens to the token bucket.
437         if let Some(bucket) = token_bucket {
438             bucket.replenish(tokens);
439         }
440     }
441 
442     /// Returns whether this rate limiter is blocked.
443     ///
444     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
445     /// budget for it.
446     /// An event will be generated on the exported FD when the limiter 'unblocks'.
447     pub fn is_blocked(&self) -> bool {
448         self.timer_active.load(Ordering::Relaxed)
449     }
450 
451     /// This function needs to be called every time there is an event on the
452     /// FD provided by this object's `AsRawFd` trait implementation.
453     ///
454     /// # Errors
455     ///
456     /// If the rate limiter is disabled or is not blocked, an error is returned.
457     pub fn event_handler(&self) -> Result<(), Error> {
458         let mut guard = self.inner.lock().unwrap();
459         loop {
460             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
461             // `timer_fd::wait()` won't block (which is different from its default behavior.)
462             match guard.timer_fd.wait() {
463                 Err(e) => {
464                     let err: std::io::Error = e.into();
465                     match err.kind() {
466                         std::io::ErrorKind::Interrupted => (),
467                         std::io::ErrorKind::WouldBlock => {
468                             return Err(Error::SpuriousRateLimiterEvent(
469                                 "Rate limiter event handler called without a present timer",
470                             ))
471                         }
472                         _ => return Err(Error::TimerFdWaitError(err)),
473                     }
474                 }
475                 _ => {
476                     self.timer_active.store(false, Ordering::Relaxed);
477                     return Ok(());
478                 }
479             }
480         }
481     }
482 
483     /// Updates the parameters of the token buckets associated with this RateLimiter.
484     // TODO: Please note that, right now, the buckets become full after being updated.
485     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
486         let mut guard = self.inner.lock().unwrap();
487         match bytes {
488             BucketUpdate::Disabled => guard.bandwidth = None,
489             BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
490             BucketUpdate::None => (),
491         };
492         match ops {
493             BucketUpdate::Disabled => guard.ops = None,
494             BucketUpdate::Update(tb) => guard.ops = Some(tb),
495             BucketUpdate::None => (),
496         };
497     }
498 }
499 
500 impl AsRawFd for RateLimiter {
501     /// Provides a FD which needs to be monitored for POLLIN events.
502     ///
503     /// This object's `event_handler()` method must be called on such events.
504     ///
505     /// Will return a negative value if rate limiting is disabled on both
506     /// token types.
507     fn as_raw_fd(&self) -> RawFd {
508         let guard = self.inner.lock().unwrap();
509         guard.timer_fd.as_raw_fd()
510     }
511 }
512 
513 impl Default for RateLimiter {
514     /// Default RateLimiter is a no-op limiter with infinite budget.
515     fn default() -> Self {
516         // Safe to unwrap since this will not attempt to create timer_fd.
517         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
518     }
519 }
520 
521 #[cfg(test)]
522 pub(crate) mod tests {
523     use std::{fmt, thread};
524 
525     use super::*;
526 
527     impl TokenBucket {
528         // Resets the token bucket: budget set to max capacity and last-updated set to now.
529         fn reset(&mut self) {
530             self.budget = self.size;
531             self.last_update = Instant::now();
532         }
533 
534         fn get_last_update(&self) -> &Instant {
535             &self.last_update
536         }
537 
538         fn get_processed_capacity(&self) -> u64 {
539             self.processed_capacity
540         }
541 
542         fn get_processed_refill_time(&self) -> u64 {
543             self.processed_refill_time
544         }
545 
546         // After a restore, we cannot be certain that the last_update field has the same value.
547         #[allow(dead_code)]
548         fn partial_eq(&self, other: &TokenBucket) -> bool {
549             (other.capacity() == self.capacity())
550                 && (other.one_time_burst() == self.one_time_burst())
551                 && (other.refill_time_ms() == self.refill_time_ms())
552                 && (other.budget() == self.budget())
553         }
554     }
555 
556     impl RateLimiter {
557         fn bandwidth(&self) -> Option<TokenBucket> {
558             let guard = self.inner.lock().unwrap();
559             guard.bandwidth.clone()
560         }
561 
562         fn ops(&self) -> Option<TokenBucket> {
563             let guard = self.inner.lock().unwrap();
564             guard.ops.clone()
565         }
566     }
567 
568     impl PartialEq for RateLimiter {
569         fn eq(&self, other: &RateLimiter) -> bool {
570             let self_guard = self.inner.lock().unwrap();
571             let other_guard = other.inner.lock().unwrap();
572             self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
573         }
574     }
575 
576     impl fmt::Debug for RateLimiter {
577         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
578             let guard = self.inner.lock().unwrap();
579             write!(
580                 f,
581                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
582                 guard.bandwidth, guard.ops
583             )
584         }
585     }
586 
587     #[test]
588     fn test_token_bucket_create() {
589         let before = Instant::now();
590         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
591         assert_eq!(tb.capacity(), 1000);
592         assert_eq!(tb.budget(), 1000);
593         assert!(*tb.get_last_update() >= before);
594         let after = Instant::now();
595         assert!(*tb.get_last_update() <= after);
596         assert_eq!(tb.get_processed_capacity(), 1);
597         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
598 
599         // Verify invalid bucket configurations result in `None`.
600         assert!(TokenBucket::new(0, 1234, 1000).is_none());
601         assert!(TokenBucket::new(100, 1234, 0).is_none());
602         assert!(TokenBucket::new(0, 1234, 0).is_none());
603     }
604 
605     #[test]
606     fn test_token_bucket_preprocess() {
607         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
608         assert_eq!(tb.get_processed_capacity(), 1);
609         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
610 
611         let thousand = 1000;
612         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
613         assert_eq!(tb.get_processed_capacity(), 3 * 19);
614         assert_eq!(
615             tb.get_processed_refill_time(),
616             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
617         );
618     }
619 
620     #[test]
621     fn test_token_bucket_reduce() {
622         // token bucket with capacity 1000 and refill time of 1000 milliseconds
623         // allowing rate of 1 token/ms.
624         let capacity = 1000;
625         let refill_ms = 1000;
626         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
627 
628         assert_eq!(tb.reduce(123), BucketReduction::Success);
629         assert_eq!(tb.budget(), capacity - 123);
630 
631         thread::sleep(Duration::from_millis(123));
632         assert_eq!(tb.reduce(1), BucketReduction::Success);
633         assert_eq!(tb.budget(), capacity - 1);
634         assert_eq!(tb.reduce(100), BucketReduction::Success);
635         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
636 
637         // token bucket with capacity 1000 and refill time of 1000 milliseconds
638         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
639         // safely assuming the thread can run these 3 commands in less than 500ms
640         assert_eq!(tb.reduce(1000), BucketReduction::Success);
641         assert_eq!(tb.one_time_burst(), 100);
642         assert_eq!(tb.reduce(500), BucketReduction::Success);
643         assert_eq!(tb.one_time_burst(), 0);
644         assert_eq!(tb.reduce(500), BucketReduction::Success);
645         assert_eq!(tb.reduce(500), BucketReduction::Failure);
646         thread::sleep(Duration::from_millis(500));
647         assert_eq!(tb.reduce(500), BucketReduction::Success);
648         thread::sleep(Duration::from_millis(1000));
649         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
650 
651         let before = Instant::now();
652         tb.reset();
653         assert_eq!(tb.capacity(), 1000);
654         assert_eq!(tb.budget(), 1000);
655         assert!(*tb.get_last_update() >= before);
656         let after = Instant::now();
657         assert!(*tb.get_last_update() <= after);
658     }
659 
660     #[test]
661     fn test_rate_limiter_default() {
662         let l = RateLimiter::default();
663 
664         // limiter should not be blocked
665         assert!(!l.is_blocked());
666         // limiter should be disabled so consume(whatever) should work
667         assert!(l.consume(u64::MAX, TokenType::Ops));
668         assert!(l.consume(u64::MAX, TokenType::Bytes));
669         // calling the handler without there having been an event should error
670         l.event_handler().unwrap_err();
671         assert_eq!(
672             format!("{:?}", l.event_handler().err().unwrap()),
673             "SpuriousRateLimiterEvent(\
674              \"Rate limiter event handler called without a present timer\")"
675         );
676     }
677 
678     #[test]
679     fn test_rate_limiter_new() {
680         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
681         let bw = l.bandwidth().unwrap();
682         assert_eq!(bw.capacity(), 1000);
683         assert_eq!(bw.one_time_burst(), 1001);
684         assert_eq!(bw.refill_time_ms(), 1002);
685         assert_eq!(bw.budget(), 1000);
686 
687         let ops = l.ops().unwrap();
688         assert_eq!(ops.capacity(), 1003);
689         assert_eq!(ops.one_time_burst(), 1004);
690         assert_eq!(ops.refill_time_ms(), 1005);
691         assert_eq!(ops.budget(), 1003);
692     }
693 
694     #[test]
695     fn test_rate_limiter_manual_replenish() {
696         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
697         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
698 
699         // consume 123 bytes
700         assert!(l.consume(123, TokenType::Bytes));
701         l.manual_replenish(23, TokenType::Bytes);
702         {
703             let bytes_tb = l.bandwidth().unwrap();
704             assert_eq!(bytes_tb.budget(), 900);
705         }
706         // consume 123 ops
707         assert!(l.consume(123, TokenType::Ops));
708         l.manual_replenish(23, TokenType::Ops);
709         {
710             let bytes_tb = l.ops().unwrap();
711             assert_eq!(bytes_tb.budget(), 900);
712         }
713     }
714 
715     #[test]
716     fn test_rate_limiter_bandwidth() {
717         // rate limiter with limit of 1000 bytes/s
718         let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
719 
720         // limiter should not be blocked
721         assert!(!l.is_blocked());
722         // raw FD for this disabled should be valid
723         assert!(l.as_raw_fd() > 0);
724 
725         // ops/s limiter should be disabled so consume(whatever) should work
726         assert!(l.consume(u64::MAX, TokenType::Ops));
727 
728         // do full 1000 bytes
729         assert!(l.consume(1000, TokenType::Bytes));
730         // try and fail on another 100
731         assert!(!l.consume(100, TokenType::Bytes));
732         // since consume failed, limiter should be blocked now
733         assert!(l.is_blocked());
734         // wait half the timer period
735         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
736         // limiter should still be blocked
737         assert!(l.is_blocked());
738         // wait the other half of the timer period
739         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
740         // the timer_fd should have an event on it by now
741         l.event_handler().unwrap();
742         // limiter should now be unblocked
743         assert!(!l.is_blocked());
744         // try and succeed on another 100 bytes this time
745         assert!(l.consume(100, TokenType::Bytes));
746     }
747 
748     #[test]
749     fn test_rate_limiter_ops() {
750         // rate limiter with limit of 1000 ops/s
751         let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
752 
753         // limiter should not be blocked
754         assert!(!l.is_blocked());
755         // raw FD for this disabled should be valid
756         assert!(l.as_raw_fd() > 0);
757 
758         // bytes/s limiter should be disabled so consume(whatever) should work
759         assert!(l.consume(u64::MAX, TokenType::Bytes));
760 
761         // do full 1000 ops
762         assert!(l.consume(1000, TokenType::Ops));
763         // try and fail on another 100
764         assert!(!l.consume(100, TokenType::Ops));
765         // since consume failed, limiter should be blocked now
766         assert!(l.is_blocked());
767         // wait half the timer period
768         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
769         // limiter should still be blocked
770         assert!(l.is_blocked());
771         // wait the other half of the timer period
772         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
773         // the timer_fd should have an event on it by now
774         l.event_handler().unwrap();
775         // limiter should now be unblocked
776         assert!(!l.is_blocked());
777         // try and succeed on another 100 ops this time
778         assert!(l.consume(100, TokenType::Ops));
779     }
780 
781     #[test]
782     fn test_rate_limiter_full() {
783         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
784         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
785 
786         // limiter should not be blocked
787         assert!(!l.is_blocked());
788         // raw FD for this disabled should be valid
789         assert!(l.as_raw_fd() > 0);
790 
791         // do full 1000 bytes
792         assert!(l.consume(1000, TokenType::Ops));
793         // do full 1000 bytes
794         assert!(l.consume(1000, TokenType::Bytes));
795         // try and fail on another 100 ops
796         assert!(!l.consume(100, TokenType::Ops));
797         // try and fail on another 100 bytes
798         assert!(!l.consume(100, TokenType::Bytes));
799         // since consume failed, limiter should be blocked now
800         assert!(l.is_blocked());
801         // wait half the timer period
802         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
803         // limiter should still be blocked
804         assert!(l.is_blocked());
805         // wait the other half of the timer period
806         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
807         // the timer_fd should have an event on it by now
808         l.event_handler().unwrap();
809         // limiter should now be unblocked
810         assert!(!l.is_blocked());
811         // try and succeed on another 100 ops this time
812         assert!(l.consume(100, TokenType::Ops));
813         // try and succeed on another 100 bytes this time
814         assert!(l.consume(100, TokenType::Bytes));
815     }
816 
817     #[test]
818     fn test_rate_limiter_overconsumption() {
819         // initialize the rate limiter
820         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
821         // try to consume 2.5x the bucket size
822         // we are "borrowing" 1.5x the bucket size in tokens since
823         // the bucket is full
824         assert!(l.consume(2500, TokenType::Bytes));
825 
826         // check that even after a whole second passes, the rate limiter
827         // is still blocked
828         thread::sleep(Duration::from_millis(1000));
829         l.event_handler().unwrap_err();
830         assert!(l.is_blocked());
831 
832         // after 1.5x the replenish time has passed, the rate limiter
833         // is available again
834         thread::sleep(Duration::from_millis(500));
835         l.event_handler().unwrap();
836         assert!(!l.is_blocked());
837 
838         // reset the rate limiter
839         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
840         // try to consume 1.5x the bucket size
841         // we are "borrowing" 1.5x the bucket size in tokens since
842         // the bucket is full, should arm the timer to 0.5x replenish
843         // time, which is 500 ms
844         assert!(l.consume(1500, TokenType::Bytes));
845 
846         // check that after more than the minimum refill time,
847         // the rate limiter is still blocked
848         thread::sleep(Duration::from_millis(200));
849         l.event_handler().unwrap_err();
850         assert!(l.is_blocked());
851 
852         // try to consume some tokens, which should fail as the timer
853         // is still active
854         assert!(!l.consume(100, TokenType::Bytes));
855         l.event_handler().unwrap_err();
856         assert!(l.is_blocked());
857 
858         // check that after the minimum refill time, the timer was not
859         // overwritten and the rate limiter is still blocked from the
860         // borrowing we performed earlier
861         thread::sleep(Duration::from_millis(100));
862         l.event_handler().unwrap_err();
863         assert!(l.is_blocked());
864         assert!(!l.consume(100, TokenType::Bytes));
865 
866         // after waiting out the full duration, rate limiter should be
867         // available again
868         thread::sleep(Duration::from_millis(200));
869         l.event_handler().unwrap();
870         assert!(!l.is_blocked());
871         assert!(l.consume(100, TokenType::Bytes));
872     }
873 
874     #[test]
875     fn test_update_buckets() {
876         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
877 
878         let initial_bw = x.bandwidth();
879         let initial_ops = x.ops();
880 
881         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
882         assert_eq!(x.bandwidth(), initial_bw);
883         assert_eq!(x.ops(), initial_ops);
884 
885         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
886         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
887         x.update_buckets(
888             BucketUpdate::Update(new_bw.clone()),
889             BucketUpdate::Update(new_ops.clone()),
890         );
891 
892         {
893             let mut guard = x.inner.lock().unwrap();
894             // We have manually adjust the last_update field, because it changes when update_buckets()
895             // constructs new buckets (and thus gets a different value for last_update). We do this so
896             // it makes sense to test the following assertions.
897             guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
898             guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
899         }
900 
901         assert_eq!(x.bandwidth(), Some(new_bw));
902         assert_eq!(x.ops(), Some(new_ops));
903 
904         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
905         assert_eq!(x.bandwidth(), None);
906         assert_eq!(x.ops(), None);
907     }
908 
909     #[test]
910     fn test_rate_limiter_debug() {
911         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
912         assert_eq!(
913             format!("{l:?}"),
914             format!(
915                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
916                 l.bandwidth(),
917                 l.ops()
918             ),
919         );
920     }
921 }
922