xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision 19d36c765fdf00be749d95b3e61028bc302d6d73)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54 
55 use vmm_sys_util::timerfd::TimerFd;
56 
57 /// Module for group rate limiting.
58 pub mod group;
59 
60 #[derive(Debug)]
61 /// Describes the errors that may occur while handling rate limiter events.
62 pub enum Error {
63     /// The event handler was called spuriously.
64     SpuriousRateLimiterEvent(&'static str),
65     /// The event handler encounters while TimerFd::wait()
66     TimerFdWaitError(std::io::Error),
67 }
68 
69 // Interval at which the refill timer will run when limiter is at capacity.
70 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
71 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
72 
73 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
74 
75 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
76 fn gcd(x: u64, y: u64) -> u64 {
77     let mut x = x;
78     let mut y = y;
79     while y != 0 {
80         let t = y;
81         y = x % y;
82         x = t;
83     }
84     x
85 }
86 
87 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
88 #[derive(Clone, Debug, PartialEq)]
89 pub enum BucketReduction {
90     /// There are not enough tokens to complete the operation.
91     Failure,
92     /// A part of the available tokens have been consumed.
93     Success,
94     /// A number of tokens `inner` times larger than the bucket size have been consumed.
95     OverConsumption(f64),
96 }
97 
98 /// TokenBucket provides a lower level interface to rate limiting with a
99 /// configurable capacity, refill-rate and initial burst.
100 #[derive(Clone, Debug, PartialEq, Eq)]
101 pub struct TokenBucket {
102     // Bucket defining traits.
103     size: u64,
104     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
105     one_time_burst: u64,
106     // Complete refill time in milliseconds.
107     refill_time: u64,
108 
109     // Internal state descriptors.
110     budget: u64,
111     last_update: Instant,
112 
113     // Fields used for pre-processing optimizations.
114     processed_capacity: u64,
115     processed_refill_time: u64,
116 }
117 
118 impl TokenBucket {
119     /// Creates a `TokenBucket` wrapped in an `Option`.
120     ///
121     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
122     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
123     /// extra credit on top of total capacity, that does not replenish and which can be used
124     /// for an initial burst of data.
125     ///
126     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
127     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
128         // If either token bucket capacity or refill time is 0, disable limiting.
129         if size == 0 || complete_refill_time_ms == 0 {
130             return None;
131         }
132         // Formula for computing current refill amount:
133         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
134         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
135 
136         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
137         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
138         let common_factor = gcd(size, complete_refill_time_ns);
139         // The division will be exact since `common_factor` is a factor of `size`.
140         let processed_capacity: u64 = size / common_factor;
141         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
142         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
143 
144         Some(TokenBucket {
145             size,
146             one_time_burst,
147             refill_time: complete_refill_time_ms,
148             // Start off full.
149             budget: size,
150             // Last updated is now.
151             last_update: Instant::now(),
152             processed_capacity,
153             processed_refill_time,
154         })
155     }
156 
157     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
158     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
159     // for such cases we need to support partial fulfilment of requests
160     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
161         // First things first: consume the one-time-burst budget.
162         if self.one_time_burst > 0 {
163             // We still have burst budget for *all* tokens requests.
164             if self.one_time_burst >= tokens {
165                 self.one_time_burst -= tokens;
166                 self.last_update = Instant::now();
167                 // No need to continue to the refill process, we still have burst budget to consume from.
168                 return BucketReduction::Success;
169             } else {
170                 // We still have burst budget for *some* of the tokens requests.
171                 // The tokens left unfulfilled will be consumed from current `self.budget`.
172                 tokens -= self.one_time_burst;
173                 self.one_time_burst = 0;
174             }
175         }
176 
177         // Compute time passed since last refill/update.
178         let time_delta = self.last_update.elapsed().as_nanos() as u64;
179         self.last_update = Instant::now();
180 
181         // At each 'time_delta' nanoseconds the bucket should refill with:
182         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
183         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
184         // fraction formula with their greatest-common-factor.
185         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
186 
187         if self.budget >= self.size {
188             self.budget = self.size;
189         }
190 
191         if tokens > self.budget {
192             // This operation requests a bandwidth higher than the bucket size
193             if tokens > self.size {
194                 error!(
195                     "Consumed {} tokens from bucket of size {}",
196                     tokens, self.size
197                 );
198                 // Empty the bucket and report an overconsumption of
199                 // (remaining tokens / size) times larger than the bucket size
200                 tokens -= self.budget;
201                 self.budget = 0;
202                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
203             }
204             // If not enough tokens consume() fails, return false.
205             return BucketReduction::Failure;
206         }
207 
208         self.budget -= tokens;
209         BucketReduction::Success
210     }
211 
212     /// "Manually" adds tokens to bucket.
213     pub fn replenish(&mut self, tokens: u64) {
214         // This means we are still during the burst interval.
215         // Of course there is a very small chance  that the last reduce() also used up burst
216         // budget which should now be replenished, but for performance and code-complexity
217         // reasons we're just gonna let that slide since it's practically inconsequential.
218         if self.one_time_burst > 0 {
219             self.one_time_burst += tokens;
220             return;
221         }
222         self.budget = std::cmp::min(self.budget + tokens, self.size);
223     }
224 
225     /// Returns the capacity of the token bucket.
226     pub fn capacity(&self) -> u64 {
227         self.size
228     }
229 
230     /// Returns the remaining one time burst budget.
231     pub fn one_time_burst(&self) -> u64 {
232         self.one_time_burst
233     }
234 
235     /// Returns the time in milliseconds required to to completely fill the bucket.
236     pub fn refill_time_ms(&self) -> u64 {
237         self.refill_time
238     }
239 
240     /// Returns the current budget (one time burst allowance notwithstanding).
241     pub fn budget(&self) -> u64 {
242         self.budget
243     }
244 }
245 
246 /// Enum that describes the type of token used.
247 pub enum TokenType {
248     /// Token type used for bandwidth limiting.
249     Bytes,
250     /// Token type used for operations/second limiting.
251     Ops,
252 }
253 
254 /// Enum that describes the type of token bucket update.
255 pub enum BucketUpdate {
256     /// No Update - same as before.
257     None,
258     /// Rate Limiting is disabled on this bucket.
259     Disabled,
260     /// Rate Limiting enabled with updated bucket.
261     Update(TokenBucket),
262 }
263 
264 /// Rate Limiter that works on both bandwidth and ops/s limiting.
265 ///
266 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
267 ///
268 /// Implementation uses a single timer through TimerFd to refresh either or
269 /// both token buckets.
270 ///
271 /// Its internal buckets are 'passively' replenished as they're being used (as
272 /// part of `consume()` operations).
273 /// A timer is enabled and used to 'actively' replenish the token buckets when
274 /// limiting is in effect and `consume()` operations are disabled.
275 ///
276 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
277 /// implementation. These events are meant to be consumed by the user of this struct.
278 /// On each such event, the user must call the `event_handler()` method.
279 pub struct RateLimiter {
280     inner: Mutex<RateLimiterInner>,
281 
282     // Internal flag that quickly determines timer state.
283     timer_active: AtomicBool,
284 }
285 
286 struct RateLimiterInner {
287     bandwidth: Option<TokenBucket>,
288     ops: Option<TokenBucket>,
289 
290     timer_fd: TimerFd,
291 }
292 
293 impl RateLimiterInner {
294     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
295     fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
296         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
297         self.timer_fd
298             .reset(dur, None)
299             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
300         flag.store(true, Ordering::Relaxed)
301     }
302 }
303 
304 impl RateLimiter {
305     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
306     ///
307     /// # Arguments
308     ///
309     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
310     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
311     ///   that does not replenish and which can be used for an initial burst of data.
312     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
313     ///   token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
314     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
315     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
316     ///   that does not replenish and which can be used for an initial burst of data.
317     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
318     ///   bucket to go from zero Ops to `ops_total_capacity` Ops.
319     ///
320     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
321     /// is **disabled** for that respective token type.
322     ///
323     /// # Errors
324     ///
325     /// If the timerfd creation fails, an error is returned.
326     pub fn new(
327         bytes_total_capacity: u64,
328         bytes_one_time_burst: u64,
329         bytes_complete_refill_time_ms: u64,
330         ops_total_capacity: u64,
331         ops_one_time_burst: u64,
332         ops_complete_refill_time_ms: u64,
333     ) -> io::Result<Self> {
334         let bytes_token_bucket = TokenBucket::new(
335             bytes_total_capacity,
336             bytes_one_time_burst,
337             bytes_complete_refill_time_ms,
338         );
339 
340         let ops_token_bucket = TokenBucket::new(
341             ops_total_capacity,
342             ops_one_time_burst,
343             ops_complete_refill_time_ms,
344         );
345 
346         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
347         // because `Self::update_buckets()` might re-enable it later, and we might be
348         // seccomp-blocked from creating the timer_fd at that time.
349         let timer_fd = TimerFd::new()?;
350         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
351         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
352         // SAFETY: FFI calls.
353         let ret = unsafe {
354             let fd = timer_fd.as_raw_fd();
355             let mut flags = libc::fcntl(fd, libc::F_GETFL);
356             flags |= libc::O_NONBLOCK;
357             libc::fcntl(fd, libc::F_SETFL, flags)
358         };
359         if ret < 0 {
360             return Err(std::io::Error::last_os_error());
361         }
362 
363         Ok(RateLimiter {
364             inner: Mutex::new(RateLimiterInner {
365                 bandwidth: bytes_token_bucket,
366                 ops: ops_token_bucket,
367                 timer_fd,
368             }),
369             timer_active: AtomicBool::new(false),
370         })
371     }
372 
373     /// Attempts to consume tokens and returns whether that is possible.
374     ///
375     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
376     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
377         // If the timer is active, we can't consume tokens from any bucket and the function fails.
378         if self.is_blocked() {
379             return false;
380         }
381         let mut guard = self.inner.lock().unwrap();
382         // Identify the required token bucket.
383         let token_bucket = match token_type {
384             TokenType::Bytes => guard.bandwidth.as_mut(),
385             TokenType::Ops => guard.ops.as_mut(),
386         };
387         // Try to consume from the token bucket.
388         if let Some(bucket) = token_bucket {
389             let refill_time = bucket.refill_time_ms();
390             match bucket.reduce(tokens) {
391                 // When we report budget is over, there will be no further calls here,
392                 // register a timer to replenish the bucket and resume processing;
393                 // make sure there is only one running timer for this limiter.
394                 BucketReduction::Failure => {
395                     if !self.is_blocked() {
396                         guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
397                     }
398                     false
399                 }
400                 // The operation succeeded and further calls can be made.
401                 BucketReduction::Success => true,
402                 // The operation succeeded as the tokens have been consumed
403                 // but the timer still needs to be armed.
404                 BucketReduction::OverConsumption(ratio) => {
405                     // The operation "borrowed" a number of tokens `ratio` times
406                     // greater than the size of the bucket, and since it takes
407                     // `refill_time` milliseconds to fill an empty bucket, in
408                     // order to enforce the bandwidth limit we need to prevent
409                     // further calls to the rate limiter for
410                     // `ratio * refill_time` milliseconds.
411                     guard.activate_timer(
412                         Duration::from_millis((ratio * refill_time as f64) as u64),
413                         &self.timer_active,
414                     );
415                     true
416                 }
417             }
418         } else {
419             // If bucket is not present rate limiting is disabled on token type,
420             // consume() will always succeed.
421             true
422         }
423     }
424 
425     /// Adds tokens of `token_type` to their respective bucket.
426     ///
427     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
428     /// `consume()` if needed.
429     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
430         let mut guard = self.inner.lock().unwrap();
431         // Identify the required token bucket.
432         let token_bucket = match token_type {
433             TokenType::Bytes => guard.bandwidth.as_mut(),
434             TokenType::Ops => guard.ops.as_mut(),
435         };
436         // Add tokens to the token bucket.
437         if let Some(bucket) = token_bucket {
438             bucket.replenish(tokens);
439         }
440     }
441 
442     /// Returns whether this rate limiter is blocked.
443     ///
444     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
445     /// budget for it.
446     /// An event will be generated on the exported FD when the limiter 'unblocks'.
447     pub fn is_blocked(&self) -> bool {
448         self.timer_active.load(Ordering::Relaxed)
449     }
450 
451     /// This function needs to be called every time there is an event on the
452     /// FD provided by this object's `AsRawFd` trait implementation.
453     ///
454     /// # Errors
455     ///
456     /// If the rate limiter is disabled or is not blocked, an error is returned.
457     pub fn event_handler(&self) -> Result<(), Error> {
458         let mut guard = self.inner.lock().unwrap();
459         loop {
460             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
461             // `timer_fd::wait()` won't block (which is different from its default behavior.)
462             match guard.timer_fd.wait() {
463                 Err(e) => {
464                     let err: std::io::Error = e.into();
465                     match err.kind() {
466                         std::io::ErrorKind::Interrupted => (),
467                         std::io::ErrorKind::WouldBlock => {
468                             return Err(Error::SpuriousRateLimiterEvent(
469                                 "Rate limiter event handler called without a present timer",
470                             ))
471                         }
472                         _ => return Err(Error::TimerFdWaitError(err)),
473                     }
474                 }
475                 _ => {
476                     self.timer_active.store(false, Ordering::Relaxed);
477                     return Ok(());
478                 }
479             }
480         }
481     }
482 
483     /// Updates the parameters of the token buckets associated with this RateLimiter.
484     // TODO: Please note that, right now, the buckets become full after being updated.
485     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
486         let mut guard = self.inner.lock().unwrap();
487         match bytes {
488             BucketUpdate::Disabled => guard.bandwidth = None,
489             BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
490             BucketUpdate::None => (),
491         };
492         match ops {
493             BucketUpdate::Disabled => guard.ops = None,
494             BucketUpdate::Update(tb) => guard.ops = Some(tb),
495             BucketUpdate::None => (),
496         };
497     }
498 }
499 
500 impl AsRawFd for RateLimiter {
501     /// Provides a FD which needs to be monitored for POLLIN events.
502     ///
503     /// This object's `event_handler()` method must be called on such events.
504     ///
505     /// Will return a negative value if rate limiting is disabled on both
506     /// token types.
507     fn as_raw_fd(&self) -> RawFd {
508         let guard = self.inner.lock().unwrap();
509         guard.timer_fd.as_raw_fd()
510     }
511 }
512 
513 impl Default for RateLimiter {
514     /// Default RateLimiter is a no-op limiter with infinite budget.
515     fn default() -> Self {
516         // Safe to unwrap since this will not attempt to create timer_fd.
517         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
518     }
519 }
520 
521 #[cfg(test)]
522 pub(crate) mod tests {
523     use std::{fmt, thread};
524 
525     use super::*;
526 
527     impl TokenBucket {
528         // Resets the token bucket: budget set to max capacity and last-updated set to now.
529         fn reset(&mut self) {
530             self.budget = self.size;
531             self.last_update = Instant::now();
532         }
533 
534         fn get_last_update(&self) -> &Instant {
535             &self.last_update
536         }
537 
538         fn get_processed_capacity(&self) -> u64 {
539             self.processed_capacity
540         }
541 
542         fn get_processed_refill_time(&self) -> u64 {
543             self.processed_refill_time
544         }
545 
546         // After a restore, we cannot be certain that the last_update field has the same value.
547         pub fn partial_eq(&self, other: &TokenBucket) -> bool {
548             (other.capacity() == self.capacity())
549                 && (other.one_time_burst() == self.one_time_burst())
550                 && (other.refill_time_ms() == self.refill_time_ms())
551                 && (other.budget() == self.budget())
552         }
553     }
554 
555     impl RateLimiter {
556         pub fn bandwidth(&self) -> Option<TokenBucket> {
557             let guard = self.inner.lock().unwrap();
558             guard.bandwidth.clone()
559         }
560 
561         pub fn ops(&self) -> Option<TokenBucket> {
562             let guard = self.inner.lock().unwrap();
563             guard.ops.clone()
564         }
565     }
566 
567     impl PartialEq for RateLimiter {
568         fn eq(&self, other: &RateLimiter) -> bool {
569             let self_guard = self.inner.lock().unwrap();
570             let other_guard = other.inner.lock().unwrap();
571             self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
572         }
573     }
574 
575     impl fmt::Debug for RateLimiter {
576         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
577             let guard = self.inner.lock().unwrap();
578             write!(
579                 f,
580                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
581                 guard.bandwidth, guard.ops
582             )
583         }
584     }
585 
586     #[test]
587     fn test_token_bucket_create() {
588         let before = Instant::now();
589         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
590         assert_eq!(tb.capacity(), 1000);
591         assert_eq!(tb.budget(), 1000);
592         assert!(*tb.get_last_update() >= before);
593         let after = Instant::now();
594         assert!(*tb.get_last_update() <= after);
595         assert_eq!(tb.get_processed_capacity(), 1);
596         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
597 
598         // Verify invalid bucket configurations result in `None`.
599         assert!(TokenBucket::new(0, 1234, 1000).is_none());
600         assert!(TokenBucket::new(100, 1234, 0).is_none());
601         assert!(TokenBucket::new(0, 1234, 0).is_none());
602     }
603 
604     #[test]
605     fn test_token_bucket_preprocess() {
606         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
607         assert_eq!(tb.get_processed_capacity(), 1);
608         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
609 
610         let thousand = 1000;
611         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
612         assert_eq!(tb.get_processed_capacity(), 3 * 19);
613         assert_eq!(
614             tb.get_processed_refill_time(),
615             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
616         );
617     }
618 
619     #[test]
620     fn test_token_bucket_reduce() {
621         // token bucket with capacity 1000 and refill time of 1000 milliseconds
622         // allowing rate of 1 token/ms.
623         let capacity = 1000;
624         let refill_ms = 1000;
625         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
626 
627         assert_eq!(tb.reduce(123), BucketReduction::Success);
628         assert_eq!(tb.budget(), capacity - 123);
629 
630         thread::sleep(Duration::from_millis(123));
631         assert_eq!(tb.reduce(1), BucketReduction::Success);
632         assert_eq!(tb.budget(), capacity - 1);
633         assert_eq!(tb.reduce(100), BucketReduction::Success);
634         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
635 
636         // token bucket with capacity 1000 and refill time of 1000 milliseconds
637         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
638         // safely assuming the thread can run these 3 commands in less than 500ms
639         assert_eq!(tb.reduce(1000), BucketReduction::Success);
640         assert_eq!(tb.one_time_burst(), 100);
641         assert_eq!(tb.reduce(500), BucketReduction::Success);
642         assert_eq!(tb.one_time_burst(), 0);
643         assert_eq!(tb.reduce(500), BucketReduction::Success);
644         assert_eq!(tb.reduce(500), BucketReduction::Failure);
645         thread::sleep(Duration::from_millis(500));
646         assert_eq!(tb.reduce(500), BucketReduction::Success);
647         thread::sleep(Duration::from_millis(1000));
648         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
649 
650         let before = Instant::now();
651         tb.reset();
652         assert_eq!(tb.capacity(), 1000);
653         assert_eq!(tb.budget(), 1000);
654         assert!(*tb.get_last_update() >= before);
655         let after = Instant::now();
656         assert!(*tb.get_last_update() <= after);
657     }
658 
659     #[test]
660     fn test_rate_limiter_default() {
661         let l = RateLimiter::default();
662 
663         // limiter should not be blocked
664         assert!(!l.is_blocked());
665         // limiter should be disabled so consume(whatever) should work
666         assert!(l.consume(u64::MAX, TokenType::Ops));
667         assert!(l.consume(u64::MAX, TokenType::Bytes));
668         // calling the handler without there having been an event should error
669         assert!(l.event_handler().is_err());
670         assert_eq!(
671             format!("{:?}", l.event_handler().err().unwrap()),
672             "SpuriousRateLimiterEvent(\
673              \"Rate limiter event handler called without a present timer\")"
674         );
675     }
676 
677     #[test]
678     fn test_rate_limiter_new() {
679         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
680         let bw = l.bandwidth().unwrap();
681         assert_eq!(bw.capacity(), 1000);
682         assert_eq!(bw.one_time_burst(), 1001);
683         assert_eq!(bw.refill_time_ms(), 1002);
684         assert_eq!(bw.budget(), 1000);
685 
686         let ops = l.ops().unwrap();
687         assert_eq!(ops.capacity(), 1003);
688         assert_eq!(ops.one_time_burst(), 1004);
689         assert_eq!(ops.refill_time_ms(), 1005);
690         assert_eq!(ops.budget(), 1003);
691     }
692 
693     #[test]
694     fn test_rate_limiter_manual_replenish() {
695         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
696         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
697 
698         // consume 123 bytes
699         assert!(l.consume(123, TokenType::Bytes));
700         l.manual_replenish(23, TokenType::Bytes);
701         {
702             let bytes_tb = l.bandwidth().unwrap();
703             assert_eq!(bytes_tb.budget(), 900);
704         }
705         // consume 123 ops
706         assert!(l.consume(123, TokenType::Ops));
707         l.manual_replenish(23, TokenType::Ops);
708         {
709             let bytes_tb = l.ops().unwrap();
710             assert_eq!(bytes_tb.budget(), 900);
711         }
712     }
713 
714     #[test]
715     fn test_rate_limiter_bandwidth() {
716         // rate limiter with limit of 1000 bytes/s
717         let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
718 
719         // limiter should not be blocked
720         assert!(!l.is_blocked());
721         // raw FD for this disabled should be valid
722         assert!(l.as_raw_fd() > 0);
723 
724         // ops/s limiter should be disabled so consume(whatever) should work
725         assert!(l.consume(u64::MAX, TokenType::Ops));
726 
727         // do full 1000 bytes
728         assert!(l.consume(1000, TokenType::Bytes));
729         // try and fail on another 100
730         assert!(!l.consume(100, TokenType::Bytes));
731         // since consume failed, limiter should be blocked now
732         assert!(l.is_blocked());
733         // wait half the timer period
734         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
735         // limiter should still be blocked
736         assert!(l.is_blocked());
737         // wait the other half of the timer period
738         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
739         // the timer_fd should have an event on it by now
740         assert!(l.event_handler().is_ok());
741         // limiter should now be unblocked
742         assert!(!l.is_blocked());
743         // try and succeed on another 100 bytes this time
744         assert!(l.consume(100, TokenType::Bytes));
745     }
746 
747     #[test]
748     fn test_rate_limiter_ops() {
749         // rate limiter with limit of 1000 ops/s
750         let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
751 
752         // limiter should not be blocked
753         assert!(!l.is_blocked());
754         // raw FD for this disabled should be valid
755         assert!(l.as_raw_fd() > 0);
756 
757         // bytes/s limiter should be disabled so consume(whatever) should work
758         assert!(l.consume(u64::MAX, TokenType::Bytes));
759 
760         // do full 1000 ops
761         assert!(l.consume(1000, TokenType::Ops));
762         // try and fail on another 100
763         assert!(!l.consume(100, TokenType::Ops));
764         // since consume failed, limiter should be blocked now
765         assert!(l.is_blocked());
766         // wait half the timer period
767         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
768         // limiter should still be blocked
769         assert!(l.is_blocked());
770         // wait the other half of the timer period
771         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
772         // the timer_fd should have an event on it by now
773         assert!(l.event_handler().is_ok());
774         // limiter should now be unblocked
775         assert!(!l.is_blocked());
776         // try and succeed on another 100 ops this time
777         assert!(l.consume(100, TokenType::Ops));
778     }
779 
780     #[test]
781     fn test_rate_limiter_full() {
782         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
783         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
784 
785         // limiter should not be blocked
786         assert!(!l.is_blocked());
787         // raw FD for this disabled should be valid
788         assert!(l.as_raw_fd() > 0);
789 
790         // do full 1000 bytes
791         assert!(l.consume(1000, TokenType::Ops));
792         // do full 1000 bytes
793         assert!(l.consume(1000, TokenType::Bytes));
794         // try and fail on another 100 ops
795         assert!(!l.consume(100, TokenType::Ops));
796         // try and fail on another 100 bytes
797         assert!(!l.consume(100, TokenType::Bytes));
798         // since consume failed, limiter should be blocked now
799         assert!(l.is_blocked());
800         // wait half the timer period
801         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
802         // limiter should still be blocked
803         assert!(l.is_blocked());
804         // wait the other half of the timer period
805         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
806         // the timer_fd should have an event on it by now
807         assert!(l.event_handler().is_ok());
808         // limiter should now be unblocked
809         assert!(!l.is_blocked());
810         // try and succeed on another 100 ops this time
811         assert!(l.consume(100, TokenType::Ops));
812         // try and succeed on another 100 bytes this time
813         assert!(l.consume(100, TokenType::Bytes));
814     }
815 
816     #[test]
817     fn test_rate_limiter_overconsumption() {
818         // initialize the rate limiter
819         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
820         // try to consume 2.5x the bucket size
821         // we are "borrowing" 1.5x the bucket size in tokens since
822         // the bucket is full
823         assert!(l.consume(2500, TokenType::Bytes));
824 
825         // check that even after a whole second passes, the rate limiter
826         // is still blocked
827         thread::sleep(Duration::from_millis(1000));
828         assert!(l.event_handler().is_err());
829         assert!(l.is_blocked());
830 
831         // after 1.5x the replenish time has passed, the rate limiter
832         // is available again
833         thread::sleep(Duration::from_millis(500));
834         assert!(l.event_handler().is_ok());
835         assert!(!l.is_blocked());
836 
837         // reset the rate limiter
838         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
839         // try to consume 1.5x the bucket size
840         // we are "borrowing" 1.5x the bucket size in tokens since
841         // the bucket is full, should arm the timer to 0.5x replenish
842         // time, which is 500 ms
843         assert!(l.consume(1500, TokenType::Bytes));
844 
845         // check that after more than the minimum refill time,
846         // the rate limiter is still blocked
847         thread::sleep(Duration::from_millis(200));
848         assert!(l.event_handler().is_err());
849         assert!(l.is_blocked());
850 
851         // try to consume some tokens, which should fail as the timer
852         // is still active
853         assert!(!l.consume(100, TokenType::Bytes));
854         assert!(l.event_handler().is_err());
855         assert!(l.is_blocked());
856 
857         // check that after the minimum refill time, the timer was not
858         // overwritten and the rate limiter is still blocked from the
859         // borrowing we performed earlier
860         thread::sleep(Duration::from_millis(100));
861         assert!(l.event_handler().is_err());
862         assert!(l.is_blocked());
863         assert!(!l.consume(100, TokenType::Bytes));
864 
865         // after waiting out the full duration, rate limiter should be
866         // available again
867         thread::sleep(Duration::from_millis(200));
868         assert!(l.event_handler().is_ok());
869         assert!(!l.is_blocked());
870         assert!(l.consume(100, TokenType::Bytes));
871     }
872 
873     #[test]
874     fn test_update_buckets() {
875         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
876 
877         let initial_bw = x.bandwidth();
878         let initial_ops = x.ops();
879 
880         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
881         assert_eq!(x.bandwidth(), initial_bw);
882         assert_eq!(x.ops(), initial_ops);
883 
884         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
885         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
886         x.update_buckets(
887             BucketUpdate::Update(new_bw.clone()),
888             BucketUpdate::Update(new_ops.clone()),
889         );
890 
891         {
892             let mut guard = x.inner.lock().unwrap();
893             // We have manually adjust the last_update field, because it changes when update_buckets()
894             // constructs new buckets (and thus gets a different value for last_update). We do this so
895             // it makes sense to test the following assertions.
896             guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
897             guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
898         }
899 
900         assert_eq!(x.bandwidth(), Some(new_bw));
901         assert_eq!(x.ops(), Some(new_ops));
902 
903         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
904         assert_eq!(x.bandwidth(), None);
905         assert_eq!(x.ops(), None);
906     }
907 
908     #[test]
909     fn test_rate_limiter_debug() {
910         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
911         assert_eq!(
912             format!("{l:?}"),
913             format!(
914                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
915                 l.bandwidth(),
916                 l.ops()
917             ),
918         );
919     }
920 }
921