xref: /cloud-hypervisor/rate_limiter/src/lib.rs (revision 8f56de713b34820ab2f0b33bd0c02fde583e045c)
1 // Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
2 // SPDX-License-Identifier: Apache-2.0
3 
4 #![deny(missing_docs)]
5 //! # Rate Limiter
6 //!
7 //! Provides a rate limiter written in Rust useful for IO operations that need to
8 //! be throttled.
9 //!
10 //! ## Behavior
11 //!
12 //! The rate limiter starts off as 'unblocked' with two token buckets configured
13 //! with the values passed in the `RateLimiter::new()` constructor.
14 //! All subsequent accounting is done independently for each token bucket based
15 //! on the `TokenType` used. If any of the buckets runs out of budget, the limiter
16 //! goes in the 'blocked' state. At this point an internal timer is set up which
17 //! will later 'wake up' the user in order to retry sending data. The 'wake up'
18 //! notification will be dispatched as an event on the FD provided by the `AsRawFD`
19 //! trait implementation.
20 //!
21 //! The contract is that the user shall also call the `event_handler()` method on
22 //! receipt of such an event.
23 //!
24 //! The token buckets are replenished every time a `consume()` is called, before
25 //! actually trying to consume the requested amount of tokens. The amount of tokens
26 //! replenished is automatically calculated to respect the `complete_refill_time`
27 //! configuration parameter provided by the user. The token buckets will never
28 //! replenish above their respective `size`.
29 //!
30 //! Each token bucket can start off with a `one_time_burst` initial extra capacity
31 //! on top of their `size`. This initial extra credit does not replenish and
32 //! can be used for an initial burst of data.
33 //!
34 //! The granularity for 'wake up' events when the rate limiter is blocked is
35 //! currently hardcoded to `100 milliseconds`.
36 //!
37 //! ## Limitations
38 //!
39 //! This rate limiter implementation relies on the *Linux kernel's timerfd* so its
40 //! usage is limited to Linux systems.
41 //!
42 //! Another particularity of this implementation is that it is not self-driving.
43 //! It is meant to be used in an external event loop and thus implements the `AsRawFd`
44 //! trait and provides an *event-handler* as part of its API. This *event-handler*
45 //! needs to be called by the user on every event on the rate limiter's `AsRawFd` FD.
46 #[macro_use]
47 extern crate log;
48 
49 use std::io;
50 use std::os::unix::io::{AsRawFd, RawFd};
51 use std::sync::atomic::{AtomicBool, Ordering};
52 use std::sync::Mutex;
53 use std::time::{Duration, Instant};
54 
55 use thiserror::Error;
56 use vmm_sys_util::timerfd::TimerFd;
57 
58 /// Module for group rate limiting.
59 pub mod group;
60 
61 #[derive(Error, Debug)]
62 /// Describes the errors that may occur while handling rate limiter events.
63 pub enum Error {
64     /// The event handler was called spuriously.
65     #[error("Event handler was called spuriously: {0}")]
66     SpuriousRateLimiterEvent(&'static str),
67     /// The event handler encounters while TimerFd::wait()
68     #[error("Failed to wait for the timer")]
69     TimerFdWaitError(#[source] std::io::Error),
70 }
71 
72 // Interval at which the refill timer will run when limiter is at capacity.
73 const REFILL_TIMER_INTERVAL_MS: u64 = 100;
74 const TIMER_REFILL_DUR: Duration = Duration::from_millis(REFILL_TIMER_INTERVAL_MS);
75 
76 const NANOSEC_IN_ONE_MILLISEC: u64 = 1_000_000;
77 
78 // Euclid's two-thousand-year-old algorithm for finding the greatest common divisor.
gcd(x: u64, y: u64) -> u6479 fn gcd(x: u64, y: u64) -> u64 {
80     let mut x = x;
81     let mut y = y;
82     while y != 0 {
83         let t = y;
84         y = x % y;
85         x = t;
86     }
87     x
88 }
89 
90 /// Enum describing the outcomes of a `reduce()` call on a `TokenBucket`.
91 #[derive(Clone, Debug, PartialEq)]
92 pub enum BucketReduction {
93     /// There are not enough tokens to complete the operation.
94     Failure,
95     /// A part of the available tokens have been consumed.
96     Success,
97     /// A number of tokens `inner` times larger than the bucket size have been consumed.
98     OverConsumption(f64),
99 }
100 
101 /// TokenBucket provides a lower level interface to rate limiting with a
102 /// configurable capacity, refill-rate and initial burst.
103 #[derive(Clone, Debug, PartialEq, Eq)]
104 pub struct TokenBucket {
105     // Bucket defining traits.
106     size: u64,
107     // Initial burst size (number of free initial tokens, that can be consumed at no cost)
108     one_time_burst: u64,
109     // Complete refill time in milliseconds.
110     refill_time: u64,
111 
112     // Internal state descriptors.
113     budget: u64,
114     last_update: Instant,
115 
116     // Fields used for pre-processing optimizations.
117     processed_capacity: u64,
118     processed_refill_time: u64,
119 }
120 
121 impl TokenBucket {
122     /// Creates a `TokenBucket` wrapped in an `Option`.
123     ///
124     /// TokenBucket created is of `size` total capacity and takes `complete_refill_time_ms`
125     /// milliseconds to go from zero tokens to total capacity. The `one_time_burst` is initial
126     /// extra credit on top of total capacity, that does not replenish and which can be used
127     /// for an initial burst of data.
128     ///
129     /// If the `size` or the `complete refill time` are zero, then `None` is returned.
new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self>130     pub fn new(size: u64, one_time_burst: u64, complete_refill_time_ms: u64) -> Option<Self> {
131         // If either token bucket capacity or refill time is 0, disable limiting.
132         if size == 0 || complete_refill_time_ms == 0 {
133             return None;
134         }
135         // Formula for computing current refill amount:
136         // refill_token_count = (delta_time * size) / (complete_refill_time_ms * 1_000_000)
137         // In order to avoid overflows, simplify the fractions by computing greatest common divisor.
138 
139         let complete_refill_time_ns = complete_refill_time_ms * NANOSEC_IN_ONE_MILLISEC;
140         // Get the greatest common factor between `size` and `complete_refill_time_ns`.
141         let common_factor = gcd(size, complete_refill_time_ns);
142         // The division will be exact since `common_factor` is a factor of `size`.
143         let processed_capacity: u64 = size / common_factor;
144         // The division will be exact since `common_factor` is a factor of `complete_refill_time_ns`.
145         let processed_refill_time: u64 = complete_refill_time_ns / common_factor;
146 
147         Some(TokenBucket {
148             size,
149             one_time_burst,
150             refill_time: complete_refill_time_ms,
151             // Start off full.
152             budget: size,
153             // Last updated is now.
154             last_update: Instant::now(),
155             processed_capacity,
156             processed_refill_time,
157         })
158     }
159 
160     /// Attempts to consume `tokens` from the bucket and returns whether the action succeeded.
161     // TODO (Issue #259): handle cases where a single request is larger than the full capacity
162     // for such cases we need to support partial fulfilment of requests
reduce(&mut self, mut tokens: u64) -> BucketReduction163     pub fn reduce(&mut self, mut tokens: u64) -> BucketReduction {
164         // First things first: consume the one-time-burst budget.
165         if self.one_time_burst > 0 {
166             // We still have burst budget for *all* tokens requests.
167             if self.one_time_burst >= tokens {
168                 self.one_time_burst -= tokens;
169                 self.last_update = Instant::now();
170                 // No need to continue to the refill process, we still have burst budget to consume from.
171                 return BucketReduction::Success;
172             } else {
173                 // We still have burst budget for *some* of the tokens requests.
174                 // The tokens left unfulfilled will be consumed from current `self.budget`.
175                 tokens -= self.one_time_burst;
176                 self.one_time_burst = 0;
177             }
178         }
179 
180         // Compute time passed since last refill/update.
181         let time_delta = self.last_update.elapsed().as_nanos() as u64;
182         self.last_update = Instant::now();
183 
184         // At each 'time_delta' nanoseconds the bucket should refill with:
185         // refill_amount = (time_delta * size) / (complete_refill_time_ms * 1_000_000)
186         // `processed_capacity` and `processed_refill_time` are the result of simplifying above
187         // fraction formula with their greatest-common-factor.
188         self.budget += (time_delta * self.processed_capacity) / self.processed_refill_time;
189 
190         if self.budget >= self.size {
191             self.budget = self.size;
192         }
193 
194         if tokens > self.budget {
195             // This operation requests a bandwidth higher than the bucket size
196             if tokens > self.size {
197                 error!(
198                     "Consumed {} tokens from bucket of size {}",
199                     tokens, self.size
200                 );
201                 // Empty the bucket and report an overconsumption of
202                 // (remaining tokens / size) times larger than the bucket size
203                 tokens -= self.budget;
204                 self.budget = 0;
205                 return BucketReduction::OverConsumption(tokens as f64 / self.size as f64);
206             }
207             // If not enough tokens consume() fails, return false.
208             return BucketReduction::Failure;
209         }
210 
211         self.budget -= tokens;
212         BucketReduction::Success
213     }
214 
215     /// "Manually" adds tokens to bucket.
replenish(&mut self, tokens: u64)216     pub fn replenish(&mut self, tokens: u64) {
217         // This means we are still during the burst interval.
218         // Of course there is a very small chance  that the last reduce() also used up burst
219         // budget which should now be replenished, but for performance and code-complexity
220         // reasons we're just gonna let that slide since it's practically inconsequential.
221         if self.one_time_burst > 0 {
222             self.one_time_burst += tokens;
223             return;
224         }
225         self.budget = std::cmp::min(self.budget + tokens, self.size);
226     }
227 
228     /// Returns the capacity of the token bucket.
capacity(&self) -> u64229     pub fn capacity(&self) -> u64 {
230         self.size
231     }
232 
233     /// Returns the remaining one time burst budget.
one_time_burst(&self) -> u64234     pub fn one_time_burst(&self) -> u64 {
235         self.one_time_burst
236     }
237 
238     /// Returns the time in milliseconds required to to completely fill the bucket.
refill_time_ms(&self) -> u64239     pub fn refill_time_ms(&self) -> u64 {
240         self.refill_time
241     }
242 
243     /// Returns the current budget (one time burst allowance notwithstanding).
budget(&self) -> u64244     pub fn budget(&self) -> u64 {
245         self.budget
246     }
247 }
248 
249 /// Enum that describes the type of token used.
250 pub enum TokenType {
251     /// Token type used for bandwidth limiting.
252     Bytes,
253     /// Token type used for operations/second limiting.
254     Ops,
255 }
256 
257 /// Enum that describes the type of token bucket update.
258 pub enum BucketUpdate {
259     /// No Update - same as before.
260     None,
261     /// Rate Limiting is disabled on this bucket.
262     Disabled,
263     /// Rate Limiting enabled with updated bucket.
264     Update(TokenBucket),
265 }
266 
267 /// Rate Limiter that works on both bandwidth and ops/s limiting.
268 ///
269 /// Bandwidth (bytes/s) and ops/s limiting can be used at the same time or individually.
270 ///
271 /// Implementation uses a single timer through TimerFd to refresh either or
272 /// both token buckets.
273 ///
274 /// Its internal buckets are 'passively' replenished as they're being used (as
275 /// part of `consume()` operations).
276 /// A timer is enabled and used to 'actively' replenish the token buckets when
277 /// limiting is in effect and `consume()` operations are disabled.
278 ///
279 /// RateLimiters will generate events on the FDs provided by their `AsRawFd` trait
280 /// implementation. These events are meant to be consumed by the user of this struct.
281 /// On each such event, the user must call the `event_handler()` method.
282 pub struct RateLimiter {
283     inner: Mutex<RateLimiterInner>,
284 
285     // Internal flag that quickly determines timer state.
286     timer_active: AtomicBool,
287 }
288 
289 struct RateLimiterInner {
290     bandwidth: Option<TokenBucket>,
291     ops: Option<TokenBucket>,
292 
293     timer_fd: TimerFd,
294 }
295 
296 impl RateLimiterInner {
297     // Arm the timer of the rate limiter with the provided `Duration` (which will fire only once).
activate_timer(&mut self, dur: Duration, flag: &AtomicBool)298     fn activate_timer(&mut self, dur: Duration, flag: &AtomicBool) {
299         // Panic when failing to arm the timer (same handling in crate TimerFd::set_state())
300         self.timer_fd
301             .reset(dur, None)
302             .expect("Can't arm the timer (unexpected 'timerfd_settime' failure).");
303         flag.store(true, Ordering::Relaxed)
304     }
305 }
306 
307 impl RateLimiter {
308     /// Creates a new Rate Limiter that can limit on both bytes/s and ops/s.
309     ///
310     /// # Arguments
311     ///
312     /// * `bytes_total_capacity` - the total capacity of the `TokenType::Bytes` token bucket.
313     /// * `bytes_one_time_burst` - initial extra credit on top of `bytes_total_capacity`,
314     ///   that does not replenish and which can be used for an initial burst of data.
315     /// * `bytes_complete_refill_time_ms` - number of milliseconds for the `TokenType::Bytes`
316     ///   token bucket to go from zero Bytes to `bytes_total_capacity` Bytes.
317     /// * `ops_total_capacity` - the total capacity of the `TokenType::Ops` token bucket.
318     /// * `ops_one_time_burst` - initial extra credit on top of `ops_total_capacity`,
319     ///   that does not replenish and which can be used for an initial burst of data.
320     /// * `ops_complete_refill_time_ms` - number of milliseconds for the `TokenType::Ops` token
321     ///   bucket to go from zero Ops to `ops_total_capacity` Ops.
322     ///
323     /// If either bytes/ops *size* or *refill_time* are **zero**, the limiter
324     /// is **disabled** for that respective token type.
325     ///
326     /// # Errors
327     ///
328     /// If the timerfd creation fails, an error is returned.
new( bytes_total_capacity: u64, bytes_one_time_burst: u64, bytes_complete_refill_time_ms: u64, ops_total_capacity: u64, ops_one_time_burst: u64, ops_complete_refill_time_ms: u64, ) -> io::Result<Self>329     pub fn new(
330         bytes_total_capacity: u64,
331         bytes_one_time_burst: u64,
332         bytes_complete_refill_time_ms: u64,
333         ops_total_capacity: u64,
334         ops_one_time_burst: u64,
335         ops_complete_refill_time_ms: u64,
336     ) -> io::Result<Self> {
337         let bytes_token_bucket = TokenBucket::new(
338             bytes_total_capacity,
339             bytes_one_time_burst,
340             bytes_complete_refill_time_ms,
341         );
342 
343         let ops_token_bucket = TokenBucket::new(
344             ops_total_capacity,
345             ops_one_time_burst,
346             ops_complete_refill_time_ms,
347         );
348 
349         // We'll need a timer_fd, even if our current config effectively disables rate limiting,
350         // because `Self::update_buckets()` might re-enable it later, and we might be
351         // seccomp-blocked from creating the timer_fd at that time.
352         let timer_fd = TimerFd::new()?;
353         // Note: vmm_sys_util::TimerFd::new() open the fd w/o O_NONBLOCK. We manually add this flag
354         // so that `Self::event_handler` won't be blocked with `vmm_sys_util::TimerFd::wait()`.
355         // SAFETY: FFI calls.
356         let ret = unsafe {
357             let fd = timer_fd.as_raw_fd();
358             let mut flags = libc::fcntl(fd, libc::F_GETFL);
359             flags |= libc::O_NONBLOCK;
360             libc::fcntl(fd, libc::F_SETFL, flags)
361         };
362         if ret < 0 {
363             return Err(std::io::Error::last_os_error());
364         }
365 
366         Ok(RateLimiter {
367             inner: Mutex::new(RateLimiterInner {
368                 bandwidth: bytes_token_bucket,
369                 ops: ops_token_bucket,
370                 timer_fd,
371             }),
372             timer_active: AtomicBool::new(false),
373         })
374     }
375 
376     /// Attempts to consume tokens and returns whether that is possible.
377     ///
378     /// If rate limiting is disabled on provided `token_type`, this function will always succeed.
consume(&self, tokens: u64, token_type: TokenType) -> bool379     pub fn consume(&self, tokens: u64, token_type: TokenType) -> bool {
380         // If the timer is active, we can't consume tokens from any bucket and the function fails.
381         if self.is_blocked() {
382             return false;
383         }
384         let mut guard = self.inner.lock().unwrap();
385         // Identify the required token bucket.
386         let token_bucket = match token_type {
387             TokenType::Bytes => guard.bandwidth.as_mut(),
388             TokenType::Ops => guard.ops.as_mut(),
389         };
390         // Try to consume from the token bucket.
391         if let Some(bucket) = token_bucket {
392             let refill_time = bucket.refill_time_ms();
393             match bucket.reduce(tokens) {
394                 // When we report budget is over, there will be no further calls here,
395                 // register a timer to replenish the bucket and resume processing;
396                 // make sure there is only one running timer for this limiter.
397                 BucketReduction::Failure => {
398                     if !self.is_blocked() {
399                         guard.activate_timer(TIMER_REFILL_DUR, &self.timer_active);
400                     }
401                     false
402                 }
403                 // The operation succeeded and further calls can be made.
404                 BucketReduction::Success => true,
405                 // The operation succeeded as the tokens have been consumed
406                 // but the timer still needs to be armed.
407                 BucketReduction::OverConsumption(ratio) => {
408                     // The operation "borrowed" a number of tokens `ratio` times
409                     // greater than the size of the bucket, and since it takes
410                     // `refill_time` milliseconds to fill an empty bucket, in
411                     // order to enforce the bandwidth limit we need to prevent
412                     // further calls to the rate limiter for
413                     // `ratio * refill_time` milliseconds.
414                     guard.activate_timer(
415                         Duration::from_millis((ratio * refill_time as f64) as u64),
416                         &self.timer_active,
417                     );
418                     true
419                 }
420             }
421         } else {
422             // If bucket is not present rate limiting is disabled on token type,
423             // consume() will always succeed.
424             true
425         }
426     }
427 
428     /// Adds tokens of `token_type` to their respective bucket.
429     ///
430     /// Can be used to *manually* add tokens to a bucket. Useful for reverting a
431     /// `consume()` if needed.
manual_replenish(&self, tokens: u64, token_type: TokenType)432     pub fn manual_replenish(&self, tokens: u64, token_type: TokenType) {
433         let mut guard = self.inner.lock().unwrap();
434         // Identify the required token bucket.
435         let token_bucket = match token_type {
436             TokenType::Bytes => guard.bandwidth.as_mut(),
437             TokenType::Ops => guard.ops.as_mut(),
438         };
439         // Add tokens to the token bucket.
440         if let Some(bucket) = token_bucket {
441             bucket.replenish(tokens);
442         }
443     }
444 
445     /// Returns whether this rate limiter is blocked.
446     ///
447     /// The limiter 'blocks' when a `consume()` operation fails because there was not enough
448     /// budget for it.
449     /// An event will be generated on the exported FD when the limiter 'unblocks'.
is_blocked(&self) -> bool450     pub fn is_blocked(&self) -> bool {
451         self.timer_active.load(Ordering::Relaxed)
452     }
453 
454     /// This function needs to be called every time there is an event on the
455     /// FD provided by this object's `AsRawFd` trait implementation.
456     ///
457     /// # Errors
458     ///
459     /// If the rate limiter is disabled or is not blocked, an error is returned.
event_handler(&self) -> Result<(), Error>460     pub fn event_handler(&self) -> Result<(), Error> {
461         let mut guard = self.inner.lock().unwrap();
462         loop {
463             // Note: As we manually added the `O_NONBLOCK` flag to the FD, the following
464             // `timer_fd::wait()` won't block (which is different from its default behavior.)
465             match guard.timer_fd.wait() {
466                 Err(e) => {
467                     let err: std::io::Error = e.into();
468                     match err.kind() {
469                         std::io::ErrorKind::Interrupted => (),
470                         std::io::ErrorKind::WouldBlock => {
471                             return Err(Error::SpuriousRateLimiterEvent(
472                                 "Rate limiter event handler called without a present timer",
473                             ))
474                         }
475                         _ => return Err(Error::TimerFdWaitError(err)),
476                     }
477                 }
478                 _ => {
479                     self.timer_active.store(false, Ordering::Relaxed);
480                     return Ok(());
481                 }
482             }
483         }
484     }
485 
486     /// Updates the parameters of the token buckets associated with this RateLimiter.
487     // TODO: Please note that, right now, the buckets become full after being updated.
update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate)488     pub fn update_buckets(&mut self, bytes: BucketUpdate, ops: BucketUpdate) {
489         let mut guard = self.inner.lock().unwrap();
490         match bytes {
491             BucketUpdate::Disabled => guard.bandwidth = None,
492             BucketUpdate::Update(tb) => guard.bandwidth = Some(tb),
493             BucketUpdate::None => (),
494         };
495         match ops {
496             BucketUpdate::Disabled => guard.ops = None,
497             BucketUpdate::Update(tb) => guard.ops = Some(tb),
498             BucketUpdate::None => (),
499         };
500     }
501 }
502 
503 impl AsRawFd for RateLimiter {
504     /// Provides a FD which needs to be monitored for POLLIN events.
505     ///
506     /// This object's `event_handler()` method must be called on such events.
507     ///
508     /// Will return a negative value if rate limiting is disabled on both
509     /// token types.
as_raw_fd(&self) -> RawFd510     fn as_raw_fd(&self) -> RawFd {
511         let guard = self.inner.lock().unwrap();
512         guard.timer_fd.as_raw_fd()
513     }
514 }
515 
516 impl Default for RateLimiter {
517     /// Default RateLimiter is a no-op limiter with infinite budget.
default() -> Self518     fn default() -> Self {
519         // Safe to unwrap since this will not attempt to create timer_fd.
520         RateLimiter::new(0, 0, 0, 0, 0, 0).expect("Failed to build default RateLimiter")
521     }
522 }
523 
524 #[cfg(test)]
525 pub(crate) mod tests {
526     use std::{fmt, thread};
527 
528     use super::*;
529 
530     impl TokenBucket {
531         // Resets the token bucket: budget set to max capacity and last-updated set to now.
reset(&mut self)532         fn reset(&mut self) {
533             self.budget = self.size;
534             self.last_update = Instant::now();
535         }
536 
get_last_update(&self) -> &Instant537         fn get_last_update(&self) -> &Instant {
538             &self.last_update
539         }
540 
get_processed_capacity(&self) -> u64541         fn get_processed_capacity(&self) -> u64 {
542             self.processed_capacity
543         }
544 
get_processed_refill_time(&self) -> u64545         fn get_processed_refill_time(&self) -> u64 {
546             self.processed_refill_time
547         }
548 
549         // After a restore, we cannot be certain that the last_update field has the same value.
550         #[allow(dead_code)]
partial_eq(&self, other: &TokenBucket) -> bool551         fn partial_eq(&self, other: &TokenBucket) -> bool {
552             (other.capacity() == self.capacity())
553                 && (other.one_time_burst() == self.one_time_burst())
554                 && (other.refill_time_ms() == self.refill_time_ms())
555                 && (other.budget() == self.budget())
556         }
557     }
558 
559     impl RateLimiter {
bandwidth(&self) -> Option<TokenBucket>560         fn bandwidth(&self) -> Option<TokenBucket> {
561             let guard = self.inner.lock().unwrap();
562             guard.bandwidth.clone()
563         }
564 
ops(&self) -> Option<TokenBucket>565         fn ops(&self) -> Option<TokenBucket> {
566             let guard = self.inner.lock().unwrap();
567             guard.ops.clone()
568         }
569     }
570 
571     impl PartialEq for RateLimiter {
eq(&self, other: &RateLimiter) -> bool572         fn eq(&self, other: &RateLimiter) -> bool {
573             let self_guard = self.inner.lock().unwrap();
574             let other_guard = other.inner.lock().unwrap();
575             self_guard.bandwidth == other_guard.bandwidth && self_guard.ops == other_guard.ops
576         }
577     }
578 
579     impl fmt::Debug for RateLimiter {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result580         fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
581             let guard = self.inner.lock().unwrap();
582             write!(
583                 f,
584                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
585                 guard.bandwidth, guard.ops
586             )
587         }
588     }
589 
590     #[test]
test_token_bucket_create()591     fn test_token_bucket_create() {
592         let before = Instant::now();
593         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
594         assert_eq!(tb.capacity(), 1000);
595         assert_eq!(tb.budget(), 1000);
596         assert!(*tb.get_last_update() >= before);
597         let after = Instant::now();
598         assert!(*tb.get_last_update() <= after);
599         assert_eq!(tb.get_processed_capacity(), 1);
600         assert_eq!(tb.get_processed_refill_time(), 1_000_000);
601 
602         // Verify invalid bucket configurations result in `None`.
603         assert!(TokenBucket::new(0, 1234, 1000).is_none());
604         assert!(TokenBucket::new(100, 1234, 0).is_none());
605         assert!(TokenBucket::new(0, 1234, 0).is_none());
606     }
607 
608     #[test]
test_token_bucket_preprocess()609     fn test_token_bucket_preprocess() {
610         let tb = TokenBucket::new(1000, 0, 1000).unwrap();
611         assert_eq!(tb.get_processed_capacity(), 1);
612         assert_eq!(tb.get_processed_refill_time(), NANOSEC_IN_ONE_MILLISEC);
613 
614         let thousand = 1000;
615         let tb = TokenBucket::new(3 * 7 * 11 * 19 * thousand, 0, 7 * 11 * 13 * 17).unwrap();
616         assert_eq!(tb.get_processed_capacity(), 3 * 19);
617         assert_eq!(
618             tb.get_processed_refill_time(),
619             13 * 17 * (NANOSEC_IN_ONE_MILLISEC / thousand)
620         );
621     }
622 
623     #[test]
test_token_bucket_reduce()624     fn test_token_bucket_reduce() {
625         // token bucket with capacity 1000 and refill time of 1000 milliseconds
626         // allowing rate of 1 token/ms.
627         let capacity = 1000;
628         let refill_ms = 1000;
629         let mut tb = TokenBucket::new(capacity, 0, refill_ms as u64).unwrap();
630 
631         assert_eq!(tb.reduce(123), BucketReduction::Success);
632         assert_eq!(tb.budget(), capacity - 123);
633 
634         thread::sleep(Duration::from_millis(123));
635         assert_eq!(tb.reduce(1), BucketReduction::Success);
636         assert_eq!(tb.budget(), capacity - 1);
637         assert_eq!(tb.reduce(100), BucketReduction::Success);
638         assert_eq!(tb.reduce(capacity), BucketReduction::Failure);
639 
640         // token bucket with capacity 1000 and refill time of 1000 milliseconds
641         let mut tb = TokenBucket::new(1000, 1100, 1000).unwrap();
642         // safely assuming the thread can run these 3 commands in less than 500ms
643         assert_eq!(tb.reduce(1000), BucketReduction::Success);
644         assert_eq!(tb.one_time_burst(), 100);
645         assert_eq!(tb.reduce(500), BucketReduction::Success);
646         assert_eq!(tb.one_time_burst(), 0);
647         assert_eq!(tb.reduce(500), BucketReduction::Success);
648         assert_eq!(tb.reduce(500), BucketReduction::Failure);
649         thread::sleep(Duration::from_millis(500));
650         assert_eq!(tb.reduce(500), BucketReduction::Success);
651         thread::sleep(Duration::from_millis(1000));
652         assert_eq!(tb.reduce(2500), BucketReduction::OverConsumption(1.5));
653 
654         let before = Instant::now();
655         tb.reset();
656         assert_eq!(tb.capacity(), 1000);
657         assert_eq!(tb.budget(), 1000);
658         assert!(*tb.get_last_update() >= before);
659         let after = Instant::now();
660         assert!(*tb.get_last_update() <= after);
661     }
662 
663     #[test]
test_rate_limiter_default()664     fn test_rate_limiter_default() {
665         let l = RateLimiter::default();
666 
667         // limiter should not be blocked
668         assert!(!l.is_blocked());
669         // limiter should be disabled so consume(whatever) should work
670         assert!(l.consume(u64::MAX, TokenType::Ops));
671         assert!(l.consume(u64::MAX, TokenType::Bytes));
672         // calling the handler without there having been an event should error
673         l.event_handler().unwrap_err();
674         assert_eq!(
675             format!("{:?}", l.event_handler().err().unwrap()),
676             "SpuriousRateLimiterEvent(\
677              \"Rate limiter event handler called without a present timer\")"
678         );
679     }
680 
681     #[test]
test_rate_limiter_new()682     fn test_rate_limiter_new() {
683         let l = RateLimiter::new(1000, 1001, 1002, 1003, 1004, 1005).unwrap();
684         let bw = l.bandwidth().unwrap();
685         assert_eq!(bw.capacity(), 1000);
686         assert_eq!(bw.one_time_burst(), 1001);
687         assert_eq!(bw.refill_time_ms(), 1002);
688         assert_eq!(bw.budget(), 1000);
689 
690         let ops = l.ops().unwrap();
691         assert_eq!(ops.capacity(), 1003);
692         assert_eq!(ops.one_time_burst(), 1004);
693         assert_eq!(ops.refill_time_ms(), 1005);
694         assert_eq!(ops.budget(), 1003);
695     }
696 
697     #[test]
test_rate_limiter_manual_replenish()698     fn test_rate_limiter_manual_replenish() {
699         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
700         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
701 
702         // consume 123 bytes
703         assert!(l.consume(123, TokenType::Bytes));
704         l.manual_replenish(23, TokenType::Bytes);
705         {
706             let bytes_tb = l.bandwidth().unwrap();
707             assert_eq!(bytes_tb.budget(), 900);
708         }
709         // consume 123 ops
710         assert!(l.consume(123, TokenType::Ops));
711         l.manual_replenish(23, TokenType::Ops);
712         {
713             let bytes_tb = l.ops().unwrap();
714             assert_eq!(bytes_tb.budget(), 900);
715         }
716     }
717 
718     #[test]
test_rate_limiter_bandwidth()719     fn test_rate_limiter_bandwidth() {
720         // rate limiter with limit of 1000 bytes/s
721         let l = RateLimiter::new(1000, 0, 1000, 0, 0, 0).unwrap();
722 
723         // limiter should not be blocked
724         assert!(!l.is_blocked());
725         // raw FD for this disabled should be valid
726         assert!(l.as_raw_fd() > 0);
727 
728         // ops/s limiter should be disabled so consume(whatever) should work
729         assert!(l.consume(u64::MAX, TokenType::Ops));
730 
731         // do full 1000 bytes
732         assert!(l.consume(1000, TokenType::Bytes));
733         // try and fail on another 100
734         assert!(!l.consume(100, TokenType::Bytes));
735         // since consume failed, limiter should be blocked now
736         assert!(l.is_blocked());
737         // wait half the timer period
738         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
739         // limiter should still be blocked
740         assert!(l.is_blocked());
741         // wait the other half of the timer period
742         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
743         // the timer_fd should have an event on it by now
744         l.event_handler().unwrap();
745         // limiter should now be unblocked
746         assert!(!l.is_blocked());
747         // try and succeed on another 100 bytes this time
748         assert!(l.consume(100, TokenType::Bytes));
749     }
750 
751     #[test]
test_rate_limiter_ops()752     fn test_rate_limiter_ops() {
753         // rate limiter with limit of 1000 ops/s
754         let l = RateLimiter::new(0, 0, 0, 1000, 0, 1000).unwrap();
755 
756         // limiter should not be blocked
757         assert!(!l.is_blocked());
758         // raw FD for this disabled should be valid
759         assert!(l.as_raw_fd() > 0);
760 
761         // bytes/s limiter should be disabled so consume(whatever) should work
762         assert!(l.consume(u64::MAX, TokenType::Bytes));
763 
764         // do full 1000 ops
765         assert!(l.consume(1000, TokenType::Ops));
766         // try and fail on another 100
767         assert!(!l.consume(100, TokenType::Ops));
768         // since consume failed, limiter should be blocked now
769         assert!(l.is_blocked());
770         // wait half the timer period
771         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
772         // limiter should still be blocked
773         assert!(l.is_blocked());
774         // wait the other half of the timer period
775         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
776         // the timer_fd should have an event on it by now
777         l.event_handler().unwrap();
778         // limiter should now be unblocked
779         assert!(!l.is_blocked());
780         // try and succeed on another 100 ops this time
781         assert!(l.consume(100, TokenType::Ops));
782     }
783 
784     #[test]
test_rate_limiter_full()785     fn test_rate_limiter_full() {
786         // rate limiter with limit of 1000 bytes/s and 1000 ops/s
787         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
788 
789         // limiter should not be blocked
790         assert!(!l.is_blocked());
791         // raw FD for this disabled should be valid
792         assert!(l.as_raw_fd() > 0);
793 
794         // do full 1000 bytes
795         assert!(l.consume(1000, TokenType::Ops));
796         // do full 1000 bytes
797         assert!(l.consume(1000, TokenType::Bytes));
798         // try and fail on another 100 ops
799         assert!(!l.consume(100, TokenType::Ops));
800         // try and fail on another 100 bytes
801         assert!(!l.consume(100, TokenType::Bytes));
802         // since consume failed, limiter should be blocked now
803         assert!(l.is_blocked());
804         // wait half the timer period
805         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
806         // limiter should still be blocked
807         assert!(l.is_blocked());
808         // wait the other half of the timer period
809         thread::sleep(Duration::from_millis(REFILL_TIMER_INTERVAL_MS / 2));
810         // the timer_fd should have an event on it by now
811         l.event_handler().unwrap();
812         // limiter should now be unblocked
813         assert!(!l.is_blocked());
814         // try and succeed on another 100 ops this time
815         assert!(l.consume(100, TokenType::Ops));
816         // try and succeed on another 100 bytes this time
817         assert!(l.consume(100, TokenType::Bytes));
818     }
819 
820     #[test]
test_rate_limiter_overconsumption()821     fn test_rate_limiter_overconsumption() {
822         // initialize the rate limiter
823         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
824         // try to consume 2.5x the bucket size
825         // we are "borrowing" 1.5x the bucket size in tokens since
826         // the bucket is full
827         assert!(l.consume(2500, TokenType::Bytes));
828 
829         // check that even after a whole second passes, the rate limiter
830         // is still blocked
831         thread::sleep(Duration::from_millis(1000));
832         l.event_handler().unwrap_err();
833         assert!(l.is_blocked());
834 
835         // after 1.5x the replenish time has passed, the rate limiter
836         // is available again
837         thread::sleep(Duration::from_millis(500));
838         l.event_handler().unwrap();
839         assert!(!l.is_blocked());
840 
841         // reset the rate limiter
842         let l = RateLimiter::new(1000, 0, 1000, 1000, 0, 1000).unwrap();
843         // try to consume 1.5x the bucket size
844         // we are "borrowing" 1.5x the bucket size in tokens since
845         // the bucket is full, should arm the timer to 0.5x replenish
846         // time, which is 500 ms
847         assert!(l.consume(1500, TokenType::Bytes));
848 
849         // check that after more than the minimum refill time,
850         // the rate limiter is still blocked
851         thread::sleep(Duration::from_millis(200));
852         l.event_handler().unwrap_err();
853         assert!(l.is_blocked());
854 
855         // try to consume some tokens, which should fail as the timer
856         // is still active
857         assert!(!l.consume(100, TokenType::Bytes));
858         l.event_handler().unwrap_err();
859         assert!(l.is_blocked());
860 
861         // check that after the minimum refill time, the timer was not
862         // overwritten and the rate limiter is still blocked from the
863         // borrowing we performed earlier
864         thread::sleep(Duration::from_millis(100));
865         l.event_handler().unwrap_err();
866         assert!(l.is_blocked());
867         assert!(!l.consume(100, TokenType::Bytes));
868 
869         // after waiting out the full duration, rate limiter should be
870         // available again
871         thread::sleep(Duration::from_millis(200));
872         l.event_handler().unwrap();
873         assert!(!l.is_blocked());
874         assert!(l.consume(100, TokenType::Bytes));
875     }
876 
877     #[test]
test_update_buckets()878     fn test_update_buckets() {
879         let mut x = RateLimiter::new(1000, 2000, 1000, 10, 20, 1000).unwrap();
880 
881         let initial_bw = x.bandwidth();
882         let initial_ops = x.ops();
883 
884         x.update_buckets(BucketUpdate::None, BucketUpdate::None);
885         assert_eq!(x.bandwidth(), initial_bw);
886         assert_eq!(x.ops(), initial_ops);
887 
888         let new_bw = TokenBucket::new(123, 0, 57).unwrap();
889         let new_ops = TokenBucket::new(321, 12346, 89).unwrap();
890         x.update_buckets(
891             BucketUpdate::Update(new_bw.clone()),
892             BucketUpdate::Update(new_ops.clone()),
893         );
894 
895         {
896             let mut guard = x.inner.lock().unwrap();
897             // We have manually adjust the last_update field, because it changes when update_buckets()
898             // constructs new buckets (and thus gets a different value for last_update). We do this so
899             // it makes sense to test the following assertions.
900             guard.bandwidth.as_mut().unwrap().last_update = new_bw.last_update;
901             guard.ops.as_mut().unwrap().last_update = new_ops.last_update;
902         }
903 
904         assert_eq!(x.bandwidth(), Some(new_bw));
905         assert_eq!(x.ops(), Some(new_ops));
906 
907         x.update_buckets(BucketUpdate::Disabled, BucketUpdate::Disabled);
908         assert_eq!(x.bandwidth(), None);
909         assert_eq!(x.ops(), None);
910     }
911 
912     #[test]
test_rate_limiter_debug()913     fn test_rate_limiter_debug() {
914         let l = RateLimiter::new(1, 2, 3, 4, 5, 6).unwrap();
915         assert_eq!(
916             format!("{l:?}"),
917             format!(
918                 "RateLimiter {{ bandwidth: {:?}, ops: {:?} }}",
919                 l.bandwidth(),
920                 l.ops()
921             ),
922         );
923     }
924 }
925